• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 6608519807369216

04 Jun 2025 07:50PM UTC coverage: 59.689%. Remained the same
6608519807369216

push

CirrusCI

web-flow
additions to RELEASE-NOTES (#9908)

21846 of 36600 relevant lines covered (59.69%)

2.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.92
/electrum/util.py
1
# Electrum - lightweight Bitcoin client
2
# Copyright (C) 2011 Thomas Voegtlin
3
#
4
# Permission is hereby granted, free of charge, to any person
5
# obtaining a copy of this software and associated documentation files
6
# (the "Software"), to deal in the Software without restriction,
7
# including without limitation the rights to use, copy, modify, merge,
8
# publish, distribute, sublicense, and/or sell copies of the Software,
9
# and to permit persons to whom the Software is furnished to do so,
10
# subject to the following conditions:
11
#
12
# The above copyright notice and this permission notice shall be
13
# included in all copies or substantial portions of the Software.
14
#
15
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
16
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
17
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
18
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
19
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
20
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
21
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22
# SOFTWARE.
23
import binascii
5✔
24
import concurrent.futures
5✔
25
from dataclasses import dataclass
5✔
26
import logging
5✔
27
import os, sys, re
5✔
28
from collections import defaultdict, OrderedDict
5✔
29
from concurrent.futures.process import ProcessPoolExecutor
5✔
30
from typing import (NamedTuple, Union, TYPE_CHECKING, Tuple, Optional, Callable, Any,
5✔
31
                    Sequence, Dict, Generic, TypeVar, List, Iterable, Set, Awaitable)
32
from datetime import datetime, timezone, timedelta
5✔
33
import decimal
5✔
34
from decimal import Decimal
5✔
35
from urllib.parse import urlparse
5✔
36
import threading
5✔
37
import hmac
5✔
38
import hashlib
5✔
39
import stat
5✔
40
import locale
5✔
41
import asyncio
5✔
42
import builtins
5✔
43
import json
5✔
44
import time
5✔
45
import ssl
5✔
46
import ipaddress
5✔
47
from ipaddress import IPv4Address, IPv6Address
5✔
48
import random
5✔
49
import secrets
5✔
50
import functools
5✔
51
from functools import partial
5✔
52
from abc import abstractmethod, ABC
5✔
53
import socket
5✔
54
import enum
5✔
55
from contextlib import nullcontext
5✔
56
import traceback
5✔
57

58
import attr
5✔
59
import aiohttp
5✔
60
from aiohttp_socks import ProxyConnector, ProxyType
5✔
61
import aiorpcx
5✔
62
import certifi
5✔
63
import dns.asyncresolver
5✔
64

65
from .i18n import _
5✔
66
from .logging import get_logger, Logger
5✔
67

68
if TYPE_CHECKING:
5✔
69
    from .network import Network, ProxySettings
×
70
    from .interface import Interface
×
71
    from .simple_config import SimpleConfig
×
72
    from .paymentrequest import PaymentRequest
×
73

74

75
_logger = get_logger(__name__)
5✔
76

77

78
def inv_dict(d):
5✔
79
    return {v: k for k, v in d.items()}
5✔
80

81

82
def all_subclasses(cls) -> Set:
5✔
83
    """Return all (transitive) subclasses of cls."""
84
    res = set(cls.__subclasses__())
5✔
85
    for sub in res.copy():
5✔
86
        res |= all_subclasses(sub)
5✔
87
    return res
5✔
88

89

90
ca_path = certifi.where()
5✔
91

92

93
base_units = {'BTC':8, 'mBTC':5, 'bits':2, 'sat':0}
5✔
94
base_units_inverse = inv_dict(base_units)
5✔
95
base_units_list = ['BTC', 'mBTC', 'bits', 'sat']  # list(dict) does not guarantee order
5✔
96

97
DECIMAL_POINT_DEFAULT = 5  # mBTC
5✔
98

99

100
class UnknownBaseUnit(Exception): pass
5✔
101

102

103
def decimal_point_to_base_unit_name(dp: int) -> str:
5✔
104
    # e.g. 8 -> "BTC"
105
    try:
5✔
106
        return base_units_inverse[dp]
5✔
107
    except KeyError:
×
108
        raise UnknownBaseUnit(dp) from None
×
109

110

111
def base_unit_name_to_decimal_point(unit_name: str) -> int:
5✔
112
    """Returns the max number of digits allowed after the decimal point."""
113
    # e.g. "BTC" -> 8
114
    try:
×
115
        return base_units[unit_name]
×
116
    except KeyError:
×
117
        raise UnknownBaseUnit(unit_name) from None
×
118

119
def parse_max_spend(amt: Any) -> Optional[int]:
5✔
120
    """Checks if given amount is "spend-max"-like.
121
    Returns None or the positive integer weight for "max". Never raises.
122

123
    When creating invoices and on-chain txs, the user can specify to send "max".
124
    This is done by setting the amount to '!'. Splitting max between multiple
125
    tx outputs is also possible, and custom weights (positive ints) can also be used.
126
    For example, to send 40% of all coins to address1, and 60% to address2:
127
    ```
128
    address1, 2!
129
    address2, 3!
130
    ```
131
    """
132
    if not (isinstance(amt, str) and amt and amt[-1] == '!'):
5✔
133
        return None
5✔
134
    if amt == '!':
5✔
135
        return 1
5✔
136
    x = amt[:-1]
5✔
137
    try:
5✔
138
        x = int(x)
5✔
139
    except ValueError:
×
140
        return None
×
141
    if x > 0:
5✔
142
        return x
5✔
143
    return None
×
144

145
class NotEnoughFunds(Exception):
5✔
146
    def __str__(self):
5✔
147
        return _("Insufficient funds")
×
148

149

150
class UneconomicFee(Exception):
5✔
151
    def __str__(self):
5✔
152
        return _("The fee for the transaction is higher than the funds gained from it.")
×
153

154

155
class NoDynamicFeeEstimates(Exception):
5✔
156
    def __str__(self):
5✔
157
        return _('Dynamic fee estimates not available')
×
158

159

160
class BelowDustLimit(Exception):
5✔
161
    pass
5✔
162

163

164
class InvalidPassword(Exception):
5✔
165
    def __init__(self, message: Optional[str] = None):
5✔
166
        self.message = message
5✔
167

168
    def __str__(self):
5✔
169
        if self.message is None:
×
170
            return _("Incorrect password")
×
171
        else:
172
            return str(self.message)
×
173

174

175
class AddTransactionException(Exception):
5✔
176
    pass
5✔
177

178

179
class UnrelatedTransactionException(AddTransactionException):
5✔
180
    def __str__(self):
5✔
181
        return _("Transaction is unrelated to this wallet.")
×
182

183

184
class FileImportFailed(Exception):
5✔
185
    def __init__(self, message=''):
5✔
186
        self.message = str(message)
×
187

188
    def __str__(self):
5✔
189
        return _("Failed to import from file.") + "\n" + self.message
×
190

191

192
class FileExportFailed(Exception):
5✔
193
    def __init__(self, message=''):
5✔
194
        self.message = str(message)
×
195

196
    def __str__(self):
5✔
197
        return _("Failed to export to file.") + "\n" + self.message
×
198

199

200
class WalletFileException(Exception):
5✔
201
    def __init__(self, message='', *, should_report_crash: bool = False):
5✔
202
        Exception.__init__(self, message)
5✔
203
        self.should_report_crash = should_report_crash
5✔
204

205

206
class BitcoinException(Exception): pass
5✔
207

208

209
class UserFacingException(Exception):
5✔
210
    """Exception that contains information intended to be shown to the user."""
211

212

213
class InvoiceError(UserFacingException): pass
5✔
214

215

216
class NetworkOfflineException(UserFacingException):
5✔
217
    """Can be raised if we are running in offline mode (--offline flag)
218
    and the user requests an operation that requires the network.
219
    """
220
    def __str__(self):
5✔
221
        return _("You are offline.")
×
222

223

224
# Throw this exception to unwind the stack like when an error occurs.
225
# However unlike other exceptions the user won't be informed.
226
class UserCancelled(Exception):
5✔
227
    '''An exception that is suppressed from the user'''
228
    pass
5✔
229

230

231
def to_decimal(x: Union[str, float, int, Decimal]) -> Decimal:
5✔
232
    # helper function mainly for float->Decimal conversion, i.e.:
233
    #   >>> Decimal(41754.681)
234
    #   Decimal('41754.680999999996856786310672760009765625')
235
    #   >>> Decimal("41754.681")
236
    #   Decimal('41754.681')
237
    if isinstance(x, Decimal):
5✔
238
        return x
5✔
239
    return Decimal(str(x))
5✔
240

241

242
# note: this is not a NamedTuple as then its json encoding cannot be customized
243
class Satoshis(object):
5✔
244
    __slots__ = ('value',)
5✔
245

246
    def __new__(cls, value):
5✔
247
        self = super(Satoshis, cls).__new__(cls)
×
248
        # note: 'value' sometimes has msat precision
249
        assert isinstance(value, (int, Decimal)), f"unexpected type for {value=!r}"
×
250
        self.value = value
×
251
        return self
×
252

253
    def __repr__(self):
5✔
254
        return f'Satoshis({self.value})'
×
255

256
    def __str__(self):
5✔
257
        # note: precision is truncated to satoshis here
258
        return format_satoshis(self.value)
×
259

260
    def __eq__(self, other):
5✔
261
        return self.value == other.value
×
262

263
    def __ne__(self, other):
5✔
264
        return not (self == other)
×
265

266
    def __add__(self, other):
5✔
267
        return Satoshis(self.value + other.value)
×
268

269

270
# note: this is not a NamedTuple as then its json encoding cannot be customized
271
class Fiat(object):
5✔
272
    __slots__ = ('value', 'ccy')
5✔
273

274
    def __new__(cls, value: Optional[Decimal], ccy: str):
5✔
275
        self = super(Fiat, cls).__new__(cls)
×
276
        self.ccy = ccy
×
277
        if not isinstance(value, (Decimal, type(None))):
×
278
            raise TypeError(f"value should be Decimal or None, not {type(value)}")
×
279
        self.value = value
×
280
        return self
×
281

282
    def __repr__(self):
5✔
283
        return 'Fiat(%s)'% self.__str__()
×
284

285
    def __str__(self):
5✔
286
        if self.value is None or self.value.is_nan():
×
287
            return _('No Data')
×
288
        else:
289
            return "{:.2f}".format(self.value)
×
290

291
    def to_ui_string(self):
5✔
292
        if self.value is None or self.value.is_nan():
×
293
            return _('No Data')
×
294
        else:
295
            return "{:.2f}".format(self.value) + ' ' + self.ccy
×
296

297
    def __eq__(self, other):
5✔
298
        if not isinstance(other, Fiat):
×
299
            return False
×
300
        if self.ccy != other.ccy:
×
301
            return False
×
302
        if isinstance(self.value, Decimal) and isinstance(other.value, Decimal) \
×
303
                and self.value.is_nan() and other.value.is_nan():
304
            return True
×
305
        return self.value == other.value
×
306

307
    def __ne__(self, other):
5✔
308
        return not (self == other)
×
309

310
    def __add__(self, other):
5✔
311
        assert self.ccy == other.ccy
×
312
        return Fiat(self.value + other.value, self.ccy)
×
313

314

315
class MyEncoder(json.JSONEncoder):
5✔
316
    def default(self, obj):
5✔
317
        # note: this does not get called for namedtuples :(  https://bugs.python.org/issue30343
318
        from .transaction import Transaction, TxOutput
5✔
319
        if isinstance(obj, Transaction):
5✔
320
            return obj.serialize()
5✔
321
        if isinstance(obj, TxOutput):
5✔
322
            return obj.to_legacy_tuple()
5✔
323
        if isinstance(obj, Satoshis):
5✔
324
            return str(obj)
×
325
        if isinstance(obj, Fiat):
5✔
326
            return str(obj)
×
327
        if isinstance(obj, Decimal):
5✔
328
            return str(obj)
×
329
        if isinstance(obj, datetime):
5✔
330
            return obj.isoformat(' ')[:-3]
×
331
        if isinstance(obj, set):
5✔
332
            return list(obj)
×
333
        if isinstance(obj, bytes): # for nametuples in lnchannel
5✔
334
            return obj.hex()
5✔
335
        if hasattr(obj, 'to_json') and callable(obj.to_json):
5✔
336
            return obj.to_json()
5✔
337
        return super(MyEncoder, self).default(obj)
×
338

339

340
class ThreadJob(Logger):
5✔
341
    """A job that is run periodically from a thread's main loop.  run() is
342
    called from that thread's context.
343
    """
344

345
    def __init__(self):
5✔
346
        Logger.__init__(self)
5✔
347

348
    def run(self):
5✔
349
        """Called periodically from the thread"""
350
        pass
×
351

352
class DebugMem(ThreadJob):
5✔
353
    '''A handy class for debugging GC memory leaks'''
354
    def __init__(self, classes, interval=30):
5✔
355
        ThreadJob.__init__(self)
×
356
        self.next_time = 0
×
357
        self.classes = classes
×
358
        self.interval = interval
×
359

360
    def mem_stats(self):
5✔
361
        import gc
×
362
        self.logger.info("Start memscan")
×
363
        gc.collect()
×
364
        objmap = defaultdict(list)
×
365
        for obj in gc.get_objects():
×
366
            for class_ in self.classes:
×
367
                if isinstance(obj, class_):
×
368
                    objmap[class_].append(obj)
×
369
        for class_, objs in objmap.items():
×
370
            self.logger.info(f"{class_.__name__}: {len(objs)}")
×
371
        self.logger.info("Finish memscan")
×
372

373
    def run(self):
5✔
374
        if time.time() > self.next_time:
×
375
            self.mem_stats()
×
376
            self.next_time = time.time() + self.interval
×
377

378
class DaemonThread(threading.Thread, Logger):
5✔
379
    """ daemon thread that terminates cleanly """
380

381
    def __init__(self):
5✔
382
        threading.Thread.__init__(self)
5✔
383
        Logger.__init__(self)
5✔
384
        self.parent_thread = threading.current_thread()
5✔
385
        self.running = False
5✔
386
        self.running_lock = threading.Lock()
5✔
387
        self.job_lock = threading.Lock()
5✔
388
        self.jobs = []
5✔
389
        self.stopped_event = threading.Event()        # set when fully stopped
5✔
390
        self.stopped_event_async = asyncio.Event()    # set when fully stopped
5✔
391
        self.wake_up_event = threading.Event()  # for perf optimisation of polling in run()
5✔
392

393
    def add_jobs(self, jobs):
5✔
394
        with self.job_lock:
5✔
395
            self.jobs.extend(jobs)
5✔
396

397
    def run_jobs(self):
5✔
398
        # Don't let a throwing job disrupt the thread, future runs of
399
        # itself, or other jobs.  This is useful protection against
400
        # malformed or malicious server responses
401
        with self.job_lock:
5✔
402
            for job in self.jobs:
5✔
403
                try:
5✔
404
                    job.run()
5✔
405
                except Exception as e:
×
406
                    self.logger.exception('')
×
407

408
    def remove_jobs(self, jobs):
5✔
409
        with self.job_lock:
×
410
            for job in jobs:
×
411
                self.jobs.remove(job)
×
412

413
    def start(self):
5✔
414
        with self.running_lock:
5✔
415
            self.running = True
5✔
416
        return threading.Thread.start(self)
5✔
417

418
    def is_running(self):
5✔
419
        with self.running_lock:
5✔
420
            return self.running and self.parent_thread.is_alive()
5✔
421

422
    def stop(self):
5✔
423
        with self.running_lock:
5✔
424
            self.running = False
5✔
425
            self.wake_up_event.set()
5✔
426
            self.wake_up_event.clear()
5✔
427

428
    def on_stop(self):
5✔
429
        if 'ANDROID_DATA' in os.environ:
5✔
430
            import jnius
×
431
            jnius.detach()
×
432
            self.logger.info("jnius detach")
×
433
        self.logger.info("stopped")
5✔
434
        self.stopped_event.set()
5✔
435
        loop = get_asyncio_loop()
5✔
436
        loop.call_soon_threadsafe(self.stopped_event_async.set)
5✔
437

438

439
def print_stderr(*args):
5✔
440
    args = [str(item) for item in args]
×
441
    sys.stderr.write(" ".join(args) + "\n")
×
442
    sys.stderr.flush()
×
443

444
def print_msg(*args):
5✔
445
    # Stringify args
446
    args = [str(item) for item in args]
×
447
    sys.stdout.write(" ".join(args) + "\n")
×
448
    sys.stdout.flush()
×
449

450
def json_encode(obj):
5✔
451
    try:
×
452
        s = json.dumps(obj, sort_keys = True, indent = 4, cls=MyEncoder)
×
453
    except TypeError:
×
454
        s = repr(obj)
×
455
    return s
×
456

457
def json_decode(x):
5✔
458
    try:
5✔
459
        return json.loads(x, parse_float=Decimal)
5✔
460
    except Exception:
5✔
461
        return x
5✔
462

463
def json_normalize(x):
5✔
464
    # note: The return value of commands, when going through the JSON-RPC interface,
465
    #       is json-encoded. The encoder used there cannot handle some types, e.g. electrum.util.Satoshis.
466
    # note: We should not simply do "json_encode(x)" here, as then later x would get doubly json-encoded.
467
    # see #5868
468
    return json_decode(json_encode(x))
×
469

470

471
# taken from Django Source Code
472
def constant_time_compare(val1, val2):
5✔
473
    """Return True if the two strings are equal, False otherwise."""
474
    return hmac.compare_digest(to_bytes(val1, 'utf8'), to_bytes(val2, 'utf8'))
×
475

476

477
_profiler_logger = _logger.getChild('profiler')
5✔
478
def profiler(func=None, *, min_threshold: Union[int, float, None] = None):
5✔
479
    """Function decorator that logs execution time.
480

481
    min_threshold: if set, only log if time taken is higher than threshold
482
    """
483
    if func is None:  # to make "@profiler(...)" work. (in addition to bare "@profiler")
5✔
484
        return partial(profiler, min_threshold=min_threshold)
5✔
485
    t0 = None  # type: Optional[float]
5✔
486
    def timer_start():
5✔
487
        nonlocal t0
488
        t0 = time.time()
5✔
489
    def timer_done():
5✔
490
        t = time.time() - t0
5✔
491
        if min_threshold is None or t > min_threshold:
5✔
492
            _profiler_logger.debug(f"{func.__qualname__} {t:,.4f} sec")
5✔
493
    if asyncio.iscoroutinefunction(func):
5✔
494
        async def do_profile(*args, **kw_args):
×
495
            timer_start()
×
496
            o = await func(*args, **kw_args)
×
497
            timer_done()
×
498
            return o
×
499
    else:
500
        def do_profile(*args, **kw_args):
5✔
501
            timer_start()
5✔
502
            o = func(*args, **kw_args)
5✔
503
            timer_done()
5✔
504
            return o
5✔
505
    return do_profile
5✔
506

507

508
class AsyncHangDetector:
5✔
509
    """Context manager that logs every `n` seconds if encapsulated context still has not exited."""
510

511
    def __init__(
5✔
512
        self,
513
        *,
514
        period_sec: int = 15,
515
        message: str,
516
        logger: logging.Logger = None,
517
    ):
518
        self.period_sec = period_sec
5✔
519
        self.message = message
5✔
520
        self.logger = logger or _logger
5✔
521

522
    async def _monitor(self):
5✔
523
        # note: this assumes that the event loop itself is not blocked
524
        t0 = time.monotonic()
5✔
525
        while True:
5✔
526
            await asyncio.sleep(self.period_sec)
5✔
527
            t1 = time.monotonic()
×
528
            self.logger.info(f"{self.message} (after {t1 - t0:.2f} sec)")
×
529

530
    async def __aenter__(self):
5✔
531
        self.mtask = asyncio.create_task(self._monitor())
5✔
532

533
    async def __aexit__(self, exc_type, exc, tb):
5✔
534
        self.mtask.cancel()
5✔
535

536

537
def android_ext_dir():
5✔
538
    from android.storage import primary_external_storage_path
×
539
    return primary_external_storage_path()
×
540

541
def android_backup_dir():
5✔
542
    pkgname = get_android_package_name()
×
543
    d = os.path.join(android_ext_dir(), pkgname)
×
544
    if not os.path.exists(d):
×
545
        os.mkdir(d)
×
546
    return d
×
547

548
def android_data_dir():
5✔
549
    import jnius
×
550
    PythonActivity = jnius.autoclass('org.kivy.android.PythonActivity')
×
551
    return PythonActivity.mActivity.getFilesDir().getPath() + '/data'
×
552

553
def ensure_sparse_file(filename):
5✔
554
    # On modern Linux, no need to do anything.
555
    # On Windows, need to explicitly mark file.
556
    if os.name == "nt":
×
557
        try:
×
558
            os.system('fsutil sparse setflag "{}" 1'.format(filename))
×
559
        except Exception as e:
×
560
            _logger.info(f'error marking file {filename} as sparse: {e}')
×
561

562

563
def get_headers_dir(config):
5✔
564
    return config.path
5✔
565

566

567
def assert_datadir_available(config_path):
5✔
568
    path = config_path
5✔
569
    if os.path.exists(path):
5✔
570
        return
5✔
571
    else:
572
        raise FileNotFoundError(
×
573
            'Electrum datadir does not exist. Was it deleted while running?' + '\n' +
574
            'Should be at {}'.format(path))
575

576

577
def assert_file_in_datadir_available(path, config_path):
5✔
578
    if os.path.exists(path):
×
579
        return
×
580
    else:
581
        assert_datadir_available(config_path)
×
582
        raise FileNotFoundError(
×
583
            'Cannot find file but datadir is there.' + '\n' +
584
            'Should be at {}'.format(path))
585

586

587
def standardize_path(path):
5✔
588
    # note: os.path.realpath() is not used, as on Windows it can return non-working paths (see #8495).
589
    #       This means that we don't resolve symlinks!
590
    return os.path.normcase(
5✔
591
                os.path.abspath(
592
                    os.path.expanduser(
593
                        path
594
    )))
595

596

597
def get_new_wallet_name(wallet_folder: str) -> str:
5✔
598
    """Returns a file basename for a new wallet to be used.
599
    Can raise OSError.
600
    """
601
    i = 1
5✔
602
    while True:
5✔
603
        filename = "wallet_%d" % i
5✔
604
        if filename in os.listdir(wallet_folder):
5✔
605
            i += 1
5✔
606
        else:
607
            break
5✔
608
    return filename
5✔
609

610

611
def is_android_debug_apk() -> bool:
5✔
612
    is_android = 'ANDROID_DATA' in os.environ
×
613
    if not is_android:
×
614
        return False
×
615
    from jnius import autoclass
×
616
    pkgname = get_android_package_name()
×
617
    build_config = autoclass(f"{pkgname}.BuildConfig")
×
618
    return bool(build_config.DEBUG)
×
619

620

621
def get_android_package_name() -> str:
5✔
622
    is_android = 'ANDROID_DATA' in os.environ
×
623
    assert is_android
×
624
    from jnius import autoclass
×
625
    from android.config import ACTIVITY_CLASS_NAME
×
626
    activity = autoclass(ACTIVITY_CLASS_NAME).mActivity
×
627
    pkgname = str(activity.getPackageName())
×
628
    return pkgname
×
629

630

631
def assert_bytes(*args):
5✔
632
    """
633
    porting helper, assert args type
634
    """
635
    try:
5✔
636
        for x in args:
5✔
637
            assert isinstance(x, (bytes, bytearray))
5✔
638
    except Exception:
×
639
        print('assert bytes failed', list(map(type, args)))
×
640
        raise
×
641

642

643
def assert_str(*args):
5✔
644
    """
645
    porting helper, assert args type
646
    """
647
    for x in args:
×
648
        assert isinstance(x, str)
×
649

650

651
def to_string(x, enc) -> str:
5✔
652
    if isinstance(x, (bytes, bytearray)):
5✔
653
        return x.decode(enc)
5✔
654
    if isinstance(x, str):
×
655
        return x
×
656
    else:
657
        raise TypeError("Not a string or bytes like object")
×
658

659

660
def to_bytes(something, encoding='utf8') -> bytes:
5✔
661
    """
662
    cast string to bytes() like object, but for python2 support it's bytearray copy
663
    """
664
    if isinstance(something, bytes):
5✔
665
        return something
5✔
666
    if isinstance(something, str):
5✔
667
        return something.encode(encoding)
5✔
668
    elif isinstance(something, bytearray):
5✔
669
        return bytes(something)
5✔
670
    else:
671
        raise TypeError("Not a string or bytes like object")
5✔
672

673

674
bfh = bytes.fromhex
5✔
675

676

677
def xor_bytes(a: bytes, b: bytes) -> bytes:
5✔
678
    size = min(len(a), len(b))
5✔
679
    return ((int.from_bytes(a[:size], "big") ^ int.from_bytes(b[:size], "big"))
5✔
680
            .to_bytes(size, "big"))
681

682

683
def user_dir():
5✔
684
    if "ELECTRUMDIR" in os.environ:
5✔
685
        return os.environ["ELECTRUMDIR"]
×
686
    elif 'ANDROID_DATA' in os.environ:
5✔
687
        return android_data_dir()
×
688
    elif os.name == 'posix':
5✔
689
        return os.path.join(os.environ["HOME"], ".electrum")
5✔
690
    elif "APPDATA" in os.environ:
×
691
        return os.path.join(os.environ["APPDATA"], "Electrum")
×
692
    elif "LOCALAPPDATA" in os.environ:
×
693
        return os.path.join(os.environ["LOCALAPPDATA"], "Electrum")
×
694
    else:
695
        #raise Exception("No home directory found in environment variables.")
696
        return
×
697

698

699
def resource_path(*parts):
5✔
700
    return os.path.join(pkg_dir, *parts)
5✔
701

702

703
# absolute path to python package folder of electrum ("lib")
704
pkg_dir = os.path.split(os.path.realpath(__file__))[0]
5✔
705

706

707
def is_valid_email(s):
5✔
708
    regexp = r"[^@]+@[^@]+\.[^@]+"
×
709
    return re.match(regexp, s) is not None
×
710

711
def is_valid_websocket_url(url: str) -> bool:
5✔
712
    """
713
    uses this django url validation regex:
714
    https://github.com/django/django/blob/2c6906a0c4673a7685817156576724aba13ad893/django/core/validators.py#L45C1-L52C43
715
    Note: this is not perfect, urls and their parsing can get very complex (see recent django code).
716
    however its sufficient for catching weird user input in the gui dialog
717
    """
718
    # stores the compiled regex in the function object itself to avoid recompiling it every call
719
    if not hasattr(is_valid_websocket_url, "regex"):
×
720
        is_valid_websocket_url.regex = re.compile(
×
721
            r'^(?:ws|wss)://'  # ws:// or wss://
722
            r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|'  # domain...
723
            r'localhost|'  # localhost...
724
            r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}|'  # ...or ipv4
725
            r'\[?[A-F0-9]*:[A-F0-9:]+\]?)'  # ...or ipv6
726
            r'(?::\d+)?'  # optional port
727
            r'(?:/?|[/?]\S+)$', re.IGNORECASE)
728
    try:
×
729
        return re.match(is_valid_websocket_url.regex, url) is not None
×
730
    except Exception:
×
731
        return False
×
732

733
def is_hash256_str(text: Any) -> bool:
5✔
734
    if not isinstance(text, str): return False
5✔
735
    if len(text) != 64: return False
5✔
736
    return is_hex_str(text)
5✔
737

738

739
def is_hex_str(text: Any) -> bool:
5✔
740
    if not isinstance(text, str): return False
5✔
741
    try:
5✔
742
        b = bytes.fromhex(text)
5✔
743
    except Exception:
5✔
744
        return False
5✔
745
    # forbid whitespaces in text:
746
    if len(text) != 2 * len(b):
5✔
747
        return False
5✔
748
    return True
5✔
749

750

751
def is_integer(val: Any) -> bool:
5✔
752
    return isinstance(val, int)
5✔
753

754

755
def is_non_negative_integer(val: Any) -> bool:
5✔
756
    if is_integer(val):
5✔
757
        return val >= 0
5✔
758
    return False
5✔
759

760

761
def is_int_or_float(val: Any) -> bool:
5✔
762
    return isinstance(val, (int, float))
5✔
763

764

765
def is_non_negative_int_or_float(val: Any) -> bool:
5✔
766
    if is_int_or_float(val):
5✔
767
        return val >= 0
5✔
768
    return False
5✔
769

770

771
def chunks(items, size: int):
5✔
772
    """Break up items, an iterable, into chunks of length size."""
773
    if size < 1:
5✔
774
        raise ValueError(f"size must be positive, not {repr(size)}")
5✔
775
    for i in range(0, len(items), size):
5✔
776
        yield items[i: i + size]
5✔
777

778

779
def format_satoshis_plain(
5✔
780
        x: Union[int, float, Decimal, str],  # amount in satoshis,
781
        *,
782
        decimal_point: int = 8,  # how much to shift decimal point to left (default: sat->BTC)
783
) -> str:
784
    """Display a satoshi amount scaled.  Always uses a '.' as a decimal
785
    point and has no thousands separator"""
786
    if parse_max_spend(x):
5✔
787
        return f'max({x})'
×
788
    assert isinstance(x, (int, float, Decimal)), f"{x!r} should be a number"
5✔
789
    scale_factor = pow(10, decimal_point)
5✔
790
    return "{:.8f}".format(Decimal(x) / scale_factor).rstrip('0').rstrip('.')
5✔
791

792

793
# Check that Decimal precision is sufficient.
794
# We need at the very least ~20, as we deal with msat amounts, and
795
# log10(21_000_000 * 10**8 * 1000) ~= 18.3
796
# decimal.DefaultContext.prec == 28 by default, but it is mutable.
797
# We enforce that we have at least that available.
798
assert decimal.getcontext().prec >= 28, f"PyDecimal precision too low: {decimal.getcontext().prec}"
5✔
799

800
# DECIMAL_POINT = locale.localeconv()['decimal_point']  # type: str
801
DECIMAL_POINT = "."
5✔
802
THOUSANDS_SEP = " "
5✔
803
assert len(DECIMAL_POINT) == 1, f"DECIMAL_POINT has unexpected len. {DECIMAL_POINT!r}"
5✔
804
assert len(THOUSANDS_SEP) == 1, f"THOUSANDS_SEP has unexpected len. {THOUSANDS_SEP!r}"
5✔
805

806

807
def format_satoshis(
5✔
808
        x: Union[int, float, Decimal, str, None],  # amount in satoshis
809
        *,
810
        num_zeros: int = 0,
811
        decimal_point: int = 8,  # how much to shift decimal point to left (default: sat->BTC)
812
        precision: int = 0,  # extra digits after satoshi precision
813
        is_diff: bool = False,  # if True, enforce a leading sign (+/-)
814
        whitespaces: bool = False,  # if True, add whitespaces, to align numbers in a column
815
        add_thousands_sep: bool = False,  # if True, add whitespaces, for better readability of the numbers
816
) -> str:
817
    if x is None:
5✔
818
        return 'unknown'
×
819
    if parse_max_spend(x):
5✔
820
        return f'max({x})'
×
821
    assert isinstance(x, (int, float, Decimal)), f"{x!r} should be a number"
5✔
822
    # lose redundant precision
823
    x = Decimal(x).quantize(Decimal(10) ** (-precision))
5✔
824
    # format string
825
    overall_precision = decimal_point + precision  # max digits after final decimal point
5✔
826
    decimal_format = "." + str(overall_precision) if overall_precision > 0 else ""
5✔
827
    if is_diff:
5✔
828
        decimal_format = '+' + decimal_format
5✔
829
    # initial result
830
    scale_factor = pow(10, decimal_point)
5✔
831
    result = ("{:" + decimal_format + "f}").format(x / scale_factor)
5✔
832
    if "." not in result: result += "."
5✔
833
    result = result.rstrip('0')
5✔
834
    # add extra decimal places (zeros)
835
    integer_part, fract_part = result.split(".")
5✔
836
    if len(fract_part) < num_zeros:
5✔
837
        fract_part += "0" * (num_zeros - len(fract_part))
5✔
838
    # add whitespaces as thousands' separator for better readability of numbers
839
    if add_thousands_sep:
5✔
840
        sign = integer_part[0] if integer_part[0] in ("+", "-") else ""
5✔
841
        if sign == "-":
5✔
842
            integer_part = integer_part[1:]
5✔
843
        integer_part = "{:,}".format(int(integer_part)).replace(',', THOUSANDS_SEP)
5✔
844
        integer_part = sign + integer_part
5✔
845
        fract_part = THOUSANDS_SEP.join(fract_part[i:i+3] for i in range(0, len(fract_part), 3))
5✔
846
    result = integer_part + DECIMAL_POINT + fract_part
5✔
847
    # add leading/trailing whitespaces so that numbers can be aligned in a column
848
    if whitespaces:
5✔
849
        target_fract_len = overall_precision
5✔
850
        target_integer_len = 14 - decimal_point  # should be enough for up to unsigned 999999 BTC
5✔
851
        if add_thousands_sep:
5✔
852
            target_fract_len += max(0, (target_fract_len - 1) // 3)
5✔
853
            target_integer_len += max(0, (target_integer_len - 1) // 3)
5✔
854
        # add trailing whitespaces
855
        result += " " * (target_fract_len - len(fract_part))
5✔
856
        # add leading whitespaces
857
        target_total_len = target_integer_len + 1 + target_fract_len
5✔
858
        result = " " * (target_total_len - len(result)) + result
5✔
859
    return result
5✔
860

861

862
FEERATE_PRECISION = 1  # num fractional decimal places for sat/byte fee rates
5✔
863
_feerate_quanta = Decimal(10) ** (-FEERATE_PRECISION)
5✔
864
UI_UNIT_NAME_FEERATE_SAT_PER_VBYTE = "sat/vbyte"
5✔
865
UI_UNIT_NAME_FEERATE_SAT_PER_VB = "sat/vB"
5✔
866
UI_UNIT_NAME_TXSIZE_VBYTES = "vbytes"
5✔
867
UI_UNIT_NAME_MEMPOOL_MB = "vMB"
5✔
868

869

870
def format_fee_satoshis(fee, *, num_zeros=0, precision=None):
5✔
871
    if precision is None:
5✔
872
        precision = FEERATE_PRECISION
5✔
873
    num_zeros = min(num_zeros, FEERATE_PRECISION)  # no more zeroes than available prec
5✔
874
    return format_satoshis(fee, num_zeros=num_zeros, decimal_point=0, precision=precision)
5✔
875

876

877
def quantize_feerate(fee) -> Union[None, Decimal, int]:
5✔
878
    """Strip sat/byte fee rate of excess precision."""
879
    if fee is None:
5✔
880
        return None
×
881
    return Decimal(fee).quantize(_feerate_quanta, rounding=decimal.ROUND_HALF_DOWN)
5✔
882

883

884
def timestamp_to_datetime(timestamp: Union[int, float, None], *, utc: bool = False) -> Optional[datetime]:
5✔
885
    if timestamp is None:
5✔
886
        return None
×
887
    tz = None
5✔
888
    if utc:
5✔
889
        tz = timezone.utc
×
890
    return datetime.fromtimestamp(timestamp, tz=tz)
5✔
891

892

893
def format_time(timestamp: Union[int, float, None]) -> str:
5✔
894
    date = timestamp_to_datetime(timestamp)
×
895
    return date.isoformat(' ', timespec="minutes") if date else _("Unknown")
×
896

897

898
def age(
5✔
899
    from_date: Union[int, float, None],  # POSIX timestamp
900
    *,
901
    since_date: datetime = None,
902
    target_tz=None,
903
    include_seconds: bool = False,
904
) -> str:
905
    """Takes a timestamp and returns a string with the approximation of the age"""
906
    if from_date is None:
5✔
907
        return _("Unknown")
5✔
908
    from_date = datetime.fromtimestamp(from_date)
5✔
909
    if since_date is None:
5✔
910
        since_date = datetime.now(target_tz)
×
911
    distance_in_time = from_date - since_date
5✔
912
    is_in_past = from_date < since_date
5✔
913
    s = delta_time_str(distance_in_time, include_seconds=include_seconds)
5✔
914
    return _("{} ago").format(s) if is_in_past else _("in {}").format(s)
5✔
915

916

917
def delta_time_str(distance_in_time: timedelta, *, include_seconds: bool = False) -> str:
5✔
918
    distance_in_seconds = int(round(abs(distance_in_time.days * 86400 + distance_in_time.seconds)))
5✔
919
    distance_in_minutes = int(round(distance_in_seconds / 60))
5✔
920
    if distance_in_minutes == 0:
5✔
921
        if include_seconds:
5✔
922
            return _("{} seconds").format(distance_in_seconds)
5✔
923
        else:
924
            return _("less than a minute")
5✔
925
    elif distance_in_minutes < 45:
5✔
926
        return _("about {} minutes").format(distance_in_minutes)
5✔
927
    elif distance_in_minutes < 90:
5✔
928
        return _("about 1 hour")
5✔
929
    elif distance_in_minutes < 1440:
5✔
930
        return _("about {} hours").format(round(distance_in_minutes / 60.0))
5✔
931
    elif distance_in_minutes < 2880:
5✔
932
        return _("about 1 day")
5✔
933
    elif distance_in_minutes < 43220:
5✔
934
        return _("about {} days").format(round(distance_in_minutes / 1440))
5✔
935
    elif distance_in_minutes < 86400:
5✔
936
        return _("about 1 month")
5✔
937
    elif distance_in_minutes < 525600:
5✔
938
        return _("about {} months").format(round(distance_in_minutes / 43200))
5✔
939
    elif distance_in_minutes < 1051200:
5✔
940
        return _("about 1 year")
5✔
941
    else:
942
        return _("over {} years").format(round(distance_in_minutes / 525600))
5✔
943

944
mainnet_block_explorers = {
5✔
945
    '3xpl.com': ('https://3xpl.com/bitcoin/',
946
                        {'tx': 'transaction/', 'addr': 'address/'}),
947
    'Bitflyer.jp': ('https://chainflyer.bitflyer.jp/',
948
                        {'tx': 'Transaction/', 'addr': 'Address/'}),
949
    'Blockchain.info': ('https://blockchain.com/btc/',
950
                        {'tx': 'tx/', 'addr': 'address/'}),
951
    'Blockstream.info': ('https://blockstream.info/',
952
                        {'tx': 'tx/', 'addr': 'address/'}),
953
    'Bitaps.com': ('https://btc.bitaps.com/',
954
                        {'tx': '', 'addr': ''}),
955
    'BTC.com': ('https://btc.com/',
956
                        {'tx': '', 'addr': ''}),
957
    'Chain.so': ('https://www.chain.so/',
958
                        {'tx': 'tx/BTC/', 'addr': 'address/BTC/'}),
959
    'Insight.is': ('https://insight.bitpay.com/',
960
                        {'tx': 'tx/', 'addr': 'address/'}),
961
    'BlockCypher.com': ('https://live.blockcypher.com/btc/',
962
                        {'tx': 'tx/', 'addr': 'address/'}),
963
    'Blockchair.com': ('https://blockchair.com/bitcoin/',
964
                        {'tx': 'transaction/', 'addr': 'address/'}),
965
    'blockonomics.co': ('https://www.blockonomics.co/',
966
                        {'tx': 'api/tx?txid=', 'addr': '#/search?q='}),
967
    'mempool.space': ('https://mempool.space/',
968
                        {'tx': 'tx/', 'addr': 'address/'}),
969
    'mempool.emzy.de': ('https://mempool.emzy.de/',
970
                        {'tx': 'tx/', 'addr': 'address/'}),
971
    'OXT.me': ('https://oxt.me/',
972
                        {'tx': 'transaction/', 'addr': 'address/'}),
973
    'mynode.local': ('http://mynode.local:3002/',
974
                        {'tx': 'tx/', 'addr': 'address/'}),
975
    'system default': ('blockchain:/',
976
                        {'tx': 'tx/', 'addr': 'address/'}),
977
}
978

979
testnet_block_explorers = {
5✔
980
    'Bitaps.com': ('https://tbtc.bitaps.com/',
981
                       {'tx': '', 'addr': ''}),
982
    'BlockCypher.com': ('https://live.blockcypher.com/btc-testnet/',
983
                       {'tx': 'tx/', 'addr': 'address/'}),
984
    'Blockchain.info': ('https://www.blockchain.com/btc-testnet/',
985
                       {'tx': 'tx/', 'addr': 'address/'}),
986
    'Blockstream.info': ('https://blockstream.info/testnet/',
987
                        {'tx': 'tx/', 'addr': 'address/'}),
988
    'mempool.space': ('https://mempool.space/testnet/',
989
                        {'tx': 'tx/', 'addr': 'address/'}),
990
    'smartbit.com.au': ('https://testnet.smartbit.com.au/',
991
                       {'tx': 'tx/', 'addr': 'address/'}),
992
    'system default': ('blockchain://000000000933ea01ad0ee984209779baaec3ced90fa3f408719526f8d77f4943/',
993
                       {'tx': 'tx/', 'addr': 'address/'}),
994
}
995

996
testnet4_block_explorers = {
5✔
997
    'mempool.space': ('https://mempool.space/testnet4/',
998
                        {'tx': 'tx/', 'addr': 'address/'}),
999
    'wakiyamap.dev': ('https://testnet4-explorer.wakiyamap.dev/',
1000
                       {'tx': 'tx/', 'addr': 'address/'}),
1001
}
1002

1003
signet_block_explorers = {
5✔
1004
    'bc-2.jp': ('https://explorer.bc-2.jp/',
1005
                        {'tx': 'tx/', 'addr': 'address/'}),
1006
    'mempool.space': ('https://mempool.space/signet/',
1007
                        {'tx': 'tx/', 'addr': 'address/'}),
1008
    'bitcoinexplorer.org': ('https://signet.bitcoinexplorer.org/',
1009
                       {'tx': 'tx/', 'addr': 'address/'}),
1010
    'wakiyamap.dev': ('https://signet-explorer.wakiyamap.dev/',
1011
                       {'tx': 'tx/', 'addr': 'address/'}),
1012
    'ex.signet.bublina.eu.org': ('https://ex.signet.bublina.eu.org/',
1013
                       {'tx': 'tx/', 'addr': 'address/'}),
1014
    'system default': ('blockchain:/',
1015
                       {'tx': 'tx/', 'addr': 'address/'}),
1016
}
1017

1018
_block_explorer_default_api_loc = {'tx': 'tx/', 'addr': 'address/'}
5✔
1019

1020

1021
def block_explorer_info():
5✔
1022
    from . import constants
×
1023
    if constants.net.NET_NAME == "testnet":
×
1024
        return testnet_block_explorers
×
1025
    elif constants.net.NET_NAME == "testnet4":
×
1026
        return testnet4_block_explorers
×
1027
    elif constants.net.NET_NAME == "signet":
×
1028
        return signet_block_explorers
×
1029
    return mainnet_block_explorers
×
1030

1031

1032
def block_explorer(config: 'SimpleConfig') -> Optional[str]:
5✔
1033
    """Returns name of selected block explorer,
1034
    or None if a custom one (not among hardcoded ones) is configured.
1035
    """
1036
    if config.BLOCK_EXPLORER_CUSTOM is not None:
×
1037
        return None
×
1038
    be_key = config.BLOCK_EXPLORER
×
1039
    be_tuple = block_explorer_info().get(be_key)
×
1040
    if be_tuple is None:
×
1041
        be_key = config.cv.BLOCK_EXPLORER.get_default_value()
×
1042
    assert isinstance(be_key, str), f"{be_key!r} should be str"
×
1043
    return be_key
×
1044

1045

1046
def block_explorer_tuple(config: 'SimpleConfig') -> Optional[Tuple[str, dict]]:
5✔
1047
    custom_be = config.BLOCK_EXPLORER_CUSTOM
×
1048
    if custom_be:
×
1049
        if isinstance(custom_be, str):
×
1050
            return custom_be, _block_explorer_default_api_loc
×
1051
        if isinstance(custom_be, (tuple, list)) and len(custom_be) == 2:
×
1052
            return tuple(custom_be)
×
1053
        _logger.warning(f"not using {config.cv.BLOCK_EXPLORER_CUSTOM.key()!r} from config. "
×
1054
                        f"expected a str or a pair but got {custom_be!r}")
1055
        return None
×
1056
    else:
1057
        # using one of the hardcoded block explorers
1058
        return block_explorer_info().get(block_explorer(config))
×
1059

1060

1061
def block_explorer_URL(config: 'SimpleConfig', kind: str, item: str) -> Optional[str]:
5✔
1062
    be_tuple = block_explorer_tuple(config)
×
1063
    if not be_tuple:
×
1064
        return
×
1065
    explorer_url, explorer_dict = be_tuple
×
1066
    kind_str = explorer_dict.get(kind)
×
1067
    if kind_str is None:
×
1068
        return
×
1069
    if explorer_url[-1] != "/":
×
1070
        explorer_url += "/"
×
1071
    url_parts = [explorer_url, kind_str, item]
×
1072
    return ''.join(url_parts)
×
1073

1074

1075

1076

1077

1078
# Python bug (http://bugs.python.org/issue1927) causes raw_input
1079
# to be redirected improperly between stdin/stderr on Unix systems
1080
#TODO: py3
1081
def raw_input(prompt=None):
5✔
1082
    if prompt:
×
1083
        sys.stdout.write(prompt)
×
1084
    return builtin_raw_input()
×
1085

1086
builtin_raw_input = builtins.input
5✔
1087
builtins.input = raw_input
5✔
1088

1089

1090
def parse_json(message):
5✔
1091
    # TODO: check \r\n pattern
1092
    n = message.find(b'\n')
×
1093
    if n==-1:
×
1094
        return None, message
×
1095
    try:
×
1096
        j = json.loads(message[0:n].decode('utf8'))
×
1097
    except Exception:
×
1098
        j = None
×
1099
    return j, message[n+1:]
×
1100

1101

1102
def setup_thread_excepthook():
5✔
1103
    """
1104
    Workaround for `sys.excepthook` thread bug from:
1105
    http://bugs.python.org/issue1230540
1106

1107
    Call once from the main thread before creating any threads.
1108
    """
1109

1110
    init_original = threading.Thread.__init__
×
1111

1112
    def init(self, *args, **kwargs):
×
1113

1114
        init_original(self, *args, **kwargs)
×
1115
        run_original = self.run
×
1116

1117
        def run_with_except_hook(*args2, **kwargs2):
×
1118
            try:
×
1119
                run_original(*args2, **kwargs2)
×
1120
            except Exception:
×
1121
                sys.excepthook(*sys.exc_info())
×
1122

1123
        self.run = run_with_except_hook
×
1124

1125
    threading.Thread.__init__ = init
×
1126

1127

1128
def send_exception_to_crash_reporter(e: BaseException):
5✔
1129
    from .base_crash_reporter import send_exception_to_crash_reporter
×
1130
    send_exception_to_crash_reporter(e)
×
1131

1132

1133
def versiontuple(v):
5✔
1134
    return tuple(map(int, (v.split("."))))
5✔
1135

1136

1137
def read_json_file(path):
5✔
1138
    try:
5✔
1139
        with open(path, 'r', encoding='utf-8') as f:
5✔
1140
            data = json.loads(f.read())
5✔
1141
    except json.JSONDecodeError:
×
1142
        _logger.exception('')
×
1143
        raise FileImportFailed(_("Invalid JSON code."))
×
1144
    except BaseException as e:
×
1145
        _logger.exception('')
×
1146
        raise FileImportFailed(e)
×
1147
    return data
5✔
1148

1149

1150
def write_json_file(path, data):
5✔
1151
    try:
×
1152
        with open(path, 'w+', encoding='utf-8') as f:
×
1153
            json.dump(data, f, indent=4, sort_keys=True, cls=MyEncoder)
×
1154
    except (IOError, os.error) as e:
×
1155
        _logger.exception('')
×
1156
        raise FileExportFailed(e)
×
1157

1158

1159
def os_chmod(path, mode):
5✔
1160
    """os.chmod aware of tmpfs"""
1161
    try:
5✔
1162
        os.chmod(path, mode)
5✔
1163
    except OSError as e:
×
1164
        xdg_runtime_dir = os.environ.get("XDG_RUNTIME_DIR", None)
×
1165
        if xdg_runtime_dir and is_subpath(path, xdg_runtime_dir):
×
1166
            _logger.info(f"Tried to chmod in tmpfs. Skipping... {e!r}")
×
1167
        else:
1168
            raise
×
1169

1170

1171
def make_dir(path, *, allow_symlink=True):
5✔
1172
    """Makes directory if it does not yet exist.
1173
    Also sets sane 0700 permissions on the dir.
1174
    """
1175
    if not os.path.exists(path):
5✔
1176
        if not allow_symlink and os.path.islink(path):
5✔
1177
            raise Exception('Dangling link: ' + path)
×
1178
        try:
5✔
1179
            os.mkdir(path)
5✔
1180
        except FileExistsError:
×
1181
            # this can happen in a multiprocess race, e.g. when an electrum daemon
1182
            # and an electrum cli command are launched in rapid fire
1183
            pass
×
1184
        os_chmod(path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
5✔
1185
        assert os.path.exists(path)
5✔
1186

1187

1188
def is_subpath(long_path: str, short_path: str) -> bool:
5✔
1189
    """Returns whether long_path is a sub-path of short_path."""
1190
    try:
5✔
1191
        common = os.path.commonpath([long_path, short_path])
5✔
1192
    except ValueError:
5✔
1193
        return False
5✔
1194
    short_path = standardize_path(short_path)
5✔
1195
    common     = standardize_path(common)
5✔
1196
    return short_path == common
5✔
1197

1198

1199
def log_exceptions(func):
5✔
1200
    """Decorator to log AND re-raise exceptions."""
1201
    assert asyncio.iscoroutinefunction(func), 'func needs to be a coroutine'
5✔
1202
    @functools.wraps(func)
5✔
1203
    async def wrapper(*args, **kwargs):
5✔
1204
        self = args[0] if len(args) > 0 else None
5✔
1205
        try:
5✔
1206
            return await func(*args, **kwargs)
5✔
1207
        except asyncio.CancelledError as e:
5✔
1208
            raise
5✔
1209
        except BaseException as e:
5✔
1210
            mylogger = self.logger if hasattr(self, 'logger') else _logger
5✔
1211
            try:
5✔
1212
                mylogger.exception(f"Exception in {func.__name__}: {repr(e)}")
5✔
1213
            except BaseException as e2:
×
1214
                print(f"logging exception raised: {repr(e2)}... orig exc: {repr(e)} in {func.__name__}")
×
1215
            raise
5✔
1216
    return wrapper
5✔
1217

1218

1219
def ignore_exceptions(func):
5✔
1220
    """Decorator to silently swallow all exceptions."""
1221
    assert asyncio.iscoroutinefunction(func), 'func needs to be a coroutine'
5✔
1222
    @functools.wraps(func)
5✔
1223
    async def wrapper(*args, **kwargs):
5✔
1224
        try:
×
1225
            return await func(*args, **kwargs)
×
1226
        except Exception as e:
×
1227
            pass
×
1228
    return wrapper
5✔
1229

1230

1231
def with_lock(func):
5✔
1232
    """Decorator to enforce a lock on a function call."""
1233
    def func_wrapper(self, *args, **kwargs):
5✔
1234
        with self.lock:
5✔
1235
            return func(self, *args, **kwargs)
5✔
1236
    return func_wrapper
5✔
1237

1238

1239
class TxMinedInfo(NamedTuple):
5✔
1240
    height: int                        # height of block that mined tx
5✔
1241
    conf: Optional[int] = None         # number of confirmations, SPV verified. >=0, or None (None means unknown)
5✔
1242
    timestamp: Optional[int] = None    # timestamp of block that mined tx
5✔
1243
    txpos: Optional[int] = None        # position of tx in serialized block
5✔
1244
    header_hash: Optional[str] = None  # hash of block that mined tx
5✔
1245
    wanted_height: Optional[int] = None  # in case of timelock, min abs block height
5✔
1246

1247
    def short_id(self) -> Optional[str]:
5✔
1248
        if self.txpos is not None and self.txpos >= 0:
×
1249
            assert self.height > 0
×
1250
            return f"{self.height}x{self.txpos}"
×
1251
        return None
×
1252

1253
    def is_local_like(self) -> bool:
5✔
1254
        """Returns whether the tx is local-like (LOCAL/FUTURE)."""
1255
        from .address_synchronizer import TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT
×
1256
        if self.height > 0:
×
1257
            return False
×
1258
        if self.height in (TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT):
×
1259
            return False
×
1260
        return True
×
1261

1262

1263
class ShortID(bytes):
5✔
1264

1265
    def __repr__(self):
5✔
1266
        return f"<ShortID: {format_short_id(self)}>"
5✔
1267

1268
    def __str__(self):
5✔
1269
        return format_short_id(self)
5✔
1270

1271
    @classmethod
5✔
1272
    def from_components(cls, block_height: int, tx_pos_in_block: int, output_index: int) -> 'ShortID':
5✔
1273
        bh = block_height.to_bytes(3, byteorder='big')
5✔
1274
        tpos = tx_pos_in_block.to_bytes(3, byteorder='big')
5✔
1275
        oi = output_index.to_bytes(2, byteorder='big')
5✔
1276
        return ShortID(bh + tpos + oi)
5✔
1277

1278
    @classmethod
5✔
1279
    def from_str(cls, scid: str) -> 'ShortID':
5✔
1280
        """Parses a formatted scid str, e.g. '643920x356x0'."""
1281
        components = scid.split("x")
5✔
1282
        if len(components) != 3:
5✔
1283
            raise ValueError(f"failed to parse ShortID: {scid!r}")
×
1284
        try:
5✔
1285
            components = [int(x) for x in components]
5✔
1286
        except ValueError:
×
1287
            raise ValueError(f"failed to parse ShortID: {scid!r}") from None
×
1288
        return ShortID.from_components(*components)
5✔
1289

1290
    @classmethod
5✔
1291
    def normalize(cls, data: Union[None, str, bytes, 'ShortID']) -> Optional['ShortID']:
5✔
1292
        if isinstance(data, ShortID) or data is None:
5✔
1293
            return data
5✔
1294
        if isinstance(data, str):
5✔
1295
            assert len(data) == 16
5✔
1296
            return ShortID.fromhex(data)
5✔
1297
        if isinstance(data, (bytes, bytearray)):
5✔
1298
            assert len(data) == 8
5✔
1299
            return ShortID(data)
5✔
1300

1301
    @property
5✔
1302
    def block_height(self) -> int:
5✔
1303
        return int.from_bytes(self[:3], byteorder='big')
5✔
1304

1305
    @property
5✔
1306
    def txpos(self) -> int:
5✔
1307
        return int.from_bytes(self[3:6], byteorder='big')
5✔
1308

1309
    @property
5✔
1310
    def output_index(self) -> int:
5✔
1311
        return int.from_bytes(self[6:8], byteorder='big')
5✔
1312

1313

1314
def format_short_id(short_channel_id: Optional[bytes]):
5✔
1315
    if not short_channel_id:
5✔
1316
        return _('Not yet available')
×
1317
    return str(int.from_bytes(short_channel_id[:3], 'big')) \
5✔
1318
        + 'x' + str(int.from_bytes(short_channel_id[3:6], 'big')) \
1319
        + 'x' + str(int.from_bytes(short_channel_id[6:], 'big'))
1320

1321

1322
def make_aiohttp_proxy_connector(proxy: 'ProxySettings', ssl_context: Optional[ssl.SSLContext] = None) -> ProxyConnector:
5✔
1323
    return ProxyConnector(
×
1324
        proxy_type=ProxyType.SOCKS5 if proxy.mode == 'socks5' else ProxyType.SOCKS4,
1325
        host=proxy.host,
1326
        port=int(proxy.port),
1327
        username=proxy.user,
1328
        password=proxy.password,
1329
        rdns=True,  # needed to prevent DNS leaks over proxy
1330
        ssl=ssl_context,
1331
    )
1332

1333

1334
def make_aiohttp_session(proxy: Optional['ProxySettings'], headers=None, timeout=None):
5✔
1335
    if headers is None:
×
1336
        headers = {'User-Agent': 'Electrum'}
×
1337
    if timeout is None:
×
1338
        # The default timeout is high intentionally.
1339
        # DNS on some systems can be really slow, see e.g. #5337
1340
        timeout = aiohttp.ClientTimeout(total=45)
×
1341
    elif isinstance(timeout, (int, float)):
×
1342
        timeout = aiohttp.ClientTimeout(total=timeout)
×
1343
    ssl_context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_path)
×
1344

1345
    if proxy and proxy.enabled:
×
1346
        connector = make_aiohttp_proxy_connector(proxy, ssl_context)
×
1347
    else:
1348
        connector = aiohttp.TCPConnector(ssl=ssl_context)
×
1349

1350
    return aiohttp.ClientSession(headers=headers, timeout=timeout, connector=connector)
×
1351

1352

1353
class OldTaskGroup(aiorpcx.TaskGroup):
5✔
1354
    """Automatically raises exceptions on join; as in aiorpcx prior to version 0.20.
1355
    That is, when using TaskGroup as a context manager, if any task encounters an exception,
1356
    we would like that exception to be re-raised (propagated out). For the wait=all case,
1357
    the OldTaskGroup class is emulating the following code-snippet:
1358
    ```
1359
    async with TaskGroup() as group:
1360
        await group.spawn(task1())
1361
        await group.spawn(task2())
1362

1363
        async for task in group:
1364
            if not task.cancelled():
1365
                task.result()
1366
    ```
1367
    So instead of the above, one can just write:
1368
    ```
1369
    async with OldTaskGroup() as group:
1370
        await group.spawn(task1())
1371
        await group.spawn(task2())
1372
    ```
1373
    # TODO see if we can migrate to asyncio.timeout, introduced in python 3.11, and use stdlib instead of aiorpcx.curio...
1374
    """
1375
    async def join(self):
5✔
1376
        if self._wait is all:
5✔
1377
            exc = False
5✔
1378
            try:
5✔
1379
                async for task in self:
5✔
1380
                    if not task.cancelled():
5✔
1381
                        task.result()
5✔
1382
            except BaseException:  # including asyncio.CancelledError
5✔
1383
                exc = True
5✔
1384
                raise
5✔
1385
            finally:
1386
                if exc:
5✔
1387
                    await self.cancel_remaining()
5✔
1388
                await super().join()
5✔
1389
        else:
1390
            await super().join()
5✔
1391
            if self.completed:
5✔
1392
                self.completed.result()
5✔
1393

1394
# We monkey-patch aiorpcx TimeoutAfter (used by timeout_after and ignore_after API),
1395
# to fix a timing issue present in asyncio as a whole re timing out tasks.
1396
# To see the issue we are trying to fix, consider example:
1397
#     async def outer_task():
1398
#         async with timeout_after(0.1):
1399
#             await inner_task()
1400
# When the 0.1 sec timeout expires, inner_task will get cancelled by timeout_after (=internal cancellation).
1401
# If around the same time (in terms of event loop iterations) another coroutine
1402
# cancels outer_task (=external cancellation), there will be a race.
1403
# Both cancellations work by propagating a CancelledError out to timeout_after, which then
1404
# needs to decide (in TimeoutAfter.__aexit__) whether it's due to an internal or external cancellation.
1405
# AFAICT asyncio provides no reliable way of distinguishing between the two.
1406
# This patch tries to always give priority to external cancellations.
1407
# see https://github.com/kyuupichan/aiorpcX/issues/44
1408
# see https://github.com/aio-libs/async-timeout/issues/229
1409
# see https://bugs.python.org/issue42130 and https://bugs.python.org/issue45098
1410
# TODO see if we can migrate to asyncio.timeout, introduced in python 3.11, and use stdlib instead of aiorpcx.curio...
1411
def _aiorpcx_monkeypatched_set_new_deadline(task, deadline):
5✔
1412
    def timeout_task():
5✔
1413
        task._orig_cancel()
5✔
1414
        task._timed_out = None if getattr(task, "_externally_cancelled", False) else deadline
5✔
1415
    def mycancel(*args, **kwargs):
5✔
1416
        task._orig_cancel(*args, **kwargs)
5✔
1417
        task._externally_cancelled = True
5✔
1418
        task._timed_out = None
5✔
1419
    if not hasattr(task, "_orig_cancel"):
5✔
1420
        task._orig_cancel = task.cancel
5✔
1421
        task.cancel = mycancel
5✔
1422
    task._deadline_handle = task._loop.call_at(deadline, timeout_task)
5✔
1423

1424

1425
def _aiorpcx_monkeypatched_set_task_deadline(task, deadline):
5✔
1426
    ret = _aiorpcx_orig_set_task_deadline(task, deadline)
5✔
1427
    task._externally_cancelled = None
5✔
1428
    return ret
5✔
1429

1430

1431
def _aiorpcx_monkeypatched_unset_task_deadline(task):
5✔
1432
    if hasattr(task, "_orig_cancel"):
5✔
1433
        task.cancel = task._orig_cancel
5✔
1434
        del task._orig_cancel
5✔
1435
    return _aiorpcx_orig_unset_task_deadline(task)
5✔
1436

1437

1438
_aiorpcx_orig_set_task_deadline    = aiorpcx.curio._set_task_deadline
5✔
1439
_aiorpcx_orig_unset_task_deadline  = aiorpcx.curio._unset_task_deadline
5✔
1440

1441
aiorpcx.curio._set_new_deadline    = _aiorpcx_monkeypatched_set_new_deadline
5✔
1442
aiorpcx.curio._set_task_deadline   = _aiorpcx_monkeypatched_set_task_deadline
5✔
1443
aiorpcx.curio._unset_task_deadline = _aiorpcx_monkeypatched_unset_task_deadline
5✔
1444

1445

1446
async def wait_for2(fut: Awaitable, timeout: Union[int, float, None]):
5✔
1447
    """Replacement for asyncio.wait_for,
1448
     due to bugs: https://bugs.python.org/issue42130 and https://github.com/python/cpython/issues/86296 ,
1449
     which are only fixed in python 3.12+.
1450
     """
1451
    if sys.version_info[:3] >= (3, 12):
5✔
1452
        return await asyncio.wait_for(fut, timeout)
3✔
1453
    else:
1454
        async with async_timeout(timeout):
2✔
1455
            return await asyncio.ensure_future(fut, loop=get_running_loop())
2✔
1456

1457

1458
if hasattr(asyncio, 'timeout'):  # python 3.11+
5✔
1459
    async_timeout = asyncio.timeout
4✔
1460
else:
1461
    class TimeoutAfterAsynciolike(aiorpcx.curio.TimeoutAfter):
1✔
1462
        async def __aexit__(self, exc_type, exc_value, tb):
1✔
1463
            try:
1✔
1464
                await super().__aexit__(exc_type, exc_value, tb)
1✔
1465
            except (aiorpcx.TaskTimeout, aiorpcx.UncaughtTimeoutError):
×
1466
                raise asyncio.TimeoutError from None
×
1467
            except aiorpcx.TimeoutCancellationError:
×
1468
                raise asyncio.CancelledError from None
×
1469

1470
    def async_timeout(delay: Union[int, float, None]):
1✔
1471
        if delay is None:
1✔
1472
            return nullcontext()
×
1473
        return TimeoutAfterAsynciolike(delay)
1✔
1474

1475

1476
class NetworkJobOnDefaultServer(Logger, ABC):
5✔
1477
    """An abstract base class for a job that runs on the main network
1478
    interface. Every time the main interface changes, the job is
1479
    restarted, and some of its internals are reset.
1480
    """
1481
    def __init__(self, network: 'Network'):
5✔
1482
        Logger.__init__(self)
5✔
1483
        self.network = network
5✔
1484
        self.interface = None  # type: Interface
5✔
1485
        self._restart_lock = asyncio.Lock()
5✔
1486
        # Ensure fairness between NetworkJobs. e.g. if multiple wallets
1487
        # are open, a large wallet's Synchronizer should not starve the small wallets:
1488
        self._network_request_semaphore = asyncio.Semaphore(100)
5✔
1489

1490
        self._reset()
5✔
1491
        # every time the main interface changes, restart:
1492
        register_callback(self._restart, ['default_server_changed'])
5✔
1493
        # also schedule a one-off restart now, as there might already be a main interface:
1494
        asyncio.run_coroutine_threadsafe(self._restart(), network.asyncio_loop)
5✔
1495

1496
    def _reset(self):
5✔
1497
        """Initialise fields. Called every time the underlying
1498
        server connection changes.
1499
        """
1500
        self.taskgroup = OldTaskGroup()
5✔
1501
        self.reset_request_counters()
5✔
1502

1503
    async def _start(self, interface: 'Interface'):
5✔
1504
        self.logger.debug(f"starting. interface.server={repr(str(interface.server))}")
×
1505
        self.interface = interface
×
1506

1507
        taskgroup = self.taskgroup
×
1508
        async def run_tasks_wrapper():
×
1509
            self.logger.debug(f"starting taskgroup ({hex(id(taskgroup))}).")
×
1510
            try:
×
1511
                await self._run_tasks(taskgroup=taskgroup)
×
1512
            except Exception as e:
×
1513
                self.logger.error(f"taskgroup died ({hex(id(taskgroup))}). exc={e!r}")
×
1514
                raise
×
1515
            finally:
1516
                self.logger.debug(f"taskgroup stopped ({hex(id(taskgroup))}).")
×
1517
        await interface.taskgroup.spawn(run_tasks_wrapper)
×
1518

1519
    @abstractmethod
5✔
1520
    async def _run_tasks(self, *, taskgroup: OldTaskGroup) -> None:
5✔
1521
        """Start tasks in taskgroup. Called every time the underlying
1522
        server connection changes.
1523
        """
1524
        # If self.taskgroup changed, don't start tasks. This can happen if we have
1525
        # been restarted *just now*, i.e. after the _run_tasks coroutine object was created.
1526
        if taskgroup != self.taskgroup:
×
1527
            raise asyncio.CancelledError()
×
1528

1529
    async def stop(self, *, full_shutdown: bool = True):
5✔
1530
        self.logger.debug(f"stopping. {full_shutdown=}")
×
1531
        if full_shutdown:
×
1532
            unregister_callback(self._restart)
×
1533
        await self.taskgroup.cancel_remaining()
×
1534

1535
    @log_exceptions
5✔
1536
    async def _restart(self, *args):
5✔
1537
        interface = self.network.interface
5✔
1538
        if interface is None:
5✔
1539
            return  # we should get called again soon
5✔
1540

1541
        async with self._restart_lock:
×
1542
            await self.stop(full_shutdown=False)
×
1543
            self._reset()
×
1544
            await self._start(interface)
×
1545

1546
    def reset_request_counters(self):
5✔
1547
        self._requests_sent = 0
5✔
1548
        self._requests_answered = 0
5✔
1549

1550
    def num_requests_sent_and_answered(self) -> Tuple[int, int]:
5✔
1551
        return self._requests_sent, self._requests_answered
×
1552

1553
    @property
5✔
1554
    def session(self):
5✔
1555
        s = self.interface.session
×
1556
        assert s is not None
×
1557
        return s
×
1558

1559

1560
async def detect_tor_socks_proxy() -> Optional[Tuple[str, int]]:
5✔
1561
    # Probable ports for Tor to listen at
1562
    candidates = [
×
1563
        ("127.0.0.1", 9050),
1564
        ("127.0.0.1", 9051),
1565
        ("127.0.0.1", 9150),
1566
    ]
1567

1568
    proxy_addr = None
×
1569
    async def test_net_addr(net_addr):
×
1570
        is_tor = await is_tor_socks_port(*net_addr)
×
1571
        # set result, and cancel remaining probes
1572
        if is_tor:
×
1573
            nonlocal proxy_addr
1574
            proxy_addr = net_addr
×
1575
            await group.cancel_remaining()
×
1576

1577
    async with OldTaskGroup() as group:
×
1578
        for net_addr in candidates:
×
1579
            await group.spawn(test_net_addr(net_addr))
×
1580
    return proxy_addr
×
1581

1582

1583
@log_exceptions
5✔
1584
async def is_tor_socks_port(host: str, port: int) -> bool:
5✔
1585
    # mimic "tor-resolve 0.0.0.0".
1586
    # see https://github.com/spesmilo/electrum/issues/7317#issuecomment-1369281075
1587
    # > this is a socks5 handshake, followed by a socks RESOLVE request as defined in
1588
    # > [tor's socks extension spec](https://github.com/torproject/torspec/blob/7116c9cdaba248aae07a3f1d0e15d9dd102f62c5/socks-extensions.txt#L63),
1589
    # > resolving 0.0.0.0, which being an IP, tor resolves itself without needing to ask a relay.
1590
    writer = None
×
1591
    try:
×
1592
        async with async_timeout(10):
×
1593
            reader, writer = await asyncio.open_connection(host, port)
×
1594
            writer.write(b'\x05\x01\x00\x05\xf0\x00\x03\x070.0.0.0\x00\x00')
×
1595
            await writer.drain()
×
1596
            data = await reader.read(1024)
×
1597
            if data == b'\x05\x00\x05\x00\x00\x01\x00\x00\x00\x00\x00\x00':
×
1598
                return True
×
1599
            return False
×
1600
    except (OSError, asyncio.TimeoutError):
×
1601
        return False
×
1602
    finally:
1603
        if writer:
×
1604
            writer.close()
×
1605

1606

1607
AS_LIB_USER_I_WANT_TO_MANAGE_MY_OWN_ASYNCIO_LOOP = False  # used by unit tests
5✔
1608

1609
_asyncio_event_loop = None  # type: Optional[asyncio.AbstractEventLoop]
5✔
1610
def get_asyncio_loop() -> asyncio.AbstractEventLoop:
5✔
1611
    """Returns the global asyncio event loop we use."""
1612
    if loop := _asyncio_event_loop:
5✔
1613
        return loop
5✔
1614
    if AS_LIB_USER_I_WANT_TO_MANAGE_MY_OWN_ASYNCIO_LOOP:
5✔
1615
        if loop := get_running_loop():
5✔
1616
            return loop
5✔
1617
    raise Exception("event loop not created yet")
×
1618

1619

1620
def create_and_start_event_loop() -> Tuple[asyncio.AbstractEventLoop,
5✔
1621
                                           asyncio.Future,
1622
                                           threading.Thread]:
1623
    global _asyncio_event_loop
1624
    if _asyncio_event_loop is not None:
×
1625
        raise Exception("there is already a running event loop")
×
1626

1627
    # asyncio.get_event_loop() became deprecated in python3.10. (see https://github.com/python/cpython/issues/83710)
1628
    # We set a custom event loop policy purely to be compatible with code that
1629
    # relies on asyncio.get_event_loop().
1630
    # - in python 3.8-3.9, asyncio.Event.__init__, asyncio.Lock.__init__,
1631
    #   and similar, calls get_event_loop. see https://github.com/python/cpython/pull/23420
1632
    class MyEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
×
1633
        def get_event_loop(self):
×
1634
            # In case electrum is being used as a library, there might be other
1635
            # event loops in use besides ours. To minimise interfering with those,
1636
            # if there is a loop running in the current thread, return that:
1637
            running_loop = get_running_loop()
×
1638
            if running_loop is not None:
×
1639
                return running_loop
×
1640
            # Otherwise, return our global loop:
1641
            return get_asyncio_loop()
×
1642
    asyncio.set_event_loop_policy(MyEventLoopPolicy())
×
1643

1644
    loop = asyncio.new_event_loop()
×
1645
    _asyncio_event_loop = loop
×
1646

1647
    def on_exception(loop, context):
×
1648
        """Suppress spurious messages it appears we cannot control."""
1649
        SUPPRESS_MESSAGE_REGEX = re.compile('SSL handshake|Fatal read error on|'
×
1650
                                            'SSL error in data received')
1651
        message = context.get('message')
×
1652
        if message and SUPPRESS_MESSAGE_REGEX.match(message):
×
1653
            return
×
1654
        loop.default_exception_handler(context)
×
1655

1656
    def run_event_loop():
×
1657
        try:
×
1658
            loop.run_until_complete(stopping_fut)
×
1659
        finally:
1660
            # clean-up
1661
            global _asyncio_event_loop
1662
            _asyncio_event_loop = None
×
1663

1664
    loop.set_exception_handler(on_exception)
×
1665
    _set_custom_task_factory(loop)
×
1666
    # loop.set_debug(True)
1667
    stopping_fut = loop.create_future()
×
1668
    loop_thread = threading.Thread(
×
1669
        target=run_event_loop,
1670
        name='EventLoop',
1671
    )
1672
    loop_thread.start()
×
1673
    # Wait until the loop actually starts.
1674
    # On a slow PC, or with a debugger attached, this can take a few dozens of ms,
1675
    # and if we returned without a running loop, weird things can happen...
1676
    t0 = time.monotonic()
×
1677
    while not loop.is_running():
×
1678
        time.sleep(0.01)
×
1679
        if time.monotonic() - t0 > 5:
×
1680
            raise Exception("been waiting for 5 seconds but asyncio loop would not start!")
×
1681
    return loop, stopping_fut, loop_thread
×
1682

1683

1684
_running_asyncio_tasks = set()  # type: Set[asyncio.Future]
5✔
1685
def _set_custom_task_factory(loop: asyncio.AbstractEventLoop):
5✔
1686
    """Wrap task creation to track pending and running tasks.
1687
    When tasks are created, asyncio only maintains a weak reference to them.
1688
    Hence, the garbage collector might destroy the task mid-execution.
1689
    To avoid this, we store a strong reference for the task until it completes.
1690

1691
    Without this, a lot of APIs are basically Heisenbug-generators... e.g.:
1692
    - "asyncio.create_task"
1693
    - "loop.create_task"
1694
    - "asyncio.ensure_future"
1695
    - "asyncio.run_coroutine_threadsafe"
1696

1697
    related:
1698
        - https://bugs.python.org/issue44665
1699
        - https://github.com/python/cpython/issues/88831
1700
        - https://github.com/python/cpython/issues/91887
1701
        - https://textual.textualize.io/blog/2023/02/11/the-heisenbug-lurking-in-your-async-code/
1702
        - https://github.com/python/cpython/issues/91887#issuecomment-1434816045
1703
        - "Task was destroyed but it is pending!"
1704
    """
1705

1706
    platform_task_factory = loop.get_task_factory()
5✔
1707

1708
    def factory(loop_, coro, **kwargs):
5✔
1709
        if platform_task_factory is not None:
5✔
1710
            task = platform_task_factory(loop_, coro, **kwargs)
×
1711
        else:
1712
            task = asyncio.Task(coro, loop=loop_, **kwargs)
5✔
1713
        _running_asyncio_tasks.add(task)
5✔
1714
        task.add_done_callback(_running_asyncio_tasks.discard)
5✔
1715
        return task
5✔
1716

1717
    loop.set_task_factory(factory)
5✔
1718

1719

1720
def run_sync_function_on_asyncio_thread(func: Callable, *, block: bool) -> None:
5✔
1721
    """Run a non-async fn on the asyncio thread. Can be called from any thread.
1722

1723
    If the current thread is already the asyncio thread, func is guaranteed
1724
    to have been completed when this method returns.
1725

1726
    For any other thread, we only wait for completion if `block` is True.
1727
    """
1728
    assert not asyncio.iscoroutinefunction(func), "func must be a non-async function"
5✔
1729
    asyncio_loop = get_asyncio_loop()
5✔
1730
    if get_running_loop() == asyncio_loop:  # we are running on the asyncio thread
5✔
1731
        func()
5✔
1732
    else:  # non-asyncio thread
1733
        async def wrapper():
×
1734
            return func()
×
1735
        fut = asyncio.run_coroutine_threadsafe(wrapper(), loop=asyncio_loop)
×
1736
        if block:
×
1737
            fut.result()
×
1738
        else:
1739
            # add explicit logging of exceptions, otherwise they might get lost
1740
            tb1 = traceback.format_stack()[:-1]
×
1741
            tb1_str = "".join(tb1)
×
1742
            def on_done(fut_: concurrent.futures.Future):
×
1743
                assert fut_.done()
×
1744
                if fut_.cancelled():
×
1745
                    _logger.debug(f"func cancelled. {func=}.")
×
1746
                elif exc := fut_.exception():
×
1747
                    # note: We explicitly log the first part of the traceback, tb1_str.
1748
                    #       The second part gets logged by setting "exc_info".
1749
                    _logger.error(
×
1750
                        f"func errored. {func=}. {exc=}"
1751
                        f"\n{tb1_str}", exc_info=exc)
1752
            fut.add_done_callback(on_done)
×
1753

1754

1755
class OrderedDictWithIndex(OrderedDict):
5✔
1756
    """An OrderedDict that keeps track of the positions of keys.
1757

1758
    Note: very inefficient to modify contents, except to add new items.
1759
    """
1760

1761
    def __init__(self):
5✔
1762
        super().__init__()
×
1763
        self._key_to_pos = {}
×
1764
        self._pos_to_key = {}
×
1765

1766
    def _recalc_index(self):
5✔
1767
        self._key_to_pos = {key: pos for (pos, key) in enumerate(self.keys())}
×
1768
        self._pos_to_key = {pos: key for (pos, key) in enumerate(self.keys())}
×
1769

1770
    def pos_from_key(self, key):
5✔
1771
        return self._key_to_pos[key]
×
1772

1773
    def value_from_pos(self, pos):
5✔
1774
        key = self._pos_to_key[pos]
×
1775
        return self[key]
×
1776

1777
    def popitem(self, *args, **kwargs):
5✔
1778
        ret = super().popitem(*args, **kwargs)
×
1779
        self._recalc_index()
×
1780
        return ret
×
1781

1782
    def move_to_end(self, *args, **kwargs):
5✔
1783
        ret = super().move_to_end(*args, **kwargs)
×
1784
        self._recalc_index()
×
1785
        return ret
×
1786

1787
    def clear(self):
5✔
1788
        ret = super().clear()
×
1789
        self._recalc_index()
×
1790
        return ret
×
1791

1792
    def pop(self, *args, **kwargs):
5✔
1793
        ret = super().pop(*args, **kwargs)
×
1794
        self._recalc_index()
×
1795
        return ret
×
1796

1797
    def update(self, *args, **kwargs):
5✔
1798
        ret = super().update(*args, **kwargs)
×
1799
        self._recalc_index()
×
1800
        return ret
×
1801

1802
    def __delitem__(self, *args, **kwargs):
5✔
1803
        ret = super().__delitem__(*args, **kwargs)
×
1804
        self._recalc_index()
×
1805
        return ret
×
1806

1807
    def __setitem__(self, key, *args, **kwargs):
5✔
1808
        is_new_key = key not in self
×
1809
        ret = super().__setitem__(key, *args, **kwargs)
×
1810
        if is_new_key:
×
1811
            pos = len(self) - 1
×
1812
            self._key_to_pos[key] = pos
×
1813
            self._pos_to_key[pos] = key
×
1814
        return ret
×
1815

1816

1817
def multisig_type(wallet_type):
5✔
1818
    '''If wallet_type is mofn multi-sig, return [m, n],
1819
    otherwise return None.'''
1820
    if not wallet_type:
5✔
1821
        return None
×
1822
    match = re.match(r'(\d+)of(\d+)', wallet_type)
5✔
1823
    if match:
5✔
1824
        match = [int(x) for x in match.group(1, 2)]
5✔
1825
    return match
5✔
1826

1827

1828
def is_ip_address(x: Union[str, bytes]) -> bool:
5✔
1829
    if isinstance(x, bytes):
5✔
1830
        x = x.decode("utf-8")
×
1831
    try:
5✔
1832
        ipaddress.ip_address(x)
5✔
1833
        return True
5✔
1834
    except ValueError:
5✔
1835
        return False
5✔
1836

1837

1838
def is_localhost(host: str) -> bool:
5✔
1839
    if str(host) in ('localhost', 'localhost.',):
5✔
1840
        return True
5✔
1841
    if host[0] == '[' and host[-1] == ']':  # IPv6
5✔
1842
        host = host[1:-1]
5✔
1843
    try:
5✔
1844
        ip_addr = ipaddress.ip_address(host)  # type: Union[IPv4Address, IPv6Address]
5✔
1845
        return ip_addr.is_loopback
5✔
1846
    except ValueError:
5✔
1847
        pass  # not an IP
5✔
1848
    return False
5✔
1849

1850

1851
def is_private_netaddress(host: str) -> bool:
5✔
1852
    if is_localhost(host):
5✔
1853
        return True
5✔
1854
    if host[0] == '[' and host[-1] == ']':  # IPv6
5✔
1855
        host = host[1:-1]
5✔
1856
    try:
5✔
1857
        ip_addr = ipaddress.ip_address(host)  # type: Union[IPv4Address, IPv6Address]
5✔
1858
        return ip_addr.is_private
5✔
1859
    except ValueError:
5✔
1860
        pass  # not an IP
5✔
1861
    return False
5✔
1862

1863

1864
def list_enabled_bits(x: int) -> Sequence[int]:
5✔
1865
    """e.g. 77 (0b1001101) --> (0, 2, 3, 6)"""
1866
    binary = bin(x)[2:]
5✔
1867
    rev_bin = reversed(binary)
5✔
1868
    return tuple(i for i, b in enumerate(rev_bin) if b == '1')
5✔
1869

1870

1871
async def resolve_dns_srv(host: str):
5✔
1872
    # FIXME this method is not using the network proxy. (although the proxy might not support UDP?)
1873
    srv_records = await dns.asyncresolver.resolve(host, 'SRV')
×
1874
    # priority: prefer lower
1875
    # weight: tie breaker; prefer higher
1876
    srv_records = sorted(srv_records, key=lambda x: (x.priority, -x.weight))
×
1877

1878
    def dict_from_srv_record(srv):
×
1879
        return {
×
1880
            'host': str(srv.target),
1881
            'port': srv.port,
1882
        }
1883
    return [dict_from_srv_record(srv) for srv in srv_records]
×
1884

1885

1886
def randrange(bound: int) -> int:
5✔
1887
    """Return a random integer k such that 1 <= k < bound, uniformly
1888
    distributed across that range.
1889
    This is guaranteed to be cryptographically strong.
1890
    """
1891
    # secrets.randbelow(bound) returns a random int: 0 <= r < bound,
1892
    # hence transformations:
1893
    return secrets.randbelow(bound - 1) + 1
5✔
1894

1895

1896
class CallbackManager(Logger):
5✔
1897
    # callbacks set by the GUI or any thread
1898
    # guarantee: the callbacks will always get triggered from the asyncio thread.
1899

1900
    def __init__(self):
5✔
1901
        Logger.__init__(self)
5✔
1902
        self.callback_lock = threading.Lock()
5✔
1903
        self.callbacks = defaultdict(list)      # note: needs self.callback_lock
5✔
1904

1905
    def register_callback(self, func, events):
5✔
1906
        with self.callback_lock:
5✔
1907
            for event in events:
5✔
1908
                self.callbacks[event].append(func)
5✔
1909

1910
    def unregister_callback(self, callback):
5✔
1911
        with self.callback_lock:
5✔
1912
            for callbacks in self.callbacks.values():
5✔
1913
                if callback in callbacks:
5✔
1914
                    callbacks.remove(callback)
5✔
1915

1916
    def trigger_callback(self, event, *args):
5✔
1917
        """Trigger a callback with given arguments.
1918
        Can be called from any thread. The callback itself will get scheduled
1919
        on the event loop.
1920
        """
1921
        loop = get_asyncio_loop()
5✔
1922
        assert loop.is_running(), "event loop not running"
5✔
1923
        with self.callback_lock:
5✔
1924
            callbacks = self.callbacks[event][:]
5✔
1925
        for callback in callbacks:
5✔
1926
            if asyncio.iscoroutinefunction(callback):  # async cb
5✔
1927
                fut = asyncio.run_coroutine_threadsafe(callback(*args), loop)
5✔
1928
                def on_done(fut_: concurrent.futures.Future):
5✔
1929
                    assert fut_.done()
5✔
1930
                    if fut_.cancelled():
5✔
1931
                        self.logger.debug(f"cb cancelled. {event=}.")
4✔
1932
                    elif exc := fut_.exception():
5✔
1933
                        self.logger.error(f"cb errored. {event=}. {exc=}", exc_info=exc)
×
1934
                fut.add_done_callback(on_done)
5✔
1935
            else:  # non-async cb
1936
                run_sync_function_on_asyncio_thread(partial(callback, *args), block=False)
5✔
1937

1938

1939
callback_mgr = CallbackManager()
5✔
1940
trigger_callback = callback_mgr.trigger_callback
5✔
1941
register_callback = callback_mgr.register_callback
5✔
1942
unregister_callback = callback_mgr.unregister_callback
5✔
1943
_event_listeners = defaultdict(set)  # type: Dict[str, Set[str]]
5✔
1944

1945

1946
class EventListener:
5✔
1947
    """Use as a mixin for a class that has methods to be triggered on events.
1948
    - Methods that receive the callbacks should be named "on_event_*" and decorated with @event_listener.
1949
    - register_callbacks() should be called exactly once per instance of EventListener, e.g. in __init__
1950
    - unregister_callbacks() should be called at least once, e.g. when the instance is destroyed
1951
    """
1952

1953
    def _list_callbacks(self):
5✔
1954
        for c in self.__class__.__mro__:
5✔
1955
            classpath = f"{c.__module__}.{c.__name__}"
5✔
1956
            for method_name in _event_listeners[classpath]:
5✔
1957
                method = getattr(self, method_name)
5✔
1958
                assert callable(method)
5✔
1959
                assert method_name.startswith('on_event_')
5✔
1960
                yield method_name[len('on_event_'):], method
5✔
1961

1962
    def register_callbacks(self):
5✔
1963
        for name, method in self._list_callbacks():
5✔
1964
            #_logger.debug(f'registering callback {method}')
1965
            register_callback(method, [name])
5✔
1966

1967
    def unregister_callbacks(self):
5✔
1968
        for name, method in self._list_callbacks():
5✔
1969
            #_logger.debug(f'unregistering callback {method}')
1970
            unregister_callback(method)
5✔
1971

1972

1973
def event_listener(func):
5✔
1974
    """To be used in subclasses of EventListener only. (how to enforce this programmatically?)"""
1975
    classname, method_name = func.__qualname__.split('.')
5✔
1976
    assert method_name.startswith('on_event_')
5✔
1977
    classpath = f"{func.__module__}.{classname}"
5✔
1978
    _event_listeners[classpath].add(method_name)
5✔
1979
    return func
5✔
1980

1981

1982
_NetAddrType = TypeVar("_NetAddrType")
5✔
1983
# requirements for _NetAddrType:
1984
# - reasonable __hash__() implementation (e.g. based on host/port of remote endpoint)
1985

1986
class NetworkRetryManager(Generic[_NetAddrType]):
5✔
1987
    """Truncated Exponential Backoff for network connections."""
1988

1989
    def __init__(
5✔
1990
            self, *,
1991
            max_retry_delay_normal: float,
1992
            init_retry_delay_normal: float,
1993
            max_retry_delay_urgent: float = None,
1994
            init_retry_delay_urgent: float = None,
1995
    ):
1996
        self._last_tried_addr = {}  # type: Dict[_NetAddrType, Tuple[float, int]]  # (unix ts, num_attempts)
5✔
1997

1998
        # note: these all use "seconds" as unit
1999
        if max_retry_delay_urgent is None:
5✔
2000
            max_retry_delay_urgent = max_retry_delay_normal
5✔
2001
        if init_retry_delay_urgent is None:
5✔
2002
            init_retry_delay_urgent = init_retry_delay_normal
5✔
2003
        self._max_retry_delay_normal = max_retry_delay_normal
5✔
2004
        self._init_retry_delay_normal = init_retry_delay_normal
5✔
2005
        self._max_retry_delay_urgent = max_retry_delay_urgent
5✔
2006
        self._init_retry_delay_urgent = init_retry_delay_urgent
5✔
2007

2008
    def _trying_addr_now(self, addr: _NetAddrType) -> None:
5✔
2009
        last_time, num_attempts = self._last_tried_addr.get(addr, (0, 0))
×
2010
        # we add up to 1 second of noise to the time, so that clients are less likely
2011
        # to get synchronised and bombard the remote in connection waves:
2012
        cur_time = time.time() + random.random()
×
2013
        self._last_tried_addr[addr] = cur_time, num_attempts + 1
×
2014

2015
    def _on_connection_successfully_established(self, addr: _NetAddrType) -> None:
5✔
2016
        self._last_tried_addr[addr] = time.time(), 0
×
2017

2018
    def _can_retry_addr(self, addr: _NetAddrType, *,
5✔
2019
                        now: float = None, urgent: bool = False) -> bool:
2020
        if now is None:
×
2021
            now = time.time()
×
2022
        last_time, num_attempts = self._last_tried_addr.get(addr, (0, 0))
×
2023
        if urgent:
×
2024
            max_delay = self._max_retry_delay_urgent
×
2025
            init_delay = self._init_retry_delay_urgent
×
2026
        else:
2027
            max_delay = self._max_retry_delay_normal
×
2028
            init_delay = self._init_retry_delay_normal
×
2029
        delay = self.__calc_delay(multiplier=init_delay, max_delay=max_delay, num_attempts=num_attempts)
×
2030
        next_time = last_time + delay
×
2031
        return next_time < now
×
2032

2033
    @classmethod
5✔
2034
    def __calc_delay(cls, *, multiplier: float, max_delay: float,
5✔
2035
                     num_attempts: int) -> float:
2036
        num_attempts = min(num_attempts, 100_000)
×
2037
        try:
×
2038
            res = multiplier * 2 ** num_attempts
×
2039
        except OverflowError:
×
2040
            return max_delay
×
2041
        return max(0, min(max_delay, res))
×
2042

2043
    def _clear_addr_retry_times(self) -> None:
5✔
2044
        self._last_tried_addr.clear()
5✔
2045

2046

2047
class ESocksProxy(aiorpcx.SOCKSProxy):
5✔
2048
    # note: proxy will not leak DNS as create_connection()
2049
    # sets (local DNS) resolve=False by default
2050

2051
    async def open_connection(self, host=None, port=None, **kwargs):
5✔
2052
        loop = asyncio.get_running_loop()
×
2053
        reader = asyncio.StreamReader(loop=loop)
×
2054
        protocol = asyncio.StreamReaderProtocol(reader, loop=loop)
×
2055
        transport, _ = await self.create_connection(
×
2056
            lambda: protocol, host, port, **kwargs)
2057
        writer = asyncio.StreamWriter(transport, protocol, reader, loop)
×
2058
        return reader, writer
×
2059

2060
    @classmethod
5✔
2061
    def from_network_settings(cls, network: Optional['Network']) -> Optional['ESocksProxy']:
5✔
2062
        if not network or not network.proxy or not network.proxy.enabled:
5✔
2063
            return None
5✔
2064
        proxy = network.proxy
×
2065
        username, pw = proxy.user, proxy.password
×
2066
        if not username or not pw:
×
2067
            # is_proxy_tor is tri-state; None indicates it is still probing the proxy to test for TOR
2068
            if network.is_proxy_tor:
×
2069
                auth = aiorpcx.socks.SOCKSRandomAuth()
×
2070
            else:
2071
                auth = None
×
2072
        else:
2073
            auth = aiorpcx.socks.SOCKSUserAuth(username, pw)
×
2074
        addr = aiorpcx.NetAddress(proxy.host, proxy.port)
×
2075
        if proxy.mode == "socks4":
×
2076
            ret = cls(addr, aiorpcx.socks.SOCKS4a, auth)
×
2077
        elif proxy.mode == "socks5":
×
2078
            ret = cls(addr, aiorpcx.socks.SOCKS5, auth)
×
2079
        else:
2080
            raise NotImplementedError  # http proxy not available with aiorpcx
×
2081
        return ret
×
2082

2083

2084
class JsonRPCError(Exception):
5✔
2085

2086
    class Codes(enum.IntEnum):
5✔
2087
        # application-specific error codes
2088
        USERFACING = 1
5✔
2089
        INTERNAL = 2
5✔
2090

2091
    def __init__(self, *, code: int, message: str, data: Optional[dict] = None):
5✔
2092
        Exception.__init__(self)
×
2093
        self.code = code
×
2094
        self.message = message
×
2095
        self.data = data
×
2096

2097

2098
class JsonRPCClient:
5✔
2099

2100
    def __init__(self, session: aiohttp.ClientSession, url: str):
5✔
2101
        self.session = session
×
2102
        self.url = url
×
2103
        self._id = 0
×
2104

2105
    async def request(self, endpoint, *args):
5✔
2106
        """Send request to server, parse and return result.
2107
        note: parsing code is naive, the server is assumed to be well-behaved.
2108
              Up to the caller to handle exceptions, including those arising from parsing errors.
2109
        """
2110
        self._id += 1
×
2111
        data = ('{"jsonrpc": "2.0", "id":"%d", "method": "%s", "params": %s }'
×
2112
                % (self._id, endpoint, json.dumps(args)))
2113
        async with self.session.post(self.url, data=data) as resp:
×
2114
            if resp.status == 200:
×
2115
                r = await resp.json()
×
2116
                result = r.get('result')
×
2117
                error = r.get('error')
×
2118
                if error:
×
2119
                    raise JsonRPCError(code=error["code"], message=error["message"], data=error.get("data"))
×
2120
                else:
2121
                    return result
×
2122
            else:
2123
                text = await resp.text()
×
2124
                return 'Error: ' + str(text)
×
2125

2126
    def add_method(self, endpoint):
5✔
2127
        async def coro(*args):
×
2128
            return await self.request(endpoint, *args)
×
2129
        setattr(self, endpoint, coro)
×
2130

2131

2132
T = TypeVar('T')
5✔
2133

2134
def random_shuffled_copy(x: Iterable[T]) -> List[T]:
5✔
2135
    """Returns a shuffled copy of the input."""
2136
    x_copy = list(x)  # copy
5✔
2137
    random.shuffle(x_copy)  # shuffle in-place
5✔
2138
    return x_copy
5✔
2139

2140

2141
def test_read_write_permissions(path) -> None:
5✔
2142
    # note: There might already be a file at 'path'.
2143
    #       Make sure we do NOT overwrite/corrupt that!
2144
    temp_path = "%s.tmptest.%s" % (path, os.getpid())
5✔
2145
    echo = "fs r/w test"
5✔
2146
    try:
5✔
2147
        # test READ permissions for actual path
2148
        if os.path.exists(path):
5✔
2149
            with open(path, "rb") as f:
5✔
2150
                f.read(1)  # read 1 byte
5✔
2151
        # test R/W sanity for "similar" path
2152
        with open(temp_path, "w", encoding='utf-8') as f:
5✔
2153
            f.write(echo)
5✔
2154
        with open(temp_path, "r", encoding='utf-8') as f:
5✔
2155
            echo2 = f.read()
5✔
2156
        os.remove(temp_path)
5✔
2157
    except Exception as e:
×
2158
        raise IOError(e) from e
×
2159
    if echo != echo2:
5✔
2160
        raise IOError('echo sanity-check failed')
×
2161

2162

2163
class classproperty(property):
5✔
2164
    """~read-only class-level @property
2165
    from https://stackoverflow.com/a/13624858 by denis-ryzhkov
2166
    """
2167
    def __get__(self, owner_self, owner_cls):
5✔
2168
        return self.fget(owner_cls)
5✔
2169

2170

2171
def get_running_loop() -> Optional[asyncio.AbstractEventLoop]:
5✔
2172
    """Returns the asyncio event loop that is *running in this thread*, if any."""
2173
    try:
5✔
2174
        return asyncio.get_running_loop()
5✔
2175
    except RuntimeError:
×
2176
        return None
×
2177

2178

2179
def error_text_str_to_safe_str(err: str, *, max_len: Optional[int] = 500) -> str:
5✔
2180
    """Converts an untrusted error string to a sane printable ascii str.
2181
    Never raises.
2182
    """
2183
    text = error_text_bytes_to_safe_str(
5✔
2184
        err.encode("ascii", errors='backslashreplace'),
2185
        max_len=None)
2186
    return truncate_text(text, max_len=max_len)
5✔
2187

2188

2189
def error_text_bytes_to_safe_str(err: bytes, *, max_len: Optional[int] = 500) -> str:
5✔
2190
    """Converts an untrusted error bytes text to a sane printable ascii str.
2191
    Never raises.
2192

2193
    Note that naive ascii conversion would be insufficient. Fun stuff:
2194
    >>> b = b"my_long_prefix_blabla" + 21 * b"\x08" + b"malicious_stuff"
2195
    >>> s = b.decode("ascii")
2196
    >>> print(s)
2197
    malicious_stuffblabla
2198
    """
2199
    # convert to ascii, to get rid of unicode stuff
2200
    ascii_text = err.decode("ascii", errors='backslashreplace')
5✔
2201
    # do repr to handle ascii special chars (especially when printing/logging the str)
2202
    text = repr(ascii_text)
5✔
2203
    return truncate_text(text, max_len=max_len)
5✔
2204

2205

2206
def truncate_text(text: str, *, max_len: Optional[int]) -> str:
5✔
2207
    if max_len is None or len(text) <= max_len:
5✔
2208
        return text
5✔
2209
    else:
2210
        return text[:max_len] + f"... (truncated. orig_len={len(text)})"
5✔
2211

2212

2213
def nostr_pow_worker(nonce, nostr_pubk, target_bits, hash_function, hash_len_bits, shutdown):
5✔
2214
    """Function to generate PoW for Nostr, to be spawned in a ProcessPoolExecutor."""
2215
    hash_preimage = b'electrum-' + nostr_pubk
×
2216
    while True:
×
2217
        # we cannot check is_set on each iteration as it has a lot of overhead, this way we can check
2218
        # it with low overhead (just the additional range counter)
2219
        for i in range(1000000):
×
2220
            digest = hash_function(hash_preimage + nonce.to_bytes(32, 'big')).digest()
×
2221
            if int.from_bytes(digest, 'big') < (1 << (hash_len_bits - target_bits)):
×
2222
                shutdown.set()
×
2223
                return hash, nonce
×
2224
            nonce += 1
×
2225
        if shutdown.is_set():
×
2226
            return None, None
×
2227

2228

2229
async def gen_nostr_ann_pow(nostr_pubk: bytes, target_bits: int) -> Tuple[int, int]:
5✔
2230
    """Generate a PoW for a Nostr announcement. The PoW is hash[b'electrum-'+pubk+nonce]"""
2231
    import multiprocessing  # not available on Android, so we import it here
×
2232
    hash_function = hashlib.sha256
×
2233
    hash_len_bits = 256
×
2234
    max_nonce: int = (1 << (32 * 8)) - 1  # 32-byte nonce
×
2235
    start_nonce = 0
×
2236

2237
    max_workers = max(multiprocessing.cpu_count() - 1, 1)  # use all but one CPU
×
2238
    manager = multiprocessing.Manager()
×
2239
    shutdown = manager.Event()
×
2240
    with ProcessPoolExecutor(max_workers=max_workers) as executor:
×
2241
        tasks = []
×
2242
        loop = asyncio.get_running_loop()
×
2243
        for task in range(0, max_workers):
×
2244
            task = loop.run_in_executor(
×
2245
                executor,
2246
                nostr_pow_worker,
2247
                start_nonce,
2248
                nostr_pubk,
2249
                target_bits,
2250
                hash_function,
2251
                hash_len_bits,
2252
                shutdown
2253
            )
2254
            tasks.append(task)
×
2255
            start_nonce += max_nonce // max_workers  # split the nonce range between the processes
×
2256
            if start_nonce > max_nonce:  # make sure we don't go over the max_nonce
×
2257
                start_nonce = random.randint(0, int(max_nonce * 0.75))
×
2258

2259
        done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
×
2260
        hash_res, nonce_res = done.pop().result()
×
2261
        executor.shutdown(wait=False, cancel_futures=True)
×
2262

2263
    return nonce_res, get_nostr_ann_pow_amount(nostr_pubk, nonce_res)
×
2264

2265

2266
def get_nostr_ann_pow_amount(nostr_pubk: bytes, nonce: Optional[int]) -> int:
5✔
2267
    """Return the amount of leading zero bits for a nostr announcement PoW."""
2268
    if not nonce:
×
2269
        return 0
×
2270
    hash_function = hashlib.sha256
×
2271
    hash_len_bits = 256
×
2272
    hash_preimage = b'electrum-' + nostr_pubk
×
2273

2274
    digest = hash_function(hash_preimage + nonce.to_bytes(32, 'big')).digest()
×
2275
    digest = int.from_bytes(digest, 'big')
×
2276
    return hash_len_bits - digest.bit_length()
×
2277

2278

2279
class OnchainHistoryItem(NamedTuple):
5✔
2280
    txid: str
5✔
2281
    amount_sat: int
5✔
2282
    fee_sat: int
5✔
2283
    balance_sat: int
5✔
2284
    tx_mined_status: TxMinedInfo
5✔
2285
    group_id: Optional[str]
5✔
2286
    label: str
5✔
2287
    monotonic_timestamp: int
5✔
2288
    group_id: Optional[str]
5✔
2289
    def to_dict(self):
5✔
2290
        return {
×
2291
            'txid': self.txid,
2292
            'amount_sat': self.amount_sat,
2293
            'fee_sat': self.fee_sat,
2294
            'height': self.tx_mined_status.height,
2295
            'confirmations': self.tx_mined_status.conf,
2296
            'timestamp': self.tx_mined_status.timestamp,
2297
            'monotonic_timestamp': self.monotonic_timestamp,
2298
            'incoming': True if self.amount_sat>0 else False,
2299
            'bc_value': Satoshis(self.amount_sat),
2300
            'bc_balance': Satoshis(self.balance_sat),
2301
            'date': timestamp_to_datetime(self.tx_mined_status.timestamp),
2302
            'txpos_in_block': self.tx_mined_status.txpos,
2303
            'wanted_height': self.tx_mined_status.wanted_height,
2304
            'label': self.label,
2305
            'group_id': self.group_id,
2306
        }
2307

2308

2309
class LightningHistoryItem(NamedTuple):
5✔
2310
    payment_hash: Optional[str]
5✔
2311
    preimage: Optional[str]
5✔
2312
    amount_msat: int
5✔
2313
    fee_msat: Optional[int]
5✔
2314
    type: str
5✔
2315
    group_id: Optional[str]
5✔
2316
    timestamp: int
5✔
2317
    label: str
5✔
2318
    direction: Optional[int]
5✔
2319
    def to_dict(self):
5✔
2320
        return {
×
2321
            'type': self.type,
2322
            'label': self.label,
2323
            'timestamp': self.timestamp or 0,
2324
            'date': timestamp_to_datetime(self.timestamp),
2325
            'amount_msat': self.amount_msat,
2326
            'fee_msat': self.fee_msat,
2327
            'payment_hash': self.payment_hash,
2328
            'preimage': self.preimage,
2329
            'group_id': self.group_id,
2330
            'ln_value': Satoshis(Decimal(self.amount_msat) / 1000),
2331
            'direction': self.direction,
2332
        }
2333

2334

2335
@dataclass(kw_only=True, slots=True)
5✔
2336
class ChoiceItem:
5✔
2337
    key: Any
5✔
2338
    label: str  # user facing string
5✔
2339
    extra_data: Any = None
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc