• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 5942294764847104

17 Mar 2025 06:27PM UTC coverage: 61.137% (-0.04%) from 61.18%
5942294764847104

push

CirrusCI

web-flow
Merge pull request #9650 from SomberNight/202503_keepkey

plugins: keepkey: vendor our fork of keepkeylib (as git submodule)

21355 of 34930 relevant lines covered (61.14%)

3.05 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.27
/electrum/util.py
1
# Electrum - lightweight Bitcoin client
2
# Copyright (C) 2011 Thomas Voegtlin
3
#
4
# Permission is hereby granted, free of charge, to any person
5
# obtaining a copy of this software and associated documentation files
6
# (the "Software"), to deal in the Software without restriction,
7
# including without limitation the rights to use, copy, modify, merge,
8
# publish, distribute, sublicense, and/or sell copies of the Software,
9
# and to permit persons to whom the Software is furnished to do so,
10
# subject to the following conditions:
11
#
12
# The above copyright notice and this permission notice shall be
13
# included in all copies or substantial portions of the Software.
14
#
15
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
16
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
17
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
18
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
19
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
20
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
21
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22
# SOFTWARE.
23
import binascii
5✔
24
import concurrent.futures
5✔
25
import logging
5✔
26
import os, sys, re, json
5✔
27
from collections import defaultdict, OrderedDict
5✔
28
from concurrent.futures.process import ProcessPoolExecutor
5✔
29
from typing import (NamedTuple, Union, TYPE_CHECKING, Tuple, Optional, Callable, Any,
5✔
30
                    Sequence, Dict, Generic, TypeVar, List, Iterable, Set, Awaitable)
31
from datetime import datetime, timezone
5✔
32
import decimal
5✔
33
from decimal import Decimal
5✔
34
import traceback
5✔
35
import urllib
5✔
36
import threading
5✔
37
import hmac
5✔
38
import hashlib
5✔
39
import stat
5✔
40
import locale
5✔
41
import asyncio
5✔
42
import urllib.request, urllib.parse, urllib.error
5✔
43
import builtins
5✔
44
import json
5✔
45
import time
5✔
46
from typing import NamedTuple, Optional
5✔
47
import ssl
5✔
48
import ipaddress
5✔
49
from ipaddress import IPv4Address, IPv6Address
5✔
50
import random
5✔
51
import secrets
5✔
52
import functools
5✔
53
from functools import partial
5✔
54
from abc import abstractmethod, ABC
5✔
55
import socket
5✔
56
import enum
5✔
57
from contextlib import nullcontext
5✔
58

59
import attr
5✔
60
import aiohttp
5✔
61
from aiohttp_socks import ProxyConnector, ProxyType
5✔
62
import aiorpcx
5✔
63
import certifi
5✔
64
import dns.resolver
5✔
65

66
from .i18n import _
5✔
67
from .logging import get_logger, Logger
5✔
68

69
if TYPE_CHECKING:
5✔
70
    from .network import Network, ProxySettings
×
71
    from .interface import Interface
×
72
    from .simple_config import SimpleConfig
×
73
    from .paymentrequest import PaymentRequest
×
74

75

76
_logger = get_logger(__name__)
5✔
77

78

79
def inv_dict(d):
5✔
80
    return {v: k for k, v in d.items()}
5✔
81

82

83
def all_subclasses(cls) -> Set:
5✔
84
    """Return all (transitive) subclasses of cls."""
85
    res = set(cls.__subclasses__())
5✔
86
    for sub in res.copy():
5✔
87
        res |= all_subclasses(sub)
5✔
88
    return res
5✔
89

90

91
ca_path = certifi.where()
5✔
92

93

94
base_units = {'BTC':8, 'mBTC':5, 'bits':2, 'sat':0}
5✔
95
base_units_inverse = inv_dict(base_units)
5✔
96
base_units_list = ['BTC', 'mBTC', 'bits', 'sat']  # list(dict) does not guarantee order
5✔
97

98
DECIMAL_POINT_DEFAULT = 5  # mBTC
5✔
99

100

101
class UnknownBaseUnit(Exception): pass
5✔
102

103

104
def decimal_point_to_base_unit_name(dp: int) -> str:
5✔
105
    # e.g. 8 -> "BTC"
106
    try:
5✔
107
        return base_units_inverse[dp]
5✔
108
    except KeyError:
×
109
        raise UnknownBaseUnit(dp) from None
×
110

111

112
def base_unit_name_to_decimal_point(unit_name: str) -> int:
5✔
113
    """Returns the max number of digits allowed after the decimal point."""
114
    # e.g. "BTC" -> 8
115
    try:
×
116
        return base_units[unit_name]
×
117
    except KeyError:
×
118
        raise UnknownBaseUnit(unit_name) from None
×
119

120
def parse_max_spend(amt: Any) -> Optional[int]:
5✔
121
    """Checks if given amount is "spend-max"-like.
122
    Returns None or the positive integer weight for "max". Never raises.
123

124
    When creating invoices and on-chain txs, the user can specify to send "max".
125
    This is done by setting the amount to '!'. Splitting max between multiple
126
    tx outputs is also possible, and custom weights (positive ints) can also be used.
127
    For example, to send 40% of all coins to address1, and 60% to address2:
128
    ```
129
    address1, 2!
130
    address2, 3!
131
    ```
132
    """
133
    if not (isinstance(amt, str) and amt and amt[-1] == '!'):
5✔
134
        return None
5✔
135
    if amt == '!':
5✔
136
        return 1
5✔
137
    x = amt[:-1]
5✔
138
    try:
5✔
139
        x = int(x)
5✔
140
    except ValueError:
×
141
        return None
×
142
    if x > 0:
5✔
143
        return x
5✔
144
    return None
×
145

146
class NotEnoughFunds(Exception):
5✔
147
    def __str__(self):
5✔
148
        return _("Insufficient funds")
5✔
149

150

151
class UneconomicFee(Exception):
5✔
152
    def __str__(self):
5✔
153
        return _("The fee for the transaction is higher than the funds gained from it.")
×
154

155

156
class NoDynamicFeeEstimates(Exception):
5✔
157
    def __str__(self):
5✔
158
        return _('Dynamic fee estimates not available')
×
159

160

161
class BelowDustLimit(Exception):
5✔
162
    pass
5✔
163

164

165
class InvalidPassword(Exception):
5✔
166
    def __init__(self, message: Optional[str] = None):
5✔
167
        self.message = message
5✔
168

169
    def __str__(self):
5✔
170
        if self.message is None:
×
171
            return _("Incorrect password")
×
172
        else:
173
            return str(self.message)
×
174

175

176
class AddTransactionException(Exception):
5✔
177
    pass
5✔
178

179

180
class UnrelatedTransactionException(AddTransactionException):
5✔
181
    def __str__(self):
5✔
182
        return _("Transaction is unrelated to this wallet.")
×
183

184

185
class FileImportFailed(Exception):
5✔
186
    def __init__(self, message=''):
5✔
187
        self.message = str(message)
×
188

189
    def __str__(self):
5✔
190
        return _("Failed to import from file.") + "\n" + self.message
×
191

192

193
class FileExportFailed(Exception):
5✔
194
    def __init__(self, message=''):
5✔
195
        self.message = str(message)
×
196

197
    def __str__(self):
5✔
198
        return _("Failed to export to file.") + "\n" + self.message
×
199

200

201
class WalletFileException(Exception):
5✔
202
    def __init__(self, message='', *, should_report_crash: bool = False):
5✔
203
        Exception.__init__(self, message)
5✔
204
        self.should_report_crash = should_report_crash
5✔
205

206

207
class BitcoinException(Exception): pass
5✔
208

209

210
class UserFacingException(Exception):
5✔
211
    """Exception that contains information intended to be shown to the user."""
212

213

214
class InvoiceError(UserFacingException): pass
5✔
215

216

217
class NetworkOfflineException(UserFacingException):
5✔
218
    """Can be raised if we are running in offline mode (--offline flag)
219
    and the user requests an operation that requires the network.
220
    """
221
    def __str__(self):
5✔
222
        return _("You are offline.")
×
223

224

225
# Throw this exception to unwind the stack like when an error occurs.
226
# However unlike other exceptions the user won't be informed.
227
class UserCancelled(Exception):
5✔
228
    '''An exception that is suppressed from the user'''
229
    pass
5✔
230

231

232
def to_decimal(x: Union[str, float, int, Decimal]) -> Decimal:
5✔
233
    # helper function mainly for float->Decimal conversion, i.e.:
234
    #   >>> Decimal(41754.681)
235
    #   Decimal('41754.680999999996856786310672760009765625')
236
    #   >>> Decimal("41754.681")
237
    #   Decimal('41754.681')
238
    if isinstance(x, Decimal):
5✔
239
        return x
×
240
    return Decimal(str(x))
5✔
241

242

243
# note: this is not a NamedTuple as then its json encoding cannot be customized
244
class Satoshis(object):
5✔
245
    __slots__ = ('value',)
5✔
246

247
    def __new__(cls, value):
5✔
248
        self = super(Satoshis, cls).__new__(cls)
×
249
        # note: 'value' sometimes has msat precision
250
        assert isinstance(value, (int, Decimal)), f"unexpected type for {value=!r}"
×
251
        self.value = value
×
252
        return self
×
253

254
    def __repr__(self):
5✔
255
        return f'Satoshis({self.value})'
×
256

257
    def __str__(self):
5✔
258
        # note: precision is truncated to satoshis here
259
        return format_satoshis(self.value)
×
260

261
    def __eq__(self, other):
5✔
262
        return self.value == other.value
×
263

264
    def __ne__(self, other):
5✔
265
        return not (self == other)
×
266

267
    def __add__(self, other):
5✔
268
        return Satoshis(self.value + other.value)
×
269

270

271
# note: this is not a NamedTuple as then its json encoding cannot be customized
272
class Fiat(object):
5✔
273
    __slots__ = ('value', 'ccy')
5✔
274

275
    def __new__(cls, value: Optional[Decimal], ccy: str):
5✔
276
        self = super(Fiat, cls).__new__(cls)
×
277
        self.ccy = ccy
×
278
        if not isinstance(value, (Decimal, type(None))):
×
279
            raise TypeError(f"value should be Decimal or None, not {type(value)}")
×
280
        self.value = value
×
281
        return self
×
282

283
    def __repr__(self):
5✔
284
        return 'Fiat(%s)'% self.__str__()
×
285

286
    def __str__(self):
5✔
287
        if self.value is None or self.value.is_nan():
×
288
            return _('No Data')
×
289
        else:
290
            return "{:.2f}".format(self.value)
×
291

292
    def to_ui_string(self):
5✔
293
        if self.value is None or self.value.is_nan():
×
294
            return _('No Data')
×
295
        else:
296
            return "{:.2f}".format(self.value) + ' ' + self.ccy
×
297

298
    def __eq__(self, other):
5✔
299
        if not isinstance(other, Fiat):
×
300
            return False
×
301
        if self.ccy != other.ccy:
×
302
            return False
×
303
        if isinstance(self.value, Decimal) and isinstance(other.value, Decimal) \
×
304
                and self.value.is_nan() and other.value.is_nan():
305
            return True
×
306
        return self.value == other.value
×
307

308
    def __ne__(self, other):
5✔
309
        return not (self == other)
×
310

311
    def __add__(self, other):
5✔
312
        assert self.ccy == other.ccy
×
313
        return Fiat(self.value + other.value, self.ccy)
×
314

315

316
class MyEncoder(json.JSONEncoder):
5✔
317
    def default(self, obj):
5✔
318
        # note: this does not get called for namedtuples :(  https://bugs.python.org/issue30343
319
        from .transaction import Transaction, TxOutput
5✔
320
        if isinstance(obj, Transaction):
5✔
321
            return obj.serialize()
5✔
322
        if isinstance(obj, TxOutput):
5✔
323
            return obj.to_legacy_tuple()
5✔
324
        if isinstance(obj, Satoshis):
5✔
325
            return str(obj)
×
326
        if isinstance(obj, Fiat):
5✔
327
            return str(obj)
×
328
        if isinstance(obj, Decimal):
5✔
329
            return str(obj)
×
330
        if isinstance(obj, datetime):
5✔
331
            return obj.isoformat(' ')[:-3]
×
332
        if isinstance(obj, set):
5✔
333
            return list(obj)
×
334
        if isinstance(obj, bytes): # for nametuples in lnchannel
5✔
335
            return obj.hex()
5✔
336
        if hasattr(obj, 'to_json') and callable(obj.to_json):
5✔
337
            return obj.to_json()
5✔
338
        return super(MyEncoder, self).default(obj)
×
339

340

341
class ThreadJob(Logger):
5✔
342
    """A job that is run periodically from a thread's main loop.  run() is
343
    called from that thread's context.
344
    """
345

346
    def __init__(self):
5✔
347
        Logger.__init__(self)
5✔
348

349
    def run(self):
5✔
350
        """Called periodically from the thread"""
351
        pass
×
352

353
class DebugMem(ThreadJob):
5✔
354
    '''A handy class for debugging GC memory leaks'''
355
    def __init__(self, classes, interval=30):
5✔
356
        ThreadJob.__init__(self)
×
357
        self.next_time = 0
×
358
        self.classes = classes
×
359
        self.interval = interval
×
360

361
    def mem_stats(self):
5✔
362
        import gc
×
363
        self.logger.info("Start memscan")
×
364
        gc.collect()
×
365
        objmap = defaultdict(list)
×
366
        for obj in gc.get_objects():
×
367
            for class_ in self.classes:
×
368
                if isinstance(obj, class_):
×
369
                    objmap[class_].append(obj)
×
370
        for class_, objs in objmap.items():
×
371
            self.logger.info(f"{class_.__name__}: {len(objs)}")
×
372
        self.logger.info("Finish memscan")
×
373

374
    def run(self):
5✔
375
        if time.time() > self.next_time:
×
376
            self.mem_stats()
×
377
            self.next_time = time.time() + self.interval
×
378

379
class DaemonThread(threading.Thread, Logger):
5✔
380
    """ daemon thread that terminates cleanly """
381

382
    LOGGING_SHORTCUT = 'd'
5✔
383

384
    def __init__(self):
5✔
385
        threading.Thread.__init__(self)
5✔
386
        Logger.__init__(self)
5✔
387
        self.parent_thread = threading.current_thread()
5✔
388
        self.running = False
5✔
389
        self.running_lock = threading.Lock()
5✔
390
        self.job_lock = threading.Lock()
5✔
391
        self.jobs = []
5✔
392
        self.stopped_event = threading.Event()        # set when fully stopped
5✔
393
        self.stopped_event_async = asyncio.Event()    # set when fully stopped
5✔
394
        self.wake_up_event = threading.Event()  # for perf optimisation of polling in run()
5✔
395

396
    def add_jobs(self, jobs):
5✔
397
        with self.job_lock:
5✔
398
            self.jobs.extend(jobs)
5✔
399

400
    def run_jobs(self):
5✔
401
        # Don't let a throwing job disrupt the thread, future runs of
402
        # itself, or other jobs.  This is useful protection against
403
        # malformed or malicious server responses
404
        with self.job_lock:
5✔
405
            for job in self.jobs:
5✔
406
                try:
5✔
407
                    job.run()
5✔
408
                except Exception as e:
×
409
                    self.logger.exception('')
×
410

411
    def remove_jobs(self, jobs):
5✔
412
        with self.job_lock:
×
413
            for job in jobs:
×
414
                self.jobs.remove(job)
×
415

416
    def start(self):
5✔
417
        with self.running_lock:
5✔
418
            self.running = True
5✔
419
        return threading.Thread.start(self)
5✔
420

421
    def is_running(self):
5✔
422
        with self.running_lock:
5✔
423
            return self.running and self.parent_thread.is_alive()
5✔
424

425
    def stop(self):
5✔
426
        with self.running_lock:
5✔
427
            self.running = False
5✔
428
            self.wake_up_event.set()
5✔
429
            self.wake_up_event.clear()
5✔
430

431
    def on_stop(self):
5✔
432
        if 'ANDROID_DATA' in os.environ:
5✔
433
            import jnius
×
434
            jnius.detach()
×
435
            self.logger.info("jnius detach")
×
436
        self.logger.info("stopped")
5✔
437
        self.stopped_event.set()
5✔
438
        loop = get_asyncio_loop()
5✔
439
        loop.call_soon_threadsafe(self.stopped_event_async.set)
5✔
440

441

442
def print_stderr(*args):
5✔
443
    args = [str(item) for item in args]
×
444
    sys.stderr.write(" ".join(args) + "\n")
×
445
    sys.stderr.flush()
×
446

447
def print_msg(*args):
5✔
448
    # Stringify args
449
    args = [str(item) for item in args]
×
450
    sys.stdout.write(" ".join(args) + "\n")
×
451
    sys.stdout.flush()
×
452

453
def json_encode(obj):
5✔
454
    try:
×
455
        s = json.dumps(obj, sort_keys = True, indent = 4, cls=MyEncoder)
×
456
    except TypeError:
×
457
        s = repr(obj)
×
458
    return s
×
459

460
def json_decode(x):
5✔
461
    try:
5✔
462
        return json.loads(x, parse_float=Decimal)
5✔
463
    except Exception:
5✔
464
        return x
5✔
465

466
def json_normalize(x):
5✔
467
    # note: The return value of commands, when going through the JSON-RPC interface,
468
    #       is json-encoded. The encoder used there cannot handle some types, e.g. electrum.util.Satoshis.
469
    # note: We should not simply do "json_encode(x)" here, as then later x would get doubly json-encoded.
470
    # see #5868
471
    return json_decode(json_encode(x))
×
472

473

474
# taken from Django Source Code
475
def constant_time_compare(val1, val2):
5✔
476
    """Return True if the two strings are equal, False otherwise."""
477
    return hmac.compare_digest(to_bytes(val1, 'utf8'), to_bytes(val2, 'utf8'))
×
478

479

480
_profiler_logger = _logger.getChild('profiler')
5✔
481
def profiler(func=None, *, min_threshold: Union[int, float, None] = None):
5✔
482
    """Function decorator that logs execution time.
483

484
    min_threshold: if set, only log if time taken is higher than threshold
485
    NOTE: does not work with async methods.
486
    """
487
    if func is None:  # to make "@profiler(...)" work. (in addition to bare "@profiler")
5✔
488
        return partial(profiler, min_threshold=min_threshold)
5✔
489
    def do_profile(*args, **kw_args):
5✔
490
        name = func.__qualname__
5✔
491
        t0 = time.time()
5✔
492
        o = func(*args, **kw_args)
5✔
493
        t = time.time() - t0
5✔
494
        if min_threshold is None or t > min_threshold:
5✔
495
            _profiler_logger.debug(f"{name} {t:,.4f} sec")
5✔
496
        return o
5✔
497
    return do_profile
5✔
498

499

500
class AsyncHangDetector:
5✔
501
    """Context manager that logs every `n` seconds if encapsulated context still has not exited."""
502

503
    def __init__(
5✔
504
        self,
505
        *,
506
        period_sec: int = 15,
507
        message: str,
508
        logger: logging.Logger = None,
509
    ):
510
        self.period_sec = period_sec
5✔
511
        self.message = message
5✔
512
        self.logger = logger or _logger
5✔
513

514
    async def _monitor(self):
5✔
515
        # note: this assumes that the event loop itself is not blocked
516
        t0 = time.monotonic()
5✔
517
        while True:
5✔
518
            await asyncio.sleep(self.period_sec)
5✔
519
            t1 = time.monotonic()
×
520
            self.logger.info(f"{self.message} (after {t1 - t0:.2f} sec)")
×
521

522
    async def __aenter__(self):
5✔
523
        self.mtask = asyncio.create_task(self._monitor())
5✔
524

525
    async def __aexit__(self, exc_type, exc, tb):
5✔
526
        self.mtask.cancel()
5✔
527

528

529
def android_ext_dir():
5✔
530
    from android.storage import primary_external_storage_path
×
531
    return primary_external_storage_path()
×
532

533
def android_backup_dir():
5✔
534
    pkgname = get_android_package_name()
×
535
    d = os.path.join(android_ext_dir(), pkgname)
×
536
    if not os.path.exists(d):
×
537
        os.mkdir(d)
×
538
    return d
×
539

540
def android_data_dir():
5✔
541
    import jnius
×
542
    PythonActivity = jnius.autoclass('org.kivy.android.PythonActivity')
×
543
    return PythonActivity.mActivity.getFilesDir().getPath() + '/data'
×
544

545
def ensure_sparse_file(filename):
5✔
546
    # On modern Linux, no need to do anything.
547
    # On Windows, need to explicitly mark file.
548
    if os.name == "nt":
×
549
        try:
×
550
            os.system('fsutil sparse setflag "{}" 1'.format(filename))
×
551
        except Exception as e:
×
552
            _logger.info(f'error marking file {filename} as sparse: {e}')
×
553

554

555
def get_headers_dir(config):
5✔
556
    return config.path
5✔
557

558

559
def assert_datadir_available(config_path):
5✔
560
    path = config_path
5✔
561
    if os.path.exists(path):
5✔
562
        return
5✔
563
    else:
564
        raise FileNotFoundError(
×
565
            'Electrum datadir does not exist. Was it deleted while running?' + '\n' +
566
            'Should be at {}'.format(path))
567

568

569
def assert_file_in_datadir_available(path, config_path):
5✔
570
    if os.path.exists(path):
×
571
        return
×
572
    else:
573
        assert_datadir_available(config_path)
×
574
        raise FileNotFoundError(
×
575
            'Cannot find file but datadir is there.' + '\n' +
576
            'Should be at {}'.format(path))
577

578

579
def standardize_path(path):
5✔
580
    # note: os.path.realpath() is not used, as on Windows it can return non-working paths (see #8495).
581
    #       This means that we don't resolve symlinks!
582
    return os.path.normcase(
5✔
583
                os.path.abspath(
584
                    os.path.expanduser(
585
                        path
586
    )))
587

588

589
def get_new_wallet_name(wallet_folder: str) -> str:
5✔
590
    """Returns a file basename for a new wallet to be used.
591
    Can raise OSError.
592
    """
593
    i = 1
5✔
594
    while True:
5✔
595
        filename = "wallet_%d" % i
5✔
596
        if filename in os.listdir(wallet_folder):
5✔
597
            i += 1
5✔
598
        else:
599
            break
5✔
600
    return filename
5✔
601

602

603
def is_android_debug_apk() -> bool:
5✔
604
    is_android = 'ANDROID_DATA' in os.environ
×
605
    if not is_android:
×
606
        return False
×
607
    from jnius import autoclass
×
608
    pkgname = get_android_package_name()
×
609
    build_config = autoclass(f"{pkgname}.BuildConfig")
×
610
    return bool(build_config.DEBUG)
×
611

612

613
def get_android_package_name() -> str:
5✔
614
    is_android = 'ANDROID_DATA' in os.environ
×
615
    assert is_android
×
616
    from jnius import autoclass
×
617
    from android.config import ACTIVITY_CLASS_NAME
×
618
    activity = autoclass(ACTIVITY_CLASS_NAME).mActivity
×
619
    pkgname = str(activity.getPackageName())
×
620
    return pkgname
×
621

622

623
def assert_bytes(*args):
5✔
624
    """
625
    porting helper, assert args type
626
    """
627
    try:
5✔
628
        for x in args:
5✔
629
            assert isinstance(x, (bytes, bytearray))
5✔
630
    except Exception:
×
631
        print('assert bytes failed', list(map(type, args)))
×
632
        raise
×
633

634

635
def assert_str(*args):
5✔
636
    """
637
    porting helper, assert args type
638
    """
639
    for x in args:
×
640
        assert isinstance(x, str)
×
641

642

643
def to_string(x, enc) -> str:
5✔
644
    if isinstance(x, (bytes, bytearray)):
5✔
645
        return x.decode(enc)
5✔
646
    if isinstance(x, str):
×
647
        return x
×
648
    else:
649
        raise TypeError("Not a string or bytes like object")
×
650

651

652
def to_bytes(something, encoding='utf8') -> bytes:
5✔
653
    """
654
    cast string to bytes() like object, but for python2 support it's bytearray copy
655
    """
656
    if isinstance(something, bytes):
5✔
657
        return something
5✔
658
    if isinstance(something, str):
5✔
659
        return something.encode(encoding)
5✔
660
    elif isinstance(something, bytearray):
5✔
661
        return bytes(something)
5✔
662
    else:
663
        raise TypeError("Not a string or bytes like object")
5✔
664

665

666
bfh = bytes.fromhex
5✔
667

668

669
def xor_bytes(a: bytes, b: bytes) -> bytes:
5✔
670
    size = min(len(a), len(b))
5✔
671
    return ((int.from_bytes(a[:size], "big") ^ int.from_bytes(b[:size], "big"))
5✔
672
            .to_bytes(size, "big"))
673

674

675
def user_dir():
5✔
676
    if "ELECTRUMDIR" in os.environ:
5✔
677
        return os.environ["ELECTRUMDIR"]
×
678
    elif 'ANDROID_DATA' in os.environ:
5✔
679
        return android_data_dir()
×
680
    elif os.name == 'posix':
5✔
681
        return os.path.join(os.environ["HOME"], ".electrum")
5✔
682
    elif "APPDATA" in os.environ:
×
683
        return os.path.join(os.environ["APPDATA"], "Electrum")
×
684
    elif "LOCALAPPDATA" in os.environ:
×
685
        return os.path.join(os.environ["LOCALAPPDATA"], "Electrum")
×
686
    else:
687
        #raise Exception("No home directory found in environment variables.")
688
        return
×
689

690

691
def resource_path(*parts):
5✔
692
    return os.path.join(pkg_dir, *parts)
5✔
693

694

695
# absolute path to python package folder of electrum ("lib")
696
pkg_dir = os.path.split(os.path.realpath(__file__))[0]
5✔
697

698

699
def is_valid_email(s):
5✔
700
    regexp = r"[^@]+@[^@]+\.[^@]+"
×
701
    return re.match(regexp, s) is not None
×
702

703

704
def is_hash256_str(text: Any) -> bool:
5✔
705
    if not isinstance(text, str): return False
5✔
706
    if len(text) != 64: return False
5✔
707
    return is_hex_str(text)
5✔
708

709

710
def is_hex_str(text: Any) -> bool:
5✔
711
    if not isinstance(text, str): return False
5✔
712
    try:
5✔
713
        b = bytes.fromhex(text)
5✔
714
    except Exception:
5✔
715
        return False
5✔
716
    # forbid whitespaces in text:
717
    if len(text) != 2 * len(b):
5✔
718
        return False
5✔
719
    return True
5✔
720

721

722
def is_integer(val: Any) -> bool:
5✔
723
    return isinstance(val, int)
5✔
724

725

726
def is_non_negative_integer(val: Any) -> bool:
5✔
727
    if is_integer(val):
5✔
728
        return val >= 0
5✔
729
    return False
5✔
730

731

732
def is_int_or_float(val: Any) -> bool:
5✔
733
    return isinstance(val, (int, float))
5✔
734

735

736
def is_non_negative_int_or_float(val: Any) -> bool:
5✔
737
    if is_int_or_float(val):
5✔
738
        return val >= 0
5✔
739
    return False
5✔
740

741

742
def chunks(items, size: int):
5✔
743
    """Break up items, an iterable, into chunks of length size."""
744
    if size < 1:
5✔
745
        raise ValueError(f"size must be positive, not {repr(size)}")
5✔
746
    for i in range(0, len(items), size):
5✔
747
        yield items[i: i + size]
5✔
748

749

750
def format_satoshis_plain(
5✔
751
        x: Union[int, float, Decimal, str],  # amount in satoshis,
752
        *,
753
        decimal_point: int = 8,  # how much to shift decimal point to left (default: sat->BTC)
754
) -> str:
755
    """Display a satoshi amount scaled.  Always uses a '.' as a decimal
756
    point and has no thousands separator"""
757
    if parse_max_spend(x):
5✔
758
        return f'max({x})'
×
759
    assert isinstance(x, (int, float, Decimal)), f"{x!r} should be a number"
5✔
760
    scale_factor = pow(10, decimal_point)
5✔
761
    return "{:.8f}".format(Decimal(x) / scale_factor).rstrip('0').rstrip('.')
5✔
762

763

764
# Check that Decimal precision is sufficient.
765
# We need at the very least ~20, as we deal with msat amounts, and
766
# log10(21_000_000 * 10**8 * 1000) ~= 18.3
767
# decimal.DefaultContext.prec == 28 by default, but it is mutable.
768
# We enforce that we have at least that available.
769
assert decimal.getcontext().prec >= 28, f"PyDecimal precision too low: {decimal.getcontext().prec}"
5✔
770

771
# DECIMAL_POINT = locale.localeconv()['decimal_point']  # type: str
772
DECIMAL_POINT = "."
5✔
773
THOUSANDS_SEP = " "
5✔
774
assert len(DECIMAL_POINT) == 1, f"DECIMAL_POINT has unexpected len. {DECIMAL_POINT!r}"
5✔
775
assert len(THOUSANDS_SEP) == 1, f"THOUSANDS_SEP has unexpected len. {THOUSANDS_SEP!r}"
5✔
776

777

778
def format_satoshis(
5✔
779
        x: Union[int, float, Decimal, str, None],  # amount in satoshis
780
        *,
781
        num_zeros: int = 0,
782
        decimal_point: int = 8,  # how much to shift decimal point to left (default: sat->BTC)
783
        precision: int = 0,  # extra digits after satoshi precision
784
        is_diff: bool = False,  # if True, enforce a leading sign (+/-)
785
        whitespaces: bool = False,  # if True, add whitespaces, to align numbers in a column
786
        add_thousands_sep: bool = False,  # if True, add whitespaces, for better readability of the numbers
787
) -> str:
788
    if x is None:
5✔
789
        return 'unknown'
×
790
    if parse_max_spend(x):
5✔
791
        return f'max({x})'
×
792
    assert isinstance(x, (int, float, Decimal)), f"{x!r} should be a number"
5✔
793
    # lose redundant precision
794
    x = Decimal(x).quantize(Decimal(10) ** (-precision))
5✔
795
    # format string
796
    overall_precision = decimal_point + precision  # max digits after final decimal point
5✔
797
    decimal_format = "." + str(overall_precision) if overall_precision > 0 else ""
5✔
798
    if is_diff:
5✔
799
        decimal_format = '+' + decimal_format
5✔
800
    # initial result
801
    scale_factor = pow(10, decimal_point)
5✔
802
    result = ("{:" + decimal_format + "f}").format(x / scale_factor)
5✔
803
    if "." not in result: result += "."
5✔
804
    result = result.rstrip('0')
5✔
805
    # add extra decimal places (zeros)
806
    integer_part, fract_part = result.split(".")
5✔
807
    if len(fract_part) < num_zeros:
5✔
808
        fract_part += "0" * (num_zeros - len(fract_part))
5✔
809
    # add whitespaces as thousands' separator for better readability of numbers
810
    if add_thousands_sep:
5✔
811
        sign = integer_part[0] if integer_part[0] in ("+", "-") else ""
5✔
812
        if sign == "-":
5✔
813
            integer_part = integer_part[1:]
5✔
814
        integer_part = "{:,}".format(int(integer_part)).replace(',', THOUSANDS_SEP)
5✔
815
        integer_part = sign + integer_part
5✔
816
        fract_part = THOUSANDS_SEP.join(fract_part[i:i+3] for i in range(0, len(fract_part), 3))
5✔
817
    result = integer_part + DECIMAL_POINT + fract_part
5✔
818
    # add leading/trailing whitespaces so that numbers can be aligned in a column
819
    if whitespaces:
5✔
820
        target_fract_len = overall_precision
5✔
821
        target_integer_len = 14 - decimal_point  # should be enough for up to unsigned 999999 BTC
5✔
822
        if add_thousands_sep:
5✔
823
            target_fract_len += max(0, (target_fract_len - 1) // 3)
5✔
824
            target_integer_len += max(0, (target_integer_len - 1) // 3)
5✔
825
        # add trailing whitespaces
826
        result += " " * (target_fract_len - len(fract_part))
5✔
827
        # add leading whitespaces
828
        target_total_len = target_integer_len + 1 + target_fract_len
5✔
829
        result = " " * (target_total_len - len(result)) + result
5✔
830
    return result
5✔
831

832

833
FEERATE_PRECISION = 1  # num fractional decimal places for sat/byte fee rates
5✔
834
_feerate_quanta = Decimal(10) ** (-FEERATE_PRECISION)
5✔
835
UI_UNIT_NAME_FEERATE_SAT_PER_VBYTE = "sat/vbyte"
5✔
836
UI_UNIT_NAME_FEERATE_SAT_PER_VB = "sat/vB"
5✔
837
UI_UNIT_NAME_TXSIZE_VBYTES = "vbytes"
5✔
838
UI_UNIT_NAME_MEMPOOL_MB = "vMB"
5✔
839

840

841
def format_fee_satoshis(fee, *, num_zeros=0, precision=None):
5✔
842
    if precision is None:
5✔
843
        precision = FEERATE_PRECISION
5✔
844
    num_zeros = min(num_zeros, FEERATE_PRECISION)  # no more zeroes than available prec
5✔
845
    return format_satoshis(fee, num_zeros=num_zeros, decimal_point=0, precision=precision)
5✔
846

847

848
def quantize_feerate(fee) -> Union[None, Decimal, int]:
5✔
849
    """Strip sat/byte fee rate of excess precision."""
850
    if fee is None:
5✔
851
        return None
×
852
    return Decimal(fee).quantize(_feerate_quanta, rounding=decimal.ROUND_HALF_DOWN)
5✔
853

854

855
def timestamp_to_datetime(timestamp: Union[int, float, None], *, utc: bool = False) -> Optional[datetime]:
5✔
856
    if timestamp is None:
5✔
857
        return None
×
858
    tz = None
5✔
859
    if utc:
5✔
860
        tz = timezone.utc
×
861
    return datetime.fromtimestamp(timestamp, tz=tz)
5✔
862

863

864
def format_time(timestamp: Union[int, float, None]) -> str:
5✔
865
    date = timestamp_to_datetime(timestamp)
×
866
    return date.isoformat(' ', timespec="minutes") if date else _("Unknown")
×
867

868

869
def age(
5✔
870
    from_date: Union[int, float, None],  # POSIX timestamp
871
    *,
872
    since_date: datetime = None,
873
    target_tz=None,
874
    include_seconds: bool = False,
875
) -> str:
876
    """Takes a timestamp and returns a string with the approximation of the age"""
877
    if from_date is None:
5✔
878
        return _("Unknown")
5✔
879

880
    from_date = datetime.fromtimestamp(from_date)
5✔
881
    if since_date is None:
5✔
882
        since_date = datetime.now(target_tz)
×
883

884
    distance_in_time = from_date - since_date
5✔
885
    is_in_past = from_date < since_date
5✔
886
    distance_in_seconds = int(round(abs(distance_in_time.days * 86400 + distance_in_time.seconds)))
5✔
887
    distance_in_minutes = int(round(distance_in_seconds / 60))
5✔
888

889
    if distance_in_minutes == 0:
5✔
890
        if include_seconds:
5✔
891
            if is_in_past:
5✔
892
                return _("{} seconds ago").format(distance_in_seconds)
5✔
893
            else:
894
                return _("in {} seconds").format(distance_in_seconds)
5✔
895
        else:
896
            if is_in_past:
5✔
897
                return _("less than a minute ago")
5✔
898
            else:
899
                return _("in less than a minute")
5✔
900
    elif distance_in_minutes < 45:
5✔
901
        if is_in_past:
5✔
902
            return _("about {} minutes ago").format(distance_in_minutes)
5✔
903
        else:
904
            return _("in about {} minutes").format(distance_in_minutes)
5✔
905
    elif distance_in_minutes < 90:
5✔
906
        if is_in_past:
5✔
907
            return _("about 1 hour ago")
5✔
908
        else:
909
            return _("in about 1 hour")
5✔
910
    elif distance_in_minutes < 1440:
5✔
911
        if is_in_past:
5✔
912
            return _("about {} hours ago").format(round(distance_in_minutes / 60.0))
5✔
913
        else:
914
            return _("in about {} hours").format(round(distance_in_minutes / 60.0))
5✔
915
    elif distance_in_minutes < 2880:
5✔
916
        if is_in_past:
5✔
917
            return _("about 1 day ago")
5✔
918
        else:
919
            return _("in about 1 day")
5✔
920
    elif distance_in_minutes < 43220:
5✔
921
        if is_in_past:
5✔
922
            return _("about {} days ago").format(round(distance_in_minutes / 1440))
5✔
923
        else:
924
            return _("in about {} days").format(round(distance_in_minutes / 1440))
5✔
925
    elif distance_in_minutes < 86400:
5✔
926
        if is_in_past:
5✔
927
            return _("about 1 month ago")
5✔
928
        else:
929
            return _("in about 1 month")
5✔
930
    elif distance_in_minutes < 525600:
5✔
931
        if is_in_past:
5✔
932
            return _("about {} months ago").format(round(distance_in_minutes / 43200))
5✔
933
        else:
934
            return _("in about {} months").format(round(distance_in_minutes / 43200))
5✔
935
    elif distance_in_minutes < 1051200:
5✔
936
        if is_in_past:
5✔
937
            return _("about 1 year ago")
5✔
938
        else:
939
            return _("in about 1 year")
5✔
940
    else:
941
        if is_in_past:
5✔
942
            return _("over {} years ago").format(round(distance_in_minutes / 525600))
5✔
943
        else:
944
            return _("in over {} years").format(round(distance_in_minutes / 525600))
5✔
945

946
mainnet_block_explorers = {
5✔
947
    '3xpl.com': ('https://3xpl.com/bitcoin/',
948
                        {'tx': 'transaction/', 'addr': 'address/'}),
949
    'Bitflyer.jp': ('https://chainflyer.bitflyer.jp/',
950
                        {'tx': 'Transaction/', 'addr': 'Address/'}),
951
    'Blockchain.info': ('https://blockchain.com/btc/',
952
                        {'tx': 'tx/', 'addr': 'address/'}),
953
    'Blockstream.info': ('https://blockstream.info/',
954
                        {'tx': 'tx/', 'addr': 'address/'}),
955
    'Bitaps.com': ('https://btc.bitaps.com/',
956
                        {'tx': '', 'addr': ''}),
957
    'BTC.com': ('https://btc.com/',
958
                        {'tx': '', 'addr': ''}),
959
    'Chain.so': ('https://www.chain.so/',
960
                        {'tx': 'tx/BTC/', 'addr': 'address/BTC/'}),
961
    'Insight.is': ('https://insight.bitpay.com/',
962
                        {'tx': 'tx/', 'addr': 'address/'}),
963
    'BlockCypher.com': ('https://live.blockcypher.com/btc/',
964
                        {'tx': 'tx/', 'addr': 'address/'}),
965
    'Blockchair.com': ('https://blockchair.com/bitcoin/',
966
                        {'tx': 'transaction/', 'addr': 'address/'}),
967
    'blockonomics.co': ('https://www.blockonomics.co/',
968
                        {'tx': 'api/tx?txid=', 'addr': '#/search?q='}),
969
    'mempool.space': ('https://mempool.space/',
970
                        {'tx': 'tx/', 'addr': 'address/'}),
971
    'mempool.emzy.de': ('https://mempool.emzy.de/',
972
                        {'tx': 'tx/', 'addr': 'address/'}),
973
    'OXT.me': ('https://oxt.me/',
974
                        {'tx': 'transaction/', 'addr': 'address/'}),
975
    'mynode.local': ('http://mynode.local:3002/',
976
                        {'tx': 'tx/', 'addr': 'address/'}),
977
    'system default': ('blockchain:/',
978
                        {'tx': 'tx/', 'addr': 'address/'}),
979
}
980

981
testnet_block_explorers = {
5✔
982
    'Bitaps.com': ('https://tbtc.bitaps.com/',
983
                       {'tx': '', 'addr': ''}),
984
    'BlockCypher.com': ('https://live.blockcypher.com/btc-testnet/',
985
                       {'tx': 'tx/', 'addr': 'address/'}),
986
    'Blockchain.info': ('https://www.blockchain.com/btc-testnet/',
987
                       {'tx': 'tx/', 'addr': 'address/'}),
988
    'Blockstream.info': ('https://blockstream.info/testnet/',
989
                        {'tx': 'tx/', 'addr': 'address/'}),
990
    'mempool.space': ('https://mempool.space/testnet/',
991
                        {'tx': 'tx/', 'addr': 'address/'}),
992
    'smartbit.com.au': ('https://testnet.smartbit.com.au/',
993
                       {'tx': 'tx/', 'addr': 'address/'}),
994
    'system default': ('blockchain://000000000933ea01ad0ee984209779baaec3ced90fa3f408719526f8d77f4943/',
995
                       {'tx': 'tx/', 'addr': 'address/'}),
996
}
997

998
testnet4_block_explorers = {
5✔
999
    'mempool.space': ('https://mempool.space/testnet4/',
1000
                        {'tx': 'tx/', 'addr': 'address/'}),
1001
    'wakiyamap.dev': ('https://testnet4-explorer.wakiyamap.dev/',
1002
                       {'tx': 'tx/', 'addr': 'address/'}),
1003
}
1004

1005
signet_block_explorers = {
5✔
1006
    'bc-2.jp': ('https://explorer.bc-2.jp/',
1007
                        {'tx': 'tx/', 'addr': 'address/'}),
1008
    'mempool.space': ('https://mempool.space/signet/',
1009
                        {'tx': 'tx/', 'addr': 'address/'}),
1010
    'bitcoinexplorer.org': ('https://signet.bitcoinexplorer.org/',
1011
                       {'tx': 'tx/', 'addr': 'address/'}),
1012
    'wakiyamap.dev': ('https://signet-explorer.wakiyamap.dev/',
1013
                       {'tx': 'tx/', 'addr': 'address/'}),
1014
    'ex.signet.bublina.eu.org': ('https://ex.signet.bublina.eu.org/',
1015
                       {'tx': 'tx/', 'addr': 'address/'}),
1016
    'system default': ('blockchain:/',
1017
                       {'tx': 'tx/', 'addr': 'address/'}),
1018
}
1019

1020
_block_explorer_default_api_loc = {'tx': 'tx/', 'addr': 'address/'}
5✔
1021

1022

1023
def block_explorer_info():
5✔
1024
    from . import constants
×
1025
    if constants.net.NET_NAME == "testnet":
×
1026
        return testnet_block_explorers
×
1027
    elif constants.net.NET_NAME == "testnet4":
×
1028
        return testnet4_block_explorers
×
1029
    elif constants.net.NET_NAME == "signet":
×
1030
        return signet_block_explorers
×
1031
    return mainnet_block_explorers
×
1032

1033

1034
def block_explorer(config: 'SimpleConfig') -> Optional[str]:
5✔
1035
    """Returns name of selected block explorer,
1036
    or None if a custom one (not among hardcoded ones) is configured.
1037
    """
1038
    if config.BLOCK_EXPLORER_CUSTOM is not None:
×
1039
        return None
×
1040
    be_key = config.BLOCK_EXPLORER
×
1041
    be_tuple = block_explorer_info().get(be_key)
×
1042
    if be_tuple is None:
×
1043
        be_key = config.cv.BLOCK_EXPLORER.get_default_value()
×
1044
    assert isinstance(be_key, str), f"{be_key!r} should be str"
×
1045
    return be_key
×
1046

1047

1048
def block_explorer_tuple(config: 'SimpleConfig') -> Optional[Tuple[str, dict]]:
5✔
1049
    custom_be = config.BLOCK_EXPLORER_CUSTOM
×
1050
    if custom_be:
×
1051
        if isinstance(custom_be, str):
×
1052
            return custom_be, _block_explorer_default_api_loc
×
1053
        if isinstance(custom_be, (tuple, list)) and len(custom_be) == 2:
×
1054
            return tuple(custom_be)
×
1055
        _logger.warning(f"not using {config.cv.BLOCK_EXPLORER_CUSTOM.key()!r} from config. "
×
1056
                        f"expected a str or a pair but got {custom_be!r}")
1057
        return None
×
1058
    else:
1059
        # using one of the hardcoded block explorers
1060
        return block_explorer_info().get(block_explorer(config))
×
1061

1062

1063
def block_explorer_URL(config: 'SimpleConfig', kind: str, item: str) -> Optional[str]:
5✔
1064
    be_tuple = block_explorer_tuple(config)
×
1065
    if not be_tuple:
×
1066
        return
×
1067
    explorer_url, explorer_dict = be_tuple
×
1068
    kind_str = explorer_dict.get(kind)
×
1069
    if kind_str is None:
×
1070
        return
×
1071
    if explorer_url[-1] != "/":
×
1072
        explorer_url += "/"
×
1073
    url_parts = [explorer_url, kind_str, item]
×
1074
    return ''.join(url_parts)
×
1075

1076

1077

1078

1079

1080
# Python bug (http://bugs.python.org/issue1927) causes raw_input
1081
# to be redirected improperly between stdin/stderr on Unix systems
1082
#TODO: py3
1083
def raw_input(prompt=None):
5✔
1084
    if prompt:
×
1085
        sys.stdout.write(prompt)
×
1086
    return builtin_raw_input()
×
1087

1088
builtin_raw_input = builtins.input
5✔
1089
builtins.input = raw_input
5✔
1090

1091

1092
def parse_json(message):
5✔
1093
    # TODO: check \r\n pattern
1094
    n = message.find(b'\n')
×
1095
    if n==-1:
×
1096
        return None, message
×
1097
    try:
×
1098
        j = json.loads(message[0:n].decode('utf8'))
×
1099
    except Exception:
×
1100
        j = None
×
1101
    return j, message[n+1:]
×
1102

1103

1104
def setup_thread_excepthook():
5✔
1105
    """
1106
    Workaround for `sys.excepthook` thread bug from:
1107
    http://bugs.python.org/issue1230540
1108

1109
    Call once from the main thread before creating any threads.
1110
    """
1111

1112
    init_original = threading.Thread.__init__
×
1113

1114
    def init(self, *args, **kwargs):
×
1115

1116
        init_original(self, *args, **kwargs)
×
1117
        run_original = self.run
×
1118

1119
        def run_with_except_hook(*args2, **kwargs2):
×
1120
            try:
×
1121
                run_original(*args2, **kwargs2)
×
1122
            except Exception:
×
1123
                sys.excepthook(*sys.exc_info())
×
1124

1125
        self.run = run_with_except_hook
×
1126

1127
    threading.Thread.__init__ = init
×
1128

1129

1130
def send_exception_to_crash_reporter(e: BaseException):
5✔
1131
    from .base_crash_reporter import send_exception_to_crash_reporter
×
1132
    send_exception_to_crash_reporter(e)
×
1133

1134

1135
def versiontuple(v):
5✔
1136
    return tuple(map(int, (v.split("."))))
5✔
1137

1138

1139
def read_json_file(path):
5✔
1140
    try:
5✔
1141
        with open(path, 'r', encoding='utf-8') as f:
5✔
1142
            data = json.loads(f.read())
5✔
1143
    except json.JSONDecodeError:
×
1144
        _logger.exception('')
×
1145
        raise FileImportFailed(_("Invalid JSON code."))
×
1146
    except BaseException as e:
×
1147
        _logger.exception('')
×
1148
        raise FileImportFailed(e)
×
1149
    return data
5✔
1150

1151

1152
def write_json_file(path, data):
5✔
1153
    try:
×
1154
        with open(path, 'w+', encoding='utf-8') as f:
×
1155
            json.dump(data, f, indent=4, sort_keys=True, cls=MyEncoder)
×
1156
    except (IOError, os.error) as e:
×
1157
        _logger.exception('')
×
1158
        raise FileExportFailed(e)
×
1159

1160

1161
def os_chmod(path, mode):
5✔
1162
    """os.chmod aware of tmpfs"""
1163
    try:
5✔
1164
        os.chmod(path, mode)
5✔
1165
    except OSError as e:
×
1166
        xdg_runtime_dir = os.environ.get("XDG_RUNTIME_DIR", None)
×
1167
        if xdg_runtime_dir and is_subpath(path, xdg_runtime_dir):
×
1168
            _logger.info(f"Tried to chmod in tmpfs. Skipping... {e!r}")
×
1169
        else:
1170
            raise
×
1171

1172

1173
def make_dir(path, allow_symlink=True):
5✔
1174
    """Make directory if it does not yet exist."""
1175
    if not os.path.exists(path):
5✔
1176
        if not allow_symlink and os.path.islink(path):
5✔
1177
            raise Exception('Dangling link: ' + path)
×
1178
        os.mkdir(path)
5✔
1179
        os_chmod(path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
5✔
1180

1181

1182
def is_subpath(long_path: str, short_path: str) -> bool:
5✔
1183
    """Returns whether long_path is a sub-path of short_path."""
1184
    try:
5✔
1185
        common = os.path.commonpath([long_path, short_path])
5✔
1186
    except ValueError:
5✔
1187
        return False
5✔
1188
    short_path = standardize_path(short_path)
5✔
1189
    common     = standardize_path(common)
5✔
1190
    return short_path == common
5✔
1191

1192

1193
def log_exceptions(func):
5✔
1194
    """Decorator to log AND re-raise exceptions."""
1195
    assert asyncio.iscoroutinefunction(func), 'func needs to be a coroutine'
5✔
1196
    @functools.wraps(func)
5✔
1197
    async def wrapper(*args, **kwargs):
5✔
1198
        self = args[0] if len(args) > 0 else None
5✔
1199
        try:
5✔
1200
            return await func(*args, **kwargs)
5✔
1201
        except asyncio.CancelledError as e:
5✔
1202
            raise
5✔
1203
        except BaseException as e:
5✔
1204
            mylogger = self.logger if hasattr(self, 'logger') else _logger
5✔
1205
            try:
5✔
1206
                mylogger.exception(f"Exception in {func.__name__}: {repr(e)}")
5✔
1207
            except BaseException as e2:
×
1208
                print(f"logging exception raised: {repr(e2)}... orig exc: {repr(e)} in {func.__name__}")
×
1209
            raise
5✔
1210
    return wrapper
5✔
1211

1212

1213
def ignore_exceptions(func):
5✔
1214
    """Decorator to silently swallow all exceptions."""
1215
    assert asyncio.iscoroutinefunction(func), 'func needs to be a coroutine'
5✔
1216
    @functools.wraps(func)
5✔
1217
    async def wrapper(*args, **kwargs):
5✔
1218
        try:
×
1219
            return await func(*args, **kwargs)
×
1220
        except Exception as e:
×
1221
            pass
×
1222
    return wrapper
5✔
1223

1224

1225
def with_lock(func):
5✔
1226
    """Decorator to enforce a lock on a function call."""
1227
    def func_wrapper(self, *args, **kwargs):
5✔
1228
        with self.lock:
5✔
1229
            return func(self, *args, **kwargs)
5✔
1230
    return func_wrapper
5✔
1231

1232

1233
class TxMinedInfo(NamedTuple):
5✔
1234
    height: int                        # height of block that mined tx
5✔
1235
    conf: Optional[int] = None         # number of confirmations, SPV verified. >=0, or None (None means unknown)
5✔
1236
    timestamp: Optional[int] = None    # timestamp of block that mined tx
5✔
1237
    txpos: Optional[int] = None        # position of tx in serialized block
5✔
1238
    header_hash: Optional[str] = None  # hash of block that mined tx
5✔
1239
    wanted_height: Optional[int] = None  # in case of timelock, min abs block height
5✔
1240

1241
    def short_id(self) -> Optional[str]:
5✔
1242
        if self.txpos is not None and self.txpos >= 0:
×
1243
            assert self.height > 0
×
1244
            return f"{self.height}x{self.txpos}"
×
1245
        return None
×
1246

1247
    def is_local_like(self) -> bool:
5✔
1248
        """Returns whether the tx is local-like (LOCAL/FUTURE)."""
1249
        from .address_synchronizer import TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT
×
1250
        if self.height > 0:
×
1251
            return False
×
1252
        if self.height in (TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT):
×
1253
            return False
×
1254
        return True
×
1255

1256

1257
class ShortID(bytes):
5✔
1258

1259
    def __repr__(self):
5✔
1260
        return f"<ShortID: {format_short_id(self)}>"
5✔
1261

1262
    def __str__(self):
5✔
1263
        return format_short_id(self)
5✔
1264

1265
    @classmethod
5✔
1266
    def from_components(cls, block_height: int, tx_pos_in_block: int, output_index: int) -> 'ShortID':
5✔
1267
        bh = block_height.to_bytes(3, byteorder='big')
5✔
1268
        tpos = tx_pos_in_block.to_bytes(3, byteorder='big')
5✔
1269
        oi = output_index.to_bytes(2, byteorder='big')
5✔
1270
        return ShortID(bh + tpos + oi)
5✔
1271

1272
    @classmethod
5✔
1273
    def from_str(cls, scid: str) -> 'ShortID':
5✔
1274
        """Parses a formatted scid str, e.g. '643920x356x0'."""
1275
        components = scid.split("x")
5✔
1276
        if len(components) != 3:
5✔
1277
            raise ValueError(f"failed to parse ShortID: {scid!r}")
×
1278
        try:
5✔
1279
            components = [int(x) for x in components]
5✔
1280
        except ValueError:
×
1281
            raise ValueError(f"failed to parse ShortID: {scid!r}") from None
×
1282
        return ShortID.from_components(*components)
5✔
1283

1284
    @classmethod
5✔
1285
    def normalize(cls, data: Union[None, str, bytes, 'ShortID']) -> Optional['ShortID']:
5✔
1286
        if isinstance(data, ShortID) or data is None:
5✔
1287
            return data
5✔
1288
        if isinstance(data, str):
5✔
1289
            assert len(data) == 16
5✔
1290
            return ShortID.fromhex(data)
5✔
1291
        if isinstance(data, (bytes, bytearray)):
5✔
1292
            assert len(data) == 8
5✔
1293
            return ShortID(data)
5✔
1294

1295
    @property
5✔
1296
    def block_height(self) -> int:
5✔
1297
        return int.from_bytes(self[:3], byteorder='big')
5✔
1298

1299
    @property
5✔
1300
    def txpos(self) -> int:
5✔
1301
        return int.from_bytes(self[3:6], byteorder='big')
5✔
1302

1303
    @property
5✔
1304
    def output_index(self) -> int:
5✔
1305
        return int.from_bytes(self[6:8], byteorder='big')
5✔
1306

1307

1308
def format_short_id(short_channel_id: Optional[bytes]):
5✔
1309
    if not short_channel_id:
5✔
1310
        return _('Not yet available')
×
1311
    return str(int.from_bytes(short_channel_id[:3], 'big')) \
5✔
1312
        + 'x' + str(int.from_bytes(short_channel_id[3:6], 'big')) \
1313
        + 'x' + str(int.from_bytes(short_channel_id[6:], 'big'))
1314

1315

1316
def make_aiohttp_session(proxy: Optional['ProxySettings'], headers=None, timeout=None):
5✔
1317
    if headers is None:
×
1318
        headers = {'User-Agent': 'Electrum'}
×
1319
    if timeout is None:
×
1320
        # The default timeout is high intentionally.
1321
        # DNS on some systems can be really slow, see e.g. #5337
1322
        timeout = aiohttp.ClientTimeout(total=45)
×
1323
    elif isinstance(timeout, (int, float)):
×
1324
        timeout = aiohttp.ClientTimeout(total=timeout)
×
1325
    ssl_context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_path)
×
1326

1327
    if proxy and proxy.enabled:
×
1328
        connector = ProxyConnector(
×
1329
            proxy_type=ProxyType.SOCKS5 if proxy.mode == 'socks5' else ProxyType.SOCKS4,
1330
            host=proxy.host,
1331
            port=int(proxy.port),
1332
            username=proxy.user,
1333
            password=proxy.password,
1334
            rdns=True,  # needed to prevent DNS leaks over proxy
1335
            ssl=ssl_context,
1336
        )
1337
    else:
1338
        connector = aiohttp.TCPConnector(ssl=ssl_context)
×
1339

1340
    return aiohttp.ClientSession(headers=headers, timeout=timeout, connector=connector)
×
1341

1342

1343
class OldTaskGroup(aiorpcx.TaskGroup):
5✔
1344
    """Automatically raises exceptions on join; as in aiorpcx prior to version 0.20.
1345
    That is, when using TaskGroup as a context manager, if any task encounters an exception,
1346
    we would like that exception to be re-raised (propagated out). For the wait=all case,
1347
    the OldTaskGroup class is emulating the following code-snippet:
1348
    ```
1349
    async with TaskGroup() as group:
1350
        await group.spawn(task1())
1351
        await group.spawn(task2())
1352

1353
        async for task in group:
1354
            if not task.cancelled():
1355
                task.result()
1356
    ```
1357
    So instead of the above, one can just write:
1358
    ```
1359
    async with OldTaskGroup() as group:
1360
        await group.spawn(task1())
1361
        await group.spawn(task2())
1362
    ```
1363
    # TODO see if we can migrate to asyncio.timeout, introduced in python 3.11, and use stdlib instead of aiorpcx.curio...
1364
    """
1365
    async def join(self):
5✔
1366
        if self._wait is all:
5✔
1367
            exc = False
5✔
1368
            try:
5✔
1369
                async for task in self:
5✔
1370
                    if not task.cancelled():
5✔
1371
                        task.result()
5✔
1372
            except BaseException:  # including asyncio.CancelledError
5✔
1373
                exc = True
5✔
1374
                raise
5✔
1375
            finally:
1376
                if exc:
5✔
1377
                    await self.cancel_remaining()
5✔
1378
                await super().join()
5✔
1379
        else:
1380
            await super().join()
5✔
1381
            if self.completed:
5✔
1382
                self.completed.result()
5✔
1383

1384
# We monkey-patch aiorpcx TimeoutAfter (used by timeout_after and ignore_after API),
1385
# to fix a timing issue present in asyncio as a whole re timing out tasks.
1386
# To see the issue we are trying to fix, consider example:
1387
#     async def outer_task():
1388
#         async with timeout_after(0.1):
1389
#             await inner_task()
1390
# When the 0.1 sec timeout expires, inner_task will get cancelled by timeout_after (=internal cancellation).
1391
# If around the same time (in terms of event loop iterations) another coroutine
1392
# cancels outer_task (=external cancellation), there will be a race.
1393
# Both cancellations work by propagating a CancelledError out to timeout_after, which then
1394
# needs to decide (in TimeoutAfter.__aexit__) whether it's due to an internal or external cancellation.
1395
# AFAICT asyncio provides no reliable way of distinguishing between the two.
1396
# This patch tries to always give priority to external cancellations.
1397
# see https://github.com/kyuupichan/aiorpcX/issues/44
1398
# see https://github.com/aio-libs/async-timeout/issues/229
1399
# see https://bugs.python.org/issue42130 and https://bugs.python.org/issue45098
1400
# TODO see if we can migrate to asyncio.timeout, introduced in python 3.11, and use stdlib instead of aiorpcx.curio...
1401
def _aiorpcx_monkeypatched_set_new_deadline(task, deadline):
5✔
1402
    def timeout_task():
5✔
1403
        task._orig_cancel()
5✔
1404
        task._timed_out = None if getattr(task, "_externally_cancelled", False) else deadline
5✔
1405
    def mycancel(*args, **kwargs):
5✔
1406
        task._orig_cancel(*args, **kwargs)
5✔
1407
        task._externally_cancelled = True
5✔
1408
        task._timed_out = None
5✔
1409
    if not hasattr(task, "_orig_cancel"):
5✔
1410
        task._orig_cancel = task.cancel
5✔
1411
        task.cancel = mycancel
5✔
1412
    task._deadline_handle = task._loop.call_at(deadline, timeout_task)
5✔
1413

1414

1415
def _aiorpcx_monkeypatched_set_task_deadline(task, deadline):
5✔
1416
    ret = _aiorpcx_orig_set_task_deadline(task, deadline)
5✔
1417
    task._externally_cancelled = None
5✔
1418
    return ret
5✔
1419

1420

1421
def _aiorpcx_monkeypatched_unset_task_deadline(task):
5✔
1422
    if hasattr(task, "_orig_cancel"):
5✔
1423
        task.cancel = task._orig_cancel
5✔
1424
        del task._orig_cancel
5✔
1425
    return _aiorpcx_orig_unset_task_deadline(task)
5✔
1426

1427

1428
_aiorpcx_orig_set_task_deadline    = aiorpcx.curio._set_task_deadline
5✔
1429
_aiorpcx_orig_unset_task_deadline  = aiorpcx.curio._unset_task_deadline
5✔
1430

1431
aiorpcx.curio._set_new_deadline    = _aiorpcx_monkeypatched_set_new_deadline
5✔
1432
aiorpcx.curio._set_task_deadline   = _aiorpcx_monkeypatched_set_task_deadline
5✔
1433
aiorpcx.curio._unset_task_deadline = _aiorpcx_monkeypatched_unset_task_deadline
5✔
1434

1435

1436
async def wait_for2(fut: Awaitable, timeout: Union[int, float, None]):
5✔
1437
    """Replacement for asyncio.wait_for,
1438
     due to bugs: https://bugs.python.org/issue42130 and https://github.com/python/cpython/issues/86296 ,
1439
     which are only fixed in python 3.12+.
1440
     """
1441
    if sys.version_info[:3] >= (3, 12):
5✔
1442
        return await asyncio.wait_for(fut, timeout)
3✔
1443
    else:
1444
        async with async_timeout(timeout):
2✔
1445
            return await asyncio.ensure_future(fut, loop=get_running_loop())
2✔
1446

1447

1448
if hasattr(asyncio, 'timeout'):  # python 3.11+
5✔
1449
    async_timeout = asyncio.timeout
4✔
1450
else:
1451
    class TimeoutAfterAsynciolike(aiorpcx.curio.TimeoutAfter):
1✔
1452
        async def __aexit__(self, exc_type, exc_value, traceback):
1✔
1453
            try:
1✔
1454
                await super().__aexit__(exc_type, exc_value, traceback)
1✔
1455
            except (aiorpcx.TaskTimeout, aiorpcx.UncaughtTimeoutError):
×
1456
                raise asyncio.TimeoutError from None
×
1457
            except aiorpcx.TimeoutCancellationError:
×
1458
                raise asyncio.CancelledError from None
×
1459

1460
    def async_timeout(delay: Union[int, float, None]):
1✔
1461
        if delay is None:
1✔
1462
            return nullcontext()
×
1463
        return TimeoutAfterAsynciolike(delay)
1✔
1464

1465

1466
class NetworkJobOnDefaultServer(Logger, ABC):
5✔
1467
    """An abstract base class for a job that runs on the main network
1468
    interface. Every time the main interface changes, the job is
1469
    restarted, and some of its internals are reset.
1470
    """
1471
    def __init__(self, network: 'Network'):
5✔
1472
        Logger.__init__(self)
5✔
1473
        self.network = network
5✔
1474
        self.interface = None  # type: Interface
5✔
1475
        self._restart_lock = asyncio.Lock()
5✔
1476
        # Ensure fairness between NetworkJobs. e.g. if multiple wallets
1477
        # are open, a large wallet's Synchronizer should not starve the small wallets:
1478
        self._network_request_semaphore = asyncio.Semaphore(100)
5✔
1479

1480
        self._reset()
5✔
1481
        # every time the main interface changes, restart:
1482
        register_callback(self._restart, ['default_server_changed'])
5✔
1483
        # also schedule a one-off restart now, as there might already be a main interface:
1484
        asyncio.run_coroutine_threadsafe(self._restart(), network.asyncio_loop)
5✔
1485

1486
    def _reset(self):
5✔
1487
        """Initialise fields. Called every time the underlying
1488
        server connection changes.
1489
        """
1490
        self.taskgroup = OldTaskGroup()
5✔
1491
        self.reset_request_counters()
5✔
1492

1493
    async def _start(self, interface: 'Interface'):
5✔
1494
        self.logger.debug(f"starting. interface.server={repr(str(interface.server))}")
×
1495
        self.interface = interface
×
1496

1497
        taskgroup = self.taskgroup
×
1498
        async def run_tasks_wrapper():
×
1499
            self.logger.debug(f"starting taskgroup ({hex(id(taskgroup))}).")
×
1500
            try:
×
1501
                await self._run_tasks(taskgroup=taskgroup)
×
1502
            except Exception as e:
×
1503
                self.logger.error(f"taskgroup died ({hex(id(taskgroup))}). exc={e!r}")
×
1504
                raise
×
1505
            finally:
1506
                self.logger.debug(f"taskgroup stopped ({hex(id(taskgroup))}).")
×
1507
        await interface.taskgroup.spawn(run_tasks_wrapper)
×
1508

1509
    @abstractmethod
5✔
1510
    async def _run_tasks(self, *, taskgroup: OldTaskGroup) -> None:
5✔
1511
        """Start tasks in taskgroup. Called every time the underlying
1512
        server connection changes.
1513
        """
1514
        # If self.taskgroup changed, don't start tasks. This can happen if we have
1515
        # been restarted *just now*, i.e. after the _run_tasks coroutine object was created.
1516
        if taskgroup != self.taskgroup:
×
1517
            raise asyncio.CancelledError()
×
1518

1519
    async def stop(self, *, full_shutdown: bool = True):
5✔
1520
        self.logger.debug(f"stopping. {full_shutdown=}")
×
1521
        if full_shutdown:
×
1522
            unregister_callback(self._restart)
×
1523
        await self.taskgroup.cancel_remaining()
×
1524

1525
    @log_exceptions
5✔
1526
    async def _restart(self, *args):
5✔
1527
        interface = self.network.interface
5✔
1528
        if interface is None:
5✔
1529
            return  # we should get called again soon
5✔
1530

1531
        async with self._restart_lock:
×
1532
            await self.stop(full_shutdown=False)
×
1533
            self._reset()
×
1534
            await self._start(interface)
×
1535

1536
    def reset_request_counters(self):
5✔
1537
        self._requests_sent = 0
5✔
1538
        self._requests_answered = 0
5✔
1539

1540
    def num_requests_sent_and_answered(self) -> Tuple[int, int]:
5✔
1541
        return self._requests_sent, self._requests_answered
×
1542

1543
    @property
5✔
1544
    def session(self):
5✔
1545
        s = self.interface.session
×
1546
        assert s is not None
×
1547
        return s
×
1548

1549

1550
async def detect_tor_socks_proxy() -> Optional[Tuple[str, int]]:
5✔
1551
    # Probable ports for Tor to listen at
1552
    candidates = [
×
1553
        ("127.0.0.1", 9050),
1554
        ("127.0.0.1", 9051),
1555
        ("127.0.0.1", 9150),
1556
    ]
1557

1558
    proxy_addr = None
×
1559
    async def test_net_addr(net_addr):
×
1560
        is_tor = await is_tor_socks_port(*net_addr)
×
1561
        # set result, and cancel remaining probes
1562
        if is_tor:
×
1563
            nonlocal proxy_addr
1564
            proxy_addr = net_addr
×
1565
            await group.cancel_remaining()
×
1566

1567
    async with OldTaskGroup() as group:
×
1568
        for net_addr in candidates:
×
1569
            await group.spawn(test_net_addr(net_addr))
×
1570
    return proxy_addr
×
1571

1572

1573
@log_exceptions
5✔
1574
async def is_tor_socks_port(host: str, port: int) -> bool:
5✔
1575
    # mimic "tor-resolve 0.0.0.0".
1576
    # see https://github.com/spesmilo/electrum/issues/7317#issuecomment-1369281075
1577
    # > this is a socks5 handshake, followed by a socks RESOLVE request as defined in
1578
    # > [tor's socks extension spec](https://github.com/torproject/torspec/blob/7116c9cdaba248aae07a3f1d0e15d9dd102f62c5/socks-extensions.txt#L63),
1579
    # > resolving 0.0.0.0, which being an IP, tor resolves itself without needing to ask a relay.
1580
    writer = None
×
1581
    try:
×
1582
        async with async_timeout(10):
×
1583
            reader, writer = await asyncio.open_connection(host, port)
×
1584
            writer.write(b'\x05\x01\x00\x05\xf0\x00\x03\x070.0.0.0\x00\x00')
×
1585
            await writer.drain()
×
1586
            data = await reader.read(1024)
×
1587
            if data == b'\x05\x00\x05\x00\x00\x01\x00\x00\x00\x00\x00\x00':
×
1588
                return True
×
1589
            return False
×
1590
    except (OSError, asyncio.TimeoutError):
×
1591
        return False
×
1592
    finally:
1593
        if writer:
×
1594
            writer.close()
×
1595

1596

1597
AS_LIB_USER_I_WANT_TO_MANAGE_MY_OWN_ASYNCIO_LOOP = False  # used by unit tests
5✔
1598

1599
_asyncio_event_loop = None  # type: Optional[asyncio.AbstractEventLoop]
5✔
1600
def get_asyncio_loop() -> asyncio.AbstractEventLoop:
5✔
1601
    """Returns the global asyncio event loop we use."""
1602
    if loop := _asyncio_event_loop:
5✔
1603
        return loop
5✔
1604
    if AS_LIB_USER_I_WANT_TO_MANAGE_MY_OWN_ASYNCIO_LOOP:
5✔
1605
        if loop := get_running_loop():
5✔
1606
            return loop
5✔
1607
    raise Exception("event loop not created yet")
×
1608

1609

1610
def create_and_start_event_loop() -> Tuple[asyncio.AbstractEventLoop,
5✔
1611
                                           asyncio.Future,
1612
                                           threading.Thread]:
1613
    global _asyncio_event_loop
1614
    if _asyncio_event_loop is not None:
×
1615
        raise Exception("there is already a running event loop")
×
1616

1617
    # asyncio.get_event_loop() became deprecated in python3.10. (see https://github.com/python/cpython/issues/83710)
1618
    # We set a custom event loop policy purely to be compatible with code that
1619
    # relies on asyncio.get_event_loop().
1620
    # - in python 3.8-3.9, asyncio.Event.__init__, asyncio.Lock.__init__,
1621
    #   and similar, calls get_event_loop. see https://github.com/python/cpython/pull/23420
1622
    class MyEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
×
1623
        def get_event_loop(self):
×
1624
            # In case electrum is being used as a library, there might be other
1625
            # event loops in use besides ours. To minimise interfering with those,
1626
            # if there is a loop running in the current thread, return that:
1627
            running_loop = get_running_loop()
×
1628
            if running_loop is not None:
×
1629
                return running_loop
×
1630
            # Otherwise, return our global loop:
1631
            return get_asyncio_loop()
×
1632
    asyncio.set_event_loop_policy(MyEventLoopPolicy())
×
1633

1634
    loop = asyncio.new_event_loop()
×
1635
    _asyncio_event_loop = loop
×
1636

1637
    def on_exception(loop, context):
×
1638
        """Suppress spurious messages it appears we cannot control."""
1639
        SUPPRESS_MESSAGE_REGEX = re.compile('SSL handshake|Fatal read error on|'
×
1640
                                            'SSL error in data received')
1641
        message = context.get('message')
×
1642
        if message and SUPPRESS_MESSAGE_REGEX.match(message):
×
1643
            return
×
1644
        loop.default_exception_handler(context)
×
1645

1646
    def run_event_loop():
×
1647
        try:
×
1648
            loop.run_until_complete(stopping_fut)
×
1649
        finally:
1650
            # clean-up
1651
            global _asyncio_event_loop
1652
            _asyncio_event_loop = None
×
1653

1654
    loop.set_exception_handler(on_exception)
×
1655
    _set_custom_task_factory(loop)
×
1656
    # loop.set_debug(True)
1657
    stopping_fut = loop.create_future()
×
1658
    loop_thread = threading.Thread(
×
1659
        target=run_event_loop,
1660
        name='EventLoop',
1661
    )
1662
    loop_thread.start()
×
1663
    # Wait until the loop actually starts.
1664
    # On a slow PC, or with a debugger attached, this can take a few dozens of ms,
1665
    # and if we returned without a running loop, weird things can happen...
1666
    t0 = time.monotonic()
×
1667
    while not loop.is_running():
×
1668
        time.sleep(0.01)
×
1669
        if time.monotonic() - t0 > 5:
×
1670
            raise Exception("been waiting for 5 seconds but asyncio loop would not start!")
×
1671
    return loop, stopping_fut, loop_thread
×
1672

1673

1674
_running_asyncio_tasks = set()  # type: Set[asyncio.Future]
5✔
1675
def _set_custom_task_factory(loop: asyncio.AbstractEventLoop):
5✔
1676
    """Wrap task creation to track pending and running tasks.
1677
    When tasks are created, asyncio only maintains a weak reference to them.
1678
    Hence, the garbage collector might destroy the task mid-execution.
1679
    To avoid this, we store a strong reference for the task until it completes.
1680

1681
    Without this, a lot of APIs are basically Heisenbug-generators... e.g.:
1682
    - "asyncio.create_task"
1683
    - "loop.create_task"
1684
    - "asyncio.ensure_future"
1685
    - what about "asyncio.run_coroutine_threadsafe"? not sure if that is safe.
1686

1687
    related:
1688
        - https://bugs.python.org/issue44665
1689
        - https://github.com/python/cpython/issues/88831
1690
        - https://github.com/python/cpython/issues/91887
1691
        - https://textual.textualize.io/blog/2023/02/11/the-heisenbug-lurking-in-your-async-code/
1692
        - https://github.com/python/cpython/issues/91887#issuecomment-1434816045
1693
        - "Task was destroyed but it is pending!"
1694
    """
1695

1696
    platform_task_factory = loop.get_task_factory()
5✔
1697

1698
    def factory(loop_, coro, **kwargs):
5✔
1699
        if platform_task_factory is not None:
5✔
1700
            task = platform_task_factory(loop_, coro, **kwargs)
×
1701
        else:
1702
            task = asyncio.Task(coro, loop=loop_, **kwargs)
5✔
1703
        _running_asyncio_tasks.add(task)
5✔
1704
        task.add_done_callback(_running_asyncio_tasks.discard)
5✔
1705
        return task
5✔
1706

1707
    loop.set_task_factory(factory)
5✔
1708

1709

1710
class OrderedDictWithIndex(OrderedDict):
5✔
1711
    """An OrderedDict that keeps track of the positions of keys.
1712

1713
    Note: very inefficient to modify contents, except to add new items.
1714
    """
1715

1716
    def __init__(self):
5✔
1717
        super().__init__()
×
1718
        self._key_to_pos = {}
×
1719
        self._pos_to_key = {}
×
1720

1721
    def _recalc_index(self):
5✔
1722
        self._key_to_pos = {key: pos for (pos, key) in enumerate(self.keys())}
×
1723
        self._pos_to_key = {pos: key for (pos, key) in enumerate(self.keys())}
×
1724

1725
    def pos_from_key(self, key):
5✔
1726
        return self._key_to_pos[key]
×
1727

1728
    def value_from_pos(self, pos):
5✔
1729
        key = self._pos_to_key[pos]
×
1730
        return self[key]
×
1731

1732
    def popitem(self, *args, **kwargs):
5✔
1733
        ret = super().popitem(*args, **kwargs)
×
1734
        self._recalc_index()
×
1735
        return ret
×
1736

1737
    def move_to_end(self, *args, **kwargs):
5✔
1738
        ret = super().move_to_end(*args, **kwargs)
×
1739
        self._recalc_index()
×
1740
        return ret
×
1741

1742
    def clear(self):
5✔
1743
        ret = super().clear()
×
1744
        self._recalc_index()
×
1745
        return ret
×
1746

1747
    def pop(self, *args, **kwargs):
5✔
1748
        ret = super().pop(*args, **kwargs)
×
1749
        self._recalc_index()
×
1750
        return ret
×
1751

1752
    def update(self, *args, **kwargs):
5✔
1753
        ret = super().update(*args, **kwargs)
×
1754
        self._recalc_index()
×
1755
        return ret
×
1756

1757
    def __delitem__(self, *args, **kwargs):
5✔
1758
        ret = super().__delitem__(*args, **kwargs)
×
1759
        self._recalc_index()
×
1760
        return ret
×
1761

1762
    def __setitem__(self, key, *args, **kwargs):
5✔
1763
        is_new_key = key not in self
×
1764
        ret = super().__setitem__(key, *args, **kwargs)
×
1765
        if is_new_key:
×
1766
            pos = len(self) - 1
×
1767
            self._key_to_pos[key] = pos
×
1768
            self._pos_to_key[pos] = key
×
1769
        return ret
×
1770

1771

1772
def multisig_type(wallet_type):
5✔
1773
    '''If wallet_type is mofn multi-sig, return [m, n],
1774
    otherwise return None.'''
1775
    if not wallet_type:
5✔
1776
        return None
×
1777
    match = re.match(r'(\d+)of(\d+)', wallet_type)
5✔
1778
    if match:
5✔
1779
        match = [int(x) for x in match.group(1, 2)]
5✔
1780
    return match
5✔
1781

1782

1783
def is_ip_address(x: Union[str, bytes]) -> bool:
5✔
1784
    if isinstance(x, bytes):
5✔
1785
        x = x.decode("utf-8")
×
1786
    try:
5✔
1787
        ipaddress.ip_address(x)
5✔
1788
        return True
5✔
1789
    except ValueError:
5✔
1790
        return False
5✔
1791

1792

1793
def is_localhost(host: str) -> bool:
5✔
1794
    if str(host) in ('localhost', 'localhost.',):
5✔
1795
        return True
5✔
1796
    if host[0] == '[' and host[-1] == ']':  # IPv6
5✔
1797
        host = host[1:-1]
5✔
1798
    try:
5✔
1799
        ip_addr = ipaddress.ip_address(host)  # type: Union[IPv4Address, IPv6Address]
5✔
1800
        return ip_addr.is_loopback
5✔
1801
    except ValueError:
5✔
1802
        pass  # not an IP
5✔
1803
    return False
5✔
1804

1805

1806
def is_private_netaddress(host: str) -> bool:
5✔
1807
    if is_localhost(host):
5✔
1808
        return True
5✔
1809
    if host[0] == '[' and host[-1] == ']':  # IPv6
5✔
1810
        host = host[1:-1]
5✔
1811
    try:
5✔
1812
        ip_addr = ipaddress.ip_address(host)  # type: Union[IPv4Address, IPv6Address]
5✔
1813
        return ip_addr.is_private
5✔
1814
    except ValueError:
5✔
1815
        pass  # not an IP
5✔
1816
    return False
5✔
1817

1818

1819
def list_enabled_bits(x: int) -> Sequence[int]:
5✔
1820
    """e.g. 77 (0b1001101) --> (0, 2, 3, 6)"""
1821
    binary = bin(x)[2:]
5✔
1822
    rev_bin = reversed(binary)
5✔
1823
    return tuple(i for i, b in enumerate(rev_bin) if b == '1')
5✔
1824

1825

1826
def resolve_dns_srv(host: str):
5✔
1827
    # FIXME this method is not using the network proxy. (although the proxy might not support UDP?)
1828
    srv_records = dns.resolver.resolve(host, 'SRV')
×
1829
    # priority: prefer lower
1830
    # weight: tie breaker; prefer higher
1831
    srv_records = sorted(srv_records, key=lambda x: (x.priority, -x.weight))
×
1832

1833
    def dict_from_srv_record(srv):
×
1834
        return {
×
1835
            'host': str(srv.target),
1836
            'port': srv.port,
1837
        }
1838
    return [dict_from_srv_record(srv) for srv in srv_records]
×
1839

1840

1841
def randrange(bound: int) -> int:
5✔
1842
    """Return a random integer k such that 1 <= k < bound, uniformly
1843
    distributed across that range.
1844
    This is guaranteed to be cryptographically strong.
1845
    """
1846
    # secrets.randbelow(bound) returns a random int: 0 <= r < bound,
1847
    # hence transformations:
1848
    return secrets.randbelow(bound - 1) + 1
5✔
1849

1850

1851
class CallbackManager(Logger):
5✔
1852
    # callbacks set by the GUI or any thread
1853
    # guarantee: the callbacks will always get triggered from the asyncio thread.
1854

1855
    def __init__(self):
5✔
1856
        Logger.__init__(self)
5✔
1857
        self.callback_lock = threading.Lock()
5✔
1858
        self.callbacks = defaultdict(list)      # note: needs self.callback_lock
5✔
1859
        self._running_cb_futs = set()
5✔
1860

1861
    def register_callback(self, func, events):
5✔
1862
        with self.callback_lock:
5✔
1863
            for event in events:
5✔
1864
                self.callbacks[event].append(func)
5✔
1865

1866
    def unregister_callback(self, callback):
5✔
1867
        with self.callback_lock:
5✔
1868
            for callbacks in self.callbacks.values():
5✔
1869
                if callback in callbacks:
5✔
1870
                    callbacks.remove(callback)
5✔
1871

1872
    def trigger_callback(self, event, *args):
5✔
1873
        """Trigger a callback with given arguments.
1874
        Can be called from any thread. The callback itself will get scheduled
1875
        on the event loop.
1876
        """
1877
        loop = get_asyncio_loop()
5✔
1878
        assert loop.is_running(), "event loop not running"
5✔
1879
        with self.callback_lock:
5✔
1880
            callbacks = self.callbacks[event][:]
5✔
1881
        for callback in callbacks:
5✔
1882
            if asyncio.iscoroutinefunction(callback):  # async cb
5✔
1883
                fut = asyncio.run_coroutine_threadsafe(callback(*args), loop)
5✔
1884
                # keep strong references around to avoid GC issues:
1885
                self._running_cb_futs.add(fut)
5✔
1886
                def on_done(fut_: concurrent.futures.Future):
5✔
1887
                    assert fut_.done()
5✔
1888
                    self._running_cb_futs.remove(fut_)
5✔
1889
                    if fut_.cancelled():
5✔
1890
                        self.logger.debug(f"cb cancelled. {event=}.")
4✔
1891
                    elif exc := fut_.exception():
5✔
1892
                        self.logger.error(f"cb errored. {event=}. {exc=}", exc_info=exc)
×
1893
                fut.add_done_callback(on_done)
5✔
1894
            else:  # non-async cb
1895
                # note: the cb needs to run in the asyncio thread
1896
                if get_running_loop() == loop:
5✔
1897
                    # run callback immediately, so that it is guaranteed
1898
                    # to have been executed when this method returns
1899
                    callback(*args)
5✔
1900
                else:
1901
                    # note: if cb raises, asyncio will log the exception
1902
                    loop.call_soon_threadsafe(callback, *args)
×
1903

1904

1905
callback_mgr = CallbackManager()
5✔
1906
trigger_callback = callback_mgr.trigger_callback
5✔
1907
register_callback = callback_mgr.register_callback
5✔
1908
unregister_callback = callback_mgr.unregister_callback
5✔
1909
_event_listeners = defaultdict(set)  # type: Dict[str, Set[str]]
5✔
1910

1911

1912
class EventListener:
5✔
1913
    """Use as a mixin for a class that has methods to be triggered on events.
1914
    - Methods that receive the callbacks should be named "on_event_*" and decorated with @event_listener.
1915
    - register_callbacks() should be called exactly once per instance of EventListener, e.g. in __init__
1916
    - unregister_callbacks() should be called at least once, e.g. when the instance is destroyed
1917
    """
1918

1919
    def _list_callbacks(self):
5✔
1920
        for c in self.__class__.__mro__:
5✔
1921
            classpath = f"{c.__module__}.{c.__name__}"
5✔
1922
            for method_name in _event_listeners[classpath]:
5✔
1923
                method = getattr(self, method_name)
5✔
1924
                assert callable(method)
5✔
1925
                assert method_name.startswith('on_event_')
5✔
1926
                yield method_name[len('on_event_'):], method
5✔
1927

1928
    def register_callbacks(self):
5✔
1929
        for name, method in self._list_callbacks():
5✔
1930
            #_logger.debug(f'registering callback {method}')
1931
            register_callback(method, [name])
5✔
1932

1933
    def unregister_callbacks(self):
5✔
1934
        for name, method in self._list_callbacks():
5✔
1935
            #_logger.debug(f'unregistering callback {method}')
1936
            unregister_callback(method)
5✔
1937

1938

1939
def event_listener(func):
5✔
1940
    """To be used in subclasses of EventListener only. (how to enforce this programmatically?)"""
1941
    classname, method_name = func.__qualname__.split('.')
5✔
1942
    assert method_name.startswith('on_event_')
5✔
1943
    classpath = f"{func.__module__}.{classname}"
5✔
1944
    _event_listeners[classpath].add(method_name)
5✔
1945
    return func
5✔
1946

1947

1948
_NetAddrType = TypeVar("_NetAddrType")
5✔
1949
# requirements for _NetAddrType:
1950
# - reasonable __hash__() implementation (e.g. based on host/port of remote endpoint)
1951

1952
class NetworkRetryManager(Generic[_NetAddrType]):
5✔
1953
    """Truncated Exponential Backoff for network connections."""
1954

1955
    def __init__(
5✔
1956
            self, *,
1957
            max_retry_delay_normal: float,
1958
            init_retry_delay_normal: float,
1959
            max_retry_delay_urgent: float = None,
1960
            init_retry_delay_urgent: float = None,
1961
    ):
1962
        self._last_tried_addr = {}  # type: Dict[_NetAddrType, Tuple[float, int]]  # (unix ts, num_attempts)
5✔
1963

1964
        # note: these all use "seconds" as unit
1965
        if max_retry_delay_urgent is None:
5✔
1966
            max_retry_delay_urgent = max_retry_delay_normal
5✔
1967
        if init_retry_delay_urgent is None:
5✔
1968
            init_retry_delay_urgent = init_retry_delay_normal
5✔
1969
        self._max_retry_delay_normal = max_retry_delay_normal
5✔
1970
        self._init_retry_delay_normal = init_retry_delay_normal
5✔
1971
        self._max_retry_delay_urgent = max_retry_delay_urgent
5✔
1972
        self._init_retry_delay_urgent = init_retry_delay_urgent
5✔
1973

1974
    def _trying_addr_now(self, addr: _NetAddrType) -> None:
5✔
1975
        last_time, num_attempts = self._last_tried_addr.get(addr, (0, 0))
×
1976
        # we add up to 1 second of noise to the time, so that clients are less likely
1977
        # to get synchronised and bombard the remote in connection waves:
1978
        cur_time = time.time() + random.random()
×
1979
        self._last_tried_addr[addr] = cur_time, num_attempts + 1
×
1980

1981
    def _on_connection_successfully_established(self, addr: _NetAddrType) -> None:
5✔
1982
        self._last_tried_addr[addr] = time.time(), 0
×
1983

1984
    def _can_retry_addr(self, addr: _NetAddrType, *,
5✔
1985
                        now: float = None, urgent: bool = False) -> bool:
1986
        if now is None:
×
1987
            now = time.time()
×
1988
        last_time, num_attempts = self._last_tried_addr.get(addr, (0, 0))
×
1989
        if urgent:
×
1990
            max_delay = self._max_retry_delay_urgent
×
1991
            init_delay = self._init_retry_delay_urgent
×
1992
        else:
1993
            max_delay = self._max_retry_delay_normal
×
1994
            init_delay = self._init_retry_delay_normal
×
1995
        delay = self.__calc_delay(multiplier=init_delay, max_delay=max_delay, num_attempts=num_attempts)
×
1996
        next_time = last_time + delay
×
1997
        return next_time < now
×
1998

1999
    @classmethod
5✔
2000
    def __calc_delay(cls, *, multiplier: float, max_delay: float,
5✔
2001
                     num_attempts: int) -> float:
2002
        num_attempts = min(num_attempts, 100_000)
×
2003
        try:
×
2004
            res = multiplier * 2 ** num_attempts
×
2005
        except OverflowError:
×
2006
            return max_delay
×
2007
        return max(0, min(max_delay, res))
×
2008

2009
    def _clear_addr_retry_times(self) -> None:
5✔
2010
        self._last_tried_addr.clear()
5✔
2011

2012

2013
class ESocksProxy(aiorpcx.SOCKSProxy):
5✔
2014
    # note: proxy will not leak DNS as create_connection()
2015
    # sets (local DNS) resolve=False by default
2016

2017
    async def open_connection(self, host=None, port=None, **kwargs):
5✔
2018
        loop = asyncio.get_running_loop()
×
2019
        reader = asyncio.StreamReader(loop=loop)
×
2020
        protocol = asyncio.StreamReaderProtocol(reader, loop=loop)
×
2021
        transport, _ = await self.create_connection(
×
2022
            lambda: protocol, host, port, **kwargs)
2023
        writer = asyncio.StreamWriter(transport, protocol, reader, loop)
×
2024
        return reader, writer
×
2025

2026
    @classmethod
5✔
2027
    def from_network_settings(cls, network: Optional['Network']) -> Optional['ESocksProxy']:
5✔
2028
        if not network or not network.proxy or not network.proxy.enabled:
5✔
2029
            return None
5✔
2030
        proxy = network.proxy
×
2031
        username, pw = proxy.user, proxy.password
×
2032
        if not username or not pw:
×
2033
            # is_proxy_tor is tri-state; None indicates it is still probing the proxy to test for TOR
2034
            if network.is_proxy_tor:
×
2035
                auth = aiorpcx.socks.SOCKSRandomAuth()
×
2036
            else:
2037
                auth = None
×
2038
        else:
2039
            auth = aiorpcx.socks.SOCKSUserAuth(username, pw)
×
2040
        addr = aiorpcx.NetAddress(proxy.host, proxy.port)
×
2041
        if proxy.mode == "socks4":
×
2042
            ret = cls(addr, aiorpcx.socks.SOCKS4a, auth)
×
2043
        elif proxy.mode == "socks5":
×
2044
            ret = cls(addr, aiorpcx.socks.SOCKS5, auth)
×
2045
        else:
2046
            raise NotImplementedError  # http proxy not available with aiorpcx
×
2047
        return ret
×
2048

2049

2050
class JsonRPCError(Exception):
5✔
2051

2052
    class Codes(enum.IntEnum):
5✔
2053
        # application-specific error codes
2054
        USERFACING = 1
5✔
2055
        INTERNAL = 2
5✔
2056

2057
    def __init__(self, *, code: int, message: str, data: Optional[dict] = None):
5✔
2058
        Exception.__init__(self)
×
2059
        self.code = code
×
2060
        self.message = message
×
2061
        self.data = data
×
2062

2063

2064
class JsonRPCClient:
5✔
2065

2066
    def __init__(self, session: aiohttp.ClientSession, url: str):
5✔
2067
        self.session = session
×
2068
        self.url = url
×
2069
        self._id = 0
×
2070

2071
    async def request(self, endpoint, *args):
5✔
2072
        """Send request to server, parse and return result.
2073
        note: parsing code is naive, the server is assumed to be well-behaved.
2074
              Up to the caller to handle exceptions, including those arising from parsing errors.
2075
        """
2076
        self._id += 1
×
2077
        data = ('{"jsonrpc": "2.0", "id":"%d", "method": "%s", "params": %s }'
×
2078
                % (self._id, endpoint, json.dumps(args)))
2079
        async with self.session.post(self.url, data=data) as resp:
×
2080
            if resp.status == 200:
×
2081
                r = await resp.json()
×
2082
                result = r.get('result')
×
2083
                error = r.get('error')
×
2084
                if error:
×
2085
                    raise JsonRPCError(code=error["code"], message=error["message"], data=error.get("data"))
×
2086
                else:
2087
                    return result
×
2088
            else:
2089
                text = await resp.text()
×
2090
                return 'Error: ' + str(text)
×
2091

2092
    def add_method(self, endpoint):
5✔
2093
        async def coro(*args):
×
2094
            return await self.request(endpoint, *args)
×
2095
        setattr(self, endpoint, coro)
×
2096

2097

2098
T = TypeVar('T')
5✔
2099

2100
def random_shuffled_copy(x: Iterable[T]) -> List[T]:
5✔
2101
    """Returns a shuffled copy of the input."""
2102
    x_copy = list(x)  # copy
5✔
2103
    random.shuffle(x_copy)  # shuffle in-place
5✔
2104
    return x_copy
5✔
2105

2106

2107
def test_read_write_permissions(path) -> None:
5✔
2108
    # note: There might already be a file at 'path'.
2109
    #       Make sure we do NOT overwrite/corrupt that!
2110
    temp_path = "%s.tmptest.%s" % (path, os.getpid())
5✔
2111
    echo = "fs r/w test"
5✔
2112
    try:
5✔
2113
        # test READ permissions for actual path
2114
        if os.path.exists(path):
5✔
2115
            with open(path, "rb") as f:
5✔
2116
                f.read(1)  # read 1 byte
5✔
2117
        # test R/W sanity for "similar" path
2118
        with open(temp_path, "w", encoding='utf-8') as f:
5✔
2119
            f.write(echo)
5✔
2120
        with open(temp_path, "r", encoding='utf-8') as f:
5✔
2121
            echo2 = f.read()
5✔
2122
        os.remove(temp_path)
5✔
2123
    except Exception as e:
×
2124
        raise IOError(e) from e
×
2125
    if echo != echo2:
5✔
2126
        raise IOError('echo sanity-check failed')
×
2127

2128

2129
class classproperty(property):
5✔
2130
    """~read-only class-level @property
2131
    from https://stackoverflow.com/a/13624858 by denis-ryzhkov
2132
    """
2133
    def __get__(self, owner_self, owner_cls):
5✔
2134
        return self.fget(owner_cls)
5✔
2135

2136

2137
def get_running_loop() -> Optional[asyncio.AbstractEventLoop]:
5✔
2138
    """Returns the asyncio event loop that is *running in this thread*, if any."""
2139
    try:
5✔
2140
        return asyncio.get_running_loop()
5✔
2141
    except RuntimeError:
×
2142
        return None
×
2143

2144

2145
def error_text_str_to_safe_str(err: str, *, max_len: Optional[int] = 500) -> str:
5✔
2146
    """Converts an untrusted error string to a sane printable ascii str.
2147
    Never raises.
2148
    """
2149
    text = error_text_bytes_to_safe_str(
5✔
2150
        err.encode("ascii", errors='backslashreplace'),
2151
        max_len=None)
2152
    return truncate_text(text, max_len=max_len)
5✔
2153

2154

2155
def error_text_bytes_to_safe_str(err: bytes, *, max_len: Optional[int] = 500) -> str:
5✔
2156
    """Converts an untrusted error bytes text to a sane printable ascii str.
2157
    Never raises.
2158

2159
    Note that naive ascii conversion would be insufficient. Fun stuff:
2160
    >>> b = b"my_long_prefix_blabla" + 21 * b"\x08" + b"malicious_stuff"
2161
    >>> s = b.decode("ascii")
2162
    >>> print(s)
2163
    malicious_stuffblabla
2164
    """
2165
    # convert to ascii, to get rid of unicode stuff
2166
    ascii_text = err.decode("ascii", errors='backslashreplace')
5✔
2167
    # do repr to handle ascii special chars (especially when printing/logging the str)
2168
    text = repr(ascii_text)
5✔
2169
    return truncate_text(text, max_len=max_len)
5✔
2170

2171

2172
def truncate_text(text: str, *, max_len: Optional[int]) -> str:
5✔
2173
    if max_len is None or len(text) <= max_len:
5✔
2174
        return text
5✔
2175
    else:
2176
        return text[:max_len] + f"... (truncated. orig_len={len(text)})"
5✔
2177

2178

2179
def nostr_pow_worker(nonce, nostr_pubk, target_bits, hash_function, hash_len_bits, shutdown):
5✔
2180
    """Function to generate PoW for Nostr, to be spawned in a ProcessPoolExecutor."""
2181
    hash_preimage = b'electrum-' + nostr_pubk
×
2182
    while True:
×
2183
        # we cannot check is_set on each iteration as it has a lot of overhead, this way we can check
2184
        # it with low overhead (just the additional range counter)
2185
        for i in range(1000000):
×
2186
            digest = hash_function(hash_preimage + nonce.to_bytes(32, 'big')).digest()
×
2187
            if int.from_bytes(digest, 'big') < (1 << (hash_len_bits - target_bits)):
×
2188
                shutdown.set()
×
2189
                return hash, nonce
×
2190
            nonce += 1
×
2191
        if shutdown.is_set():
×
2192
            return None, None
×
2193

2194

2195
async def gen_nostr_ann_pow(nostr_pubk: bytes, target_bits: int) -> Tuple[int, int]:
5✔
2196
    """Generate a PoW for a Nostr announcement. The PoW is hash[b'electrum-'+pubk+nonce]"""
2197
    import multiprocessing  # not available on Android, so we import it here
×
2198
    hash_function = hashlib.sha256
×
2199
    hash_len_bits = 256
×
2200
    max_nonce: int = (1 << (32 * 8)) - 1  # 32-byte nonce
×
2201
    start_nonce = 0
×
2202

2203
    max_workers = max(multiprocessing.cpu_count() - 1, 1)  # use all but one CPU
×
2204
    manager = multiprocessing.Manager()
×
2205
    shutdown = manager.Event()
×
2206
    with ProcessPoolExecutor(max_workers=max_workers) as executor:
×
2207
        tasks = []
×
2208
        loop = asyncio.get_running_loop()
×
2209
        for task in range(0, max_workers):
×
2210
            task = loop.run_in_executor(
×
2211
                executor,
2212
                nostr_pow_worker,
2213
                start_nonce,
2214
                nostr_pubk,
2215
                target_bits,
2216
                hash_function,
2217
                hash_len_bits,
2218
                shutdown
2219
            )
2220
            tasks.append(task)
×
2221
            start_nonce += max_nonce // max_workers  # split the nonce range between the processes
×
2222
            if start_nonce > max_nonce:  # make sure we don't go over the max_nonce
×
2223
                start_nonce = random.randint(0, int(max_nonce * 0.75))
×
2224

2225
        done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
×
2226
        hash_res, nonce_res = done.pop().result()
×
2227
        executor.shutdown(wait=False, cancel_futures=True)
×
2228

2229
    return nonce_res, get_nostr_ann_pow_amount(nostr_pubk, nonce_res)
×
2230

2231

2232
def get_nostr_ann_pow_amount(nostr_pubk: bytes, nonce: Optional[int]) -> int:
5✔
2233
    """Return the amount of leading zero bits for a nostr announcement PoW."""
2234
    if not nonce:
×
2235
        return 0
×
2236
    hash_function = hashlib.sha256
×
2237
    hash_len_bits = 256
×
2238
    hash_preimage = b'electrum-' + nostr_pubk
×
2239

2240
    digest = hash_function(hash_preimage + nonce.to_bytes(32, 'big')).digest()
×
2241
    digest = int.from_bytes(digest, 'big')
×
2242
    return hash_len_bits - digest.bit_length()
×
2243

2244

2245
class OnchainHistoryItem(NamedTuple):
5✔
2246
    txid: str
5✔
2247
    amount_sat: int
5✔
2248
    fee_sat: int
5✔
2249
    balance_sat: int
5✔
2250
    tx_mined_status: TxMinedInfo
5✔
2251
    group_id: Optional[str]
5✔
2252
    label: str
5✔
2253
    monotonic_timestamp: int
5✔
2254
    group_id: Optional[str]
5✔
2255
    def to_dict(self):
5✔
2256
        return {
×
2257
            'txid': self.txid,
2258
            'amount_sat': self.amount_sat,
2259
            'fee_sat': self.fee_sat,
2260
            'height': self.tx_mined_status.height,
2261
            'confirmations': self.tx_mined_status.conf,
2262
            'timestamp': self.tx_mined_status.timestamp,
2263
            'monotonic_timestamp': self.monotonic_timestamp,
2264
            'incoming': True if self.amount_sat>0 else False,
2265
            'bc_value': Satoshis(self.amount_sat),
2266
            'bc_balance': Satoshis(self.balance_sat),
2267
            'date': timestamp_to_datetime(self.tx_mined_status.timestamp),
2268
            'txpos_in_block': self.tx_mined_status.txpos,
2269
            'wanted_height': self.tx_mined_status.wanted_height,
2270
            'label': self.label,
2271
            'group_id': self.group_id,
2272
        }
2273

2274
class LightningHistoryItem(NamedTuple):
5✔
2275
    payment_hash: str
5✔
2276
    preimage: str
5✔
2277
    amount_msat: int
5✔
2278
    fee_msat: Optional[int]
5✔
2279
    type: str
5✔
2280
    group_id: Optional[str]
5✔
2281
    timestamp: int
5✔
2282
    label: str
5✔
2283
    def to_dict(self):
5✔
2284
        return {
×
2285
            'type': self.type,
2286
            'label': self.label,
2287
            'timestamp': self.timestamp or 0,
2288
            'date': timestamp_to_datetime(self.timestamp),
2289
            'amount_msat': self.amount_msat,
2290
            'fee_msat': self.fee_msat,
2291
            'payment_hash': self.payment_hash,
2292
            'preimage': self.preimage,
2293
            'group_id': self.group_id,
2294
            'ln_value': Satoshis(Decimal(self.amount_msat) / 1000),
2295
        }
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc