• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 5318526929469440

07 Oct 2025 05:45PM UTC coverage: 61.351% (-0.006%) from 61.357%
5318526929469440

push

CirrusCI

SomberNight
wallet: (trivial) mark calc_unused_change_addresses as private

4 of 4 new or added lines in 1 file covered. (100.0%)

2 existing lines in 2 files now uncovered.

22881 of 37295 relevant lines covered (61.35%)

0.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.65
/electrum/util.py
1
# Electrum - lightweight Bitcoin client
2
# Copyright (C) 2011 Thomas Voegtlin
3
#
4
# Permission is hereby granted, free of charge, to any person
5
# obtaining a copy of this software and associated documentation files
6
# (the "Software"), to deal in the Software without restriction,
7
# including without limitation the rights to use, copy, modify, merge,
8
# publish, distribute, sublicense, and/or sell copies of the Software,
9
# and to permit persons to whom the Software is furnished to do so,
10
# subject to the following conditions:
11
#
12
# The above copyright notice and this permission notice shall be
13
# included in all copies or substantial portions of the Software.
14
#
15
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
16
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
17
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
18
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
19
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
20
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
21
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22
# SOFTWARE.
23
import concurrent.futures
1✔
24
from dataclasses import dataclass
1✔
25
import logging
1✔
26
import os
1✔
27
import sys
1✔
28
import re
1✔
29
from collections import defaultdict, OrderedDict
1✔
30
from concurrent.futures.process import ProcessPoolExecutor
1✔
31
from typing import (
1✔
32
    NamedTuple, Union, TYPE_CHECKING, Tuple, Optional, Callable, Any, Sequence, Dict, Generic, TypeVar, List, Iterable,
33
    Set, Awaitable
34
)
35
from datetime import datetime, timezone, timedelta
1✔
36
import decimal
1✔
37
from decimal import Decimal
1✔
38
import threading
1✔
39
import hmac
1✔
40
import hashlib
1✔
41
import stat
1✔
42
import asyncio
1✔
43
import builtins
1✔
44
import json
1✔
45
import time
1✔
46
import ssl
1✔
47
import ipaddress
1✔
48
from ipaddress import IPv4Address, IPv6Address
1✔
49
import random
1✔
50
import secrets
1✔
51
import functools
1✔
52
from functools import partial
1✔
53
from abc import abstractmethod, ABC
1✔
54
import enum
1✔
55
from contextlib import nullcontext
1✔
56
import traceback
1✔
57
import inspect
1✔
58

59
import aiohttp
1✔
60
from aiohttp_socks import ProxyConnector, ProxyType
1✔
61
import aiorpcx
1✔
62
import certifi
1✔
63
import dns.asyncresolver
1✔
64

65
from .i18n import _
1✔
66
from .logging import get_logger, Logger
1✔
67

68
if TYPE_CHECKING:
69
    from .network import Network, ProxySettings
70
    from .interface import Interface
71
    from .simple_config import SimpleConfig
72

73

74
_logger = get_logger(__name__)
1✔
75

76

77
def inv_dict(d):
1✔
78
    return {v: k for k, v in d.items()}
1✔
79

80

81
def all_subclasses(cls) -> Set:
1✔
82
    """Return all (transitive) subclasses of cls."""
83
    res = set(cls.__subclasses__())
1✔
84
    for sub in res.copy():
1✔
85
        res |= all_subclasses(sub)
1✔
86
    return res
1✔
87

88

89
ca_path = certifi.where()
1✔
90

91

92
base_units = {'BTC':8, 'mBTC':5, 'bits':2, 'sat':0}
1✔
93
base_units_inverse = inv_dict(base_units)
1✔
94
base_units_list = ['BTC', 'mBTC', 'bits', 'sat']  # list(dict) does not guarantee order
1✔
95

96
DECIMAL_POINT_DEFAULT = 5  # mBTC
1✔
97

98

99
class UnknownBaseUnit(Exception): pass
1✔
100

101

102
def decimal_point_to_base_unit_name(dp: int) -> str:
1✔
103
    # e.g. 8 -> "BTC"
104
    try:
1✔
105
        return base_units_inverse[dp]
1✔
106
    except KeyError:
×
107
        raise UnknownBaseUnit(dp) from None
×
108

109

110
def base_unit_name_to_decimal_point(unit_name: str) -> int:
1✔
111
    """Returns the max number of digits allowed after the decimal point."""
112
    # e.g. "BTC" -> 8
113
    try:
×
114
        return base_units[unit_name]
×
115
    except KeyError:
×
116
        raise UnknownBaseUnit(unit_name) from None
×
117

118
def parse_max_spend(amt: Any) -> Optional[int]:
1✔
119
    """Checks if given amount is "spend-max"-like.
120
    Returns None or the positive integer weight for "max". Never raises.
121

122
    When creating invoices and on-chain txs, the user can specify to send "max".
123
    This is done by setting the amount to '!'. Splitting max between multiple
124
    tx outputs is also possible, and custom weights (positive ints) can also be used.
125
    For example, to send 40% of all coins to address1, and 60% to address2:
126
    ```
127
    address1, 2!
128
    address2, 3!
129
    ```
130
    """
131
    if not (isinstance(amt, str) and amt and amt[-1] == '!'):
1✔
132
        return None
1✔
133
    if amt == '!':
1✔
134
        return 1
1✔
135
    x = amt[:-1]
1✔
136
    try:
1✔
137
        x = int(x)
1✔
138
    except ValueError:
×
139
        return None
×
140
    if x > 0:
1✔
141
        return x
1✔
142
    return None
×
143

144
class NotEnoughFunds(Exception):
1✔
145
    def __str__(self):
1✔
146
        return _("Insufficient funds")
×
147

148

149
class UneconomicFee(Exception):
1✔
150
    def __str__(self):
1✔
151
        return _("The fee for the transaction is higher than the funds gained from it.")
×
152

153

154
class NoDynamicFeeEstimates(Exception):
1✔
155
    def __str__(self):
1✔
156
        return _('Dynamic fee estimates not available')
×
157

158

159
class BelowDustLimit(Exception):
1✔
160
    pass
1✔
161

162

163
class InvalidPassword(Exception):
1✔
164
    def __init__(self, message: Optional[str] = None):
1✔
165
        self.message = message
1✔
166

167
    def __str__(self):
1✔
168
        if self.message is None:
×
169
            return _("Incorrect password")
×
170
        else:
171
            return str(self.message)
×
172

173

174
class AddTransactionException(Exception):
1✔
175
    pass
1✔
176

177

178
class UnrelatedTransactionException(AddTransactionException):
1✔
179
    def __str__(self):
1✔
180
        return _("Transaction is unrelated to this wallet.")
×
181

182

183
class FileImportFailed(Exception):
1✔
184
    def __init__(self, message=''):
1✔
185
        self.message = str(message)
×
186

187
    def __str__(self):
1✔
188
        return _("Failed to import from file.") + "\n" + self.message
×
189

190

191
class FileExportFailed(Exception):
1✔
192
    def __init__(self, message=''):
1✔
193
        self.message = str(message)
×
194

195
    def __str__(self):
1✔
196
        return _("Failed to export to file.") + "\n" + self.message
×
197

198

199
class WalletFileException(Exception):
1✔
200
    def __init__(self, message='', *, should_report_crash: bool = False):
1✔
201
        Exception.__init__(self, message)
1✔
202
        self.should_report_crash = should_report_crash
1✔
203

204

205
class BitcoinException(Exception): pass
1✔
206

207

208
class UserFacingException(Exception):
1✔
209
    """Exception that contains information intended to be shown to the user."""
210

211

212
class InvoiceError(UserFacingException): pass
1✔
213

214

215
class NetworkOfflineException(UserFacingException):
1✔
216
    """Can be raised if we are running in offline mode (--offline flag)
217
    and the user requests an operation that requires the network.
218
    """
219
    def __str__(self):
1✔
220
        return _("You are offline.")
×
221

222

223
# Throw this exception to unwind the stack like when an error occurs.
224
# However unlike other exceptions the user won't be informed.
225
class UserCancelled(Exception):
1✔
226
    '''An exception that is suppressed from the user'''
227
    pass
1✔
228

229

230
def to_decimal(x: Union[str, float, int, Decimal]) -> Decimal:
1✔
231
    # helper function mainly for float->Decimal conversion, i.e.:
232
    #   >>> Decimal(41754.681)
233
    #   Decimal('41754.680999999996856786310672760009765625')
234
    #   >>> Decimal("41754.681")
235
    #   Decimal('41754.681')
236
    if isinstance(x, Decimal):
1✔
237
        return x
1✔
238
    if isinstance(x, int):
1✔
239
        return Decimal(x)
1✔
240
    return Decimal(str(x))
1✔
241

242

243
# note: this is not a NamedTuple as then its json encoding cannot be customized
244
class Satoshis(object):
1✔
245
    __slots__ = ('value',)
1✔
246

247
    def __new__(cls, value):
1✔
248
        self = super(Satoshis, cls).__new__(cls)
1✔
249
        # note: 'value' sometimes has msat precision
250
        assert isinstance(value, (int, Decimal)), f"unexpected type for {value=!r}"
1✔
251
        self.value = value
1✔
252
        return self
1✔
253

254
    def __repr__(self):
1✔
255
        return f'Satoshis({self.value})'
×
256

257
    def __str__(self):
1✔
258
        # note: precision is truncated to satoshis here
259
        return format_satoshis(self.value)
1✔
260

261
    def __eq__(self, other):
1✔
262
        return self.value == other.value
×
263

264
    def __ne__(self, other):
1✔
265
        return not (self == other)
×
266

267
    def __add__(self, other):
1✔
268
        return Satoshis(self.value + other.value)
×
269

270

271
# note: this is not a NamedTuple as then its json encoding cannot be customized
272
class Fiat(object):
1✔
273
    __slots__ = ('value', 'ccy')
1✔
274

275
    def __new__(cls, value: Optional[Decimal], ccy: str):
1✔
276
        self = super(Fiat, cls).__new__(cls)
1✔
277
        self.ccy = ccy
1✔
278
        if not isinstance(value, (Decimal, type(None))):
1✔
279
            raise TypeError(f"value should be Decimal or None, not {type(value)}")
×
280
        self.value = value
1✔
281
        return self
1✔
282

283
    def __repr__(self):
1✔
284
        return 'Fiat(%s)'% self.__str__()
×
285

286
    def __str__(self):
1✔
287
        if self.value is None or self.value.is_nan():
1✔
288
            return _('No Data')
×
289
        else:
290
            return "{:.2f}".format(self.value)
1✔
291

292
    def to_ui_string(self):
1✔
293
        if self.value is None or self.value.is_nan():
×
294
            return _('No Data')
×
295
        else:
296
            return "{:.2f}".format(self.value) + ' ' + self.ccy
×
297

298
    def __eq__(self, other):
1✔
299
        if not isinstance(other, Fiat):
×
300
            return False
×
301
        if self.ccy != other.ccy:
×
302
            return False
×
303
        if isinstance(self.value, Decimal) and isinstance(other.value, Decimal) \
×
304
                and self.value.is_nan() and other.value.is_nan():
305
            return True
×
306
        return self.value == other.value
×
307

308
    def __ne__(self, other):
1✔
309
        return not (self == other)
×
310

311
    def __add__(self, other):
1✔
312
        assert self.ccy == other.ccy
×
313
        return Fiat(self.value + other.value, self.ccy)
×
314

315

316
class MyEncoder(json.JSONEncoder):
1✔
317
    def default(self, obj):
1✔
318
        # note: this does not get called for namedtuples :(  https://bugs.python.org/issue30343
319
        from .transaction import Transaction, TxOutput
1✔
320
        if isinstance(obj, Transaction):
1✔
321
            return obj.serialize()
1✔
322
        if isinstance(obj, TxOutput):
1✔
323
            return obj.to_legacy_tuple()
1✔
324
        if isinstance(obj, Satoshis):
1✔
325
            return str(obj)
1✔
326
        if isinstance(obj, Fiat):
1✔
327
            return str(obj)
1✔
328
        if isinstance(obj, Decimal):
1✔
329
            return str(obj)
×
330
        if isinstance(obj, datetime):
1✔
331
            # note: if there is a timezone specified, this will include the offset
332
            return obj.isoformat(' ', timespec="minutes")
1✔
333
        if isinstance(obj, set):
1✔
334
            return list(obj)
×
335
        if isinstance(obj, bytes): # for nametuples in lnchannel
1✔
336
            return obj.hex()
1✔
337
        if hasattr(obj, 'to_json') and callable(obj.to_json):
1✔
338
            return obj.to_json()
1✔
339
        return super(MyEncoder, self).default(obj)
×
340

341

342
class ThreadJob(Logger):
1✔
343
    """A job that is run periodically from a thread's main loop.  run() is
344
    called from that thread's context.
345
    """
346

347
    def __init__(self):
1✔
348
        Logger.__init__(self)
1✔
349

350
    def run(self):
1✔
351
        """Called periodically from the thread"""
352
        pass
×
353

354
class DebugMem(ThreadJob):
1✔
355
    '''A handy class for debugging GC memory leaks'''
356
    def __init__(self, classes, interval=30):
1✔
357
        ThreadJob.__init__(self)
×
358
        self.next_time = 0
×
359
        self.classes = classes
×
360
        self.interval = interval
×
361

362
    def mem_stats(self):
1✔
363
        import gc
×
364
        self.logger.info("Start memscan")
×
365
        gc.collect()
×
366
        objmap = defaultdict(list)
×
367
        for obj in gc.get_objects():
×
368
            for class_ in self.classes:
×
369
                try:
×
370
                    _isinstance = isinstance(obj, class_)
×
371
                except AttributeError:
×
372
                    _isinstance = False
×
373
                if _isinstance:
×
374
                    objmap[class_].append(obj)
×
375
        for class_, objs in objmap.items():
×
376
            self.logger.info(f"{class_.__name__}: {len(objs)}")
×
377
        self.logger.info("Finish memscan")
×
378

379
    def run(self):
1✔
380
        if time.time() > self.next_time:
×
381
            self.mem_stats()
×
382
            self.next_time = time.time() + self.interval
×
383

384
class DaemonThread(threading.Thread, Logger):
1✔
385
    """ daemon thread that terminates cleanly """
386

387
    def __init__(self):
1✔
388
        threading.Thread.__init__(self)
1✔
389
        Logger.__init__(self)
1✔
390
        self.parent_thread = threading.current_thread()
1✔
391
        self.running = False
1✔
392
        self.running_lock = threading.Lock()
1✔
393
        self.job_lock = threading.Lock()
1✔
394
        self.jobs = []
1✔
395
        self.stopped_event = threading.Event()        # set when fully stopped
1✔
396
        self.stopped_event_async = asyncio.Event()    # set when fully stopped
1✔
397
        self.wake_up_event = threading.Event()  # for perf optimisation of polling in run()
1✔
398

399
    def add_jobs(self, jobs):
1✔
400
        with self.job_lock:
1✔
401
            self.jobs.extend(jobs)
1✔
402

403
    def run_jobs(self):
1✔
404
        # Don't let a throwing job disrupt the thread, future runs of
405
        # itself, or other jobs.  This is useful protection against
406
        # malformed or malicious server responses
407
        with self.job_lock:
1✔
408
            for job in self.jobs:
1✔
409
                try:
1✔
410
                    job.run()
1✔
411
                except Exception as e:
×
412
                    self.logger.exception('')
×
413

414
    def remove_jobs(self, jobs):
1✔
415
        with self.job_lock:
×
416
            for job in jobs:
×
417
                self.jobs.remove(job)
×
418

419
    def start(self):
1✔
420
        with self.running_lock:
1✔
421
            self.running = True
1✔
422
        return threading.Thread.start(self)
1✔
423

424
    def is_running(self):
1✔
425
        with self.running_lock:
1✔
426
            return self.running and self.parent_thread.is_alive()
1✔
427

428
    def stop(self):
1✔
429
        with self.running_lock:
1✔
430
            self.running = False
1✔
431
            self.wake_up_event.set()
1✔
432
            self.wake_up_event.clear()
1✔
433

434
    def on_stop(self):
1✔
435
        if 'ANDROID_DATA' in os.environ:
1✔
436
            import jnius
×
437
            jnius.detach()
×
438
            self.logger.info("jnius detach")
×
439
        self.logger.info("stopped")
1✔
440
        self.stopped_event.set()
1✔
441
        loop = get_asyncio_loop()
1✔
442
        loop.call_soon_threadsafe(self.stopped_event_async.set)
1✔
443

444

445
def print_stderr(*args):
1✔
446
    args = [str(item) for item in args]
×
447
    sys.stderr.write(" ".join(args) + "\n")
×
448
    sys.stderr.flush()
×
449

450

451
def print_msg(*args):
1✔
452
    # Stringify args
453
    args = [str(item) for item in args]
×
454
    sys.stdout.write(" ".join(args) + "\n")
×
455
    sys.stdout.flush()
×
456

457

458
def json_encode(obj):
1✔
459
    try:
1✔
460
        s = json.dumps(obj, sort_keys = True, indent = 4, cls=MyEncoder)
1✔
461
    except TypeError:
×
462
        s = repr(obj)
×
463
    return s
1✔
464

465

466
def json_decode(x):
1✔
467
    try:
1✔
468
        return json.loads(x, parse_float=Decimal)
1✔
469
    except Exception:
1✔
470
        return x
1✔
471

472

473
def json_normalize(x):
1✔
474
    # note: The return value of commands, when going through the JSON-RPC interface,
475
    #       is json-encoded. The encoder used there cannot handle some types, e.g. electrum.util.Satoshis.
476
    # note: We should not simply do "json_encode(x)" here, as then later x would get doubly json-encoded.
477
    # see #5868
478
    return json_decode(json_encode(x))
1✔
479

480

481
# taken from Django Source Code
482
def constant_time_compare(val1, val2):
1✔
483
    """Return True if the two strings are equal, False otherwise."""
484
    return hmac.compare_digest(to_bytes(val1, 'utf8'), to_bytes(val2, 'utf8'))
1✔
485

486

487
_profiler_logger = _logger.getChild('profiler')
1✔
488

489

490
def profiler(func=None, *, min_threshold: Union[int, float, None] = None):
1✔
491
    """Function decorator that logs execution time.
492

493
    min_threshold: if set, only log if time taken is higher than threshold
494
    """
495
    if func is None:  # to make "@profiler(...)" work. (in addition to bare "@profiler")
1✔
496
        return partial(profiler, min_threshold=min_threshold)
1✔
497
    t0 = None  # type: Optional[float]
1✔
498

499
    def timer_start():
1✔
500
        nonlocal t0
501
        t0 = time.time()
1✔
502

503
    def timer_done():
1✔
504
        t = time.time() - t0
1✔
505
        if min_threshold is None or t > min_threshold:
1✔
506
            _profiler_logger.debug(f"{func.__qualname__} {t:,.4f} sec")
1✔
507

508
    if inspect.iscoroutinefunction(func):
1✔
509
        async def do_profile(*args, **kw_args):
×
510
            timer_start()
×
511
            o = await func(*args, **kw_args)
×
512
            timer_done()
×
513
            return o
×
514
    else:
515
        def do_profile(*args, **kw_args):
1✔
516
            timer_start()
1✔
517
            o = func(*args, **kw_args)
1✔
518
            timer_done()
1✔
519
            return o
1✔
520
    return do_profile
1✔
521

522

523
class AsyncHangDetector:
1✔
524
    """Context manager that logs every `n` seconds if encapsulated context still has not exited."""
525

526
    def __init__(
1✔
527
        self,
528
        *,
529
        period_sec: int = 15,
530
        message: str,
531
        logger: logging.Logger = None,
532
    ):
533
        self.period_sec = period_sec
1✔
534
        self.message = message
1✔
535
        self.logger = logger or _logger
1✔
536

537
    async def _monitor(self):
1✔
538
        # note: this assumes that the event loop itself is not blocked
539
        t0 = time.monotonic()
1✔
540
        while True:
1✔
541
            await asyncio.sleep(self.period_sec)
1✔
542
            t1 = time.monotonic()
×
543
            self.logger.info(f"{self.message} (after {t1 - t0:.2f} sec)")
×
544

545
    async def __aenter__(self):
1✔
546
        self.mtask = asyncio.create_task(self._monitor())
1✔
547

548
    async def __aexit__(self, exc_type, exc, tb):
1✔
549
        self.mtask.cancel()
1✔
550

551

552
def android_ext_dir():
1✔
553
    from android.storage import primary_external_storage_path
×
554
    return primary_external_storage_path()
×
555

556

557
def android_backup_dir():
1✔
558
    pkgname = get_android_package_name()
×
559
    d = os.path.join(android_ext_dir(), pkgname)
×
560
    if not os.path.exists(d):
×
561
        os.mkdir(d)
×
562
    return d
×
563

564

565
def android_data_dir():
1✔
566
    import jnius
×
567
    PythonActivity = jnius.autoclass('org.kivy.android.PythonActivity')
×
568
    return PythonActivity.mActivity.getFilesDir().getPath() + '/data'
×
569

570

571
def ensure_sparse_file(filename):
1✔
572
    # On modern Linux, no need to do anything.
573
    # On Windows, need to explicitly mark file.
574
    if os.name == "nt":
1✔
575
        try:
×
576
            os.system('fsutil sparse setflag "{}" 1'.format(filename))
×
577
        except Exception as e:
×
578
            _logger.info(f'error marking file {filename} as sparse: {e}')
×
579

580

581
def get_headers_dir(config):
1✔
582
    return config.path
1✔
583

584

585
def assert_datadir_available(config_path):
1✔
586
    path = config_path
1✔
587
    if os.path.exists(path):
1✔
588
        return
1✔
589
    else:
590
        raise FileNotFoundError(
×
591
            'Electrum datadir does not exist. Was it deleted while running?' + '\n' +
592
            'Should be at {}'.format(path))
593

594

595
def assert_file_in_datadir_available(path, config_path):
1✔
596
    if os.path.exists(path):
×
597
        return
×
598
    else:
599
        assert_datadir_available(config_path)
×
600
        raise FileNotFoundError(
×
601
            'Cannot find file but datadir is there.' + '\n' +
602
            'Should be at {}'.format(path))
603

604

605
def standardize_path(path):
1✔
606
    # note: os.path.realpath() is not used, as on Windows it can return non-working paths (see #8495).
607
    #       This means that we don't resolve symlinks!
608
    return os.path.normcase(
1✔
609
                os.path.abspath(
610
                    os.path.expanduser(
611
                        path
612
    )))
613

614

615
def get_new_wallet_name(wallet_folder: str) -> str:
1✔
616
    """Returns a file basename for a new wallet to be used.
617
    Can raise OSError.
618
    """
619
    i = 1
1✔
620
    while True:
1✔
621
        filename = "wallet_%d" % i
1✔
622
        if filename in os.listdir(wallet_folder):
1✔
623
            i += 1
1✔
624
        else:
625
            break
1✔
626
    return filename
1✔
627

628

629
def is_android_debug_apk() -> bool:
1✔
630
    is_android = 'ANDROID_DATA' in os.environ
×
631
    if not is_android:
×
632
        return False
×
633
    from jnius import autoclass
×
634
    pkgname = get_android_package_name()
×
635
    build_config = autoclass(f"{pkgname}.BuildConfig")
×
636
    return bool(build_config.DEBUG)
×
637

638

639
def get_android_package_name() -> str:
1✔
640
    is_android = 'ANDROID_DATA' in os.environ
×
641
    assert is_android
×
642
    from jnius import autoclass
×
643
    from android.config import ACTIVITY_CLASS_NAME
×
644
    activity = autoclass(ACTIVITY_CLASS_NAME).mActivity
×
645
    pkgname = str(activity.getPackageName())
×
646
    return pkgname
×
647

648

649
def assert_bytes(*args):
1✔
650
    """
651
    porting helper, assert args type
652
    """
653
    try:
1✔
654
        for x in args:
1✔
655
            assert isinstance(x, (bytes, bytearray))
1✔
656
    except Exception:
×
657
        print('assert bytes failed', list(map(type, args)))
×
658
        raise
×
659

660

661
def assert_str(*args):
1✔
662
    """
663
    porting helper, assert args type
664
    """
665
    for x in args:
×
666
        assert isinstance(x, str)
×
667

668

669
def to_string(x, enc) -> str:
1✔
670
    if isinstance(x, (bytes, bytearray)):
1✔
671
        return x.decode(enc)
1✔
672
    if isinstance(x, str):
×
673
        return x
×
674
    else:
675
        raise TypeError("Not a string or bytes like object")
×
676

677

678
def to_bytes(something, encoding='utf8') -> bytes:
1✔
679
    """
680
    cast string to bytes() like object, but for python2 support it's bytearray copy
681
    """
682
    if isinstance(something, bytes):
1✔
683
        return something
1✔
684
    if isinstance(something, str):
1✔
685
        return something.encode(encoding)
1✔
686
    elif isinstance(something, bytearray):
1✔
687
        return bytes(something)
1✔
688
    else:
689
        raise TypeError("Not a string or bytes like object")
1✔
690

691

692
bfh = bytes.fromhex
1✔
693

694

695
def xor_bytes(a: bytes, b: bytes) -> bytes:
1✔
696
    size = min(len(a), len(b))
1✔
697
    return ((int.from_bytes(a[:size], "big") ^ int.from_bytes(b[:size], "big"))
1✔
698
            .to_bytes(size, "big"))
699

700

701
def user_dir():
1✔
702
    if "ELECTRUMDIR" in os.environ:
1✔
703
        return os.environ["ELECTRUMDIR"]
×
704
    elif 'ANDROID_DATA' in os.environ:
1✔
705
        return android_data_dir()
×
706
    elif os.name == 'posix':
1✔
707
        return os.path.join(os.environ["HOME"], ".electrum")
1✔
708
    elif "APPDATA" in os.environ:
×
709
        return os.path.join(os.environ["APPDATA"], "Electrum")
×
710
    elif "LOCALAPPDATA" in os.environ:
×
711
        return os.path.join(os.environ["LOCALAPPDATA"], "Electrum")
×
712
    else:
713
        #raise Exception("No home directory found in environment variables.")
714
        return
×
715

716

717
def resource_path(*parts):
1✔
718
    return os.path.join(pkg_dir, *parts)
1✔
719

720

721
# absolute path to python package folder of electrum ("lib")
722
pkg_dir = os.path.split(os.path.realpath(__file__))[0]
1✔
723

724

725
def is_valid_email(s):
1✔
726
    regexp = r"[^@]+@[^@]+\.[^@]+"
×
727
    return re.match(regexp, s) is not None
×
728

729

730
def is_valid_websocket_url(url: str) -> bool:
1✔
731
    """
732
    uses this django url validation regex:
733
    https://github.com/django/django/blob/2c6906a0c4673a7685817156576724aba13ad893/django/core/validators.py#L45C1-L52C43
734
    Note: this is not perfect, urls and their parsing can get very complex (see recent django code).
735
    however its sufficient for catching weird user input in the gui dialog
736
    """
737
    # stores the compiled regex in the function object itself to avoid recompiling it every call
738
    if not hasattr(is_valid_websocket_url, "regex"):
×
739
        is_valid_websocket_url.regex = re.compile(
×
740
            r'^(?:ws|wss)://'  # ws:// or wss://
741
            r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|'  # domain...
742
            r'localhost|'  # localhost...
743
            r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}|'  # ...or ipv4
744
            r'\[?[A-F0-9]*:[A-F0-9:]+\]?)'  # ...or ipv6
745
            r'(?::\d+)?'  # optional port
746
            r'(?:/?|[/?]\S+)$', re.IGNORECASE)
747
    try:
×
748
        return re.match(is_valid_websocket_url.regex, url) is not None
×
749
    except Exception:
×
750
        return False
×
751

752

753
def is_hash256_str(text: Any) -> bool:
1✔
754
    if not isinstance(text, str): return False
1✔
755
    if len(text) != 64: return False
1✔
756
    return is_hex_str(text)
1✔
757

758

759
def is_hex_str(text: Any) -> bool:
1✔
760
    if not isinstance(text, str): return False
1✔
761
    try:
1✔
762
        b = bytes.fromhex(text)
1✔
763
    except Exception:
1✔
764
        return False
1✔
765
    # forbid whitespaces in text:
766
    if len(text) != 2 * len(b):
1✔
767
        return False
1✔
768
    return True
1✔
769

770

771
def is_integer(val: Any) -> bool:
1✔
772
    return isinstance(val, int)
1✔
773

774

775
def is_non_negative_integer(val: Any) -> bool:
1✔
776
    if is_integer(val):
1✔
777
        return val >= 0
1✔
778
    return False
1✔
779

780

781
def is_int_or_float(val: Any) -> bool:
1✔
782
    return isinstance(val, (int, float))
1✔
783

784

785
def is_non_negative_int_or_float(val: Any) -> bool:
1✔
786
    if is_int_or_float(val):
1✔
787
        return val >= 0
1✔
788
    return False
1✔
789

790

791
def chunks(items, size: int):
1✔
792
    """Break up items, an iterable, into chunks of length size."""
793
    if size < 1:
1✔
794
        raise ValueError(f"size must be positive, not {repr(size)}")
1✔
795
    for i in range(0, len(items), size):
1✔
796
        yield items[i: i + size]
1✔
797

798

799
def format_satoshis_plain(
1✔
800
        x: Union[int, float, Decimal, str],  # amount in satoshis,
801
        *,
802
        decimal_point: int = 8,  # how much to shift decimal point to left (default: sat->BTC)
803
        is_max_allowed: bool = True,
804
) -> str:
805
    """Display a satoshi amount scaled.  Always uses a '.' as a decimal
806
    point and has no thousands separator"""
807
    if is_max_allowed and parse_max_spend(x):
1✔
808
        return f'max({x})'
×
809
    assert isinstance(x, (int, float, Decimal)), f"{x!r} should be a number"
1✔
810
    # TODO(ghost43) just hard-fail if x is a float. do we even use floats for money anywhere?
811
    x = to_decimal(x)
1✔
812
    scale_factor = pow(10, decimal_point)
1✔
813
    return "{:.8f}".format(x / scale_factor).rstrip('0').rstrip('.')
1✔
814

815

816
# Check that Decimal precision is sufficient.
817
# We need at the very least ~20, as we deal with msat amounts, and
818
# log10(21_000_000 * 10**8 * 1000) ~= 18.3
819
# decimal.DefaultContext.prec == 28 by default, but it is mutable.
820
# We enforce that we have at least that available.
821
assert decimal.getcontext().prec >= 28, f"PyDecimal precision too low: {decimal.getcontext().prec}"
1✔
822

823
# DECIMAL_POINT = locale.localeconv()['decimal_point']  # type: str
824
DECIMAL_POINT = "."
1✔
825
THOUSANDS_SEP = " "
1✔
826
assert len(DECIMAL_POINT) == 1, f"DECIMAL_POINT has unexpected len. {DECIMAL_POINT!r}"
1✔
827
assert len(THOUSANDS_SEP) == 1, f"THOUSANDS_SEP has unexpected len. {THOUSANDS_SEP!r}"
1✔
828

829

830
def format_satoshis(
1✔
831
        x: Union[int, float, Decimal, str, None],  # amount in satoshis
832
        *,
833
        num_zeros: int = 0,
834
        decimal_point: int = 8,  # how much to shift decimal point to left (default: sat->BTC)
835
        precision: int = 0,  # extra digits after satoshi precision
836
        is_diff: bool = False,  # if True, enforce a leading sign (+/-)
837
        whitespaces: bool = False,  # if True, add whitespaces, to align numbers in a column
838
        add_thousands_sep: bool = False,  # if True, add whitespaces, for better readability of the numbers
839
) -> str:
840
    if x is None:
1✔
841
        return 'unknown'
×
842
    if parse_max_spend(x):
1✔
843
        return f'max({x})'
×
844
    assert isinstance(x, (int, float, Decimal)), f"{x!r} should be a number"
1✔
845
    # TODO(ghost43) just hard-fail if x is a float. do we even use floats for money anywhere?
846
    x = to_decimal(x)
1✔
847
    # lose redundant precision
848
    x = x.quantize(Decimal(10) ** (-precision))
1✔
849
    # format string
850
    overall_precision = decimal_point + precision  # max digits after final decimal point
1✔
851
    decimal_format = "." + str(overall_precision) if overall_precision > 0 else ""
1✔
852
    if is_diff:
1✔
853
        decimal_format = '+' + decimal_format
1✔
854
    # initial result
855
    scale_factor = pow(10, decimal_point)
1✔
856
    result = ("{:" + decimal_format + "f}").format(x / scale_factor)
1✔
857
    if "." not in result: result += "."
1✔
858
    result = result.rstrip('0')
1✔
859
    # add extra decimal places (zeros)
860
    integer_part, fract_part = result.split(".")
1✔
861
    if len(fract_part) < num_zeros:
1✔
862
        fract_part += "0" * (num_zeros - len(fract_part))
1✔
863
    # add whitespaces as thousands' separator for better readability of numbers
864
    if add_thousands_sep:
1✔
865
        sign = integer_part[0] if integer_part[0] in ("+", "-") else ""
1✔
866
        if sign == "-":
1✔
867
            integer_part = integer_part[1:]
1✔
868
        integer_part = "{:,}".format(int(integer_part)).replace(',', THOUSANDS_SEP)
1✔
869
        integer_part = sign + integer_part
1✔
870
        fract_part = THOUSANDS_SEP.join(fract_part[i:i+3] for i in range(0, len(fract_part), 3))
1✔
871
    result = integer_part + DECIMAL_POINT + fract_part
1✔
872
    # add leading/trailing whitespaces so that numbers can be aligned in a column
873
    if whitespaces:
1✔
874
        target_fract_len = overall_precision
1✔
875
        target_integer_len = 14 - decimal_point  # should be enough for up to unsigned 999999 BTC
1✔
876
        if add_thousands_sep:
1✔
877
            target_fract_len += max(0, (target_fract_len - 1) // 3)
1✔
878
            target_integer_len += max(0, (target_integer_len - 1) // 3)
1✔
879
        # add trailing whitespaces
880
        result += " " * (target_fract_len - len(fract_part))
1✔
881
        # add leading whitespaces
882
        target_total_len = target_integer_len + 1 + target_fract_len
1✔
883
        result = " " * (target_total_len - len(result)) + result
1✔
884
    return result
1✔
885

886

887
FEERATE_PRECISION = 1  # num fractional decimal places for sat/byte fee rates
1✔
888
_feerate_quanta = Decimal(10) ** (-FEERATE_PRECISION)
1✔
889
UI_UNIT_NAME_FEERATE_SAT_PER_VBYTE = "sat/vbyte"
1✔
890
UI_UNIT_NAME_FEERATE_SAT_PER_VB = "sat/vB"
1✔
891
UI_UNIT_NAME_TXSIZE_VBYTES = "vbytes"
1✔
892
UI_UNIT_NAME_MEMPOOL_MB = "vMB"
1✔
893

894

895
def format_fee_satoshis(fee, *, num_zeros=0, precision=None):
1✔
896
    if precision is None:
1✔
897
        precision = FEERATE_PRECISION
1✔
898
    num_zeros = min(num_zeros, FEERATE_PRECISION)  # no more zeroes than available prec
1✔
899
    return format_satoshis(fee, num_zeros=num_zeros, decimal_point=0, precision=precision)
1✔
900

901

902
def quantize_feerate(fee) -> Union[None, Decimal, int]:
1✔
903
    """Strip sat/byte fee rate of excess precision."""
904
    if fee is None:
1✔
905
        return None
×
906
    return Decimal(fee).quantize(_feerate_quanta, rounding=decimal.ROUND_HALF_DOWN)
1✔
907

908

909
DEFAULT_TIMEZONE = None  # type: timezone | None  # None means local OS timezone
1✔
910
def timestamp_to_datetime(timestamp: Union[int, float, None], *, utc: bool = False) -> Optional[datetime]:
1✔
911
    if timestamp is None:
1✔
912
        return None
×
913
    tz = DEFAULT_TIMEZONE
1✔
914
    if utc:
1✔
915
        tz = timezone.utc
×
916
    return datetime.fromtimestamp(timestamp, tz=tz)
1✔
917

918

919
def format_time(timestamp: Union[int, float, None]) -> str:
1✔
920
    date = timestamp_to_datetime(timestamp)
×
921
    return date.isoformat(' ', timespec="minutes") if date else _("Unknown")
×
922

923

924
def age(
1✔
925
    from_date: Union[int, float, None],  # POSIX timestamp
926
    *,
927
    since_date: datetime = None,
928
    target_tz=None,
929
    include_seconds: bool = False,
930
) -> str:
931
    """Takes a timestamp and returns a string with the approximation of the age"""
932
    if from_date is None:
1✔
933
        return _("Unknown")
1✔
934
    from_date = datetime.fromtimestamp(from_date)
1✔
935
    if since_date is None:
1✔
936
        since_date = datetime.now(target_tz)
×
937
    distance_in_time = from_date - since_date
1✔
938
    is_in_past = from_date < since_date
1✔
939
    s = delta_time_str(distance_in_time, include_seconds=include_seconds)
1✔
940
    return _("{} ago").format(s) if is_in_past else _("in {}").format(s)
1✔
941

942

943
def delta_time_str(distance_in_time: timedelta, *, include_seconds: bool = False) -> str:
1✔
944
    distance_in_seconds = int(round(abs(distance_in_time.days * 86400 + distance_in_time.seconds)))
1✔
945
    distance_in_minutes = int(round(distance_in_seconds / 60))
1✔
946
    if distance_in_minutes == 0:
1✔
947
        if include_seconds:
1✔
948
            return _("{} seconds").format(distance_in_seconds)
1✔
949
        else:
950
            return _("less than a minute")
1✔
951
    elif distance_in_minutes < 45:
1✔
952
        return _("about {} minutes").format(distance_in_minutes)
1✔
953
    elif distance_in_minutes < 90:
1✔
954
        return _("about 1 hour")
1✔
955
    elif distance_in_minutes < 1440:
1✔
956
        return _("about {} hours").format(round(distance_in_minutes / 60.0))
1✔
957
    elif distance_in_minutes < 2880:
1✔
958
        return _("about 1 day")
1✔
959
    elif distance_in_minutes < 43220:
1✔
960
        return _("about {} days").format(round(distance_in_minutes / 1440))
1✔
961
    elif distance_in_minutes < 86400:
1✔
962
        return _("about 1 month")
1✔
963
    elif distance_in_minutes < 525600:
1✔
964
        return _("about {} months").format(round(distance_in_minutes / 43200))
1✔
965
    elif distance_in_minutes < 1051200:
1✔
966
        return _("about 1 year")
1✔
967
    else:
968
        return _("over {} years").format(round(distance_in_minutes / 525600))
1✔
969

970

971
mainnet_block_explorers = {
1✔
972
    '3xpl.com': ('https://3xpl.com/bitcoin/',
973
                        {'tx': 'transaction/', 'addr': 'address/'}),
974
    'Bitflyer.jp': ('https://chainflyer.bitflyer.jp/',
975
                        {'tx': 'Transaction/', 'addr': 'Address/'}),
976
    'Blockchain.info': ('https://blockchain.com/btc/',
977
                        {'tx': 'tx/', 'addr': 'address/'}),
978
    'Blockstream.info': ('https://blockstream.info/',
979
                        {'tx': 'tx/', 'addr': 'address/'}),
980
    'Bitaps.com': ('https://btc.bitaps.com/',
981
                        {'tx': '', 'addr': ''}),
982
    'BTC.com': ('https://btc.com/',
983
                        {'tx': '', 'addr': ''}),
984
    'Chain.so': ('https://www.chain.so/',
985
                        {'tx': 'tx/BTC/', 'addr': 'address/BTC/'}),
986
    'Insight.is': ('https://insight.bitpay.com/',
987
                        {'tx': 'tx/', 'addr': 'address/'}),
988
    'BlockCypher.com': ('https://live.blockcypher.com/btc/',
989
                        {'tx': 'tx/', 'addr': 'address/'}),
990
    'Blockchair.com': ('https://blockchair.com/bitcoin/',
991
                        {'tx': 'transaction/', 'addr': 'address/'}),
992
    'blockonomics.co': ('https://www.blockonomics.co/',
993
                        {'tx': 'api/tx?txid=', 'addr': '#/search?q='}),
994
    'mempool.space': ('https://mempool.space/',
995
                        {'tx': 'tx/', 'addr': 'address/'}),
996
    'mempool.emzy.de': ('https://mempool.emzy.de/',
997
                        {'tx': 'tx/', 'addr': 'address/'}),
998
    'OXT.me': ('https://oxt.me/',
999
                        {'tx': 'transaction/', 'addr': 'address/'}),
1000
    'mynode.local': ('http://mynode.local:3002/',
1001
                        {'tx': 'tx/', 'addr': 'address/'}),
1002
    'system default': ('blockchain:/',
1003
                        {'tx': 'tx/', 'addr': 'address/'}),
1004
}
1005

1006
testnet_block_explorers = {
1✔
1007
    'Bitaps.com': ('https://tbtc.bitaps.com/',
1008
                       {'tx': '', 'addr': ''}),
1009
    'BlockCypher.com': ('https://live.blockcypher.com/btc-testnet/',
1010
                       {'tx': 'tx/', 'addr': 'address/'}),
1011
    'Blockchain.info': ('https://www.blockchain.com/btc-testnet/',
1012
                       {'tx': 'tx/', 'addr': 'address/'}),
1013
    'Blockstream.info': ('https://blockstream.info/testnet/',
1014
                        {'tx': 'tx/', 'addr': 'address/'}),
1015
    'mempool.space': ('https://mempool.space/testnet/',
1016
                        {'tx': 'tx/', 'addr': 'address/'}),
1017
    'smartbit.com.au': ('https://testnet.smartbit.com.au/',
1018
                       {'tx': 'tx/', 'addr': 'address/'}),
1019
    'system default': ('blockchain://000000000933ea01ad0ee984209779baaec3ced90fa3f408719526f8d77f4943/',
1020
                       {'tx': 'tx/', 'addr': 'address/'}),
1021
}
1022

1023
testnet4_block_explorers = {
1✔
1024
    'mempool.space': ('https://mempool.space/testnet4/',
1025
                        {'tx': 'tx/', 'addr': 'address/'}),
1026
    'wakiyamap.dev': ('https://testnet4-explorer.wakiyamap.dev/',
1027
                       {'tx': 'tx/', 'addr': 'address/'}),
1028
}
1029

1030
signet_block_explorers = {
1✔
1031
    'bc-2.jp': ('https://explorer.bc-2.jp/',
1032
                        {'tx': 'tx/', 'addr': 'address/'}),
1033
    'mempool.space': ('https://mempool.space/signet/',
1034
                        {'tx': 'tx/', 'addr': 'address/'}),
1035
    'bitcoinexplorer.org': ('https://signet.bitcoinexplorer.org/',
1036
                       {'tx': 'tx/', 'addr': 'address/'}),
1037
    'wakiyamap.dev': ('https://signet-explorer.wakiyamap.dev/',
1038
                       {'tx': 'tx/', 'addr': 'address/'}),
1039
    'ex.signet.bublina.eu.org': ('https://ex.signet.bublina.eu.org/',
1040
                       {'tx': 'tx/', 'addr': 'address/'}),
1041
    'system default': ('blockchain:/',
1042
                       {'tx': 'tx/', 'addr': 'address/'}),
1043
}
1044

1045
_block_explorer_default_api_loc = {'tx': 'tx/', 'addr': 'address/'}
1✔
1046

1047

1048
def block_explorer_info():
1✔
1049
    from . import constants
×
1050
    if constants.net.NET_NAME == "testnet":
×
1051
        return testnet_block_explorers
×
1052
    elif constants.net.NET_NAME == "testnet4":
×
1053
        return testnet4_block_explorers
×
1054
    elif constants.net.NET_NAME == "signet":
×
1055
        return signet_block_explorers
×
1056
    return mainnet_block_explorers
×
1057

1058

1059
def block_explorer(config: 'SimpleConfig') -> Optional[str]:
1✔
1060
    """Returns name of selected block explorer,
1061
    or None if a custom one (not among hardcoded ones) is configured.
1062
    """
1063
    if config.BLOCK_EXPLORER_CUSTOM is not None:
×
1064
        return None
×
1065
    be_key = config.BLOCK_EXPLORER
×
1066
    be_tuple = block_explorer_info().get(be_key)
×
1067
    if be_tuple is None:
×
1068
        be_key = config.cv.BLOCK_EXPLORER.get_default_value()
×
1069
    assert isinstance(be_key, str), f"{be_key!r} should be str"
×
1070
    return be_key
×
1071

1072

1073
def block_explorer_tuple(config: 'SimpleConfig') -> Optional[Tuple[str, dict]]:
1✔
1074
    custom_be = config.BLOCK_EXPLORER_CUSTOM
×
1075
    if custom_be:
×
1076
        if isinstance(custom_be, str):
×
1077
            return custom_be, _block_explorer_default_api_loc
×
1078
        if isinstance(custom_be, (tuple, list)) and len(custom_be) == 2:
×
1079
            return tuple(custom_be)
×
1080
        _logger.warning(f"not using {config.cv.BLOCK_EXPLORER_CUSTOM.key()!r} from config. "
×
1081
                        f"expected a str or a pair but got {custom_be!r}")
1082
        return None
×
1083
    else:
1084
        # using one of the hardcoded block explorers
1085
        return block_explorer_info().get(block_explorer(config))
×
1086

1087

1088
def block_explorer_URL(config: 'SimpleConfig', kind: str, item: str) -> Optional[str]:
1✔
1089
    be_tuple = block_explorer_tuple(config)
×
1090
    if not be_tuple:
×
1091
        return
×
1092
    explorer_url, explorer_dict = be_tuple
×
1093
    kind_str = explorer_dict.get(kind)
×
1094
    if kind_str is None:
×
1095
        return
×
1096
    if explorer_url[-1] != "/":
×
1097
        explorer_url += "/"
×
1098
    url_parts = [explorer_url, kind_str, item]
×
1099
    return ''.join(url_parts)
×
1100

1101

1102
# Python bug (http://bugs.python.org/issue1927) causes raw_input
1103
# to be redirected improperly between stdin/stderr on Unix systems
1104
#TODO: py3
1105
def raw_input(prompt=None):
1✔
1106
    if prompt:
×
1107
        sys.stdout.write(prompt)
×
1108
    return builtin_raw_input()
×
1109

1110

1111
builtin_raw_input = builtins.input
1✔
1112
builtins.input = raw_input
1✔
1113

1114

1115
def parse_json(message):
1✔
1116
    # TODO: check \r\n pattern
1117
    n = message.find(b'\n')
×
1118
    if n == -1:
×
1119
        return None, message
×
1120
    try:
×
1121
        j = json.loads(message[0:n].decode('utf8'))
×
1122
    except Exception:
×
1123
        j = None
×
1124
    return j, message[n+1:]
×
1125

1126

1127
def setup_thread_excepthook():
1✔
1128
    """
1129
    Workaround for `sys.excepthook` thread bug from:
1130
    http://bugs.python.org/issue1230540
1131

1132
    Call once from the main thread before creating any threads.
1133
    """
1134

1135
    init_original = threading.Thread.__init__
×
1136

1137
    def init(self, *args, **kwargs):
×
1138

1139
        init_original(self, *args, **kwargs)
×
1140
        run_original = self.run
×
1141

1142
        def run_with_except_hook(*args2, **kwargs2):
×
1143
            try:
×
1144
                run_original(*args2, **kwargs2)
×
1145
            except Exception:
×
1146
                sys.excepthook(*sys.exc_info())
×
1147

1148
        self.run = run_with_except_hook
×
1149

1150
    threading.Thread.__init__ = init
×
1151

1152

1153
def send_exception_to_crash_reporter(e: BaseException):
1✔
1154
    from .base_crash_reporter import send_exception_to_crash_reporter
×
1155
    send_exception_to_crash_reporter(e)
×
1156

1157

1158
def versiontuple(v):
1✔
1159
    return tuple(map(int, (v.split("."))))
1✔
1160

1161

1162
def read_json_file(path):
1✔
1163
    try:
1✔
1164
        with open(path, 'r', encoding='utf-8') as f:
1✔
1165
            data = json.loads(f.read())
1✔
1166
    except json.JSONDecodeError:
×
1167
        _logger.exception('')
×
1168
        raise FileImportFailed(_("Invalid JSON code."))
×
1169
    except BaseException as e:
×
1170
        _logger.exception('')
×
1171
        raise FileImportFailed(e)
×
1172
    return data
1✔
1173

1174

1175
def write_json_file(path, data):
1✔
1176
    try:
×
1177
        with open(path, 'w+', encoding='utf-8') as f:
×
1178
            json.dump(data, f, indent=4, sort_keys=True, cls=MyEncoder)
×
1179
    except (IOError, os.error) as e:
×
1180
        _logger.exception('')
×
1181
        raise FileExportFailed(e)
×
1182

1183

1184
def os_chmod(path, mode):
1✔
1185
    """os.chmod aware of tmpfs"""
1186
    try:
1✔
1187
        os.chmod(path, mode)
1✔
1188
    except OSError as e:
×
1189
        xdg_runtime_dir = os.environ.get("XDG_RUNTIME_DIR", None)
×
1190
        if xdg_runtime_dir and is_subpath(path, xdg_runtime_dir):
×
1191
            _logger.info(f"Tried to chmod in tmpfs. Skipping... {e!r}")
×
1192
        else:
1193
            raise
×
1194

1195

1196
def make_dir(path, *, allow_symlink=True):
1✔
1197
    """Makes directory if it does not yet exist.
1198
    Also sets sane 0700 permissions on the dir.
1199
    """
1200
    if not os.path.exists(path):
1✔
1201
        if not allow_symlink and os.path.islink(path):
1✔
1202
            raise Exception('Dangling link: ' + path)
×
1203
        try:
1✔
1204
            os.mkdir(path)
1✔
1205
        except FileExistsError:
×
1206
            # this can happen in a multiprocess race, e.g. when an electrum daemon
1207
            # and an electrum cli command are launched in rapid fire
1208
            pass
×
1209
        os_chmod(path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
1✔
1210
        assert os.path.exists(path)
1✔
1211

1212

1213
def is_subpath(long_path: str, short_path: str) -> bool:
1✔
1214
    """Returns whether long_path is a sub-path of short_path."""
1215
    try:
1✔
1216
        common = os.path.commonpath([long_path, short_path])
1✔
1217
    except ValueError:
1✔
1218
        return False
1✔
1219
    short_path = standardize_path(short_path)
1✔
1220
    common     = standardize_path(common)
1✔
1221
    return short_path == common
1✔
1222

1223

1224
def log_exceptions(func):
1✔
1225
    """Decorator to log AND re-raise exceptions."""
1226
    assert inspect.iscoroutinefunction(func), 'func needs to be a coroutine'
1✔
1227

1228
    @functools.wraps(func)
1✔
1229
    async def wrapper(*args, **kwargs):
1✔
1230
        self = args[0] if len(args) > 0 else None
1✔
1231
        try:
1✔
1232
            return await func(*args, **kwargs)
1✔
1233
        except asyncio.CancelledError as e:
1✔
1234
            raise
1✔
1235
        except BaseException as e:
1✔
1236
            mylogger = self.logger if hasattr(self, 'logger') else _logger
1✔
1237
            try:
1✔
1238
                mylogger.exception(f"Exception in {func.__name__}: {repr(e)}")
1✔
1239
            except BaseException as e2:
×
1240
                print(f"logging exception raised: {repr(e2)}... orig exc: {repr(e)} in {func.__name__}")
×
1241
            raise
1✔
1242
    return wrapper
1✔
1243

1244

1245
def ignore_exceptions(func):
1✔
1246
    """Decorator to silently swallow all exceptions."""
1247
    assert inspect.iscoroutinefunction(func), 'func needs to be a coroutine'
1✔
1248

1249
    @functools.wraps(func)
1✔
1250
    async def wrapper(*args, **kwargs):
1✔
1251
        try:
1✔
1252
            return await func(*args, **kwargs)
1✔
1253
        except Exception as e:
×
1254
            pass
×
1255
    return wrapper
1✔
1256

1257

1258
def with_lock(func):
1✔
1259
    """Decorator to enforce a lock on a function call."""
1260
    @functools.wraps(func)
1✔
1261
    def func_wrapper(self, *args, **kwargs):
1✔
1262
        with self.lock:
1✔
1263
            return func(self, *args, **kwargs)
1✔
1264
    return func_wrapper
1✔
1265

1266

1267
@dataclass(frozen=True, kw_only=True)
1✔
1268
class TxMinedInfo:
1✔
1269
    _height: int                       # height of block that mined tx
1✔
1270
    conf: Optional[int] = None         # number of confirmations, SPV verified. >=0, or None (None means unknown)
1✔
1271
    timestamp: Optional[int] = None    # timestamp of block that mined tx
1✔
1272
    txpos: Optional[int] = None        # position of tx in serialized block
1✔
1273
    header_hash: Optional[str] = None  # hash of block that mined tx
1✔
1274
    wanted_height: Optional[int] = None  # in case of timelock, min abs block height
1✔
1275

1276
    def height(self) -> int:
1✔
1277
        """Treat unverified heights as unconfirmed."""
1278
        h = self._height
1✔
1279
        if h > 0:
1✔
1280
            if self.conf is not None and self.conf >= 1:
1✔
1281
                return h
1✔
1282
            return 0  # treat it as unconfirmed until SPV-ed
1✔
1283
        else:  # h <= 0
1284
            return h
1✔
1285

1286
    def short_id(self) -> Optional[str]:
1✔
1287
        if self.txpos is not None and self.txpos >= 0:
×
1288
            assert self.height() > 0
×
1289
            return f"{self.height()}x{self.txpos}"
×
1290
        return None
×
1291

1292
    def is_local_like(self) -> bool:
1✔
1293
        """Returns whether the tx is local-like (LOCAL/FUTURE)."""
1294
        from .address_synchronizer import TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT
×
1295
        if self.height() > 0:
×
1296
            return False
×
1297
        if self.height() in (TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT):
×
1298
            return False
×
1299
        return True
×
1300

1301

1302
class ShortID(bytes):
1✔
1303

1304
    def __repr__(self):
1✔
1305
        return f"<ShortID: {format_short_id(self)}>"
1✔
1306

1307
    def __str__(self):
1✔
1308
        return format_short_id(self)
1✔
1309

1310
    @classmethod
1✔
1311
    def from_components(cls, block_height: int, tx_pos_in_block: int, output_index: int) -> 'ShortID':
1✔
1312
        bh = block_height.to_bytes(3, byteorder='big')
1✔
1313
        tpos = tx_pos_in_block.to_bytes(3, byteorder='big')
1✔
1314
        oi = output_index.to_bytes(2, byteorder='big')
1✔
1315
        return ShortID(bh + tpos + oi)
1✔
1316

1317
    @classmethod
1✔
1318
    def from_str(cls, scid: str) -> 'ShortID':
1✔
1319
        """Parses a formatted scid str, e.g. '643920x356x0'."""
1320
        components = scid.split("x")
1✔
1321
        if len(components) != 3:
1✔
1322
            raise ValueError(f"failed to parse ShortID: {scid!r}")
×
1323
        try:
1✔
1324
            components = [int(x) for x in components]
1✔
1325
        except ValueError:
×
1326
            raise ValueError(f"failed to parse ShortID: {scid!r}") from None
×
1327
        return ShortID.from_components(*components)
1✔
1328

1329
    @classmethod
1✔
1330
    def normalize(cls, data: Union[None, str, bytes, 'ShortID']) -> Optional['ShortID']:
1✔
1331
        if isinstance(data, ShortID) or data is None:
1✔
1332
            return data
1✔
1333
        if isinstance(data, str):
1✔
1334
            assert len(data) == 16
1✔
1335
            return ShortID.fromhex(data)
1✔
1336
        if isinstance(data, (bytes, bytearray)):
1✔
1337
            assert len(data) == 8
1✔
1338
            return ShortID(data)
1✔
1339

1340
    @property
1✔
1341
    def block_height(self) -> int:
1✔
1342
        return int.from_bytes(self[:3], byteorder='big')
1✔
1343

1344
    @property
1✔
1345
    def txpos(self) -> int:
1✔
1346
        return int.from_bytes(self[3:6], byteorder='big')
1✔
1347

1348
    @property
1✔
1349
    def output_index(self) -> int:
1✔
1350
        return int.from_bytes(self[6:8], byteorder='big')
1✔
1351

1352

1353
def format_short_id(short_channel_id: Optional[bytes]):
1✔
1354
    if not short_channel_id:
1✔
1355
        return _('Not yet available')
×
1356
    return str(int.from_bytes(short_channel_id[:3], 'big')) \
1✔
1357
        + 'x' + str(int.from_bytes(short_channel_id[3:6], 'big')) \
1358
        + 'x' + str(int.from_bytes(short_channel_id[6:], 'big'))
1359

1360

1361
def make_aiohttp_proxy_connector(proxy: 'ProxySettings', ssl_context: Optional[ssl.SSLContext] = None) -> ProxyConnector:
1✔
1362
    return ProxyConnector(
×
1363
        proxy_type=ProxyType.SOCKS5 if proxy.mode == 'socks5' else ProxyType.SOCKS4,
1364
        host=proxy.host,
1365
        port=int(proxy.port),
1366
        username=proxy.user,
1367
        password=proxy.password,
1368
        rdns=True,  # needed to prevent DNS leaks over proxy
1369
        ssl=ssl_context,
1370
    )
1371

1372

1373
def make_aiohttp_session(proxy: Optional['ProxySettings'], headers=None, timeout=None):
1✔
1374
    if headers is None:
×
1375
        headers = {'User-Agent': 'Electrum'}
×
1376
    if timeout is None:
×
1377
        # The default timeout is high intentionally.
1378
        # DNS on some systems can be really slow, see e.g. #5337
1379
        timeout = aiohttp.ClientTimeout(total=45)
×
1380
    elif isinstance(timeout, (int, float)):
×
1381
        timeout = aiohttp.ClientTimeout(total=timeout)
×
1382
    ssl_context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_path)
×
1383

1384
    if proxy and proxy.enabled:
×
1385
        connector = make_aiohttp_proxy_connector(proxy, ssl_context)
×
1386
    else:
1387
        connector = aiohttp.TCPConnector(ssl=ssl_context)
×
1388

1389
    return aiohttp.ClientSession(headers=headers, timeout=timeout, connector=connector)
×
1390

1391

1392
class OldTaskGroup(aiorpcx.TaskGroup):
1✔
1393
    """Automatically raises exceptions on join; as in aiorpcx prior to version 0.20.
1394
    That is, when using TaskGroup as a context manager, if any task encounters an exception,
1395
    we would like that exception to be re-raised (propagated out). For the wait=all case,
1396
    the OldTaskGroup class is emulating the following code-snippet:
1397
    ```
1398
    async with TaskGroup() as group:
1399
        await group.spawn(task1())
1400
        await group.spawn(task2())
1401

1402
        async for task in group:
1403
            if not task.cancelled():
1404
                task.result()
1405
    ```
1406
    So instead of the above, one can just write:
1407
    ```
1408
    async with OldTaskGroup() as group:
1409
        await group.spawn(task1())
1410
        await group.spawn(task2())
1411
    ```
1412
    # TODO see if we can migrate to asyncio.timeout, introduced in python 3.11, and use stdlib instead of aiorpcx.curio...
1413
    """
1414
    async def join(self):
1✔
1415
        if self._wait is all:
1✔
1416
            exc = False
1✔
1417
            try:
1✔
1418
                async for task in self:
1✔
1419
                    if not task.cancelled():
1✔
1420
                        task.result()
1✔
1421
            except BaseException:  # including asyncio.CancelledError
1✔
1422
                exc = True
1✔
1423
                raise
1✔
1424
            finally:
1425
                if exc:
1✔
1426
                    await self.cancel_remaining()
1✔
1427
                await super().join()
1✔
1428
        else:
1429
            await super().join()
1✔
1430
            if self.completed:
1✔
1431
                self.completed.result()
1✔
1432

1433

1434
# We monkey-patch aiorpcx TimeoutAfter (used by timeout_after and ignore_after API),
1435
# to fix a timing issue present in asyncio as a whole re timing out tasks.
1436
# To see the issue we are trying to fix, consider example:
1437
#     async def outer_task():
1438
#         async with timeout_after(0.1):
1439
#             await inner_task()
1440
# When the 0.1 sec timeout expires, inner_task will get cancelled by timeout_after (=internal cancellation).
1441
# If around the same time (in terms of event loop iterations) another coroutine
1442
# cancels outer_task (=external cancellation), there will be a race.
1443
# Both cancellations work by propagating a CancelledError out to timeout_after, which then
1444
# needs to decide (in TimeoutAfter.__aexit__) whether it's due to an internal or external cancellation.
1445
# AFAICT asyncio provides no reliable way of distinguishing between the two.
1446
# This patch tries to always give priority to external cancellations.
1447
# see https://github.com/kyuupichan/aiorpcX/issues/44
1448
# see https://github.com/aio-libs/async-timeout/issues/229
1449
# see https://bugs.python.org/issue42130 and https://bugs.python.org/issue45098
1450
# TODO see if we can migrate to asyncio.timeout, introduced in python 3.11, and use stdlib instead of aiorpcx.curio...
1451
def _aiorpcx_monkeypatched_set_new_deadline(task, deadline):
1✔
1452
    def timeout_task():
1✔
1453
        task._orig_cancel()
1✔
1454
        task._timed_out = None if getattr(task, "_externally_cancelled", False) else deadline
1✔
1455

1456
    def mycancel(*args, **kwargs):
1✔
1457
        task._orig_cancel(*args, **kwargs)
1✔
1458
        task._externally_cancelled = True
1✔
1459
        task._timed_out = None
1✔
1460

1461
    if not hasattr(task, "_orig_cancel"):
1✔
1462
        task._orig_cancel = task.cancel
1✔
1463
        task.cancel = mycancel
1✔
1464
    task._deadline_handle = task._loop.call_at(deadline, timeout_task)
1✔
1465

1466

1467
def _aiorpcx_monkeypatched_set_task_deadline(task, deadline):
1✔
1468
    ret = _aiorpcx_orig_set_task_deadline(task, deadline)
1✔
1469
    task._externally_cancelled = None
1✔
1470
    return ret
1✔
1471

1472

1473
def _aiorpcx_monkeypatched_unset_task_deadline(task):
1✔
1474
    if hasattr(task, "_orig_cancel"):
1✔
1475
        task.cancel = task._orig_cancel
1✔
1476
        del task._orig_cancel
1✔
1477
    return _aiorpcx_orig_unset_task_deadline(task)
1✔
1478

1479

1480
_aiorpcx_orig_set_task_deadline    = aiorpcx.curio._set_task_deadline
1✔
1481
_aiorpcx_orig_unset_task_deadline  = aiorpcx.curio._unset_task_deadline
1✔
1482

1483
aiorpcx.curio._set_new_deadline    = _aiorpcx_monkeypatched_set_new_deadline
1✔
1484
aiorpcx.curio._set_task_deadline   = _aiorpcx_monkeypatched_set_task_deadline
1✔
1485
aiorpcx.curio._unset_task_deadline = _aiorpcx_monkeypatched_unset_task_deadline
1✔
1486

1487

1488
async def wait_for2(fut: Awaitable, timeout: Union[int, float, None]):
1✔
1489
    """Replacement for asyncio.wait_for,
1490
     due to bugs: https://bugs.python.org/issue42130 and https://github.com/python/cpython/issues/86296 ,
1491
     which are only fixed in python 3.12+.
1492
     """
1493
    if sys.version_info[:3] >= (3, 12):
1✔
1494
        return await asyncio.wait_for(fut, timeout)
×
1495
    else:
1496
        async with async_timeout(timeout):
1✔
1497
            return await asyncio.ensure_future(fut, loop=get_running_loop())
1✔
1498

1499

1500
if hasattr(asyncio, 'timeout'):  # python 3.11+
1✔
1501
    async_timeout = asyncio.timeout
×
1502
else:
1503
    class TimeoutAfterAsynciolike(aiorpcx.curio.TimeoutAfter):
1✔
1504
        async def __aexit__(self, exc_type, exc_value, tb):
1✔
1505
            try:
1✔
1506
                await super().__aexit__(exc_type, exc_value, tb)
1✔
1507
            except (aiorpcx.TaskTimeout, aiorpcx.UncaughtTimeoutError):
×
1508
                raise asyncio.TimeoutError from None
×
1509
            except aiorpcx.TimeoutCancellationError:
×
1510
                raise asyncio.CancelledError from None
×
1511

1512
    def async_timeout(delay: Union[int, float, None]):
1✔
1513
        if delay is None:
1✔
1514
            return nullcontext()
1✔
1515
        return TimeoutAfterAsynciolike(delay)
1✔
1516

1517

1518
class NetworkJobOnDefaultServer(Logger, ABC):
1✔
1519
    """An abstract base class for a job that runs on the main network
1520
    interface. Every time the main interface changes, the job is
1521
    restarted, and some of its internals are reset.
1522
    """
1523
    def __init__(self, network: 'Network'):
1✔
1524
        Logger.__init__(self)
1✔
1525
        self.network = network
1✔
1526
        self.interface = None  # type: Interface
1✔
1527
        self._restart_lock = asyncio.Lock()
1✔
1528
        # Ensure fairness between NetworkJobs. e.g. if multiple wallets
1529
        # are open, a large wallet's Synchronizer should not starve the small wallets:
1530
        self._network_request_semaphore = asyncio.Semaphore(100)
1✔
1531

1532
        self._reset()
1✔
1533
        # every time the main interface changes, restart:
1534
        register_callback(self._restart, ['default_server_changed'])
1✔
1535
        # also schedule a one-off restart now, as there might already be a main interface:
1536
        asyncio.run_coroutine_threadsafe(self._restart(), network.asyncio_loop)
1✔
1537

1538
    def _reset(self):
1✔
1539
        """Initialise fields. Called every time the underlying
1540
        server connection changes.
1541
        """
1542
        self.taskgroup = OldTaskGroup()
1✔
1543
        self.reset_request_counters()
1✔
1544

1545
    async def _start(self, interface: 'Interface'):
1✔
1546
        self.logger.debug(f"starting. interface.server={repr(str(interface.server))}")
×
1547
        self.interface = interface
×
1548

1549
        taskgroup = self.taskgroup
×
1550

1551
        async def run_tasks_wrapper():
×
1552
            self.logger.debug(f"starting taskgroup ({hex(id(taskgroup))}).")
×
1553
            try:
×
1554
                await self._run_tasks(taskgroup=taskgroup)
×
1555
            except Exception as e:
×
1556
                self.logger.error(f"taskgroup died ({hex(id(taskgroup))}). exc={e!r}")
×
1557
                raise
×
1558
            finally:
1559
                self.logger.debug(f"taskgroup stopped ({hex(id(taskgroup))}).")
×
1560
        await interface.taskgroup.spawn(run_tasks_wrapper)
×
1561

1562
    @abstractmethod
1✔
1563
    async def _run_tasks(self, *, taskgroup: OldTaskGroup) -> None:
1✔
1564
        """Start tasks in taskgroup. Called every time the underlying
1565
        server connection changes.
1566
        """
1567
        # If self.taskgroup changed, don't start tasks. This can happen if we have
1568
        # been restarted *just now*, i.e. after the _run_tasks coroutine object was created.
1569
        if taskgroup != self.taskgroup:
×
1570
            raise asyncio.CancelledError()
×
1571

1572
    async def stop(self, *, full_shutdown: bool = True):
1✔
1573
        self.logger.debug(f"stopping. {full_shutdown=}")
×
1574
        if full_shutdown:
×
1575
            unregister_callback(self._restart)
×
1576
        await self.taskgroup.cancel_remaining()
×
1577

1578
    @log_exceptions
1✔
1579
    async def _restart(self, *args):
1✔
1580
        interface = self.network.interface
1✔
1581
        if interface is None:
1✔
1582
            return  # we should get called again soon
1✔
1583

1584
        async with self._restart_lock:
×
1585
            await self.stop(full_shutdown=False)
×
1586
            self._reset()
×
1587
            await self._start(interface)
×
1588

1589
    def reset_request_counters(self):
1✔
1590
        self._requests_sent = 0
1✔
1591
        self._requests_answered = 0
1✔
1592

1593
    def num_requests_sent_and_answered(self) -> Tuple[int, int]:
1✔
1594
        return self._requests_sent, self._requests_answered
×
1595

1596
    @property
1✔
1597
    def session(self):
1✔
1598
        s = self.interface.session
×
1599
        assert s is not None
×
1600
        return s
×
1601

1602

1603
async def detect_tor_socks_proxy() -> Optional[Tuple[str, int]]:
1✔
1604
    # Probable ports for Tor to listen at
1605
    candidates = [
×
1606
        ("127.0.0.1", 9050),
1607
        ("127.0.0.1", 9051),
1608
        ("127.0.0.1", 9150),
1609
    ]
1610

1611
    proxy_addr = None
×
1612

1613
    async def test_net_addr(net_addr):
×
1614
        is_tor = await is_tor_socks_port(*net_addr)
×
1615
        # set result, and cancel remaining probes
1616
        if is_tor:
×
1617
            nonlocal proxy_addr
1618
            proxy_addr = net_addr
×
1619
            await group.cancel_remaining()
×
1620

1621
    async with OldTaskGroup() as group:
×
1622
        for net_addr in candidates:
×
1623
            await group.spawn(test_net_addr(net_addr))
×
1624
    return proxy_addr
×
1625

1626

1627
@log_exceptions
1✔
1628
async def is_tor_socks_port(host: str, port: int) -> bool:
1✔
1629
    # mimic "tor-resolve 0.0.0.0".
1630
    # see https://github.com/spesmilo/electrum/issues/7317#issuecomment-1369281075
1631
    # > this is a socks5 handshake, followed by a socks RESOLVE request as defined in
1632
    # > [tor's socks extension spec](https://github.com/torproject/torspec/blob/7116c9cdaba248aae07a3f1d0e15d9dd102f62c5/socks-extensions.txt#L63),
1633
    # > resolving 0.0.0.0, which being an IP, tor resolves itself without needing to ask a relay.
1634
    writer = None
×
1635
    try:
×
1636
        async with async_timeout(10):
×
1637
            reader, writer = await asyncio.open_connection(host, port)
×
1638
            writer.write(b'\x05\x01\x00\x05\xf0\x00\x03\x070.0.0.0\x00\x00')
×
1639
            await writer.drain()
×
1640
            data = await reader.read(1024)
×
1641
            if data == b'\x05\x00\x05\x00\x00\x01\x00\x00\x00\x00\x00\x00':
×
1642
                return True
×
1643
            return False
×
1644
    except (OSError, asyncio.TimeoutError):
×
1645
        return False
×
1646
    finally:
1647
        if writer:
×
1648
            writer.close()
×
1649

1650

1651
AS_LIB_USER_I_WANT_TO_MANAGE_MY_OWN_ASYNCIO_LOOP = False  # used by unit tests
1✔
1652

1653
_asyncio_event_loop = None  # type: Optional[asyncio.AbstractEventLoop]
1✔
1654

1655

1656
def get_asyncio_loop() -> asyncio.AbstractEventLoop:
1✔
1657
    """Returns the global asyncio event loop we use."""
1658
    if loop := _asyncio_event_loop:
1✔
1659
        return loop
1✔
1660
    if AS_LIB_USER_I_WANT_TO_MANAGE_MY_OWN_ASYNCIO_LOOP:
1✔
1661
        if loop := get_running_loop():
1✔
1662
            return loop
1✔
1663
    raise Exception("event loop not created yet")
×
1664

1665

1666
def create_and_start_event_loop() -> Tuple[asyncio.AbstractEventLoop,
1✔
1667
                                           asyncio.Future,
1668
                                           threading.Thread]:
1669
    global _asyncio_event_loop
1670
    if _asyncio_event_loop is not None:
×
1671
        raise Exception("there is already a running event loop")
×
1672

1673
    # asyncio.get_event_loop() became deprecated in python3.10. (see https://github.com/python/cpython/issues/83710)
1674
    # We set a custom event loop policy purely to be compatible with code that
1675
    # relies on asyncio.get_event_loop().
1676
    # - in python 3.8-3.9, asyncio.Event.__init__, asyncio.Lock.__init__,
1677
    #   and similar, calls get_event_loop. see https://github.com/python/cpython/pull/23420
1678
    class MyEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
×
1679
        def get_event_loop(self):
×
1680
            # In case electrum is being used as a library, there might be other
1681
            # event loops in use besides ours. To minimise interfering with those,
1682
            # if there is a loop running in the current thread, return that:
1683
            running_loop = get_running_loop()
×
1684
            if running_loop is not None:
×
1685
                return running_loop
×
1686
            # Otherwise, return our global loop:
1687
            return get_asyncio_loop()
×
1688
    asyncio.set_event_loop_policy(MyEventLoopPolicy())
×
1689

1690
    loop = asyncio.new_event_loop()
×
1691
    _asyncio_event_loop = loop
×
1692

1693
    def on_exception(loop, context):
×
1694
        """Suppress spurious messages it appears we cannot control."""
1695
        SUPPRESS_MESSAGE_REGEX = re.compile('SSL handshake|Fatal read error on|'
×
1696
                                            'SSL error in data received')
1697
        message = context.get('message')
×
1698
        if message and SUPPRESS_MESSAGE_REGEX.match(message):
×
1699
            return
×
1700
        loop.default_exception_handler(context)
×
1701

1702
    def run_event_loop():
×
1703
        try:
×
1704
            loop.run_until_complete(stopping_fut)
×
1705
        finally:
1706
            # clean-up
1707
            global _asyncio_event_loop
1708
            _asyncio_event_loop = None
×
1709

1710
    loop.set_exception_handler(on_exception)
×
1711
    _set_custom_task_factory(loop)
×
1712
    # loop.set_debug(True)
1713
    stopping_fut = loop.create_future()
×
1714
    loop_thread = threading.Thread(
×
1715
        target=run_event_loop,
1716
        name='EventLoop',
1717
    )
1718
    loop_thread.start()
×
1719
    # Wait until the loop actually starts.
1720
    # On a slow PC, or with a debugger attached, this can take a few dozens of ms,
1721
    # and if we returned without a running loop, weird things can happen...
1722
    t0 = time.monotonic()
×
1723
    while not loop.is_running():
×
1724
        time.sleep(0.01)
×
1725
        if time.monotonic() - t0 > 5:
×
1726
            raise Exception("been waiting for 5 seconds but asyncio loop would not start!")
×
1727
    return loop, stopping_fut, loop_thread
×
1728

1729

1730
_running_asyncio_tasks = set()  # type: Set[asyncio.Future]
1✔
1731

1732

1733
def _set_custom_task_factory(loop: asyncio.AbstractEventLoop):
1✔
1734
    """Wrap task creation to track pending and running tasks.
1735
    When tasks are created, asyncio only maintains a weak reference to them.
1736
    Hence, the garbage collector might destroy the task mid-execution.
1737
    To avoid this, we store a strong reference for the task until it completes.
1738

1739
    Without this, a lot of APIs are basically Heisenbug-generators... e.g.:
1740
    - "asyncio.create_task"
1741
    - "loop.create_task"
1742
    - "asyncio.ensure_future"
1743
    - "asyncio.run_coroutine_threadsafe"
1744

1745
    related:
1746
        - https://bugs.python.org/issue44665
1747
        - https://github.com/python/cpython/issues/88831
1748
        - https://github.com/python/cpython/issues/91887
1749
        - https://textual.textualize.io/blog/2023/02/11/the-heisenbug-lurking-in-your-async-code/
1750
        - https://github.com/python/cpython/issues/91887#issuecomment-1434816045
1751
        - "Task was destroyed but it is pending!"
1752
    """
1753

1754
    platform_task_factory = loop.get_task_factory()
1✔
1755

1756
    def factory(loop_, coro, **kwargs):
1✔
1757
        if platform_task_factory is not None:
1✔
1758
            task = platform_task_factory(loop_, coro, **kwargs)
×
1759
        else:
1760
            task = asyncio.Task(coro, loop=loop_, **kwargs)
1✔
1761
        _running_asyncio_tasks.add(task)
1✔
1762
        task.add_done_callback(_running_asyncio_tasks.discard)
1✔
1763
        return task
1✔
1764

1765
    loop.set_task_factory(factory)
1✔
1766

1767

1768
def run_sync_function_on_asyncio_thread(func: Callable[[], Any], *, block: bool) -> None:
1✔
1769
    """Run a non-async fn on the asyncio thread. Can be called from any thread.
1770

1771
    If the current thread is already the asyncio thread, func is guaranteed
1772
    to have been completed when this method returns.
1773

1774
    For any other thread, we only wait for completion if `block` is True.
1775
    """
1776
    assert not inspect.iscoroutinefunction(func), "func must be a non-async function"
1✔
1777
    asyncio_loop = get_asyncio_loop()
1✔
1778
    if get_running_loop() == asyncio_loop:  # we are running on the asyncio thread
1✔
1779
        func()
1✔
1780
    else:  # non-asyncio thread
1781
        async def wrapper():
×
1782
            return func()
×
1783
        fut = asyncio.run_coroutine_threadsafe(wrapper(), loop=asyncio_loop)
×
1784
        if block:
×
1785
            fut.result()
×
1786
        else:
1787
            # add explicit logging of exceptions, otherwise they might get lost
1788
            tb1 = traceback.format_stack()[:-1]
×
1789
            tb1_str = "".join(tb1)
×
1790

1791
            def on_done(fut_: concurrent.futures.Future):
×
1792
                assert fut_.done()
×
1793
                if fut_.cancelled():
×
1794
                    _logger.debug(f"func cancelled. {func=}.")
×
1795
                elif exc := fut_.exception():
×
1796
                    # note: We explicitly log the first part of the traceback, tb1_str.
1797
                    #       The second part gets logged by setting "exc_info".
1798
                    _logger.error(
×
1799
                        f"func errored. {func=}. {exc=}"
1800
                        f"\n{tb1_str}", exc_info=exc)
1801
            fut.add_done_callback(on_done)
×
1802

1803

1804
class OrderedDictWithIndex(OrderedDict):
1✔
1805
    """An OrderedDict that keeps track of the positions of keys.
1806

1807
    Note: very inefficient to modify contents, except to add new items.
1808
    """
1809

1810
    def __init__(self):
1✔
1811
        super().__init__()
1✔
1812
        self._key_to_pos = {}
1✔
1813
        self._pos_to_key = {}
1✔
1814

1815
    def _recalc_index(self):
1✔
1816
        self._key_to_pos = {key: pos for (pos, key) in enumerate(self.keys())}
×
1817
        self._pos_to_key = {pos: key for (pos, key) in enumerate(self.keys())}
×
1818

1819
    def pos_from_key(self, key):
1✔
1820
        return self._key_to_pos[key]
×
1821

1822
    def value_from_pos(self, pos):
1✔
1823
        key = self._pos_to_key[pos]
×
1824
        return self[key]
×
1825

1826
    def popitem(self, *args, **kwargs):
1✔
1827
        ret = super().popitem(*args, **kwargs)
×
1828
        self._recalc_index()
×
1829
        return ret
×
1830

1831
    def move_to_end(self, *args, **kwargs):
1✔
1832
        ret = super().move_to_end(*args, **kwargs)
×
1833
        self._recalc_index()
×
1834
        return ret
×
1835

1836
    def clear(self):
1✔
1837
        ret = super().clear()
×
1838
        self._recalc_index()
×
1839
        return ret
×
1840

1841
    def pop(self, *args, **kwargs):
1✔
1842
        ret = super().pop(*args, **kwargs)
×
1843
        self._recalc_index()
×
1844
        return ret
×
1845

1846
    def update(self, *args, **kwargs):
1✔
1847
        ret = super().update(*args, **kwargs)
×
1848
        self._recalc_index()
×
1849
        return ret
×
1850

1851
    def __delitem__(self, *args, **kwargs):
1✔
1852
        ret = super().__delitem__(*args, **kwargs)
×
1853
        self._recalc_index()
×
1854
        return ret
×
1855

1856
    def __setitem__(self, key, *args, **kwargs):
1✔
1857
        is_new_key = key not in self
1✔
1858
        ret = super().__setitem__(key, *args, **kwargs)
1✔
1859
        if is_new_key:
1✔
1860
            pos = len(self) - 1
1✔
1861
            self._key_to_pos[key] = pos
1✔
1862
            self._pos_to_key[pos] = key
1✔
1863
        return ret
1✔
1864

1865

1866
def multisig_type(wallet_type):
1✔
1867
    """If wallet_type is mofn multi-sig, return [m, n],
1868
    otherwise return None."""
1869
    if not wallet_type:
1✔
1870
        return None
×
1871
    match = re.match(r'(\d+)of(\d+)', wallet_type)
1✔
1872
    if match:
1✔
1873
        match = [int(x) for x in match.group(1, 2)]
1✔
1874
    return match
1✔
1875

1876

1877
def is_ip_address(x: Union[str, bytes]) -> bool:
1✔
1878
    if isinstance(x, bytes):
1✔
1879
        x = x.decode("utf-8")
×
1880
    try:
1✔
1881
        ipaddress.ip_address(x)
1✔
1882
        return True
1✔
1883
    except ValueError:
1✔
1884
        return False
1✔
1885

1886

1887
def is_localhost(host: str) -> bool:
1✔
1888
    if str(host) in ('localhost', 'localhost.',):
1✔
1889
        return True
1✔
1890
    if host[0] == '[' and host[-1] == ']':  # IPv6
1✔
1891
        host = host[1:-1]
1✔
1892
    try:
1✔
1893
        ip_addr = ipaddress.ip_address(host)  # type: Union[IPv4Address, IPv6Address]
1✔
1894
        return ip_addr.is_loopback
1✔
1895
    except ValueError:
1✔
1896
        pass  # not an IP
1✔
1897
    return False
1✔
1898

1899

1900
def is_private_netaddress(host: str) -> bool:
1✔
1901
    if is_localhost(host):
1✔
1902
        return True
1✔
1903
    if host[0] == '[' and host[-1] == ']':  # IPv6
1✔
1904
        host = host[1:-1]
1✔
1905
    try:
1✔
1906
        ip_addr = ipaddress.ip_address(host)  # type: Union[IPv4Address, IPv6Address]
1✔
1907
        return ip_addr.is_private
1✔
1908
    except ValueError:
1✔
1909
        pass  # not an IP
1✔
1910
    return False
1✔
1911

1912

1913
def list_enabled_bits(x: int) -> Sequence[int]:
1✔
1914
    """e.g. 77 (0b1001101) --> (0, 2, 3, 6)"""
1915
    binary = bin(x)[2:]
1✔
1916
    rev_bin = reversed(binary)
1✔
1917
    return tuple(i for i, b in enumerate(rev_bin) if b == '1')
1✔
1918

1919

1920
async def resolve_dns_srv(host: str):
1✔
1921
    # FIXME this method is not using the network proxy. (although the proxy might not support UDP?)
1922
    srv_records = await dns.asyncresolver.resolve(host, 'SRV')
×
1923
    # priority: prefer lower
1924
    # weight: tie breaker; prefer higher
1925
    srv_records = sorted(srv_records, key=lambda x: (x.priority, -x.weight))
×
1926

1927
    def dict_from_srv_record(srv):
×
1928
        return {
×
1929
            'host': str(srv.target),
1930
            'port': srv.port,
1931
        }
1932
    return [dict_from_srv_record(srv) for srv in srv_records]
×
1933

1934

1935
def randrange(bound: int) -> int:
1✔
1936
    """Return a random integer k such that 1 <= k < bound, uniformly
1937
    distributed across that range.
1938
    This is guaranteed to be cryptographically strong.
1939
    """
1940
    # secrets.randbelow(bound) returns a random int: 0 <= r < bound,
1941
    # hence transformations:
1942
    return secrets.randbelow(bound - 1) + 1
1✔
1943

1944

1945
class CallbackManager(Logger):
1✔
1946
    # callbacks set by the GUI or any thread
1947
    # guarantee: the callbacks will always get triggered from the asyncio thread.
1948

1949
    # FIXME: There should be a way to prevent circular callbacks.
1950
    # At the very least, we need a distinction between callbacks that
1951
    # are for the GUI and callbacks between wallet components
1952

1953
    def __init__(self):
1✔
1954
        Logger.__init__(self)
1✔
1955
        self.callback_lock = threading.Lock()
1✔
1956
        self.callbacks = defaultdict(list)  # type: Dict[str, List[Callable]]  # note: needs self.callback_lock
1✔
1957

1958
    def register_callback(self, func: Callable, events: Sequence[str]) -> None:
1✔
1959
        with self.callback_lock:
1✔
1960
            for event in events:
1✔
1961
                self.callbacks[event].append(func)
1✔
1962

1963
    def unregister_callback(self, callback: Callable) -> None:
1✔
1964
        with self.callback_lock:
1✔
1965
            for callbacks in self.callbacks.values():
1✔
1966
                if callback in callbacks:
1✔
1967
                    callbacks.remove(callback)
1✔
1968

1969
    def clear_all_callbacks(self) -> None:
1✔
1970
        with self.callback_lock:
1✔
1971
            self.callbacks.clear()
1✔
1972

1973
    def trigger_callback(self, event: str, *args) -> None:
1✔
1974
        """Trigger a callback with given arguments.
1975
        Can be called from any thread. The callback itself will get scheduled
1976
        on the event loop.
1977
        """
1978
        loop = get_asyncio_loop()
1✔
1979
        assert loop.is_running(), "event loop not running"
1✔
1980
        with self.callback_lock:
1✔
1981
            callbacks = self.callbacks[event][:]
1✔
1982
        for callback in callbacks:
1✔
1983
            if inspect.iscoroutinefunction(callback):  # async cb
1✔
1984
                fut = asyncio.run_coroutine_threadsafe(callback(*args), loop)
1✔
1985

1986
                def on_done(fut_: concurrent.futures.Future):
1✔
1987
                    assert fut_.done()
1✔
1988
                    if fut_.cancelled():
1✔
UNCOV
1989
                        self.logger.debug(f"cb cancelled. {event=}.")
×
1990
                    elif exc := fut_.exception():
1✔
1991
                        self.logger.error(f"cb errored. {event=}. {exc=}", exc_info=exc)
×
1992
                fut.add_done_callback(on_done)
1✔
1993
            else:  # non-async cb
1994
                run_sync_function_on_asyncio_thread(partial(callback, *args), block=False)
1✔
1995

1996

1997
callback_mgr = CallbackManager()
1✔
1998
trigger_callback = callback_mgr.trigger_callback
1✔
1999
register_callback = callback_mgr.register_callback
1✔
2000
unregister_callback = callback_mgr.unregister_callback
1✔
2001
_event_listeners = defaultdict(set)  # type: Dict[str, Set[str]]
1✔
2002

2003

2004
class EventListener:
1✔
2005
    """Use as a mixin for a class that has methods to be triggered on events.
2006
    - Methods that receive the callbacks should be named "on_event_*" and decorated with @event_listener.
2007
    - register_callbacks() should be called exactly once per instance of EventListener, e.g. in __init__
2008
    - unregister_callbacks() should be called at least once, e.g. when the instance is destroyed
2009
    """
2010

2011
    def _list_callbacks(self):
1✔
2012
        for c in self.__class__.__mro__:
1✔
2013
            classpath = f"{c.__module__}.{c.__name__}"
1✔
2014
            for method_name in _event_listeners[classpath]:
1✔
2015
                method = getattr(self, method_name)
1✔
2016
                assert callable(method)
1✔
2017
                assert method_name.startswith('on_event_')
1✔
2018
                yield method_name[len('on_event_'):], method
1✔
2019

2020
    def register_callbacks(self):
1✔
2021
        for name, method in self._list_callbacks():
1✔
2022
            #_logger.debug(f'registering callback {method}')
2023
            register_callback(method, [name])
1✔
2024

2025
    def unregister_callbacks(self):
1✔
2026
        for name, method in self._list_callbacks():
1✔
2027
            #_logger.debug(f'unregistering callback {method}')
2028
            unregister_callback(method)
1✔
2029

2030

2031
def event_listener(func):
1✔
2032
    """To be used in subclasses of EventListener only. (how to enforce this programmatically?)"""
2033
    classname, method_name = func.__qualname__.split('.')
1✔
2034
    assert method_name.startswith('on_event_')
1✔
2035
    classpath = f"{func.__module__}.{classname}"
1✔
2036
    _event_listeners[classpath].add(method_name)
1✔
2037
    return func
1✔
2038

2039

2040
_NetAddrType = TypeVar("_NetAddrType")
1✔
2041
# requirements for _NetAddrType:
2042
# - reasonable __hash__() implementation (e.g. based on host/port of remote endpoint)
2043

2044

2045
class NetworkRetryManager(Generic[_NetAddrType]):
1✔
2046
    """Truncated Exponential Backoff for network connections."""
2047

2048
    def __init__(
1✔
2049
            self, *,
2050
            max_retry_delay_normal: float,
2051
            init_retry_delay_normal: float,
2052
            max_retry_delay_urgent: float = None,
2053
            init_retry_delay_urgent: float = None,
2054
    ):
2055
        self._last_tried_addr = {}  # type: Dict[_NetAddrType, Tuple[float, int]]  # (unix ts, num_attempts)
1✔
2056

2057
        # note: these all use "seconds" as unit
2058
        if max_retry_delay_urgent is None:
1✔
2059
            max_retry_delay_urgent = max_retry_delay_normal
1✔
2060
        if init_retry_delay_urgent is None:
1✔
2061
            init_retry_delay_urgent = init_retry_delay_normal
1✔
2062
        self._max_retry_delay_normal = max_retry_delay_normal
1✔
2063
        self._init_retry_delay_normal = init_retry_delay_normal
1✔
2064
        self._max_retry_delay_urgent = max_retry_delay_urgent
1✔
2065
        self._init_retry_delay_urgent = init_retry_delay_urgent
1✔
2066

2067
    def _trying_addr_now(self, addr: _NetAddrType) -> None:
1✔
2068
        last_time, num_attempts = self._last_tried_addr.get(addr, (0, 0))
×
2069
        # we add up to 1 second of noise to the time, so that clients are less likely
2070
        # to get synchronised and bombard the remote in connection waves:
2071
        cur_time = time.time() + random.random()
×
2072
        self._last_tried_addr[addr] = cur_time, num_attempts + 1
×
2073

2074
    def _on_connection_successfully_established(self, addr: _NetAddrType) -> None:
1✔
2075
        self._last_tried_addr[addr] = time.time(), 0
×
2076

2077
    def _can_retry_addr(self, addr: _NetAddrType, *,
1✔
2078
                        now: float = None, urgent: bool = False) -> bool:
2079
        if now is None:
×
2080
            now = time.time()
×
2081
        last_time, num_attempts = self._last_tried_addr.get(addr, (0, 0))
×
2082
        if urgent:
×
2083
            max_delay = self._max_retry_delay_urgent
×
2084
            init_delay = self._init_retry_delay_urgent
×
2085
        else:
2086
            max_delay = self._max_retry_delay_normal
×
2087
            init_delay = self._init_retry_delay_normal
×
2088
        delay = self.__calc_delay(multiplier=init_delay, max_delay=max_delay, num_attempts=num_attempts)
×
2089
        next_time = last_time + delay
×
2090
        return next_time < now
×
2091

2092
    @classmethod
1✔
2093
    def __calc_delay(cls, *, multiplier: float, max_delay: float,
1✔
2094
                     num_attempts: int) -> float:
2095
        num_attempts = min(num_attempts, 100_000)
×
2096
        try:
×
2097
            res = multiplier * 2 ** num_attempts
×
2098
        except OverflowError:
×
2099
            return max_delay
×
2100
        return max(0, min(max_delay, res))
×
2101

2102
    def _clear_addr_retry_times(self) -> None:
1✔
2103
        self._last_tried_addr.clear()
1✔
2104

2105

2106
class ESocksProxy(aiorpcx.SOCKSProxy):
1✔
2107
    # note: proxy will not leak DNS as create_connection()
2108
    # sets (local DNS) resolve=False by default
2109

2110
    async def open_connection(self, host=None, port=None, **kwargs):
1✔
2111
        loop = asyncio.get_running_loop()
×
2112
        reader = asyncio.StreamReader(loop=loop)
×
2113
        protocol = asyncio.StreamReaderProtocol(reader, loop=loop)
×
2114
        transport, _ = await self.create_connection(
×
2115
            lambda: protocol, host, port, **kwargs)
2116
        writer = asyncio.StreamWriter(transport, protocol, reader, loop)
×
2117
        return reader, writer
×
2118

2119
    @classmethod
1✔
2120
    def from_network_settings(cls, network: Optional['Network']) -> Optional['ESocksProxy']:
1✔
2121
        if not network or not network.proxy or not network.proxy.enabled:
1✔
2122
            return None
1✔
2123
        proxy = network.proxy
×
2124
        username, pw = proxy.user, proxy.password
×
2125
        if not username or not pw:
×
2126
            # is_proxy_tor is tri-state; None indicates it is still probing the proxy to test for TOR
2127
            if network.is_proxy_tor:
×
2128
                auth = aiorpcx.socks.SOCKSRandomAuth()
×
2129
            else:
2130
                auth = None
×
2131
        else:
2132
            auth = aiorpcx.socks.SOCKSUserAuth(username, pw)
×
2133
        addr = aiorpcx.NetAddress(proxy.host, proxy.port)
×
2134
        if proxy.mode == "socks4":
×
2135
            ret = cls(addr, aiorpcx.socks.SOCKS4a, auth)
×
2136
        elif proxy.mode == "socks5":
×
2137
            ret = cls(addr, aiorpcx.socks.SOCKS5, auth)
×
2138
        else:
2139
            raise NotImplementedError  # http proxy not available with aiorpcx
×
2140
        return ret
×
2141

2142

2143
class JsonRPCError(Exception):
1✔
2144

2145
    class Codes(enum.IntEnum):
1✔
2146
        # application-specific error codes
2147
        USERFACING = 1
1✔
2148
        INTERNAL = 2
1✔
2149

2150
    def __init__(self, *, code: int, message: str, data: Optional[dict] = None):
1✔
2151
        Exception.__init__(self)
×
2152
        self.code = code
×
2153
        self.message = message
×
2154
        self.data = data
×
2155

2156

2157
class JsonRPCClient:
1✔
2158

2159
    def __init__(self, session: aiohttp.ClientSession, url: str):
1✔
2160
        self.session = session
×
2161
        self.url = url
×
2162
        self._id = 0
×
2163

2164
    async def request(self, endpoint, *args):
1✔
2165
        """Send request to server, parse and return result.
2166
        note: parsing code is naive, the server is assumed to be well-behaved.
2167
              Up to the caller to handle exceptions, including those arising from parsing errors.
2168
        """
2169
        self._id += 1
×
2170
        data = ('{"jsonrpc": "2.0", "id":"%d", "method": "%s", "params": %s }'
×
2171
                % (self._id, endpoint, json.dumps(args)))
2172
        async with self.session.post(self.url, data=data) as resp:
×
2173
            if resp.status == 200:
×
2174
                r = await resp.json()
×
2175
                result = r.get('result')
×
2176
                error = r.get('error')
×
2177
                if error:
×
2178
                    raise JsonRPCError(code=error["code"], message=error["message"], data=error.get("data"))
×
2179
                else:
2180
                    return result
×
2181
            else:
2182
                text = await resp.text()
×
2183
                return 'Error: ' + str(text)
×
2184

2185
    def add_method(self, endpoint):
1✔
2186
        async def coro(*args):
×
2187
            return await self.request(endpoint, *args)
×
2188
        setattr(self, endpoint, coro)
×
2189

2190

2191
T = TypeVar('T')
1✔
2192

2193

2194
def random_shuffled_copy(x: Iterable[T]) -> List[T]:
1✔
2195
    """Returns a shuffled copy of the input."""
2196
    x_copy = list(x)  # copy
1✔
2197
    random.shuffle(x_copy)  # shuffle in-place
1✔
2198
    return x_copy
1✔
2199

2200

2201
def test_read_write_permissions(path) -> None:
1✔
2202
    # note: There might already be a file at 'path'.
2203
    #       Make sure we do NOT overwrite/corrupt that!
2204
    temp_path = "%s.tmptest.%s" % (path, os.getpid())
1✔
2205
    echo = "fs r/w test"
1✔
2206
    try:
1✔
2207
        # test READ permissions for actual path
2208
        if os.path.exists(path):
1✔
2209
            with open(path, "rb") as f:
1✔
2210
                f.read(1)  # read 1 byte
1✔
2211
        # test R/W sanity for "similar" path
2212
        with open(temp_path, "w", encoding='utf-8') as f:
1✔
2213
            f.write(echo)
1✔
2214
        with open(temp_path, "r", encoding='utf-8') as f:
1✔
2215
            echo2 = f.read()
1✔
2216
        os.remove(temp_path)
1✔
2217
    except Exception as e:
×
2218
        raise IOError(e) from e
×
2219
    if echo != echo2:
1✔
2220
        raise IOError('echo sanity-check failed')
×
2221

2222

2223
class classproperty(property):
1✔
2224
    """~read-only class-level @property
2225
    from https://stackoverflow.com/a/13624858 by denis-ryzhkov
2226
    """
2227
    def __get__(self, owner_self, owner_cls):
1✔
2228
        return self.fget(owner_cls)
1✔
2229

2230

2231
def sticky_property(val):
1✔
2232
    """Creates a 'property' whose value cannot be changed and that cannot be deleted.
2233
    Attempts to change the value are silently ignored.
2234

2235
    >>> class C: pass
2236
    ...
2237
    >>> setattr(C, 'x', sticky_property(3))
2238
    >>> c = C()
2239
    >>> c.x
2240
    3
2241
    >>> c.x = 2
2242
    >>> c.x
2243
    3
2244
    >>> del c.x
2245
    >>> c.x
2246
    3
2247
    """
2248
    return property(
1✔
2249
        fget=lambda self: val,
2250
        fset=lambda *args, **kwargs: None,
2251
        fdel=lambda *args, **kwargs: None,
2252
    )
2253

2254

2255
def get_running_loop() -> Optional[asyncio.AbstractEventLoop]:
1✔
2256
    """Returns the asyncio event loop that is *running in this thread*, if any."""
2257
    try:
1✔
2258
        return asyncio.get_running_loop()
1✔
2259
    except RuntimeError:
×
2260
        return None
×
2261

2262

2263
def error_text_str_to_safe_str(err: str, *, max_len: Optional[int] = 500) -> str:
1✔
2264
    """Converts an untrusted error string to a sane printable ascii str.
2265
    Never raises.
2266
    """
2267
    text = error_text_bytes_to_safe_str(
1✔
2268
        err.encode("ascii", errors='backslashreplace'),
2269
        max_len=None)
2270
    return truncate_text(text, max_len=max_len)
1✔
2271

2272

2273
def error_text_bytes_to_safe_str(err: bytes, *, max_len: Optional[int] = 500) -> str:
1✔
2274
    """Converts an untrusted error bytes text to a sane printable ascii str.
2275
    Never raises.
2276

2277
    Note that naive ascii conversion would be insufficient. Fun stuff:
2278
    >>> b = b"my_long_prefix_blabla" + 21 * b"\x08" + b"malicious_stuff"
2279
    >>> s = b.decode("ascii")
2280
    >>> print(s)
2281
    malicious_stuffblabla
2282
    """
2283
    # convert to ascii, to get rid of unicode stuff
2284
    ascii_text = err.decode("ascii", errors='backslashreplace')
1✔
2285
    # do repr to handle ascii special chars (especially when printing/logging the str)
2286
    text = repr(ascii_text)
1✔
2287
    return truncate_text(text, max_len=max_len)
1✔
2288

2289

2290
def truncate_text(text: str, *, max_len: Optional[int]) -> str:
1✔
2291
    if max_len is None or len(text) <= max_len:
1✔
2292
        return text
1✔
2293
    else:
2294
        return text[:max_len] + f"... (truncated. orig_len={len(text)})"
1✔
2295

2296

2297
def nostr_pow_worker(nonce, nostr_pubk, target_bits, hash_function, hash_len_bits, shutdown):
1✔
2298
    """Function to generate PoW for Nostr, to be spawned in a ProcessPoolExecutor."""
2299
    hash_preimage = b'electrum-' + nostr_pubk
×
2300
    while True:
×
2301
        # we cannot check is_set on each iteration as it has a lot of overhead, this way we can check
2302
        # it with low overhead (just the additional range counter)
2303
        for i in range(1000000):
×
2304
            digest = hash_function(hash_preimage + nonce.to_bytes(32, 'big')).digest()
×
2305
            if int.from_bytes(digest, 'big') < (1 << (hash_len_bits - target_bits)):
×
2306
                shutdown.set()
×
2307
                return hash, nonce
×
2308
            nonce += 1
×
2309
        if shutdown.is_set():
×
2310
            return None, None
×
2311

2312

2313
async def gen_nostr_ann_pow(nostr_pubk: bytes, target_bits: int) -> Tuple[int, int]:
1✔
2314
    """Generate a PoW for a Nostr announcement. The PoW is hash[b'electrum-'+pubk+nonce]"""
2315
    import multiprocessing  # not available on Android, so we import it here
×
2316
    hash_function = hashlib.sha256
×
2317
    hash_len_bits = 256
×
2318
    max_nonce: int = (1 << (32 * 8)) - 1  # 32-byte nonce
×
2319
    start_nonce = 0
×
2320

2321
    max_workers = max(multiprocessing.cpu_count() - 1, 1)  # use all but one CPU
×
2322
    manager = multiprocessing.Manager()
×
2323
    shutdown = manager.Event()
×
2324
    with ProcessPoolExecutor(max_workers=max_workers) as executor:
×
2325
        tasks = []
×
2326
        loop = asyncio.get_running_loop()
×
2327
        for task in range(0, max_workers):
×
2328
            task = loop.run_in_executor(
×
2329
                executor,
2330
                nostr_pow_worker,
2331
                start_nonce,
2332
                nostr_pubk,
2333
                target_bits,
2334
                hash_function,
2335
                hash_len_bits,
2336
                shutdown
2337
            )
2338
            tasks.append(task)
×
2339
            start_nonce += max_nonce // max_workers  # split the nonce range between the processes
×
2340
            if start_nonce > max_nonce:  # make sure we don't go over the max_nonce
×
2341
                start_nonce = random.randint(0, int(max_nonce * 0.75))
×
2342

2343
        done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
×
2344
        hash_res, nonce_res = done.pop().result()
×
2345
        executor.shutdown(wait=False, cancel_futures=True)
×
2346

2347
    return nonce_res, get_nostr_ann_pow_amount(nostr_pubk, nonce_res)
×
2348

2349

2350
def get_nostr_ann_pow_amount(nostr_pubk: bytes, nonce: Optional[int]) -> int:
1✔
2351
    """Return the amount of leading zero bits for a nostr announcement PoW."""
2352
    if not nonce:
×
2353
        return 0
×
2354
    hash_function = hashlib.sha256
×
2355
    hash_len_bits = 256
×
2356
    hash_preimage = b'electrum-' + nostr_pubk
×
2357

2358
    digest = hash_function(hash_preimage + nonce.to_bytes(32, 'big')).digest()
×
2359
    digest = int.from_bytes(digest, 'big')
×
2360
    return hash_len_bits - digest.bit_length()
×
2361

2362

2363
class OnchainHistoryItem(NamedTuple):
1✔
2364
    txid: str
1✔
2365
    amount_sat: int
1✔
2366
    fee_sat: int
1✔
2367
    balance_sat: int
1✔
2368
    tx_mined_status: TxMinedInfo
1✔
2369
    group_id: Optional[str]
1✔
2370
    label: Optional[str]
1✔
2371
    monotonic_timestamp: int
1✔
2372
    group_id: Optional[str]
1✔
2373
    def to_dict(self):
1✔
2374
        return {
1✔
2375
            'txid': self.txid,
2376
            'amount_sat': self.amount_sat,
2377
            'fee_sat': self.fee_sat,
2378
            'height': self.tx_mined_status.height(),
2379
            'confirmations': self.tx_mined_status.conf,
2380
            'timestamp': self.tx_mined_status.timestamp,
2381
            'monotonic_timestamp': self.monotonic_timestamp,
2382
            'incoming': True if self.amount_sat>0 else False,
2383
            'bc_value': Satoshis(self.amount_sat),
2384
            'bc_balance': Satoshis(self.balance_sat),
2385
            'date': timestamp_to_datetime(self.tx_mined_status.timestamp),
2386
            'txpos_in_block': self.tx_mined_status.txpos,
2387
            'wanted_height': self.tx_mined_status.wanted_height,
2388
            'label': self.label,
2389
            'group_id': self.group_id,
2390
        }
2391

2392

2393
class LightningHistoryItem(NamedTuple):
1✔
2394
    payment_hash: Optional[str]
1✔
2395
    preimage: Optional[str]
1✔
2396
    amount_msat: int
1✔
2397
    fee_msat: Optional[int]
1✔
2398
    type: str
1✔
2399
    group_id: Optional[str]
1✔
2400
    timestamp: int
1✔
2401
    label: Optional[str]
1✔
2402
    direction: Optional[int]
1✔
2403
    def to_dict(self):
1✔
2404
        return {
×
2405
            'type': self.type,
2406
            'label': self.label,
2407
            'timestamp': self.timestamp or 0,
2408
            'date': timestamp_to_datetime(self.timestamp),
2409
            'amount_msat': self.amount_msat,
2410
            'fee_msat': self.fee_msat,
2411
            'payment_hash': self.payment_hash,
2412
            'preimage': self.preimage,
2413
            'group_id': self.group_id,
2414
            'ln_value': Satoshis(Decimal(self.amount_msat) / 1000),
2415
            'direction': self.direction,
2416
        }
2417

2418

2419
@dataclass(kw_only=True, slots=True)
1✔
2420
class ChoiceItem:
1✔
2421
    key: Any
1✔
2422
    label: str  # user facing string
1✔
2423
    extra_data: Any = None
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc