• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 6123805547954176

15 Jan 2026 04:40PM UTC coverage: 62.669% (-0.02%) from 62.685%
6123805547954176

push

CirrusCI

web-flow
Merge pull request #10413 from SomberNight/202601_mpp_status_htlcs_frozenset

lnutil: change ReceivedMPPStatus.htlcs to frozenset, i.e. immutable

12 of 22 new or added lines in 4 files covered. (54.55%)

5 existing lines in 4 files now uncovered.

23905 of 38145 relevant lines covered (62.67%)

0.63 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.15
/electrum/util.py
1
# Electrum - lightweight Bitcoin client
2
# Copyright (C) 2011 Thomas Voegtlin
3
#
4
# Permission is hereby granted, free of charge, to any person
5
# obtaining a copy of this software and associated documentation files
6
# (the "Software"), to deal in the Software without restriction,
7
# including without limitation the rights to use, copy, modify, merge,
8
# publish, distribute, sublicense, and/or sell copies of the Software,
9
# and to permit persons to whom the Software is furnished to do so,
10
# subject to the following conditions:
11
#
12
# The above copyright notice and this permission notice shall be
13
# included in all copies or substantial portions of the Software.
14
#
15
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
16
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
17
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
18
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
19
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
20
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
21
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22
# SOFTWARE.
23
import concurrent.futures
1✔
24
from dataclasses import dataclass
1✔
25
import logging
1✔
26
import os
1✔
27
import sys
1✔
28
import re
1✔
29
from collections import defaultdict, OrderedDict
1✔
30
from concurrent.futures.process import ProcessPoolExecutor
1✔
31
from typing import (
1✔
32
    NamedTuple, Union, TYPE_CHECKING, Tuple, Optional, Callable, Any, Sequence, Dict, Generic, TypeVar, List, Iterable,
33
    Set, Awaitable
34
)
35
from types import MappingProxyType
1✔
36
from datetime import datetime, timezone, timedelta
1✔
37
import decimal
1✔
38
from decimal import Decimal
1✔
39
import threading
1✔
40
import hmac
1✔
41
import hashlib
1✔
42
import stat
1✔
43
import asyncio
1✔
44
import builtins
1✔
45
import json
1✔
46
import time
1✔
47
import ssl
1✔
48
import ipaddress
1✔
49
from ipaddress import IPv4Address, IPv6Address
1✔
50
import random
1✔
51
import secrets
1✔
52
import functools
1✔
53
from functools import partial
1✔
54
from abc import abstractmethod, ABC
1✔
55
import enum
1✔
56
from contextlib import nullcontext, suppress
1✔
57
import traceback
1✔
58
import inspect
1✔
59

60
import aiohttp
1✔
61
from aiohttp_socks import ProxyConnector, ProxyType
1✔
62
import aiorpcx
1✔
63
import certifi
1✔
64
import dns.asyncresolver
1✔
65

66
from .i18n import _
1✔
67
from .logging import get_logger, Logger
1✔
68

69
if TYPE_CHECKING:
70
    from .network import Network, ProxySettings
71
    from .interface import Interface
72
    from .simple_config import SimpleConfig
73

74

75
_logger = get_logger(__name__)
1✔
76

77

78
def inv_dict(d):
1✔
79
    return {v: k for k, v in d.items()}
1✔
80

81

82
def all_subclasses(cls) -> Set:
1✔
83
    """Return all (transitive) subclasses of cls."""
84
    res = set(cls.__subclasses__())
1✔
85
    for sub in res.copy():
1✔
86
        res |= all_subclasses(sub)
1✔
87
    return res
1✔
88

89

90
ca_path = certifi.where()
1✔
91

92

93
base_units = {'BTC':8, 'mBTC':5, 'bits':2, 'sat':0}
1✔
94
base_units_inverse = inv_dict(base_units)
1✔
95
base_units_list = ['BTC', 'mBTC', 'bits', 'sat']  # list(dict) does not guarantee order
1✔
96

97
DECIMAL_POINT_DEFAULT = 5  # mBTC
1✔
98

99

100
class UnknownBaseUnit(Exception): pass
1✔
101

102

103
def decimal_point_to_base_unit_name(dp: int) -> str:
1✔
104
    # e.g. 8 -> "BTC"
105
    try:
1✔
106
        return base_units_inverse[dp]
1✔
107
    except KeyError:
×
108
        raise UnknownBaseUnit(dp) from None
×
109

110

111
def base_unit_name_to_decimal_point(unit_name: str) -> int:
1✔
112
    """Returns the max number of digits allowed after the decimal point."""
113
    # e.g. "BTC" -> 8
114
    try:
1✔
115
        return base_units[unit_name]
1✔
116
    except KeyError:
×
117
        raise UnknownBaseUnit(unit_name) from None
×
118

119
def parse_max_spend(amt: Any) -> Optional[int]:
1✔
120
    """Checks if given amount is "spend-max"-like.
121
    Returns None or the positive integer weight for "max". Never raises.
122

123
    When creating invoices and on-chain txs, the user can specify to send "max".
124
    This is done by setting the amount to '!'. Splitting max between multiple
125
    tx outputs is also possible, and custom weights (positive ints) can also be used.
126
    For example, to send 40% of all coins to address1, and 60% to address2:
127
    ```
128
    address1, 2!
129
    address2, 3!
130
    ```
131
    """
132
    if not (isinstance(amt, str) and amt and amt[-1] == '!'):
1✔
133
        return None
1✔
134
    if amt == '!':
1✔
135
        return 1
1✔
136
    x = amt[:-1]
1✔
137
    try:
1✔
138
        x = int(x)
1✔
139
    except ValueError:
×
140
        return None
×
141
    if x > 0:
1✔
142
        return x
1✔
143
    return None
×
144

145
class NotEnoughFunds(Exception):
1✔
146
    def __str__(self):
1✔
147
        return _("Insufficient funds")
×
148

149

150
class UneconomicFee(Exception):
1✔
151
    def __str__(self):
1✔
152
        return _("The fee for the transaction is higher than the funds gained from it.")
×
153

154

155
class NoDynamicFeeEstimates(Exception):
1✔
156
    def __str__(self):
1✔
157
        return _('Dynamic fee estimates not available')
×
158

159

160
class BelowDustLimit(Exception):
1✔
161
    pass
1✔
162

163

164
class InvalidPassword(Exception):
1✔
165
    def __init__(self, message: Optional[str] = None):
1✔
166
        self.message = message
1✔
167

168
    def __str__(self):
1✔
169
        if self.message is None:
×
170
            return _("Incorrect password")
×
171
        else:
172
            return str(self.message)
×
173

174

175
class AddTransactionException(Exception):
1✔
176
    pass
1✔
177

178

179
class UnrelatedTransactionException(AddTransactionException):
1✔
180
    def __str__(self):
1✔
181
        return _("Transaction is unrelated to this wallet.")
×
182

183

184
class FileImportFailed(Exception):
1✔
185
    def __init__(self, message=''):
1✔
186
        self.message = str(message)
×
187

188
    def __str__(self):
1✔
189
        return _("Failed to import from file.") + "\n" + self.message
×
190

191

192
class FileExportFailed(Exception):
1✔
193
    def __init__(self, message=''):
1✔
194
        self.message = str(message)
×
195

196
    def __str__(self):
1✔
197
        return _("Failed to export to file.") + "\n" + self.message
×
198

199

200
class WalletFileException(Exception):
1✔
201
    def __init__(self, message='', *, should_report_crash: bool = False):
1✔
202
        Exception.__init__(self, message)
1✔
203
        self.should_report_crash = should_report_crash
1✔
204

205

206
class BitcoinException(Exception): pass
1✔
207

208

209
class UserFacingException(Exception):
1✔
210
    """Exception that contains information intended to be shown to the user."""
211

212

213
class InvoiceError(UserFacingException): pass
1✔
214

215

216
class NetworkOfflineException(UserFacingException):
1✔
217
    """Can be raised if we are running in offline mode (--offline flag)
218
    and the user requests an operation that requires the network.
219
    """
220
    def __str__(self):
1✔
221
        return _("You are offline.")
×
222

223

224
# Throw this exception to unwind the stack like when an error occurs.
225
# However unlike other exceptions the user won't be informed.
226
class UserCancelled(Exception):
1✔
227
    '''An exception that is suppressed from the user'''
228
    pass
1✔
229

230

231
def to_decimal(x: Union[str, float, int, Decimal]) -> Decimal:
1✔
232
    # helper function mainly for float->Decimal conversion, i.e.:
233
    #   >>> Decimal(41754.681)
234
    #   Decimal('41754.680999999996856786310672760009765625')
235
    #   >>> Decimal("41754.681")
236
    #   Decimal('41754.681')
237
    if isinstance(x, Decimal):
1✔
238
        return x
1✔
239
    if isinstance(x, int):
1✔
240
        return Decimal(x)
1✔
241
    return Decimal(str(x))
1✔
242

243

244
# note: this is not a NamedTuple as then its json encoding cannot be customized
245
class Satoshis(object):
1✔
246
    __slots__ = ('value',)
1✔
247

248
    def __new__(cls, value):
1✔
249
        self = super(Satoshis, cls).__new__(cls)
1✔
250
        # note: 'value' sometimes has msat precision
251
        assert isinstance(value, (int, Decimal)), f"unexpected type for {value=!r}"
1✔
252
        self.value = value
1✔
253
        return self
1✔
254

255
    def __repr__(self):
1✔
256
        return f'Satoshis({self.value})'
×
257

258
    def __str__(self):
1✔
259
        # note: precision is truncated to satoshis here
260
        return format_satoshis(self.value)
1✔
261

262
    def __eq__(self, other):
1✔
263
        return self.value == other.value
×
264

265
    def __ne__(self, other):
1✔
266
        return not (self == other)
×
267

268
    def __add__(self, other):
1✔
269
        return Satoshis(self.value + other.value)
×
270

271

272
# note: this is not a NamedTuple as then its json encoding cannot be customized
273
class Fiat(object):
1✔
274
    __slots__ = ('value', 'ccy')
1✔
275

276
    def __new__(cls, value: Optional[Decimal], ccy: str):
1✔
277
        self = super(Fiat, cls).__new__(cls)
1✔
278
        self.ccy = ccy
1✔
279
        if not isinstance(value, (Decimal, type(None))):
1✔
280
            raise TypeError(f"value should be Decimal or None, not {type(value)}")
×
281
        self.value = value
1✔
282
        return self
1✔
283

284
    def __repr__(self):
1✔
285
        return 'Fiat(%s)'% self.__str__()
×
286

287
    def __str__(self):
1✔
288
        if self.value is None or self.value.is_nan():
1✔
289
            return _('No Data')
×
290
        else:
291
            return "{:.2f}".format(self.value)
1✔
292

293
    def to_ui_string(self):
1✔
294
        if self.value is None or self.value.is_nan():
×
295
            return _('No Data')
×
296
        else:
297
            return "{:.2f}".format(self.value) + ' ' + self.ccy
×
298

299
    def __eq__(self, other):
1✔
300
        if not isinstance(other, Fiat):
×
301
            return False
×
302
        if self.ccy != other.ccy:
×
303
            return False
×
304
        if isinstance(self.value, Decimal) and isinstance(other.value, Decimal) \
×
305
                and self.value.is_nan() and other.value.is_nan():
306
            return True
×
307
        return self.value == other.value
×
308

309
    def __ne__(self, other):
1✔
310
        return not (self == other)
×
311

312
    def __add__(self, other):
1✔
313
        assert self.ccy == other.ccy
×
314
        return Fiat(self.value + other.value, self.ccy)
×
315

316

317
class MyEncoder(json.JSONEncoder):
1✔
318
    def default(self, obj):
1✔
319
        # note: this does not get called for namedtuples :(  https://bugs.python.org/issue30343
320
        from .transaction import Transaction, TxOutput
1✔
321
        if isinstance(obj, Transaction):
1✔
322
            return obj.serialize()
1✔
323
        if isinstance(obj, TxOutput):
1✔
324
            return obj.to_legacy_tuple()
1✔
325
        if isinstance(obj, Satoshis):
1✔
326
            return str(obj)
1✔
327
        if isinstance(obj, Fiat):
1✔
328
            return str(obj)
1✔
329
        if isinstance(obj, Decimal):
1✔
330
            return str(obj)
×
331
        if isinstance(obj, datetime):
1✔
332
            # note: if there is a timezone specified, this will include the offset
333
            return obj.isoformat(' ', timespec="minutes")
1✔
334
        if isinstance(obj, (set, frozenset)):
1✔
335
            return list(obj)
1✔
336
        if isinstance(obj, bytes): # for nametuples in lnchannel
1✔
337
            return obj.hex()
1✔
338
        if hasattr(obj, 'to_json') and callable(obj.to_json):
1✔
339
            return obj.to_json()
1✔
340
        return super(MyEncoder, self).default(obj)
×
341

342

343
class ThreadJob(Logger):
1✔
344
    """A job that is run periodically from a thread's main loop.  run() is
345
    called from that thread's context.
346
    """
347

348
    def __init__(self):
1✔
349
        Logger.__init__(self)
1✔
350

351
    def run(self):
1✔
352
        """Called periodically from the thread"""
353
        pass
×
354

355
class DebugMem(ThreadJob):
1✔
356
    '''A handy class for debugging GC memory leaks'''
357
    def __init__(self, classes, interval=30):
1✔
358
        ThreadJob.__init__(self)
×
359
        self.next_time = 0
×
360
        self.classes = classes
×
361
        self.interval = interval
×
362

363
    def mem_stats(self):
1✔
364
        import gc
×
365
        self.logger.info("Start memscan")
×
366
        gc.collect()
×
367
        objmap = defaultdict(list)
×
368
        for obj in gc.get_objects():
×
369
            for class_ in self.classes:
×
370
                try:
×
371
                    _isinstance = isinstance(obj, class_)
×
372
                except AttributeError:
×
373
                    _isinstance = False
×
374
                if _isinstance:
×
375
                    objmap[class_].append(obj)
×
376
        for class_, objs in objmap.items():
×
377
            self.logger.info(f"{class_.__name__}: {len(objs)}")
×
378
        self.logger.info("Finish memscan")
×
379

380
    def run(self):
1✔
381
        if time.time() > self.next_time:
×
382
            self.mem_stats()
×
383
            self.next_time = time.time() + self.interval
×
384

385
class DaemonThread(threading.Thread, Logger):
1✔
386
    """ daemon thread that terminates cleanly """
387

388
    def __init__(self):
1✔
389
        threading.Thread.__init__(self)
1✔
390
        Logger.__init__(self)
1✔
391
        self.parent_thread = threading.current_thread()
1✔
392
        self.running = False
1✔
393
        self.running_lock = threading.Lock()
1✔
394
        self.job_lock = threading.Lock()
1✔
395
        self.jobs = []
1✔
396
        self.stopped_event = threading.Event()        # set when fully stopped
1✔
397
        self.stopped_event_async = asyncio.Event()    # set when fully stopped
1✔
398
        self.wake_up_event = threading.Event()  # for perf optimisation of polling in run()
1✔
399

400
    def add_jobs(self, jobs):
1✔
401
        with self.job_lock:
1✔
402
            self.jobs.extend(jobs)
1✔
403

404
    def run_jobs(self):
1✔
405
        # Don't let a throwing job disrupt the thread, future runs of
406
        # itself, or other jobs.  This is useful protection against
407
        # malformed or malicious server responses
408
        with self.job_lock:
1✔
409
            for job in self.jobs:
1✔
410
                try:
1✔
411
                    job.run()
1✔
412
                except Exception as e:
×
413
                    self.logger.exception('')
×
414

415
    def remove_jobs(self, jobs):
1✔
416
        with self.job_lock:
×
417
            for job in jobs:
×
418
                self.jobs.remove(job)
×
419

420
    def start(self):
1✔
421
        with self.running_lock:
1✔
422
            self.running = True
1✔
423
        return threading.Thread.start(self)
1✔
424

425
    def is_running(self):
1✔
426
        with self.running_lock:
1✔
427
            return self.running and self.parent_thread.is_alive()
1✔
428

429
    def stop(self):
1✔
430
        with self.running_lock:
1✔
431
            self.running = False
1✔
432
            self.wake_up_event.set()
1✔
433
            self.wake_up_event.clear()
1✔
434

435
    def on_stop(self):
1✔
436
        if 'ANDROID_DATA' in os.environ:
1✔
437
            import jnius
×
438
            jnius.detach()
×
439
            self.logger.info("jnius detach")
×
440
        self.logger.info("stopped")
1✔
441
        self.stopped_event.set()
1✔
442
        loop = get_asyncio_loop()
1✔
443
        loop.call_soon_threadsafe(self.stopped_event_async.set)
1✔
444

445

446
def print_stderr(*args):
1✔
447
    args = [str(item) for item in args]
×
448
    sys.stderr.write(" ".join(args) + "\n")
×
449
    sys.stderr.flush()
×
450

451

452
def print_msg(*args):
1✔
453
    # Stringify args
454
    args = [str(item) for item in args]
×
455
    sys.stdout.write(" ".join(args) + "\n")
×
456
    sys.stdout.flush()
×
457

458

459
def json_encode(obj):
1✔
460
    try:
1✔
461
        s = json.dumps(obj, sort_keys = True, indent = 4, cls=MyEncoder)
1✔
462
    except TypeError:
×
463
        s = repr(obj)
×
464
    return s
1✔
465

466

467
def json_decode(x):
1✔
468
    try:
1✔
469
        return json.loads(x, parse_float=Decimal)
1✔
470
    except Exception:
1✔
471
        return x
1✔
472

473

474
def json_normalize(x):
1✔
475
    # note: The return value of commands, when going through the JSON-RPC interface,
476
    #       is json-encoded. The encoder used there cannot handle some types, e.g. electrum.util.Satoshis.
477
    # note: We should not simply do "json_encode(x)" here, as then later x would get doubly json-encoded.
478
    # see #5868
479
    return json_decode(json_encode(x))
1✔
480

481

482
# taken from Django Source Code
483
def constant_time_compare(val1, val2):
1✔
484
    """Return True if the two strings are equal, False otherwise."""
485
    return hmac.compare_digest(to_bytes(val1, 'utf8'), to_bytes(val2, 'utf8'))
1✔
486

487

488
_profiler_logger = _logger.getChild('profiler')
1✔
489

490

491
def profiler(func=None, *, min_threshold: Union[int, float, None] = None):
1✔
492
    """Function decorator that logs execution time.
493

494
    min_threshold: if set, only log if time taken is higher than threshold
495
    """
496
    if func is None:  # to make "@profiler(...)" work. (in addition to bare "@profiler")
1✔
497
        return partial(profiler, min_threshold=min_threshold)
1✔
498
    t0 = None  # type: Optional[float]
1✔
499

500
    def timer_start():
1✔
501
        nonlocal t0
502
        t0 = time.time()
1✔
503

504
    def timer_done():
1✔
505
        t = time.time() - t0
1✔
506
        if min_threshold is None or t > min_threshold:
1✔
507
            _profiler_logger.debug(f"{func.__qualname__} {t:,.4f} sec")
1✔
508

509
    if inspect.iscoroutinefunction(func):
1✔
510
        async def do_profile(*args, **kw_args):
×
511
            timer_start()
×
512
            o = await func(*args, **kw_args)
×
513
            timer_done()
×
514
            return o
×
515
    else:
516
        def do_profile(*args, **kw_args):
1✔
517
            timer_start()
1✔
518
            o = func(*args, **kw_args)
1✔
519
            timer_done()
1✔
520
            return o
1✔
521
    return do_profile
1✔
522

523

524
class AsyncHangDetector:
1✔
525
    """Context manager that logs every `n` seconds if encapsulated context still has not exited."""
526

527
    def __init__(
1✔
528
        self,
529
        *,
530
        period_sec: int = 15,
531
        message: str,
532
        logger: logging.Logger = None,
533
    ):
534
        self.period_sec = period_sec
1✔
535
        self.message = message
1✔
536
        self.logger = logger or _logger
1✔
537

538
    async def _monitor(self):
1✔
539
        # note: this assumes that the event loop itself is not blocked
540
        t0 = time.monotonic()
1✔
541
        while True:
1✔
542
            await asyncio.sleep(self.period_sec)
1✔
543
            t1 = time.monotonic()
×
544
            self.logger.info(f"{self.message} (after {t1 - t0:.2f} sec)")
×
545

546
    async def __aenter__(self):
1✔
547
        self.mtask = asyncio.create_task(self._monitor())
1✔
548

549
    async def __aexit__(self, exc_type, exc, tb):
1✔
550
        self.mtask.cancel()
1✔
551

552

553
def android_ext_dir():
1✔
554
    from android.storage import primary_external_storage_path
×
555
    return primary_external_storage_path()
×
556

557

558
def android_backup_dir():
1✔
559
    pkgname = get_android_package_name()
×
560
    d = os.path.join(android_ext_dir(), pkgname)
×
561
    if not os.path.exists(d):
×
562
        os.mkdir(d)
×
563
    return d
×
564

565

566
def android_data_dir():
1✔
567
    import jnius
×
568
    PythonActivity = jnius.autoclass('org.kivy.android.PythonActivity')
×
569
    return PythonActivity.mActivity.getFilesDir().getPath() + '/data'
×
570

571

572
def ensure_sparse_file(filename):
1✔
573
    # On modern Linux, no need to do anything.
574
    # On Windows, need to explicitly mark file.
575
    if os.name == "nt":
1✔
576
        try:
×
577
            os.system('fsutil sparse setflag "{}" 1'.format(filename))
×
578
        except Exception as e:
×
579
            _logger.info(f'error marking file {filename} as sparse: {e}')
×
580

581

582
def get_headers_dir(config):
1✔
583
    return config.path
1✔
584

585

586
def assert_datadir_available(config_path):
1✔
587
    path = config_path
1✔
588
    if os.path.exists(path):
1✔
589
        return
1✔
590
    else:
591
        raise FileNotFoundError(
×
592
            'Electrum datadir does not exist. Was it deleted while running?' + '\n' +
593
            'Should be at {}'.format(path))
594

595

596
def assert_file_in_datadir_available(path, config_path):
1✔
597
    if os.path.exists(path):
×
598
        return
×
599
    else:
600
        assert_datadir_available(config_path)
×
601
        raise FileNotFoundError(
×
602
            'Cannot find file but datadir is there.' + '\n' +
603
            'Should be at {}'.format(path))
604

605

606
def standardize_path(path):
1✔
607
    # note: os.path.realpath() is not used, as on Windows it can return non-working paths (see #8495).
608
    #       This means that we don't resolve symlinks!
609
    return os.path.normcase(
1✔
610
                os.path.abspath(
611
                    os.path.expanduser(
612
                        path
613
    )))
614

615

616
def get_new_wallet_name(wallet_folder: str) -> str:
1✔
617
    """Returns a file basename for a new wallet to be used.
618
    Can raise OSError.
619
    """
620
    i = 1
1✔
621
    while True:
1✔
622
        filename = "wallet_%d" % i
1✔
623
        if filename in os.listdir(wallet_folder):
1✔
624
            i += 1
1✔
625
        else:
626
            break
1✔
627
    return filename
1✔
628

629

630
def is_android_debug_apk() -> bool:
1✔
631
    is_android = 'ANDROID_DATA' in os.environ
×
632
    if not is_android:
×
633
        return False
×
634
    from jnius import autoclass
×
635
    pkgname = get_android_package_name()
×
636
    build_config = autoclass(f"{pkgname}.BuildConfig")
×
637
    return bool(build_config.DEBUG)
×
638

639

640
def get_android_package_name() -> str:
1✔
641
    is_android = 'ANDROID_DATA' in os.environ
×
642
    assert is_android
×
643
    from jnius import autoclass
×
644
    from android.config import ACTIVITY_CLASS_NAME
×
645
    activity = autoclass(ACTIVITY_CLASS_NAME).mActivity
×
646
    pkgname = str(activity.getPackageName())
×
647
    return pkgname
×
648

649

650
def assert_bytes(*args):
1✔
651
    """
652
    porting helper, assert args type
653
    """
654
    try:
1✔
655
        for x in args:
1✔
656
            assert isinstance(x, (bytes, bytearray))
1✔
657
    except Exception:
×
658
        print('assert bytes failed', list(map(type, args)))
×
659
        raise
×
660

661

662
def assert_str(*args):
1✔
663
    """
664
    porting helper, assert args type
665
    """
666
    for x in args:
×
667
        assert isinstance(x, str)
×
668

669

670
def to_string(x, enc) -> str:
1✔
671
    if isinstance(x, (bytes, bytearray)):
1✔
672
        return x.decode(enc)
1✔
673
    if isinstance(x, str):
×
674
        return x
×
675
    else:
676
        raise TypeError("Not a string or bytes like object")
×
677

678

679
def to_bytes(something, encoding='utf8') -> bytes:
1✔
680
    """
681
    cast string to bytes() like object, but for python2 support it's bytearray copy
682
    """
683
    if isinstance(something, bytes):
1✔
684
        return something
1✔
685
    if isinstance(something, str):
1✔
686
        return something.encode(encoding)
1✔
687
    elif isinstance(something, bytearray):
1✔
688
        return bytes(something)
1✔
689
    else:
690
        raise TypeError("Not a string or bytes like object")
1✔
691

692

693
bfh = bytes.fromhex
1✔
694

695

696
def xor_bytes(a: bytes, b: bytes) -> bytes:
1✔
697
    size = min(len(a), len(b))
1✔
698
    return ((int.from_bytes(a[:size], "big") ^ int.from_bytes(b[:size], "big"))
1✔
699
            .to_bytes(size, "big"))
700

701

702
def user_dir():
1✔
703
    if "ELECTRUMDIR" in os.environ:
1✔
704
        return os.environ["ELECTRUMDIR"]
×
705
    elif 'ANDROID_DATA' in os.environ:
1✔
706
        return android_data_dir()
×
707
    elif os.name == 'posix':
1✔
708
        return os.path.join(os.environ["HOME"], ".electrum")
1✔
709
    elif "APPDATA" in os.environ:
×
710
        return os.path.join(os.environ["APPDATA"], "Electrum")
×
711
    elif "LOCALAPPDATA" in os.environ:
×
712
        return os.path.join(os.environ["LOCALAPPDATA"], "Electrum")
×
713
    else:
714
        #raise Exception("No home directory found in environment variables.")
715
        return
×
716

717

718
def resource_path(*parts):
1✔
719
    return os.path.join(pkg_dir, *parts)
1✔
720

721

722
# absolute path to python package folder of electrum ("lib")
723
pkg_dir = os.path.split(os.path.realpath(__file__))[0]
1✔
724

725

726
def is_valid_email(s):
1✔
727
    regexp = r"[^@]+@[^@]+\.[^@]+"
×
728
    return re.match(regexp, s) is not None
×
729

730

731
def is_valid_websocket_url(url: str) -> bool:
1✔
732
    """
733
    uses this django url validation regex:
734
    https://github.com/django/django/blob/2c6906a0c4673a7685817156576724aba13ad893/django/core/validators.py#L45C1-L52C43
735
    Note: this is not perfect, urls and their parsing can get very complex (see recent django code).
736
    however its sufficient for catching weird user input in the gui dialog
737
    """
738
    # stores the compiled regex in the function object itself to avoid recompiling it every call
739
    if not hasattr(is_valid_websocket_url, "regex"):
×
740
        is_valid_websocket_url.regex = re.compile(
×
741
            r'^(?:ws|wss)://'  # ws:// or wss://
742
            r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|'  # domain...
743
            r'localhost|'  # localhost...
744
            r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}|'  # ...or ipv4
745
            r'\[?[A-F0-9]*:[A-F0-9:]+\]?)'  # ...or ipv6
746
            r'(?::\d+)?'  # optional port
747
            r'(?:/?|[/?]\S+)$', re.IGNORECASE)
748
    try:
×
749
        return re.match(is_valid_websocket_url.regex, url) is not None
×
750
    except Exception:
×
751
        return False
×
752

753

754
def is_hash256_str(text: Any) -> bool:
1✔
755
    if not isinstance(text, str): return False
1✔
756
    if len(text) != 64: return False
1✔
757
    return is_hex_str(text)
1✔
758

759

760
def is_hex_str(text: Any) -> bool:
1✔
761
    if not isinstance(text, str): return False
1✔
762
    try:
1✔
763
        b = bytes.fromhex(text)
1✔
764
    except Exception:
1✔
765
        return False
1✔
766
    # forbid whitespaces in text:
767
    if len(text) != 2 * len(b):
1✔
768
        return False
1✔
769
    return True
1✔
770

771

772
def is_integer(val: Any) -> bool:
1✔
773
    return isinstance(val, int)
1✔
774

775

776
def is_non_negative_integer(val: Any) -> bool:
1✔
777
    if is_integer(val):
1✔
778
        return val >= 0
1✔
779
    return False
1✔
780

781

782
def is_int_or_float(val: Any) -> bool:
1✔
783
    return isinstance(val, (int, float))
1✔
784

785

786
def is_non_negative_int_or_float(val: Any) -> bool:
1✔
787
    if is_int_or_float(val):
1✔
788
        return val >= 0
1✔
789
    return False
1✔
790

791

792
def chunks(items, size: int):
1✔
793
    """Break up items, an iterable, into chunks of length size."""
794
    if size < 1:
1✔
795
        raise ValueError(f"size must be positive, not {repr(size)}")
1✔
796
    for i in range(0, len(items), size):
1✔
797
        yield items[i: i + size]
1✔
798

799

800
def format_satoshis_plain(
1✔
801
        x: Union[int, float, Decimal, str],  # amount in satoshis,
802
        *,
803
        decimal_point: int = 8,  # how much to shift decimal point to left (default: sat->BTC)
804
        is_max_allowed: bool = True,
805
) -> str:
806
    """Display a satoshi amount scaled.  Always uses a '.' as a decimal
807
    point and has no thousands separator"""
808
    if is_max_allowed and parse_max_spend(x):
1✔
809
        return f'max({x})'
×
810
    assert isinstance(x, (int, float, Decimal)), f"{x!r} should be a number"
1✔
811
    # TODO(ghost43) just hard-fail if x is a float. do we even use floats for money anywhere?
812
    x = to_decimal(x)
1✔
813
    scale_factor = pow(10, decimal_point)
1✔
814
    return "{:.8f}".format(x / scale_factor).rstrip('0').rstrip('.')
1✔
815

816

817
# Check that Decimal precision is sufficient.
818
# We need at the very least ~20, as we deal with msat amounts, and
819
# log10(21_000_000 * 10**8 * 1000) ~= 18.3
820
# decimal.DefaultContext.prec == 28 by default, but it is mutable.
821
# We enforce that we have at least that available.
822
assert decimal.getcontext().prec >= 28, f"PyDecimal precision too low: {decimal.getcontext().prec}"
1✔
823

824
# DECIMAL_POINT = locale.localeconv()['decimal_point']  # type: str
825
DECIMAL_POINT = "."
1✔
826
THOUSANDS_SEP = " "
1✔
827
assert len(DECIMAL_POINT) == 1, f"DECIMAL_POINT has unexpected len. {DECIMAL_POINT!r}"
1✔
828
assert len(THOUSANDS_SEP) == 1, f"THOUSANDS_SEP has unexpected len. {THOUSANDS_SEP!r}"
1✔
829

830

831
def format_satoshis(
1✔
832
        x: Union[int, float, Decimal, str, None],  # amount in satoshis
833
        *,
834
        num_zeros: int = 0,
835
        decimal_point: int = 8,  # how much to shift decimal point to left (default: sat->BTC)
836
        precision: int = 0,  # extra digits after satoshi precision
837
        is_diff: bool = False,  # if True, enforce a leading sign (+/-)
838
        whitespaces: bool = False,  # if True, add whitespaces, to align numbers in a column
839
        add_thousands_sep: bool = False,  # if True, add whitespaces, for better readability of the numbers
840
) -> str:
841
    if x is None:
1✔
842
        return 'unknown'
×
843
    if parse_max_spend(x):
1✔
844
        return f'max({x})'
×
845
    assert isinstance(x, (int, float, Decimal)), f"{x!r} should be a number"
1✔
846
    # TODO(ghost43) just hard-fail if x is a float. do we even use floats for money anywhere?
847
    x = to_decimal(x)
1✔
848
    # lose redundant precision
849
    x = x.quantize(Decimal(10) ** (-precision))
1✔
850
    # format string
851
    overall_precision = decimal_point + precision  # max digits after final decimal point
1✔
852
    decimal_format = "." + str(overall_precision) if overall_precision > 0 else ""
1✔
853
    if is_diff:
1✔
854
        decimal_format = '+' + decimal_format
1✔
855
    # initial result
856
    scale_factor = pow(10, decimal_point)
1✔
857
    result = ("{:" + decimal_format + "f}").format(x / scale_factor)
1✔
858
    if "." not in result: result += "."
1✔
859
    result = result.rstrip('0')
1✔
860
    # add extra decimal places (zeros)
861
    integer_part, fract_part = result.split(".")
1✔
862
    if len(fract_part) < num_zeros:
1✔
863
        fract_part += "0" * (num_zeros - len(fract_part))
1✔
864
    # add whitespaces as thousands' separator for better readability of numbers
865
    if add_thousands_sep:
1✔
866
        sign = integer_part[0] if integer_part[0] in ("+", "-") else ""
1✔
867
        if sign == "-":
1✔
868
            integer_part = integer_part[1:]
1✔
869
        integer_part = "{:,}".format(int(integer_part)).replace(',', THOUSANDS_SEP)
1✔
870
        integer_part = sign + integer_part
1✔
871
        fract_part = THOUSANDS_SEP.join(fract_part[i:i+3] for i in range(0, len(fract_part), 3))
1✔
872
    result = integer_part + DECIMAL_POINT + fract_part
1✔
873
    # add leading/trailing whitespaces so that numbers can be aligned in a column
874
    if whitespaces:
1✔
875
        target_fract_len = overall_precision
1✔
876
        target_integer_len = 14 - decimal_point  # should be enough for up to unsigned 999999 BTC
1✔
877
        if add_thousands_sep:
1✔
878
            target_fract_len += max(0, (target_fract_len - 1) // 3)
1✔
879
            target_integer_len += max(0, (target_integer_len - 1) // 3)
1✔
880
        # add trailing whitespaces
881
        result += " " * (target_fract_len - len(fract_part))
1✔
882
        # add leading whitespaces
883
        target_total_len = target_integer_len + 1 + target_fract_len
1✔
884
        result = " " * (target_total_len - len(result)) + result
1✔
885
    return result
1✔
886

887

888
FEERATE_PRECISION = 1  # num fractional decimal places for sat/byte fee rates
1✔
889
_feerate_quanta = Decimal(10) ** (-FEERATE_PRECISION)
1✔
890
UI_UNIT_NAME_FEERATE_SAT_PER_VBYTE = "sat/vbyte"
1✔
891
UI_UNIT_NAME_FEERATE_SAT_PER_VB = "sat/vB"
1✔
892
UI_UNIT_NAME_TXSIZE_VBYTES = "vbytes"
1✔
893
UI_UNIT_NAME_MEMPOOL_MB = "vMB"
1✔
894
UI_UNIT_NAME_FIXED_SAT = "sat"
1✔
895

896

897
def format_fee_satoshis(fee, *, num_zeros=0, precision=None):
1✔
898
    if precision is None:
1✔
899
        precision = FEERATE_PRECISION
1✔
900
    num_zeros = min(num_zeros, FEERATE_PRECISION)  # no more zeroes than available prec
1✔
901
    return format_satoshis(fee, num_zeros=num_zeros, decimal_point=0, precision=precision)
1✔
902

903

904
def quantize_feerate(fee) -> Union[None, Decimal, int]:
1✔
905
    """Strip sat/byte fee rate of excess precision."""
906
    if fee is None:
1✔
907
        return None
×
908
    return Decimal(fee).quantize(_feerate_quanta, rounding=decimal.ROUND_HALF_DOWN)
1✔
909

910

911
DEFAULT_TIMEZONE = None  # type: timezone | None  # None means local OS timezone
1✔
912
def timestamp_to_datetime(timestamp: Union[int, float, None], *, utc: bool = False) -> Optional[datetime]:
1✔
913
    if timestamp is None:
1✔
914
        return None
×
915
    tz = DEFAULT_TIMEZONE
1✔
916
    if utc:
1✔
917
        tz = timezone.utc
×
918
    return datetime.fromtimestamp(timestamp, tz=tz)
1✔
919

920

921
def format_time(timestamp: Union[int, float, None]) -> str:
1✔
922
    date = timestamp_to_datetime(timestamp)
×
923
    return date.isoformat(' ', timespec="minutes") if date else _("Unknown")
×
924

925

926
def age(
1✔
927
    from_date: Union[int, float, None],  # POSIX timestamp
928
    *,
929
    since_date: datetime = None,
930
    target_tz=None,
931
    include_seconds: bool = False,
932
) -> str:
933
    """Takes a timestamp and returns a string with the approximation of the age"""
934
    if from_date is None:
1✔
935
        return _("Unknown")
1✔
936
    from_date = datetime.fromtimestamp(from_date)
1✔
937
    if since_date is None:
1✔
938
        since_date = datetime.now(target_tz)
×
939
    distance_in_time = from_date - since_date
1✔
940
    is_in_past = from_date < since_date
1✔
941
    s = delta_time_str(distance_in_time, include_seconds=include_seconds)
1✔
942
    return _("{} ago").format(s) if is_in_past else _("in {}").format(s)
1✔
943

944

945
def delta_time_str(distance_in_time: timedelta, *, include_seconds: bool = False) -> str:
1✔
946
    distance_in_seconds = int(round(abs(distance_in_time.days * 86400 + distance_in_time.seconds)))
1✔
947
    distance_in_minutes = int(round(distance_in_seconds / 60))
1✔
948
    if distance_in_minutes == 0:
1✔
949
        if include_seconds:
1✔
950
            return _("{} seconds").format(distance_in_seconds)
1✔
951
        else:
952
            return _("less than a minute")
1✔
953
    elif distance_in_minutes < 45:
1✔
954
        return _("about {} minutes").format(distance_in_minutes)
1✔
955
    elif distance_in_minutes < 90:
1✔
956
        return _("about 1 hour")
1✔
957
    elif distance_in_minutes < 1440:
1✔
958
        return _("about {} hours").format(round(distance_in_minutes / 60.0))
1✔
959
    elif distance_in_minutes < 2880:
1✔
960
        return _("about 1 day")
1✔
961
    elif distance_in_minutes < 43220:
1✔
962
        return _("about {} days").format(round(distance_in_minutes / 1440))
1✔
963
    elif distance_in_minutes < 86400:
1✔
964
        return _("about 1 month")
1✔
965
    elif distance_in_minutes < 525600:
1✔
966
        return _("about {} months").format(round(distance_in_minutes / 43200))
1✔
967
    elif distance_in_minutes < 1051200:
1✔
968
        return _("about 1 year")
1✔
969
    else:
970
        return _("over {} years").format(round(distance_in_minutes / 525600))
1✔
971

972

973
mainnet_block_explorers = {
1✔
974
    '3xpl.com': ('https://3xpl.com/bitcoin/',
975
                        {'tx': 'transaction/', 'addr': 'address/'}),
976
    'Bitflyer.jp': ('https://chainflyer.bitflyer.jp/',
977
                        {'tx': 'Transaction/', 'addr': 'Address/'}),
978
    'Blockchain.info': ('https://blockchain.com/btc/',
979
                        {'tx': 'tx/', 'addr': 'address/'}),
980
    'Blockstream.info': ('https://blockstream.info/',
981
                        {'tx': 'tx/', 'addr': 'address/'}),
982
    'Bitaps.com': ('https://btc.bitaps.com/',
983
                        {'tx': '', 'addr': ''}),
984
    'BTC.com': ('https://btc.com/',
985
                        {'tx': '', 'addr': ''}),
986
    'Chain.so': ('https://www.chain.so/',
987
                        {'tx': 'tx/BTC/', 'addr': 'address/BTC/'}),
988
    'Insight.is': ('https://insight.bitpay.com/',
989
                        {'tx': 'tx/', 'addr': 'address/'}),
990
    'BlockCypher.com': ('https://live.blockcypher.com/btc/',
991
                        {'tx': 'tx/', 'addr': 'address/'}),
992
    'Blockchair.com': ('https://blockchair.com/bitcoin/',
993
                        {'tx': 'transaction/', 'addr': 'address/'}),
994
    'blockonomics.co': ('https://www.blockonomics.co/',
995
                        {'tx': 'api/tx?txid=', 'addr': '#/search?q='}),
996
    'mempool.space': ('https://mempool.space/',
997
                        {'tx': 'tx/', 'addr': 'address/'}),
998
    'mempool.emzy.de': ('https://mempool.emzy.de/',
999
                        {'tx': 'tx/', 'addr': 'address/'}),
1000
    'OXT.me': ('https://oxt.me/',
1001
                        {'tx': 'transaction/', 'addr': 'address/'}),
1002
    'mynode.local': ('http://mynode.local:3002/',
1003
                        {'tx': 'tx/', 'addr': 'address/'}),
1004
    'system default': ('blockchain:/',
1005
                        {'tx': 'tx/', 'addr': 'address/'}),
1006
}
1007

1008
testnet_block_explorers = {
1✔
1009
    'Bitaps.com': ('https://tbtc.bitaps.com/',
1010
                       {'tx': '', 'addr': ''}),
1011
    'BlockCypher.com': ('https://live.blockcypher.com/btc-testnet/',
1012
                       {'tx': 'tx/', 'addr': 'address/'}),
1013
    'Blockchain.info': ('https://www.blockchain.com/btc-testnet/',
1014
                       {'tx': 'tx/', 'addr': 'address/'}),
1015
    'Blockstream.info': ('https://blockstream.info/testnet/',
1016
                        {'tx': 'tx/', 'addr': 'address/'}),
1017
    'mempool.space': ('https://mempool.space/testnet/',
1018
                        {'tx': 'tx/', 'addr': 'address/'}),
1019
    'smartbit.com.au': ('https://testnet.smartbit.com.au/',
1020
                       {'tx': 'tx/', 'addr': 'address/'}),
1021
    'system default': ('blockchain://000000000933ea01ad0ee984209779baaec3ced90fa3f408719526f8d77f4943/',
1022
                       {'tx': 'tx/', 'addr': 'address/'}),
1023
}
1024

1025
testnet4_block_explorers = {
1✔
1026
    'mempool.space': ('https://mempool.space/testnet4/',
1027
                        {'tx': 'tx/', 'addr': 'address/'}),
1028
    'wakiyamap.dev': ('https://testnet4-explorer.wakiyamap.dev/',
1029
                       {'tx': 'tx/', 'addr': 'address/'}),
1030
}
1031

1032
signet_block_explorers = {
1✔
1033
    'bc-2.jp': ('https://explorer.bc-2.jp/',
1034
                        {'tx': 'tx/', 'addr': 'address/'}),
1035
    'mempool.space': ('https://mempool.space/signet/',
1036
                        {'tx': 'tx/', 'addr': 'address/'}),
1037
    'bitcoinexplorer.org': ('https://signet.bitcoinexplorer.org/',
1038
                       {'tx': 'tx/', 'addr': 'address/'}),
1039
    'wakiyamap.dev': ('https://signet-explorer.wakiyamap.dev/',
1040
                       {'tx': 'tx/', 'addr': 'address/'}),
1041
    'ex.signet.bublina.eu.org': ('https://ex.signet.bublina.eu.org/',
1042
                       {'tx': 'tx/', 'addr': 'address/'}),
1043
    'system default': ('blockchain:/',
1044
                       {'tx': 'tx/', 'addr': 'address/'}),
1045
}
1046

1047
_block_explorer_default_api_loc = {'tx': 'tx/', 'addr': 'address/'}
1✔
1048

1049

1050
def block_explorer_info():
1✔
1051
    from . import constants
×
1052
    if constants.net.NET_NAME == "testnet":
×
1053
        return testnet_block_explorers
×
1054
    elif constants.net.NET_NAME == "testnet4":
×
1055
        return testnet4_block_explorers
×
1056
    elif constants.net.NET_NAME == "signet":
×
1057
        return signet_block_explorers
×
1058
    return mainnet_block_explorers
×
1059

1060

1061
def block_explorer(config: 'SimpleConfig') -> Optional[str]:
1✔
1062
    """Returns name of selected block explorer,
1063
    or None if a custom one (not among hardcoded ones) is configured.
1064
    """
1065
    if config.BLOCK_EXPLORER_CUSTOM is not None:
×
1066
        return None
×
1067
    be_key = config.BLOCK_EXPLORER
×
1068
    be_tuple = block_explorer_info().get(be_key)
×
1069
    if be_tuple is None:
×
1070
        be_key = config.cv.BLOCK_EXPLORER.get_default_value()
×
1071
    assert isinstance(be_key, str), f"{be_key!r} should be str"
×
1072
    return be_key
×
1073

1074

1075
def block_explorer_tuple(config: 'SimpleConfig') -> Optional[Tuple[str, dict]]:
1✔
1076
    custom_be = config.BLOCK_EXPLORER_CUSTOM
×
1077
    if custom_be:
×
1078
        if isinstance(custom_be, str):
×
1079
            return custom_be, _block_explorer_default_api_loc
×
1080
        if isinstance(custom_be, (tuple, list)) and len(custom_be) == 2:
×
1081
            return tuple(custom_be)
×
1082
        _logger.warning(f"not using {config.cv.BLOCK_EXPLORER_CUSTOM.key()!r} from config. "
×
1083
                        f"expected a str or a pair but got {custom_be!r}")
1084
        return None
×
1085
    else:
1086
        # using one of the hardcoded block explorers
1087
        return block_explorer_info().get(block_explorer(config))
×
1088

1089

1090
def block_explorer_URL(config: 'SimpleConfig', kind: str, item: str) -> Optional[str]:
1✔
1091
    be_tuple = block_explorer_tuple(config)
×
1092
    if not be_tuple:
×
1093
        return
×
1094
    explorer_url, explorer_dict = be_tuple
×
1095
    kind_str = explorer_dict.get(kind)
×
1096
    if kind_str is None:
×
1097
        return
×
1098
    if explorer_url[-1] != "/":
×
1099
        explorer_url += "/"
×
1100
    url_parts = [explorer_url, kind_str, item]
×
1101
    return ''.join(url_parts)
×
1102

1103

1104
# Python bug (http://bugs.python.org/issue1927) causes raw_input
1105
# to be redirected improperly between stdin/stderr on Unix systems
1106
#TODO: py3
1107
def raw_input(prompt=None):
1✔
1108
    if prompt:
×
1109
        sys.stdout.write(prompt)
×
1110
    return builtin_raw_input()
×
1111

1112

1113
builtin_raw_input = builtins.input
1✔
1114
builtins.input = raw_input
1✔
1115

1116

1117
def parse_json(message):
1✔
1118
    # TODO: check \r\n pattern
1119
    n = message.find(b'\n')
×
1120
    if n == -1:
×
1121
        return None, message
×
1122
    try:
×
1123
        j = json.loads(message[0:n].decode('utf8'))
×
1124
    except Exception:
×
1125
        j = None
×
1126
    return j, message[n+1:]
×
1127

1128

1129
def setup_thread_excepthook():
1✔
1130
    """
1131
    Workaround for `sys.excepthook` thread bug from:
1132
    http://bugs.python.org/issue1230540
1133

1134
    Call once from the main thread before creating any threads.
1135
    """
1136

1137
    init_original = threading.Thread.__init__
×
1138

1139
    def init(self, *args, **kwargs):
×
1140

1141
        init_original(self, *args, **kwargs)
×
1142
        run_original = self.run
×
1143

1144
        def run_with_except_hook(*args2, **kwargs2):
×
1145
            try:
×
1146
                run_original(*args2, **kwargs2)
×
1147
            except Exception:
×
1148
                sys.excepthook(*sys.exc_info())
×
1149

1150
        self.run = run_with_except_hook
×
1151

1152
    threading.Thread.__init__ = init
×
1153

1154

1155
def send_exception_to_crash_reporter(e: BaseException):
1✔
1156
    from .base_crash_reporter import send_exception_to_crash_reporter
×
1157
    send_exception_to_crash_reporter(e)
×
1158

1159

1160
def versiontuple(v):
1✔
1161
    return tuple(map(int, (v.split("."))))
1✔
1162

1163

1164
def read_json_file(path):
1✔
1165
    try:
1✔
1166
        with open(path, 'r', encoding='utf-8') as f:
1✔
1167
            data = json.loads(f.read())
1✔
1168
    except json.JSONDecodeError:
×
1169
        _logger.exception('')
×
1170
        raise FileImportFailed(_("Invalid JSON code."))
×
1171
    except BaseException as e:
×
1172
        _logger.exception('')
×
1173
        raise FileImportFailed(e)
×
1174
    return data
1✔
1175

1176

1177
def write_json_file(path, data):
1✔
1178
    try:
×
1179
        with open(path, 'w+', encoding='utf-8') as f:
×
1180
            json.dump(data, f, indent=4, sort_keys=True, cls=MyEncoder)
×
1181
    except (IOError, os.error) as e:
×
1182
        _logger.exception('')
×
1183
        raise FileExportFailed(e)
×
1184

1185

1186
def os_chmod(path, mode):
1✔
1187
    """os.chmod aware of tmpfs"""
1188
    try:
1✔
1189
        os.chmod(path, mode)
1✔
1190
    except OSError as e:
×
1191
        xdg_runtime_dir = os.environ.get("XDG_RUNTIME_DIR", None)
×
1192
        if xdg_runtime_dir and is_subpath(path, xdg_runtime_dir):
×
1193
            _logger.info(f"Tried to chmod in tmpfs. Skipping... {e!r}")
×
1194
        else:
1195
            raise
×
1196

1197

1198
def make_dir(path, *, allow_symlink=True):
1✔
1199
    """Makes directory if it does not yet exist.
1200
    Also sets sane 0700 permissions on the dir.
1201
    """
1202
    if not os.path.exists(path):
1✔
1203
        if not allow_symlink and os.path.islink(path):
1✔
1204
            raise Exception('Dangling link: ' + path)
×
1205
        try:
1✔
1206
            os.mkdir(path)
1✔
1207
        except FileExistsError:
×
1208
            # this can happen in a multiprocess race, e.g. when an electrum daemon
1209
            # and an electrum cli command are launched in rapid fire
1210
            pass
×
1211
        os_chmod(path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
1✔
1212
        assert os.path.exists(path)
1✔
1213

1214

1215
def is_subpath(long_path: str, short_path: str) -> bool:
1✔
1216
    """Returns whether long_path is a sub-path of short_path."""
1217
    try:
1✔
1218
        common = os.path.commonpath([long_path, short_path])
1✔
1219
    except ValueError:
1✔
1220
        return False
1✔
1221
    short_path = standardize_path(short_path)
1✔
1222
    common     = standardize_path(common)
1✔
1223
    return short_path == common
1✔
1224

1225

1226
def log_exceptions(func):
1✔
1227
    """Decorator to log AND re-raise exceptions."""
1228
    assert inspect.iscoroutinefunction(func), 'func needs to be a coroutine'
1✔
1229

1230
    @functools.wraps(func)
1✔
1231
    async def wrapper(*args, **kwargs):
1✔
1232
        self = args[0] if len(args) > 0 else None
1✔
1233
        try:
1✔
1234
            return await func(*args, **kwargs)
1✔
1235
        except asyncio.CancelledError as e:
1✔
1236
            raise
1✔
1237
        except BaseException as e:
1✔
1238
            mylogger = self.logger if hasattr(self, 'logger') else _logger
1✔
1239
            try:
1✔
1240
                mylogger.exception(f"Exception in {func.__name__}: {repr(e)}")
1✔
1241
            except BaseException as e2:
×
1242
                print(f"logging exception raised: {repr(e2)}... orig exc: {repr(e)} in {func.__name__}")
×
1243
            raise
1✔
1244
    return wrapper
1✔
1245

1246

1247
def ignore_exceptions(func):
1✔
1248
    """Decorator to silently swallow all exceptions."""
1249
    assert inspect.iscoroutinefunction(func), 'func needs to be a coroutine'
1✔
1250

1251
    @functools.wraps(func)
1✔
1252
    async def wrapper(*args, **kwargs):
1✔
1253
        try:
1✔
1254
            return await func(*args, **kwargs)
1✔
1255
        except Exception as e:
1✔
1256
            pass
×
1257
    return wrapper
1✔
1258

1259

1260
def with_lock(func):
1✔
1261
    """Decorator to enforce a lock on a function call."""
1262
    @functools.wraps(func)
1✔
1263
    def func_wrapper(self, *args, **kwargs):
1✔
1264
        with self.lock:
1✔
1265
            return func(self, *args, **kwargs)
1✔
1266
    return func_wrapper
1✔
1267

1268

1269
@dataclass(frozen=True, kw_only=True)
1✔
1270
class TxMinedInfo:
1✔
1271
    _height: int                       # height of block that mined tx
1✔
1272
    conf: Optional[int] = None         # number of confirmations, SPV verified. >=0, or None (None means unknown)
1✔
1273
    timestamp: Optional[int] = None    # timestamp of block that mined tx
1✔
1274
    txpos: Optional[int] = None        # position of tx in serialized block
1✔
1275
    header_hash: Optional[str] = None  # hash of block that mined tx
1✔
1276
    wanted_height: Optional[int] = None  # in case of timelock, min abs block height
1✔
1277

1278
    def height(self) -> int:
1✔
1279
        """Treat unverified heights as unconfirmed."""
1280
        h = self._height
1✔
1281
        if h > 0:
1✔
1282
            if self.conf is not None and self.conf >= 1:
1✔
1283
                return h
1✔
1284
            return 0  # treat it as unconfirmed until SPV-ed
1✔
1285
        else:  # h <= 0
1286
            return h
1✔
1287

1288
    def short_id(self) -> Optional[str]:
1✔
1289
        if self.txpos is not None and self.txpos >= 0:
×
1290
            assert self.height() > 0
×
1291
            return f"{self.height()}x{self.txpos}"
×
1292
        return None
×
1293

1294
    def is_local_like(self) -> bool:
1✔
1295
        """Returns whether the tx is local-like (LOCAL/FUTURE)."""
1296
        from .address_synchronizer import TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT
×
1297
        if self.height() > 0:
×
1298
            return False
×
1299
        if self.height() in (TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT):
×
1300
            return False
×
1301
        return True
×
1302

1303

1304
class ShortID(bytes):
1✔
1305

1306
    def __repr__(self):
1✔
1307
        return f"<ShortID: {format_short_id(self)}>"
1✔
1308

1309
    def __str__(self):
1✔
1310
        return format_short_id(self)
1✔
1311

1312
    @classmethod
1✔
1313
    def from_components(cls, block_height: int, tx_pos_in_block: int, output_index: int) -> 'ShortID':
1✔
1314
        bh = block_height.to_bytes(3, byteorder='big')
1✔
1315
        tpos = tx_pos_in_block.to_bytes(3, byteorder='big')
1✔
1316
        oi = output_index.to_bytes(2, byteorder='big')
1✔
1317
        return ShortID(bh + tpos + oi)
1✔
1318

1319
    @classmethod
1✔
1320
    def from_str(cls, scid: str) -> 'ShortID':
1✔
1321
        """Parses a formatted scid str, e.g. '643920x356x0'."""
1322
        components = scid.split("x")
1✔
1323
        if len(components) != 3:
1✔
1324
            raise ValueError(f"failed to parse ShortID: {scid!r}")
×
1325
        try:
1✔
1326
            components = [int(x) for x in components]
1✔
1327
        except ValueError:
×
1328
            raise ValueError(f"failed to parse ShortID: {scid!r}") from None
×
1329
        return ShortID.from_components(*components)
1✔
1330

1331
    @classmethod
1✔
1332
    def normalize(cls, data: Union[None, str, bytes, 'ShortID']) -> Optional['ShortID']:
1✔
1333
        if isinstance(data, ShortID) or data is None:
1✔
1334
            return data
1✔
1335
        if isinstance(data, str):
1✔
1336
            assert len(data) == 16
1✔
1337
            return ShortID.fromhex(data)
1✔
1338
        if isinstance(data, (bytes, bytearray)):
1✔
1339
            assert len(data) == 8
1✔
1340
            return ShortID(data)
1✔
1341

1342
    @property
1✔
1343
    def block_height(self) -> int:
1✔
1344
        return int.from_bytes(self[:3], byteorder='big')
1✔
1345

1346
    @property
1✔
1347
    def txpos(self) -> int:
1✔
1348
        return int.from_bytes(self[3:6], byteorder='big')
1✔
1349

1350
    @property
1✔
1351
    def output_index(self) -> int:
1✔
1352
        return int.from_bytes(self[6:8], byteorder='big')
1✔
1353

1354

1355
def format_short_id(short_channel_id: Optional[bytes]):
1✔
1356
    if not short_channel_id:
1✔
1357
        return _('Not yet available')
×
1358
    return str(int.from_bytes(short_channel_id[:3], 'big')) \
1✔
1359
        + 'x' + str(int.from_bytes(short_channel_id[3:6], 'big')) \
1360
        + 'x' + str(int.from_bytes(short_channel_id[6:], 'big'))
1361

1362

1363
def make_aiohttp_proxy_connector(proxy: 'ProxySettings', ssl_context: Optional[ssl.SSLContext] = None) -> ProxyConnector:
1✔
1364
    return ProxyConnector(
×
1365
        proxy_type=ProxyType.SOCKS5 if proxy.mode == 'socks5' else ProxyType.SOCKS4,
1366
        host=proxy.host,
1367
        port=int(proxy.port),
1368
        username=proxy.user,
1369
        password=proxy.password,
1370
        rdns=True,  # needed to prevent DNS leaks over proxy
1371
        ssl=ssl_context,
1372
    )
1373

1374

1375
def make_aiohttp_session(proxy: Optional['ProxySettings'], headers=None, timeout=None):
1✔
1376
    if headers is None:
×
1377
        headers = {'User-Agent': 'Electrum'}
×
1378
    if timeout is None:
×
1379
        # The default timeout is high intentionally.
1380
        # DNS on some systems can be really slow, see e.g. #5337
1381
        timeout = aiohttp.ClientTimeout(total=45)
×
1382
    elif isinstance(timeout, (int, float)):
×
1383
        timeout = aiohttp.ClientTimeout(total=timeout)
×
1384
    ssl_context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_path)
×
1385

1386
    if proxy and proxy.enabled:
×
1387
        connector = make_aiohttp_proxy_connector(proxy, ssl_context)
×
1388
    else:
1389
        connector = aiohttp.TCPConnector(ssl=ssl_context)
×
1390

1391
    return aiohttp.ClientSession(headers=headers, timeout=timeout, connector=connector)
×
1392

1393

1394
class OldTaskGroup(aiorpcx.TaskGroup):
1✔
1395
    """Automatically raises exceptions on join; as in aiorpcx prior to version 0.20.
1396
    That is, when using TaskGroup as a context manager, if any task encounters an exception,
1397
    we would like that exception to be re-raised (propagated out). For the wait=all case,
1398
    the OldTaskGroup class is emulating the following code-snippet:
1399
    ```
1400
    async with TaskGroup() as group:
1401
        await group.spawn(task1())
1402
        await group.spawn(task2())
1403

1404
        async for task in group:
1405
            if not task.cancelled():
1406
                task.result()
1407
    ```
1408
    So instead of the above, one can just write:
1409
    ```
1410
    async with OldTaskGroup() as group:
1411
        await group.spawn(task1())
1412
        await group.spawn(task2())
1413
    ```
1414
    # TODO see if we can migrate to asyncio.timeout, introduced in python 3.11, and use stdlib instead of aiorpcx.curio...
1415
    """
1416
    async def join(self):
1✔
1417
        if self._wait is all:
1✔
1418
            exc = False
1✔
1419
            try:
1✔
1420
                async for task in self:
1✔
1421
                    if not task.cancelled():
1✔
1422
                        task.result()
1✔
1423
            except BaseException:  # including asyncio.CancelledError
1✔
1424
                exc = True
1✔
1425
                raise
1✔
1426
            finally:
1427
                if exc:
1✔
1428
                    await self.cancel_remaining()
1✔
1429
                await super().join()
1✔
1430
        else:
1431
            await super().join()
1✔
1432
            if self.completed:
1✔
1433
                self.completed.result()
1✔
1434

1435

1436
# We monkey-patch aiorpcx TimeoutAfter (used by timeout_after and ignore_after API),
1437
# to fix a timing issue present in asyncio as a whole re timing out tasks.
1438
# To see the issue we are trying to fix, consider example:
1439
#     async def outer_task():
1440
#         async with timeout_after(0.1):
1441
#             await inner_task()
1442
# When the 0.1 sec timeout expires, inner_task will get cancelled by timeout_after (=internal cancellation).
1443
# If around the same time (in terms of event loop iterations) another coroutine
1444
# cancels outer_task (=external cancellation), there will be a race.
1445
# Both cancellations work by propagating a CancelledError out to timeout_after, which then
1446
# needs to decide (in TimeoutAfter.__aexit__) whether it's due to an internal or external cancellation.
1447
# AFAICT asyncio provides no reliable way of distinguishing between the two.
1448
# This patch tries to always give priority to external cancellations.
1449
# see https://github.com/kyuupichan/aiorpcX/issues/44
1450
# see https://github.com/aio-libs/async-timeout/issues/229
1451
# see https://bugs.python.org/issue42130 and https://bugs.python.org/issue45098
1452
# TODO see if we can migrate to asyncio.timeout, introduced in python 3.11, and use stdlib instead of aiorpcx.curio...
1453
def _aiorpcx_monkeypatched_set_new_deadline(task, deadline):
1✔
1454
    def timeout_task():
1✔
1455
        task._orig_cancel()
1✔
1456
        task._timed_out = None if getattr(task, "_externally_cancelled", False) else deadline
1✔
1457

1458
    def mycancel(*args, **kwargs):
1✔
1459
        task._orig_cancel(*args, **kwargs)
1✔
1460
        task._externally_cancelled = True
1✔
1461
        task._timed_out = None
1✔
1462

1463
    if not hasattr(task, "_orig_cancel"):
1✔
1464
        task._orig_cancel = task.cancel
1✔
1465
        task.cancel = mycancel
1✔
1466
    task._deadline_handle = task._loop.call_at(deadline, timeout_task)
1✔
1467

1468

1469
def _aiorpcx_monkeypatched_set_task_deadline(task, deadline):
1✔
1470
    ret = _aiorpcx_orig_set_task_deadline(task, deadline)
1✔
1471
    task._externally_cancelled = None
1✔
1472
    return ret
1✔
1473

1474

1475
def _aiorpcx_monkeypatched_unset_task_deadline(task):
1✔
1476
    if hasattr(task, "_orig_cancel"):
1✔
1477
        task.cancel = task._orig_cancel
1✔
1478
        del task._orig_cancel
1✔
1479
    return _aiorpcx_orig_unset_task_deadline(task)
1✔
1480

1481

1482
_aiorpcx_orig_set_task_deadline    = aiorpcx.curio._set_task_deadline
1✔
1483
_aiorpcx_orig_unset_task_deadline  = aiorpcx.curio._unset_task_deadline
1✔
1484

1485
aiorpcx.curio._set_new_deadline    = _aiorpcx_monkeypatched_set_new_deadline
1✔
1486
aiorpcx.curio._set_task_deadline   = _aiorpcx_monkeypatched_set_task_deadline
1✔
1487
aiorpcx.curio._unset_task_deadline = _aiorpcx_monkeypatched_unset_task_deadline
1✔
1488

1489

1490
async def wait_for2(fut: Awaitable, timeout: Union[int, float, None]):
1✔
1491
    """Replacement for asyncio.wait_for,
1492
     due to bugs: https://bugs.python.org/issue42130 and https://github.com/python/cpython/issues/86296 ,
1493
     which are only fixed in python 3.12+.
1494
     """
1495
    if sys.version_info[:3] >= (3, 12):
1✔
1496
        return await asyncio.wait_for(fut, timeout)
×
1497
    else:
1498
        async with async_timeout(timeout):
1✔
1499
            return await asyncio.ensure_future(fut, loop=get_running_loop())
1✔
1500

1501

1502
if hasattr(asyncio, 'timeout'):  # python 3.11+
1✔
1503
    async_timeout = asyncio.timeout
×
1504
else:
1505
    class TimeoutAfterAsynciolike(aiorpcx.curio.TimeoutAfter):
1✔
1506
        async def __aexit__(self, exc_type, exc_value, tb):
1✔
1507
            try:
1✔
1508
                await super().__aexit__(exc_type, exc_value, tb)
1✔
1509
            except (aiorpcx.TaskTimeout, aiorpcx.UncaughtTimeoutError):
1✔
1510
                raise asyncio.TimeoutError from None
1✔
1511
            except aiorpcx.TimeoutCancellationError:
×
1512
                raise asyncio.CancelledError from None
×
1513

1514
    def async_timeout(delay: Union[int, float, None]):
1✔
1515
        if delay is None:
1✔
1516
            return nullcontext()
1✔
1517
        return TimeoutAfterAsynciolike(delay)
1✔
1518

1519

1520
class NetworkJobOnDefaultServer(Logger, ABC):
1✔
1521
    """An abstract base class for a job that runs on the main network
1522
    interface. Every time the main interface changes, the job is
1523
    restarted, and some of its internals are reset.
1524
    """
1525
    def __init__(self, network: 'Network'):
1✔
1526
        Logger.__init__(self)
1✔
1527
        self.network = network
1✔
1528
        self.interface = None  # type: Interface
1✔
1529
        self._restart_lock = asyncio.Lock()
1✔
1530
        # Ensure fairness between NetworkJobs. e.g. if multiple wallets
1531
        # are open, a large wallet's Synchronizer should not starve the small wallets:
1532
        self._network_request_semaphore = asyncio.Semaphore(100)
1✔
1533

1534
        self._reset()
1✔
1535
        # every time the main interface changes, restart:
1536
        register_callback(self._restart, ['default_server_changed'])
1✔
1537
        # also schedule a one-off restart now, as there might already be a main interface:
1538
        asyncio.run_coroutine_threadsafe(self._restart(), network.asyncio_loop)
1✔
1539

1540
    def _reset(self):
1✔
1541
        """Initialise fields. Called every time the underlying
1542
        server connection changes.
1543
        """
1544
        self.taskgroup = OldTaskGroup()
1✔
1545
        self.reset_request_counters()
1✔
1546

1547
    async def _start(self, interface: 'Interface'):
1✔
1548
        self.logger.debug(f"starting. interface.server={repr(str(interface.server))}")
1✔
1549
        self.interface = interface
1✔
1550

1551
        taskgroup = self.taskgroup
1✔
1552

1553
        async def run_tasks_wrapper():
1✔
1554
            self.logger.debug(f"starting taskgroup ({hex(id(taskgroup))}).")
1✔
1555
            try:
1✔
1556
                await self._run_tasks(taskgroup=taskgroup)
1✔
1557
            except Exception as e:
1✔
1558
                self.logger.error(f"taskgroup died ({hex(id(taskgroup))}). exc={e!r}")
×
1559
                raise
×
1560
            finally:
1561
                self.logger.debug(f"taskgroup stopped ({hex(id(taskgroup))}).")
1✔
1562
        await interface.taskgroup.spawn(run_tasks_wrapper)
1✔
1563

1564
    @abstractmethod
1✔
1565
    async def _run_tasks(self, *, taskgroup: OldTaskGroup) -> None:
1✔
1566
        """Start tasks in taskgroup. Called every time the underlying
1567
        server connection changes.
1568
        """
1569
        # If self.taskgroup changed, don't start tasks. This can happen if we have
1570
        # been restarted *just now*, i.e. after the _run_tasks coroutine object was created.
1571
        if taskgroup != self.taskgroup:
1✔
1572
            raise asyncio.CancelledError()
×
1573

1574
    async def stop(self, *, full_shutdown: bool = True):
1✔
1575
        self.logger.debug(f"stopping. {full_shutdown=}")
1✔
1576
        if full_shutdown:
1✔
1577
            unregister_callback(self._restart)
×
1578
        await self.taskgroup.cancel_remaining()
1✔
1579

1580
    @log_exceptions
1✔
1581
    async def _restart(self, *args):
1✔
1582
        interface = self.network.interface
1✔
1583
        if interface is None:
1✔
1584
            return  # we should get called again soon
1✔
1585

1586
        async with self._restart_lock:
1✔
1587
            await self.stop(full_shutdown=False)
1✔
1588
            self._reset()
1✔
1589
            await self._start(interface)
1✔
1590

1591
    def reset_request_counters(self):
1✔
1592
        self._requests_sent = 0
1✔
1593
        self._requests_answered = 0
1✔
1594

1595
    def num_requests_sent_and_answered(self) -> Tuple[int, int]:
1✔
1596
        return self._requests_sent, self._requests_answered
×
1597

1598
    @property
1✔
1599
    def session(self):
1✔
1600
        s = self.interface.session
1✔
1601
        assert s is not None
1✔
1602
        return s
1✔
1603

1604

1605
async def detect_tor_socks_proxy() -> Optional[Tuple[str, int]]:
1✔
1606
    # Probable ports for Tor to listen at
1607
    candidates = [
×
1608
        ("127.0.0.1", 9050),
1609
        ("127.0.0.1", 9051),
1610
        ("127.0.0.1", 9150),
1611
    ]
1612

1613
    proxy_addr = None
×
1614

1615
    async def test_net_addr(net_addr):
×
1616
        is_tor = await is_tor_socks_port(*net_addr)
×
1617
        # set result, and cancel remaining probes
1618
        if is_tor:
×
1619
            nonlocal proxy_addr
1620
            proxy_addr = net_addr
×
1621
            await group.cancel_remaining()
×
1622

1623
    async with OldTaskGroup() as group:
×
1624
        for net_addr in candidates:
×
1625
            await group.spawn(test_net_addr(net_addr))
×
1626
    return proxy_addr
×
1627

1628

1629
@log_exceptions
1✔
1630
async def is_tor_socks_port(host: str, port: int) -> bool:
1✔
1631
    # mimic "tor-resolve 0.0.0.0".
1632
    # see https://github.com/spesmilo/electrum/issues/7317#issuecomment-1369281075
1633
    # > this is a socks5 handshake, followed by a socks RESOLVE request as defined in
1634
    # > [tor's socks extension spec](https://github.com/torproject/torspec/blob/7116c9cdaba248aae07a3f1d0e15d9dd102f62c5/socks-extensions.txt#L63),
1635
    # > resolving 0.0.0.0, which being an IP, tor resolves itself without needing to ask a relay.
1636
    writer = None
×
1637
    try:
×
1638
        async with async_timeout(10):
×
1639
            reader, writer = await asyncio.open_connection(host, port)
×
1640
            writer.write(b'\x05\x01\x00\x05\xf0\x00\x03\x070.0.0.0\x00\x00')
×
1641
            await writer.drain()
×
1642
            data = await reader.read(1024)
×
1643
            if data == b'\x05\x00\x05\x00\x00\x01\x00\x00\x00\x00\x00\x00':
×
1644
                return True
×
1645
            return False
×
1646
    except (OSError, asyncio.TimeoutError):
×
1647
        return False
×
1648
    finally:
1649
        if writer:
×
1650
            writer.close()
×
1651

1652

1653
AS_LIB_USER_I_WANT_TO_MANAGE_MY_OWN_ASYNCIO_LOOP = False  # used by unit tests
1✔
1654

1655
_asyncio_event_loop = None  # type: Optional[asyncio.AbstractEventLoop]
1✔
1656

1657

1658
def get_asyncio_loop() -> asyncio.AbstractEventLoop:
1✔
1659
    """Returns the global asyncio event loop we use."""
1660
    if loop := _asyncio_event_loop:
1✔
1661
        return loop
1✔
1662
    if AS_LIB_USER_I_WANT_TO_MANAGE_MY_OWN_ASYNCIO_LOOP:
1✔
1663
        if loop := get_running_loop():
1✔
1664
            return loop
1✔
1665
    raise Exception("event loop not created yet")
×
1666

1667

1668
def create_and_start_event_loop() -> Tuple[asyncio.AbstractEventLoop,
1✔
1669
                                           asyncio.Future,
1670
                                           threading.Thread]:
1671
    global _asyncio_event_loop
1672
    if _asyncio_event_loop is not None:
×
1673
        raise Exception("there is already a running event loop")
×
1674

1675
    # asyncio.get_event_loop() became deprecated in python3.10. (see https://github.com/python/cpython/issues/83710)
1676
    # We set a custom event loop policy purely to be compatible with code that
1677
    # relies on asyncio.get_event_loop().
1678
    # - in python 3.8-3.9, asyncio.Event.__init__, asyncio.Lock.__init__,
1679
    #   and similar, calls get_event_loop. see https://github.com/python/cpython/pull/23420
1680
    class MyEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
×
1681
        def get_event_loop(self):
×
1682
            # In case electrum is being used as a library, there might be other
1683
            # event loops in use besides ours. To minimise interfering with those,
1684
            # if there is a loop running in the current thread, return that:
1685
            running_loop = get_running_loop()
×
1686
            if running_loop is not None:
×
1687
                return running_loop
×
1688
            # Otherwise, return our global loop:
1689
            return get_asyncio_loop()
×
1690
    asyncio.set_event_loop_policy(MyEventLoopPolicy())
×
1691

1692
    loop = asyncio.new_event_loop()
×
1693
    _asyncio_event_loop = loop
×
1694

1695
    def on_exception(loop, context):
×
1696
        """Suppress spurious messages it appears we cannot control."""
1697
        SUPPRESS_MESSAGE_REGEX = re.compile('SSL handshake|Fatal read error on|'
×
1698
                                            'SSL error in data received')
1699
        message = context.get('message')
×
1700
        if message and SUPPRESS_MESSAGE_REGEX.match(message):
×
1701
            return
×
1702
        loop.default_exception_handler(context)
×
1703

1704
    def run_event_loop():
×
1705
        try:
×
1706
            loop.run_until_complete(stopping_fut)
×
1707
        finally:
1708
            # clean-up
1709
            try:
×
1710
                pending_tasks = asyncio.gather(*asyncio.all_tasks(loop), return_exceptions=True)
×
1711
                pending_tasks.cancel()
×
1712
                with suppress(asyncio.CancelledError):
×
1713
                    loop.run_until_complete(pending_tasks)
×
1714
                loop.run_until_complete(loop.shutdown_asyncgens())
×
1715
                if isinstance(loop, asyncio.BaseEventLoop):
×
1716
                    loop.run_until_complete(loop.shutdown_default_executor())
×
1717
            except Exception as e:
×
1718
                _logger.debug(f"exception when cleaning up asyncio event loop: {e}")
×
1719

1720
            global _asyncio_event_loop
1721
            _asyncio_event_loop = None
×
1722
            loop.close()
×
1723

1724
    loop.set_exception_handler(on_exception)
×
1725
    _set_custom_task_factory(loop)
×
1726
    # loop.set_debug(True)
1727
    stopping_fut = loop.create_future()
×
1728
    loop_thread = threading.Thread(
×
1729
        target=run_event_loop,
1730
        name='EventLoop',
1731
    )
1732
    loop_thread.start()
×
1733
    # Wait until the loop actually starts.
1734
    # On a slow PC, or with a debugger attached, this can take a few dozens of ms,
1735
    # and if we returned without a running loop, weird things can happen...
1736
    t0 = time.monotonic()
×
1737
    while not loop.is_running():
×
1738
        time.sleep(0.01)
×
1739
        if time.monotonic() - t0 > 5:
×
1740
            raise Exception("been waiting for 5 seconds but asyncio loop would not start!")
×
1741
    return loop, stopping_fut, loop_thread
×
1742

1743

1744
_running_asyncio_tasks = set()  # type: Set[asyncio.Future]
1✔
1745

1746

1747
def _set_custom_task_factory(loop: asyncio.AbstractEventLoop):
1✔
1748
    """Wrap task creation to track pending and running tasks.
1749
    When tasks are created, asyncio only maintains a weak reference to them.
1750
    Hence, the garbage collector might destroy the task mid-execution.
1751
    To avoid this, we store a strong reference for the task until it completes.
1752

1753
    Without this, a lot of APIs are basically Heisenbug-generators... e.g.:
1754
    - "asyncio.create_task"
1755
    - "loop.create_task"
1756
    - "asyncio.ensure_future"
1757
    - "asyncio.run_coroutine_threadsafe"
1758

1759
    related:
1760
        - https://bugs.python.org/issue44665
1761
        - https://github.com/python/cpython/issues/88831
1762
        - https://github.com/python/cpython/issues/91887
1763
        - https://textual.textualize.io/blog/2023/02/11/the-heisenbug-lurking-in-your-async-code/
1764
        - https://github.com/python/cpython/issues/91887#issuecomment-1434816045
1765
        - "Task was destroyed but it is pending!"
1766
    """
1767

1768
    platform_task_factory = loop.get_task_factory()
1✔
1769

1770
    def factory(loop_, coro, **kwargs):
1✔
1771
        if platform_task_factory is not None:
1✔
1772
            task = platform_task_factory(loop_, coro, **kwargs)
×
1773
        else:
1774
            task = asyncio.Task(coro, loop=loop_, **kwargs)
1✔
1775
        _running_asyncio_tasks.add(task)
1✔
1776
        task.add_done_callback(_running_asyncio_tasks.discard)
1✔
1777
        return task
1✔
1778

1779
    loop.set_task_factory(factory)
1✔
1780

1781

1782
def run_sync_function_on_asyncio_thread(func: Callable[[], Any], *, block: bool) -> None:
1✔
1783
    """Run a non-async fn on the asyncio thread. Can be called from any thread.
1784

1785
    If the current thread is already the asyncio thread, func is guaranteed
1786
    to have been completed when this method returns.
1787

1788
    For any other thread, we only wait for completion if `block` is True.
1789
    """
1790
    assert not inspect.iscoroutinefunction(func), "func must be a non-async function"
1✔
1791
    asyncio_loop = get_asyncio_loop()
1✔
1792
    if get_running_loop() == asyncio_loop:  # we are running on the asyncio thread
1✔
1793
        func()
1✔
1794
    else:  # non-asyncio thread
1795
        async def wrapper():
×
1796
            return func()
×
1797
        fut = asyncio.run_coroutine_threadsafe(wrapper(), loop=asyncio_loop)
×
1798
        if block:
×
1799
            fut.result()
×
1800
        else:
1801
            # add explicit logging of exceptions, otherwise they might get lost
1802
            tb1 = traceback.format_stack()[:-1]
×
1803
            tb1_str = "".join(tb1)
×
1804

1805
            def on_done(fut_: concurrent.futures.Future):
×
1806
                assert fut_.done()
×
1807
                if fut_.cancelled():
×
1808
                    _logger.debug(f"func cancelled. {func=}.")
×
1809
                elif exc := fut_.exception():
×
1810
                    # note: We explicitly log the first part of the traceback, tb1_str.
1811
                    #       The second part gets logged by setting "exc_info".
1812
                    _logger.error(
×
1813
                        f"func errored. {func=}. {exc=}"
1814
                        f"\n{tb1_str}", exc_info=exc)
1815
            fut.add_done_callback(on_done)
×
1816

1817

1818
class OrderedDictWithIndex(OrderedDict):
1✔
1819
    """An OrderedDict that keeps track of the positions of keys.
1820

1821
    Note: very inefficient to modify contents, except to add new items.
1822
    """
1823

1824
    def __init__(self):
1✔
1825
        super().__init__()
1✔
1826
        self._key_to_pos = {}
1✔
1827
        self._pos_to_key = {}
1✔
1828

1829
    def _recalc_index(self):
1✔
1830
        self._key_to_pos = {key: pos for (pos, key) in enumerate(self.keys())}
×
1831
        self._pos_to_key = {pos: key for (pos, key) in enumerate(self.keys())}
×
1832

1833
    def pos_from_key(self, key):
1✔
1834
        return self._key_to_pos[key]
×
1835

1836
    def value_from_pos(self, pos):
1✔
1837
        key = self._pos_to_key[pos]
×
1838
        return self[key]
×
1839

1840
    def popitem(self, *args, **kwargs):
1✔
1841
        ret = super().popitem(*args, **kwargs)
×
1842
        self._recalc_index()
×
1843
        return ret
×
1844

1845
    def move_to_end(self, *args, **kwargs):
1✔
1846
        ret = super().move_to_end(*args, **kwargs)
×
1847
        self._recalc_index()
×
1848
        return ret
×
1849

1850
    def clear(self):
1✔
1851
        ret = super().clear()
×
1852
        self._recalc_index()
×
1853
        return ret
×
1854

1855
    def pop(self, *args, **kwargs):
1✔
1856
        ret = super().pop(*args, **kwargs)
×
1857
        self._recalc_index()
×
1858
        return ret
×
1859

1860
    def update(self, *args, **kwargs):
1✔
1861
        ret = super().update(*args, **kwargs)
×
1862
        self._recalc_index()
×
1863
        return ret
×
1864

1865
    def __delitem__(self, *args, **kwargs):
1✔
1866
        ret = super().__delitem__(*args, **kwargs)
×
1867
        self._recalc_index()
×
1868
        return ret
×
1869

1870
    def __setitem__(self, key, *args, **kwargs):
1✔
1871
        is_new_key = key not in self
1✔
1872
        ret = super().__setitem__(key, *args, **kwargs)
1✔
1873
        if is_new_key:
1✔
1874
            pos = len(self) - 1
1✔
1875
            self._key_to_pos[key] = pos
1✔
1876
            self._pos_to_key[pos] = key
1✔
1877
        return ret
1✔
1878

1879

1880
def make_object_immutable(obj):
1✔
1881
    """Makes the passed object immutable recursively."""
1882
    allowed_types = (
1✔
1883
        dict, MappingProxyType, list, tuple, set, frozenset, str, int, float, bool, bytes, type(None)
1884
    )
1885
    assert isinstance(obj, allowed_types), f"{type(obj)=} cannot be made immutable"
1✔
1886
    if isinstance(obj, (dict, MappingProxyType)):
1✔
1887
        return MappingProxyType({k: make_object_immutable(v) for k, v in obj.items()})
1✔
1888
    elif isinstance(obj, (list, tuple)):
1✔
1889
        return tuple(make_object_immutable(item) for item in obj)
1✔
1890
    elif isinstance(obj, (set, frozenset)):
1✔
1891
        return frozenset(make_object_immutable(item) for item in obj)
×
1892
    return obj
1✔
1893

1894

1895
def multisig_type(wallet_type):
1✔
1896
    """If wallet_type is mofn multi-sig, return [m, n],
1897
    otherwise return None."""
1898
    if not wallet_type:
1✔
1899
        return None
×
1900
    match = re.match(r'(\d+)of(\d+)', wallet_type)
1✔
1901
    if match:
1✔
1902
        match = [int(x) for x in match.group(1, 2)]
1✔
1903
    return match
1✔
1904

1905

1906
def is_ip_address(x: Union[str, bytes]) -> bool:
1✔
1907
    if isinstance(x, bytes):
1✔
1908
        x = x.decode("utf-8")
×
1909
    try:
1✔
1910
        ipaddress.ip_address(x)
1✔
1911
        return True
1✔
1912
    except ValueError:
1✔
1913
        return False
1✔
1914

1915

1916
def is_localhost(host: str) -> bool:
1✔
1917
    if str(host) in ('localhost', 'localhost.',):
1✔
1918
        return True
1✔
1919
    if host[0] == '[' and host[-1] == ']':  # IPv6
1✔
1920
        host = host[1:-1]
1✔
1921
    try:
1✔
1922
        ip_addr = ipaddress.ip_address(host)  # type: Union[IPv4Address, IPv6Address]
1✔
1923
        return ip_addr.is_loopback
1✔
1924
    except ValueError:
1✔
1925
        pass  # not an IP
1✔
1926
    return False
1✔
1927

1928

1929
def is_private_netaddress(host: str) -> bool:
1✔
1930
    if is_localhost(host):
1✔
1931
        return True
1✔
1932
    if host[0] == '[' and host[-1] == ']':  # IPv6
1✔
1933
        host = host[1:-1]
1✔
1934
    try:
1✔
1935
        ip_addr = ipaddress.ip_address(host)  # type: Union[IPv4Address, IPv6Address]
1✔
1936
        return ip_addr.is_private
1✔
1937
    except ValueError:
1✔
1938
        pass  # not an IP
1✔
1939
    return False
1✔
1940

1941

1942
def list_enabled_bits(x: int) -> Sequence[int]:
1✔
1943
    """e.g. 77 (0b1001101) --> (0, 2, 3, 6)"""
1944
    binary = bin(x)[2:]
1✔
1945
    rev_bin = reversed(binary)
1✔
1946
    return tuple(i for i, b in enumerate(rev_bin) if b == '1')
1✔
1947

1948

1949
async def resolve_dns_srv(host: str):
1✔
1950
    # FIXME this method is not using the network proxy. (although the proxy might not support UDP?)
1951
    srv_records = await dns.asyncresolver.resolve(host, 'SRV')
×
1952
    # priority: prefer lower
1953
    # weight: tie breaker; prefer higher
1954
    srv_records = sorted(srv_records, key=lambda x: (x.priority, -x.weight))
×
1955

1956
    def dict_from_srv_record(srv):
×
1957
        return {
×
1958
            'host': str(srv.target),
1959
            'port': srv.port,
1960
        }
1961
    return [dict_from_srv_record(srv) for srv in srv_records]
×
1962

1963

1964
def randrange(bound: int) -> int:
1✔
1965
    """Return a random integer k such that 1 <= k < bound, uniformly
1966
    distributed across that range.
1967
    This is guaranteed to be cryptographically strong.
1968
    """
1969
    # secrets.randbelow(bound) returns a random int: 0 <= r < bound,
1970
    # hence transformations:
1971
    return secrets.randbelow(bound - 1) + 1
1✔
1972

1973

1974
class CallbackManager(Logger):
1✔
1975
    # callbacks set by the GUI or any thread
1976
    # guarantee: the callbacks will always get triggered from the asyncio thread.
1977

1978
    # FIXME: There should be a way to prevent circular callbacks.
1979
    # At the very least, we need a distinction between callbacks that
1980
    # are for the GUI and callbacks between wallet components
1981

1982
    def __init__(self):
1✔
1983
        Logger.__init__(self)
1✔
1984
        self.callback_lock = threading.Lock()
1✔
1985
        self.callbacks = defaultdict(list)  # type: Dict[str, List[Callable]]  # note: needs self.callback_lock
1✔
1986

1987
    def register_callback(self, func: Callable, events: Sequence[str]) -> None:
1✔
1988
        with self.callback_lock:
1✔
1989
            for event in events:
1✔
1990
                self.callbacks[event].append(func)
1✔
1991

1992
    def unregister_callback(self, callback: Callable) -> None:
1✔
1993
        with self.callback_lock:
1✔
1994
            for callbacks in self.callbacks.values():
1✔
1995
                if callback in callbacks:
1✔
1996
                    callbacks.remove(callback)
1✔
1997

1998
    def clear_all_callbacks(self) -> None:
1✔
1999
        with self.callback_lock:
1✔
2000
            self.callbacks.clear()
1✔
2001

2002
    def trigger_callback(self, event: str, *args) -> None:
1✔
2003
        """Trigger a callback with given arguments.
2004
        Can be called from any thread. The callback itself will get scheduled
2005
        on the event loop.
2006
        """
2007
        loop = get_asyncio_loop()
1✔
2008
        assert loop.is_running(), "event loop not running"
1✔
2009
        with self.callback_lock:
1✔
2010
            callbacks = self.callbacks[event][:]
1✔
2011
        for callback in callbacks:
1✔
2012
            if inspect.iscoroutinefunction(callback):  # async cb
1✔
2013
                fut = asyncio.run_coroutine_threadsafe(callback(*args), loop)
1✔
2014

2015
                def on_done(fut_: concurrent.futures.Future):
1✔
2016
                    assert fut_.done()
1✔
2017
                    if fut_.cancelled():
1✔
UNCOV
2018
                        self.logger.debug(f"cb cancelled. {event=}.")
×
2019
                    elif exc := fut_.exception():
1✔
2020
                        self.logger.error(f"cb errored. {event=}. {exc=}", exc_info=exc)
×
2021
                fut.add_done_callback(on_done)
1✔
2022
            else:  # non-async cb
2023
                run_sync_function_on_asyncio_thread(partial(callback, *args), block=False)
1✔
2024

2025

2026
callback_mgr = CallbackManager()
1✔
2027
trigger_callback = callback_mgr.trigger_callback
1✔
2028
register_callback = callback_mgr.register_callback
1✔
2029
unregister_callback = callback_mgr.unregister_callback
1✔
2030
_event_listeners = defaultdict(set)  # type: Dict[str, Set[str]]
1✔
2031

2032

2033
class EventListener:
1✔
2034
    """Use as a mixin for a class that has methods to be triggered on events.
2035
    - Methods that receive the callbacks should be named "on_event_*" and decorated with @event_listener.
2036
    - register_callbacks() should be called exactly once per instance of EventListener, e.g. in __init__
2037
    - unregister_callbacks() should be called at least once, e.g. when the instance is destroyed
2038
    """
2039

2040
    def _list_callbacks(self):
1✔
2041
        for c in self.__class__.__mro__:
1✔
2042
            classpath = f"{c.__module__}.{c.__name__}"
1✔
2043
            for method_name in _event_listeners[classpath]:
1✔
2044
                method = getattr(self, method_name)
1✔
2045
                assert callable(method)
1✔
2046
                assert method_name.startswith('on_event_')
1✔
2047
                yield method_name[len('on_event_'):], method
1✔
2048

2049
    def register_callbacks(self):
1✔
2050
        for name, method in self._list_callbacks():
1✔
2051
            #_logger.debug(f'registering callback {method}')
2052
            register_callback(method, [name])
1✔
2053

2054
    def unregister_callbacks(self):
1✔
2055
        for name, method in self._list_callbacks():
1✔
2056
            #_logger.debug(f'unregistering callback {method}')
2057
            unregister_callback(method)
1✔
2058

2059

2060
def event_listener(func):
1✔
2061
    """To be used in subclasses of EventListener only. (how to enforce this programmatically?)"""
2062
    classname, method_name = func.__qualname__.split('.')
1✔
2063
    assert method_name.startswith('on_event_')
1✔
2064
    classpath = f"{func.__module__}.{classname}"
1✔
2065
    _event_listeners[classpath].add(method_name)
1✔
2066
    return func
1✔
2067

2068

2069
_NetAddrType = TypeVar("_NetAddrType")
1✔
2070
# requirements for _NetAddrType:
2071
# - reasonable __hash__() implementation (e.g. based on host/port of remote endpoint)
2072

2073

2074
class NetworkRetryManager(Generic[_NetAddrType]):
1✔
2075
    """Truncated Exponential Backoff for network connections."""
2076

2077
    def __init__(
1✔
2078
            self, *,
2079
            max_retry_delay_normal: float,
2080
            init_retry_delay_normal: float,
2081
            max_retry_delay_urgent: float = None,
2082
            init_retry_delay_urgent: float = None,
2083
    ):
2084
        self._last_tried_addr = {}  # type: Dict[_NetAddrType, Tuple[float, int]]  # (unix ts, num_attempts)
1✔
2085

2086
        # note: these all use "seconds" as unit
2087
        if max_retry_delay_urgent is None:
1✔
2088
            max_retry_delay_urgent = max_retry_delay_normal
×
2089
        if init_retry_delay_urgent is None:
1✔
2090
            init_retry_delay_urgent = init_retry_delay_normal
×
2091
        self._max_retry_delay_normal = max_retry_delay_normal
1✔
2092
        self._init_retry_delay_normal = init_retry_delay_normal
1✔
2093
        self._max_retry_delay_urgent = max_retry_delay_urgent
1✔
2094
        self._init_retry_delay_urgent = init_retry_delay_urgent
1✔
2095

2096
    def _trying_addr_now(self, addr: _NetAddrType) -> None:
1✔
2097
        last_time, num_attempts = self._last_tried_addr.get(addr, (0, 0))
×
2098
        # we add up to 1 second of noise to the time, so that clients are less likely
2099
        # to get synchronised and bombard the remote in connection waves:
2100
        cur_time = time.time() + random.random()
×
2101
        self._last_tried_addr[addr] = cur_time, num_attempts + 1
×
2102

2103
    def _on_connection_successfully_established(self, addr: _NetAddrType) -> None:
1✔
2104
        self._last_tried_addr[addr] = time.time(), 0
×
2105

2106
    def _can_retry_addr(self, addr: _NetAddrType, *,
1✔
2107
                        now: float = None, urgent: bool = False) -> bool:
2108
        if now is None:
×
2109
            now = time.time()
×
2110
        last_time, num_attempts = self._last_tried_addr.get(addr, (0, 0))
×
2111
        if urgent:
×
2112
            max_delay = self._max_retry_delay_urgent
×
2113
            init_delay = self._init_retry_delay_urgent
×
2114
        else:
2115
            max_delay = self._max_retry_delay_normal
×
2116
            init_delay = self._init_retry_delay_normal
×
2117
        delay = self.__calc_delay(multiplier=init_delay, max_delay=max_delay, num_attempts=num_attempts)
×
2118
        next_time = last_time + delay
×
2119
        return next_time < now
×
2120

2121
    @classmethod
1✔
2122
    def __calc_delay(cls, *, multiplier: float, max_delay: float,
1✔
2123
                     num_attempts: int) -> float:
2124
        num_attempts = min(num_attempts, 100_000)
×
2125
        try:
×
2126
            res = multiplier * 2 ** num_attempts
×
2127
        except OverflowError:
×
2128
            return max_delay
×
2129
        return max(0, min(max_delay, res))
×
2130

2131
    def _clear_addr_retry_times(self) -> None:
1✔
2132
        self._last_tried_addr.clear()
1✔
2133

2134

2135
class ESocksProxy(aiorpcx.SOCKSProxy):
1✔
2136
    # note: proxy will not leak DNS as create_connection()
2137
    # sets (local DNS) resolve=False by default
2138

2139
    async def open_connection(self, host=None, port=None, **kwargs):
1✔
2140
        loop = asyncio.get_running_loop()
×
2141
        reader = asyncio.StreamReader(loop=loop)
×
2142
        protocol = asyncio.StreamReaderProtocol(reader, loop=loop)
×
2143
        transport, _ = await self.create_connection(
×
2144
            lambda: protocol, host, port, **kwargs)
2145
        writer = asyncio.StreamWriter(transport, protocol, reader, loop)
×
2146
        return reader, writer
×
2147

2148
    @classmethod
1✔
2149
    def from_network_settings(cls, network: Optional['Network']) -> Optional['ESocksProxy']:
1✔
2150
        if not network or not network.proxy or not network.proxy.enabled:
1✔
2151
            return None
1✔
2152
        proxy = network.proxy
×
2153
        username, pw = proxy.user, proxy.password
×
2154
        if not username or not pw:
×
2155
            # is_proxy_tor is tri-state; None indicates it is still probing the proxy to test for TOR
2156
            if network.is_proxy_tor:
×
2157
                auth = aiorpcx.socks.SOCKSRandomAuth()
×
2158
            else:
2159
                auth = None
×
2160
        else:
2161
            auth = aiorpcx.socks.SOCKSUserAuth(username, pw)
×
2162
        addr = aiorpcx.NetAddress(proxy.host, proxy.port)
×
2163
        if proxy.mode == "socks4":
×
2164
            ret = cls(addr, aiorpcx.socks.SOCKS4a, auth)
×
2165
        elif proxy.mode == "socks5":
×
2166
            ret = cls(addr, aiorpcx.socks.SOCKS5, auth)
×
2167
        else:
2168
            raise NotImplementedError  # http proxy not available with aiorpcx
×
2169
        return ret
×
2170

2171

2172
class JsonRPCError(Exception):
1✔
2173

2174
    class Codes(enum.IntEnum):
1✔
2175
        # application-specific error codes
2176
        USERFACING = 1
1✔
2177
        INTERNAL = 2
1✔
2178

2179
    def __init__(self, *, code: int, message: str, data: Optional[dict] = None):
1✔
2180
        Exception.__init__(self)
×
2181
        self.code = code
×
2182
        self.message = message
×
2183
        self.data = data
×
2184

2185

2186
class JsonRPCClient:
1✔
2187

2188
    def __init__(self, session: aiohttp.ClientSession, url: str):
1✔
2189
        self.session = session
×
2190
        self.url = url
×
2191
        self._id = 0
×
2192

2193
    async def request(self, endpoint, *args):
1✔
2194
        """Send request to server, parse and return result.
2195
        note: parsing code is naive, the server is assumed to be well-behaved.
2196
              Up to the caller to handle exceptions, including those arising from parsing errors.
2197
        """
2198
        self._id += 1
×
2199
        data = ('{"jsonrpc": "2.0", "id":"%d", "method": "%s", "params": %s }'
×
2200
                % (self._id, endpoint, json.dumps(args)))
2201
        async with self.session.post(self.url, data=data) as resp:
×
2202
            if resp.status == 200:
×
2203
                r = await resp.json()
×
2204
                result = r.get('result')
×
2205
                error = r.get('error')
×
2206
                if error:
×
2207
                    raise JsonRPCError(code=error["code"], message=error["message"], data=error.get("data"))
×
2208
                else:
2209
                    return result
×
2210
            else:
2211
                text = await resp.text()
×
2212
                return 'Error: ' + str(text)
×
2213

2214
    def add_method(self, endpoint):
1✔
2215
        async def coro(*args):
×
2216
            return await self.request(endpoint, *args)
×
2217
        setattr(self, endpoint, coro)
×
2218

2219

2220
T = TypeVar('T')
1✔
2221

2222

2223
def random_shuffled_copy(x: Iterable[T]) -> List[T]:
1✔
2224
    """Returns a shuffled copy of the input."""
2225
    x_copy = list(x)  # copy
1✔
2226
    random.shuffle(x_copy)  # shuffle in-place
1✔
2227
    return x_copy
1✔
2228

2229

2230
def test_read_write_permissions(path) -> None:
1✔
2231
    # note: There might already be a file at 'path'.
2232
    #       Make sure we do NOT overwrite/corrupt that!
2233
    temp_path = "%s.tmptest.%s" % (path, os.getpid())
1✔
2234
    echo = "fs r/w test"
1✔
2235
    try:
1✔
2236
        # test READ permissions for actual path
2237
        if os.path.exists(path):
1✔
2238
            with open(path, "rb") as f:
1✔
2239
                f.read(1)  # read 1 byte
1✔
2240
        # test R/W sanity for "similar" path
2241
        with open(temp_path, "w", encoding='utf-8') as f:
1✔
2242
            f.write(echo)
1✔
2243
        with open(temp_path, "r", encoding='utf-8') as f:
1✔
2244
            echo2 = f.read()
1✔
2245
        os.remove(temp_path)
1✔
2246
    except Exception as e:
×
2247
        raise IOError(e) from e
×
2248
    if echo != echo2:
1✔
2249
        raise IOError('echo sanity-check failed')
×
2250

2251

2252
class classproperty(property):
1✔
2253
    """~read-only class-level @property
2254
    from https://stackoverflow.com/a/13624858 by denis-ryzhkov
2255
    """
2256
    def __get__(self, owner_self, owner_cls):
1✔
2257
        return self.fget(owner_cls)
1✔
2258

2259

2260
def sticky_property(val):
1✔
2261
    """Creates a 'property' whose value cannot be changed and that cannot be deleted.
2262
    Attempts to change the value are silently ignored.
2263

2264
    >>> class C: pass
2265
    ...
2266
    >>> setattr(C, 'x', sticky_property(3))
2267
    >>> c = C()
2268
    >>> c.x
2269
    3
2270
    >>> c.x = 2
2271
    >>> c.x
2272
    3
2273
    >>> del c.x
2274
    >>> c.x
2275
    3
2276
    """
2277
    return property(
1✔
2278
        fget=lambda self: val,
2279
        fset=lambda *args, **kwargs: None,
2280
        fdel=lambda *args, **kwargs: None,
2281
    )
2282

2283

2284
def get_running_loop() -> Optional[asyncio.AbstractEventLoop]:
1✔
2285
    """Returns the asyncio event loop that is *running in this thread*, if any."""
2286
    try:
1✔
2287
        return asyncio.get_running_loop()
1✔
2288
    except RuntimeError:
×
2289
        return None
×
2290

2291

2292
def error_text_str_to_safe_str(err: str, *, max_len: Optional[int] = 500) -> str:
1✔
2293
    """Converts an untrusted error string to a sane printable ascii str.
2294
    Never raises.
2295
    """
2296
    text = error_text_bytes_to_safe_str(
1✔
2297
        err.encode("ascii", errors='backslashreplace'),
2298
        max_len=None)
2299
    return truncate_text(text, max_len=max_len)
1✔
2300

2301

2302
def error_text_bytes_to_safe_str(err: bytes, *, max_len: Optional[int] = 500) -> str:
1✔
2303
    """Converts an untrusted error bytes text to a sane printable ascii str.
2304
    Never raises.
2305

2306
    Note that naive ascii conversion would be insufficient. Fun stuff:
2307
    >>> b = b"my_long_prefix_blabla" + 21 * b"\x08" + b"malicious_stuff"
2308
    >>> s = b.decode("ascii")
2309
    >>> print(s)
2310
    malicious_stuffblabla
2311
    """
2312
    # convert to ascii, to get rid of unicode stuff
2313
    ascii_text = err.decode("ascii", errors='backslashreplace')
1✔
2314
    # do repr to handle ascii special chars (especially when printing/logging the str)
2315
    text = repr(ascii_text)
1✔
2316
    return truncate_text(text, max_len=max_len)
1✔
2317

2318

2319
def truncate_text(text: str, *, max_len: Optional[int]) -> str:
1✔
2320
    if max_len is None or len(text) <= max_len:
1✔
2321
        return text
1✔
2322
    else:
2323
        return text[:max_len] + f"... (truncated. orig_len={len(text)})"
1✔
2324

2325

2326
def nostr_pow_worker(nonce, nostr_pubk, target_bits, hash_function, hash_len_bits, shutdown):
1✔
2327
    """Function to generate PoW for Nostr, to be spawned in a ProcessPoolExecutor."""
2328
    hash_preimage = b'electrum-' + nostr_pubk
×
2329
    while True:
×
2330
        # we cannot check is_set on each iteration as it has a lot of overhead, this way we can check
2331
        # it with low overhead (just the additional range counter)
2332
        for i in range(1000000):
×
2333
            digest = hash_function(hash_preimage + nonce.to_bytes(32, 'big')).digest()
×
2334
            if int.from_bytes(digest, 'big') < (1 << (hash_len_bits - target_bits)):
×
2335
                shutdown.set()
×
2336
                return hash, nonce
×
2337
            nonce += 1
×
2338
        if shutdown.is_set():
×
2339
            return None, None
×
2340

2341

2342
async def gen_nostr_ann_pow(nostr_pubk: bytes, target_bits: int) -> Tuple[int, int]:
1✔
2343
    """Generate a PoW for a Nostr announcement. The PoW is hash[b'electrum-'+pubk+nonce]"""
2344
    import multiprocessing  # not available on Android, so we import it here
×
2345
    hash_function = hashlib.sha256
×
2346
    hash_len_bits = 256
×
2347
    max_nonce: int = (1 << (32 * 8)) - 1  # 32-byte nonce
×
2348
    start_nonce = 0
×
2349

2350
    max_workers = max(multiprocessing.cpu_count() - 1, 1)  # use all but one CPU
×
2351
    manager = multiprocessing.Manager()
×
2352
    shutdown = manager.Event()
×
2353
    with ProcessPoolExecutor(max_workers=max_workers) as executor:
×
2354
        tasks = []
×
2355
        loop = asyncio.get_running_loop()
×
2356
        for task in range(0, max_workers):
×
2357
            task = loop.run_in_executor(
×
2358
                executor,
2359
                nostr_pow_worker,
2360
                start_nonce,
2361
                nostr_pubk,
2362
                target_bits,
2363
                hash_function,
2364
                hash_len_bits,
2365
                shutdown
2366
            )
2367
            tasks.append(task)
×
2368
            start_nonce += max_nonce // max_workers  # split the nonce range between the processes
×
2369
            if start_nonce > max_nonce:  # make sure we don't go over the max_nonce
×
2370
                start_nonce = random.randint(0, int(max_nonce * 0.75))
×
2371

2372
        done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
×
2373
        hash_res, nonce_res = done.pop().result()
×
2374
        executor.shutdown(wait=False, cancel_futures=True)
×
2375

2376
    return nonce_res, get_nostr_ann_pow_amount(nostr_pubk, nonce_res)
×
2377

2378

2379
def get_nostr_ann_pow_amount(nostr_pubk: bytes, nonce: Optional[int]) -> int:
1✔
2380
    """Return the amount of leading zero bits for a nostr announcement PoW."""
2381
    if not nonce:
×
2382
        return 0
×
2383
    hash_function = hashlib.sha256
×
2384
    hash_len_bits = 256
×
2385
    hash_preimage = b'electrum-' + nostr_pubk
×
2386

2387
    digest = hash_function(hash_preimage + nonce.to_bytes(32, 'big')).digest()
×
2388
    digest = int.from_bytes(digest, 'big')
×
2389
    return hash_len_bits - digest.bit_length()
×
2390

2391

2392
class OnchainHistoryItem(NamedTuple):
1✔
2393
    txid: str
1✔
2394
    amount_sat: int
1✔
2395
    fee_sat: int
1✔
2396
    balance_sat: int
1✔
2397
    tx_mined_status: TxMinedInfo
1✔
2398
    group_id: Optional[str]
1✔
2399
    label: Optional[str]
1✔
2400
    monotonic_timestamp: int
1✔
2401
    group_id: Optional[str]
1✔
2402
    def to_dict(self):
1✔
2403
        return {
1✔
2404
            'txid': self.txid,
2405
            'amount_sat': self.amount_sat,
2406
            'fee_sat': self.fee_sat,
2407
            'height': self.tx_mined_status.height(),
2408
            'confirmations': self.tx_mined_status.conf,
2409
            'timestamp': self.tx_mined_status.timestamp,
2410
            'monotonic_timestamp': self.monotonic_timestamp,
2411
            'incoming': True if self.amount_sat>0 else False,
2412
            'bc_value': Satoshis(self.amount_sat),
2413
            'bc_balance': Satoshis(self.balance_sat),
2414
            'date': timestamp_to_datetime(self.tx_mined_status.timestamp),
2415
            'txpos_in_block': self.tx_mined_status.txpos,
2416
            'wanted_height': self.tx_mined_status.wanted_height,
2417
            'label': self.label,
2418
            'group_id': self.group_id,
2419
        }
2420

2421

2422
class LightningHistoryItem(NamedTuple):
1✔
2423
    payment_hash: Optional[str]
1✔
2424
    preimage: Optional[str]
1✔
2425
    amount_msat: int
1✔
2426
    fee_msat: Optional[int]
1✔
2427
    type: str
1✔
2428
    group_id: Optional[str]
1✔
2429
    timestamp: int
1✔
2430
    label: Optional[str]
1✔
2431
    direction: Optional[int]
1✔
2432
    def to_dict(self):
1✔
2433
        return {
×
2434
            'type': self.type,
2435
            'label': self.label,
2436
            'timestamp': self.timestamp or 0,
2437
            'date': timestamp_to_datetime(self.timestamp),
2438
            'amount_msat': self.amount_msat,
2439
            'fee_msat': self.fee_msat,
2440
            'payment_hash': self.payment_hash,
2441
            'preimage': self.preimage,
2442
            'group_id': self.group_id,
2443
            'ln_value': Satoshis(Decimal(self.amount_msat) / 1000),
2444
            'direction': self.direction,
2445
        }
2446

2447

2448
@dataclass(kw_only=True, slots=True)
1✔
2449
class ChoiceItem:
1✔
2450
    key: Any
1✔
2451
    label: str  # user facing string
1✔
2452
    extra_data: Any = None
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc