• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 5735552722403328

16 May 2025 10:28AM UTC coverage: 59.722% (+0.002%) from 59.72%
5735552722403328

Pull #9833

CirrusCI

f321x
make lightning dns seed fetching async
Pull Request #9833: dns: use async dnspython interface

22 of 50 new or added lines in 7 files covered. (44.0%)

1107 existing lines in 11 files now uncovered.

21549 of 36082 relevant lines covered (59.72%)

2.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.06
/electrum/util.py
1
# Electrum - lightweight Bitcoin client
2
# Copyright (C) 2011 Thomas Voegtlin
3
#
4
# Permission is hereby granted, free of charge, to any person
5
# obtaining a copy of this software and associated documentation files
6
# (the "Software"), to deal in the Software without restriction,
7
# including without limitation the rights to use, copy, modify, merge,
8
# publish, distribute, sublicense, and/or sell copies of the Software,
9
# and to permit persons to whom the Software is furnished to do so,
10
# subject to the following conditions:
11
#
12
# The above copyright notice and this permission notice shall be
13
# included in all copies or substantial portions of the Software.
14
#
15
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
16
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
17
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
18
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
19
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
20
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
21
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22
# SOFTWARE.
23
import binascii
4✔
24
import concurrent.futures
4✔
25
from dataclasses import dataclass
4✔
26
import logging
4✔
27
import os, sys, re
4✔
28
from collections import defaultdict, OrderedDict
4✔
29
from concurrent.futures.process import ProcessPoolExecutor
4✔
30
from typing import (NamedTuple, Union, TYPE_CHECKING, Tuple, Optional, Callable, Any,
4✔
31
                    Sequence, Dict, Generic, TypeVar, List, Iterable, Set, Awaitable)
32
from datetime import datetime, timezone, timedelta
4✔
33
import decimal
4✔
34
from decimal import Decimal
4✔
35
from urllib.parse import urlparse
4✔
36
import threading
4✔
37
import hmac
4✔
38
import hashlib
4✔
39
import stat
4✔
40
import locale
4✔
41
import asyncio
4✔
42
import builtins
4✔
43
import json
4✔
44
import time
4✔
45
import ssl
4✔
46
import ipaddress
4✔
47
from ipaddress import IPv4Address, IPv6Address
4✔
48
import random
4✔
49
import secrets
4✔
50
import functools
4✔
51
from functools import partial
4✔
52
from abc import abstractmethod, ABC
4✔
53
import socket
4✔
54
import enum
4✔
55
from contextlib import nullcontext
4✔
56
import traceback
4✔
57

58
import attr
4✔
59
import aiohttp
4✔
60
from aiohttp_socks import ProxyConnector, ProxyType
4✔
61
import aiorpcx
4✔
62
import certifi
4✔
63
import dns.asyncresolver
4✔
64

65
from .i18n import _
4✔
66
from .logging import get_logger, Logger
4✔
67

68
if TYPE_CHECKING:
4✔
69
    from .network import Network, ProxySettings
×
70
    from .interface import Interface
×
71
    from .simple_config import SimpleConfig
×
72
    from .paymentrequest import PaymentRequest
×
73

74

75
_logger = get_logger(__name__)
4✔
76

77

78
def inv_dict(d):
4✔
79
    return {v: k for k, v in d.items()}
4✔
80

81

82
def all_subclasses(cls) -> Set:
4✔
83
    """Return all (transitive) subclasses of cls."""
84
    res = set(cls.__subclasses__())
4✔
85
    for sub in res.copy():
4✔
86
        res |= all_subclasses(sub)
4✔
87
    return res
4✔
88

89

90
ca_path = certifi.where()
4✔
91

92

93
base_units = {'BTC':8, 'mBTC':5, 'bits':2, 'sat':0}
4✔
94
base_units_inverse = inv_dict(base_units)
4✔
95
base_units_list = ['BTC', 'mBTC', 'bits', 'sat']  # list(dict) does not guarantee order
4✔
96

97
DECIMAL_POINT_DEFAULT = 5  # mBTC
4✔
98

99

100
class UnknownBaseUnit(Exception): pass
4✔
101

102

103
def decimal_point_to_base_unit_name(dp: int) -> str:
4✔
104
    # e.g. 8 -> "BTC"
105
    try:
4✔
106
        return base_units_inverse[dp]
4✔
107
    except KeyError:
×
108
        raise UnknownBaseUnit(dp) from None
×
109

110

111
def base_unit_name_to_decimal_point(unit_name: str) -> int:
4✔
112
    """Returns the max number of digits allowed after the decimal point."""
113
    # e.g. "BTC" -> 8
114
    try:
×
115
        return base_units[unit_name]
×
116
    except KeyError:
×
117
        raise UnknownBaseUnit(unit_name) from None
×
118

119
def parse_max_spend(amt: Any) -> Optional[int]:
4✔
120
    """Checks if given amount is "spend-max"-like.
121
    Returns None or the positive integer weight for "max". Never raises.
122

123
    When creating invoices and on-chain txs, the user can specify to send "max".
124
    This is done by setting the amount to '!'. Splitting max between multiple
125
    tx outputs is also possible, and custom weights (positive ints) can also be used.
126
    For example, to send 40% of all coins to address1, and 60% to address2:
127
    ```
128
    address1, 2!
129
    address2, 3!
130
    ```
131
    """
132
    if not (isinstance(amt, str) and amt and amt[-1] == '!'):
4✔
133
        return None
4✔
134
    if amt == '!':
4✔
135
        return 1
4✔
136
    x = amt[:-1]
4✔
137
    try:
4✔
138
        x = int(x)
4✔
139
    except ValueError:
×
140
        return None
×
141
    if x > 0:
4✔
142
        return x
4✔
143
    return None
×
144

145
class NotEnoughFunds(Exception):
4✔
146
    def __str__(self):
4✔
147
        return _("Insufficient funds")
4✔
148

149

150
class UneconomicFee(Exception):
4✔
151
    def __str__(self):
4✔
152
        return _("The fee for the transaction is higher than the funds gained from it.")
×
153

154

155
class NoDynamicFeeEstimates(Exception):
4✔
156
    def __str__(self):
4✔
157
        return _('Dynamic fee estimates not available')
×
158

159

160
class BelowDustLimit(Exception):
4✔
161
    pass
4✔
162

163

164
class InvalidPassword(Exception):
4✔
165
    def __init__(self, message: Optional[str] = None):
4✔
166
        self.message = message
4✔
167

168
    def __str__(self):
4✔
169
        if self.message is None:
×
170
            return _("Incorrect password")
×
171
        else:
172
            return str(self.message)
×
173

174

175
class AddTransactionException(Exception):
4✔
176
    pass
4✔
177

178

179
class UnrelatedTransactionException(AddTransactionException):
4✔
180
    def __str__(self):
4✔
181
        return _("Transaction is unrelated to this wallet.")
×
182

183

184
class FileImportFailed(Exception):
4✔
185
    def __init__(self, message=''):
4✔
186
        self.message = str(message)
×
187

188
    def __str__(self):
4✔
189
        return _("Failed to import from file.") + "\n" + self.message
×
190

191

192
class FileExportFailed(Exception):
4✔
193
    def __init__(self, message=''):
4✔
194
        self.message = str(message)
×
195

196
    def __str__(self):
4✔
197
        return _("Failed to export to file.") + "\n" + self.message
×
198

199

200
class WalletFileException(Exception):
4✔
201
    def __init__(self, message='', *, should_report_crash: bool = False):
4✔
202
        Exception.__init__(self, message)
4✔
203
        self.should_report_crash = should_report_crash
4✔
204

205

206
class BitcoinException(Exception): pass
4✔
207

208

209
class UserFacingException(Exception):
4✔
210
    """Exception that contains information intended to be shown to the user."""
211

212

213
class InvoiceError(UserFacingException): pass
4✔
214

215

216
class NetworkOfflineException(UserFacingException):
4✔
217
    """Can be raised if we are running in offline mode (--offline flag)
218
    and the user requests an operation that requires the network.
219
    """
220
    def __str__(self):
4✔
221
        return _("You are offline.")
×
222

223

224
# Throw this exception to unwind the stack like when an error occurs.
225
# However unlike other exceptions the user won't be informed.
226
class UserCancelled(Exception):
4✔
227
    '''An exception that is suppressed from the user'''
228
    pass
4✔
229

230

231
def to_decimal(x: Union[str, float, int, Decimal]) -> Decimal:
4✔
232
    # helper function mainly for float->Decimal conversion, i.e.:
233
    #   >>> Decimal(41754.681)
234
    #   Decimal('41754.680999999996856786310672760009765625')
235
    #   >>> Decimal("41754.681")
236
    #   Decimal('41754.681')
237
    if isinstance(x, Decimal):
4✔
238
        return x
×
239
    return Decimal(str(x))
4✔
240

241

242
# note: this is not a NamedTuple as then its json encoding cannot be customized
243
class Satoshis(object):
4✔
244
    __slots__ = ('value',)
4✔
245

246
    def __new__(cls, value):
4✔
247
        self = super(Satoshis, cls).__new__(cls)
×
248
        # note: 'value' sometimes has msat precision
249
        assert isinstance(value, (int, Decimal)), f"unexpected type for {value=!r}"
×
250
        self.value = value
×
251
        return self
×
252

253
    def __repr__(self):
4✔
254
        return f'Satoshis({self.value})'
×
255

256
    def __str__(self):
4✔
257
        # note: precision is truncated to satoshis here
258
        return format_satoshis(self.value)
×
259

260
    def __eq__(self, other):
4✔
261
        return self.value == other.value
×
262

263
    def __ne__(self, other):
4✔
264
        return not (self == other)
×
265

266
    def __add__(self, other):
4✔
267
        return Satoshis(self.value + other.value)
×
268

269

270
# note: this is not a NamedTuple as then its json encoding cannot be customized
271
class Fiat(object):
4✔
272
    __slots__ = ('value', 'ccy')
4✔
273

274
    def __new__(cls, value: Optional[Decimal], ccy: str):
4✔
275
        self = super(Fiat, cls).__new__(cls)
×
276
        self.ccy = ccy
×
277
        if not isinstance(value, (Decimal, type(None))):
×
278
            raise TypeError(f"value should be Decimal or None, not {type(value)}")
×
279
        self.value = value
×
280
        return self
×
281

282
    def __repr__(self):
4✔
283
        return 'Fiat(%s)'% self.__str__()
×
284

285
    def __str__(self):
4✔
286
        if self.value is None or self.value.is_nan():
×
287
            return _('No Data')
×
288
        else:
289
            return "{:.2f}".format(self.value)
×
290

291
    def to_ui_string(self):
4✔
292
        if self.value is None or self.value.is_nan():
×
293
            return _('No Data')
×
294
        else:
295
            return "{:.2f}".format(self.value) + ' ' + self.ccy
×
296

297
    def __eq__(self, other):
4✔
298
        if not isinstance(other, Fiat):
×
299
            return False
×
300
        if self.ccy != other.ccy:
×
301
            return False
×
302
        if isinstance(self.value, Decimal) and isinstance(other.value, Decimal) \
×
303
                and self.value.is_nan() and other.value.is_nan():
304
            return True
×
305
        return self.value == other.value
×
306

307
    def __ne__(self, other):
4✔
308
        return not (self == other)
×
309

310
    def __add__(self, other):
4✔
311
        assert self.ccy == other.ccy
×
312
        return Fiat(self.value + other.value, self.ccy)
×
313

314

315
class MyEncoder(json.JSONEncoder):
4✔
316
    def default(self, obj):
4✔
317
        # note: this does not get called for namedtuples :(  https://bugs.python.org/issue30343
318
        from .transaction import Transaction, TxOutput
4✔
319
        if isinstance(obj, Transaction):
4✔
320
            return obj.serialize()
4✔
321
        if isinstance(obj, TxOutput):
4✔
322
            return obj.to_legacy_tuple()
4✔
323
        if isinstance(obj, Satoshis):
4✔
324
            return str(obj)
×
325
        if isinstance(obj, Fiat):
4✔
326
            return str(obj)
×
327
        if isinstance(obj, Decimal):
4✔
328
            return str(obj)
×
329
        if isinstance(obj, datetime):
4✔
330
            return obj.isoformat(' ')[:-3]
×
331
        if isinstance(obj, set):
4✔
332
            return list(obj)
×
333
        if isinstance(obj, bytes): # for nametuples in lnchannel
4✔
334
            return obj.hex()
4✔
335
        if hasattr(obj, 'to_json') and callable(obj.to_json):
4✔
336
            return obj.to_json()
4✔
337
        return super(MyEncoder, self).default(obj)
×
338

339

340
class ThreadJob(Logger):
4✔
341
    """A job that is run periodically from a thread's main loop.  run() is
342
    called from that thread's context.
343
    """
344

345
    def __init__(self):
4✔
346
        Logger.__init__(self)
4✔
347

348
    def run(self):
4✔
349
        """Called periodically from the thread"""
350
        pass
×
351

352
class DebugMem(ThreadJob):
4✔
353
    '''A handy class for debugging GC memory leaks'''
354
    def __init__(self, classes, interval=30):
4✔
355
        ThreadJob.__init__(self)
×
356
        self.next_time = 0
×
357
        self.classes = classes
×
358
        self.interval = interval
×
359

360
    def mem_stats(self):
4✔
361
        import gc
×
362
        self.logger.info("Start memscan")
×
363
        gc.collect()
×
364
        objmap = defaultdict(list)
×
365
        for obj in gc.get_objects():
×
366
            for class_ in self.classes:
×
367
                if isinstance(obj, class_):
×
368
                    objmap[class_].append(obj)
×
369
        for class_, objs in objmap.items():
×
370
            self.logger.info(f"{class_.__name__}: {len(objs)}")
×
371
        self.logger.info("Finish memscan")
×
372

373
    def run(self):
4✔
374
        if time.time() > self.next_time:
×
375
            self.mem_stats()
×
376
            self.next_time = time.time() + self.interval
×
377

378
class DaemonThread(threading.Thread, Logger):
4✔
379
    """ daemon thread that terminates cleanly """
380

381
    LOGGING_SHORTCUT = 'd'
4✔
382

383
    def __init__(self):
4✔
384
        threading.Thread.__init__(self)
4✔
385
        Logger.__init__(self)
4✔
386
        self.parent_thread = threading.current_thread()
4✔
387
        self.running = False
4✔
388
        self.running_lock = threading.Lock()
4✔
389
        self.job_lock = threading.Lock()
4✔
390
        self.jobs = []
4✔
391
        self.stopped_event = threading.Event()        # set when fully stopped
4✔
392
        self.stopped_event_async = asyncio.Event()    # set when fully stopped
4✔
393
        self.wake_up_event = threading.Event()  # for perf optimisation of polling in run()
4✔
394

395
    def add_jobs(self, jobs):
4✔
396
        with self.job_lock:
4✔
397
            self.jobs.extend(jobs)
4✔
398

399
    def run_jobs(self):
4✔
400
        # Don't let a throwing job disrupt the thread, future runs of
401
        # itself, or other jobs.  This is useful protection against
402
        # malformed or malicious server responses
403
        with self.job_lock:
4✔
404
            for job in self.jobs:
4✔
405
                try:
4✔
406
                    job.run()
4✔
407
                except Exception as e:
×
408
                    self.logger.exception('')
×
409

410
    def remove_jobs(self, jobs):
4✔
411
        with self.job_lock:
×
412
            for job in jobs:
×
413
                self.jobs.remove(job)
×
414

415
    def start(self):
4✔
416
        with self.running_lock:
4✔
417
            self.running = True
4✔
418
        return threading.Thread.start(self)
4✔
419

420
    def is_running(self):
4✔
421
        with self.running_lock:
4✔
422
            return self.running and self.parent_thread.is_alive()
4✔
423

424
    def stop(self):
4✔
425
        with self.running_lock:
4✔
426
            self.running = False
4✔
427
            self.wake_up_event.set()
4✔
428
            self.wake_up_event.clear()
4✔
429

430
    def on_stop(self):
4✔
431
        if 'ANDROID_DATA' in os.environ:
4✔
432
            import jnius
×
433
            jnius.detach()
×
434
            self.logger.info("jnius detach")
×
435
        self.logger.info("stopped")
4✔
436
        self.stopped_event.set()
4✔
437
        loop = get_asyncio_loop()
4✔
438
        loop.call_soon_threadsafe(self.stopped_event_async.set)
4✔
439

440

441
def print_stderr(*args):
4✔
442
    args = [str(item) for item in args]
×
443
    sys.stderr.write(" ".join(args) + "\n")
×
444
    sys.stderr.flush()
×
445

446
def print_msg(*args):
4✔
447
    # Stringify args
448
    args = [str(item) for item in args]
×
449
    sys.stdout.write(" ".join(args) + "\n")
×
450
    sys.stdout.flush()
×
451

452
def json_encode(obj):
4✔
453
    try:
×
454
        s = json.dumps(obj, sort_keys = True, indent = 4, cls=MyEncoder)
×
455
    except TypeError:
×
456
        s = repr(obj)
×
457
    return s
×
458

459
def json_decode(x):
4✔
460
    try:
4✔
461
        return json.loads(x, parse_float=Decimal)
4✔
462
    except Exception:
4✔
463
        return x
4✔
464

465
def json_normalize(x):
4✔
466
    # note: The return value of commands, when going through the JSON-RPC interface,
467
    #       is json-encoded. The encoder used there cannot handle some types, e.g. electrum.util.Satoshis.
468
    # note: We should not simply do "json_encode(x)" here, as then later x would get doubly json-encoded.
469
    # see #5868
470
    return json_decode(json_encode(x))
×
471

472

473
# taken from Django Source Code
474
def constant_time_compare(val1, val2):
4✔
475
    """Return True if the two strings are equal, False otherwise."""
476
    return hmac.compare_digest(to_bytes(val1, 'utf8'), to_bytes(val2, 'utf8'))
×
477

478

479
_profiler_logger = _logger.getChild('profiler')
4✔
480
def profiler(func=None, *, min_threshold: Union[int, float, None] = None):
4✔
481
    """Function decorator that logs execution time.
482

483
    min_threshold: if set, only log if time taken is higher than threshold
484
    NOTE: does not work with async methods.
485
    """
486
    if func is None:  # to make "@profiler(...)" work. (in addition to bare "@profiler")
4✔
487
        return partial(profiler, min_threshold=min_threshold)
4✔
488
    def do_profile(*args, **kw_args):
4✔
489
        name = func.__qualname__
4✔
490
        t0 = time.time()
4✔
491
        o = func(*args, **kw_args)
4✔
492
        t = time.time() - t0
4✔
493
        if min_threshold is None or t > min_threshold:
4✔
494
            _profiler_logger.debug(f"{name} {t:,.4f} sec")
4✔
495
        return o
4✔
496
    return do_profile
4✔
497

498

499
class AsyncHangDetector:
4✔
500
    """Context manager that logs every `n` seconds if encapsulated context still has not exited."""
501

502
    def __init__(
4✔
503
        self,
504
        *,
505
        period_sec: int = 15,
506
        message: str,
507
        logger: logging.Logger = None,
508
    ):
509
        self.period_sec = period_sec
4✔
510
        self.message = message
4✔
511
        self.logger = logger or _logger
4✔
512

513
    async def _monitor(self):
4✔
514
        # note: this assumes that the event loop itself is not blocked
515
        t0 = time.monotonic()
4✔
516
        while True:
4✔
517
            await asyncio.sleep(self.period_sec)
4✔
518
            t1 = time.monotonic()
×
519
            self.logger.info(f"{self.message} (after {t1 - t0:.2f} sec)")
×
520

521
    async def __aenter__(self):
4✔
522
        self.mtask = asyncio.create_task(self._monitor())
4✔
523

524
    async def __aexit__(self, exc_type, exc, tb):
4✔
525
        self.mtask.cancel()
4✔
526

527

528
def android_ext_dir():
4✔
529
    from android.storage import primary_external_storage_path
×
530
    return primary_external_storage_path()
×
531

532
def android_backup_dir():
4✔
533
    pkgname = get_android_package_name()
×
534
    d = os.path.join(android_ext_dir(), pkgname)
×
535
    if not os.path.exists(d):
×
536
        os.mkdir(d)
×
537
    return d
×
538

539
def android_data_dir():
4✔
540
    import jnius
×
541
    PythonActivity = jnius.autoclass('org.kivy.android.PythonActivity')
×
542
    return PythonActivity.mActivity.getFilesDir().getPath() + '/data'
×
543

544
def ensure_sparse_file(filename):
4✔
545
    # On modern Linux, no need to do anything.
546
    # On Windows, need to explicitly mark file.
547
    if os.name == "nt":
×
548
        try:
×
549
            os.system('fsutil sparse setflag "{}" 1'.format(filename))
×
550
        except Exception as e:
×
551
            _logger.info(f'error marking file {filename} as sparse: {e}')
×
552

553

554
def get_headers_dir(config):
4✔
555
    return config.path
4✔
556

557

558
def assert_datadir_available(config_path):
4✔
559
    path = config_path
4✔
560
    if os.path.exists(path):
4✔
561
        return
4✔
562
    else:
563
        raise FileNotFoundError(
×
564
            'Electrum datadir does not exist. Was it deleted while running?' + '\n' +
565
            'Should be at {}'.format(path))
566

567

568
def assert_file_in_datadir_available(path, config_path):
4✔
569
    if os.path.exists(path):
×
570
        return
×
571
    else:
572
        assert_datadir_available(config_path)
×
573
        raise FileNotFoundError(
×
574
            'Cannot find file but datadir is there.' + '\n' +
575
            'Should be at {}'.format(path))
576

577

578
def standardize_path(path):
4✔
579
    # note: os.path.realpath() is not used, as on Windows it can return non-working paths (see #8495).
580
    #       This means that we don't resolve symlinks!
581
    return os.path.normcase(
4✔
582
                os.path.abspath(
583
                    os.path.expanduser(
584
                        path
585
    )))
586

587

588
def get_new_wallet_name(wallet_folder: str) -> str:
4✔
589
    """Returns a file basename for a new wallet to be used.
590
    Can raise OSError.
591
    """
592
    i = 1
4✔
593
    while True:
4✔
594
        filename = "wallet_%d" % i
4✔
595
        if filename in os.listdir(wallet_folder):
4✔
596
            i += 1
4✔
597
        else:
598
            break
4✔
599
    return filename
4✔
600

601

602
def is_android_debug_apk() -> bool:
4✔
603
    is_android = 'ANDROID_DATA' in os.environ
×
604
    if not is_android:
×
605
        return False
×
606
    from jnius import autoclass
×
607
    pkgname = get_android_package_name()
×
608
    build_config = autoclass(f"{pkgname}.BuildConfig")
×
609
    return bool(build_config.DEBUG)
×
610

611

612
def get_android_package_name() -> str:
4✔
613
    is_android = 'ANDROID_DATA' in os.environ
×
614
    assert is_android
×
615
    from jnius import autoclass
×
616
    from android.config import ACTIVITY_CLASS_NAME
×
617
    activity = autoclass(ACTIVITY_CLASS_NAME).mActivity
×
618
    pkgname = str(activity.getPackageName())
×
619
    return pkgname
×
620

621

622
def assert_bytes(*args):
4✔
623
    """
624
    porting helper, assert args type
625
    """
626
    try:
4✔
627
        for x in args:
4✔
628
            assert isinstance(x, (bytes, bytearray))
4✔
629
    except Exception:
×
630
        print('assert bytes failed', list(map(type, args)))
×
631
        raise
×
632

633

634
def assert_str(*args):
4✔
635
    """
636
    porting helper, assert args type
637
    """
638
    for x in args:
×
639
        assert isinstance(x, str)
×
640

641

642
def to_string(x, enc) -> str:
4✔
643
    if isinstance(x, (bytes, bytearray)):
4✔
644
        return x.decode(enc)
4✔
645
    if isinstance(x, str):
×
646
        return x
×
647
    else:
648
        raise TypeError("Not a string or bytes like object")
×
649

650

651
def to_bytes(something, encoding='utf8') -> bytes:
4✔
652
    """
653
    cast string to bytes() like object, but for python2 support it's bytearray copy
654
    """
655
    if isinstance(something, bytes):
4✔
656
        return something
4✔
657
    if isinstance(something, str):
4✔
658
        return something.encode(encoding)
4✔
659
    elif isinstance(something, bytearray):
4✔
660
        return bytes(something)
4✔
661
    else:
662
        raise TypeError("Not a string or bytes like object")
4✔
663

664

665
bfh = bytes.fromhex
4✔
666

667

668
def xor_bytes(a: bytes, b: bytes) -> bytes:
4✔
669
    size = min(len(a), len(b))
4✔
670
    return ((int.from_bytes(a[:size], "big") ^ int.from_bytes(b[:size], "big"))
4✔
671
            .to_bytes(size, "big"))
672

673

674
def user_dir():
4✔
675
    if "ELECTRUMDIR" in os.environ:
4✔
676
        return os.environ["ELECTRUMDIR"]
×
677
    elif 'ANDROID_DATA' in os.environ:
4✔
678
        return android_data_dir()
×
679
    elif os.name == 'posix':
4✔
680
        return os.path.join(os.environ["HOME"], ".electrum")
4✔
681
    elif "APPDATA" in os.environ:
×
682
        return os.path.join(os.environ["APPDATA"], "Electrum")
×
683
    elif "LOCALAPPDATA" in os.environ:
×
684
        return os.path.join(os.environ["LOCALAPPDATA"], "Electrum")
×
685
    else:
686
        #raise Exception("No home directory found in environment variables.")
687
        return
×
688

689

690
def resource_path(*parts):
4✔
691
    return os.path.join(pkg_dir, *parts)
4✔
692

693

694
# absolute path to python package folder of electrum ("lib")
695
pkg_dir = os.path.split(os.path.realpath(__file__))[0]
4✔
696

697

698
def is_valid_email(s):
4✔
699
    regexp = r"[^@]+@[^@]+\.[^@]+"
×
700
    return re.match(regexp, s) is not None
×
701

702
def is_valid_websocket_url(url: str) -> bool:
4✔
703
    """
704
    uses this django url validation regex:
705
    https://github.com/django/django/blob/2c6906a0c4673a7685817156576724aba13ad893/django/core/validators.py#L45C1-L52C43
706
    Note: this is not perfect, urls and their parsing can get very complex (see recent django code).
707
    however its sufficient for catching weird user input in the gui dialog
708
    """
709
    # stores the compiled regex in the function object itself to avoid recompiling it every call
710
    if not hasattr(is_valid_websocket_url, "regex"):
×
711
        is_valid_websocket_url.regex = re.compile(
×
712
            r'^(?:ws|wss)://'  # ws:// or wss://
713
            r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|'  # domain...
714
            r'localhost|'  # localhost...
715
            r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}|'  # ...or ipv4
716
            r'\[?[A-F0-9]*:[A-F0-9:]+\]?)'  # ...or ipv6
717
            r'(?::\d+)?'  # optional port
718
            r'(?:/?|[/?]\S+)$', re.IGNORECASE)
719
    try:
×
720
        return re.match(is_valid_websocket_url.regex, url) is not None
×
721
    except Exception:
×
722
        return False
×
723

724
def is_hash256_str(text: Any) -> bool:
4✔
725
    if not isinstance(text, str): return False
4✔
726
    if len(text) != 64: return False
4✔
727
    return is_hex_str(text)
4✔
728

729

730
def is_hex_str(text: Any) -> bool:
4✔
731
    if not isinstance(text, str): return False
4✔
732
    try:
4✔
733
        b = bytes.fromhex(text)
4✔
734
    except Exception:
4✔
735
        return False
4✔
736
    # forbid whitespaces in text:
737
    if len(text) != 2 * len(b):
4✔
738
        return False
4✔
739
    return True
4✔
740

741

742
def is_integer(val: Any) -> bool:
4✔
743
    return isinstance(val, int)
4✔
744

745

746
def is_non_negative_integer(val: Any) -> bool:
4✔
747
    if is_integer(val):
4✔
748
        return val >= 0
4✔
749
    return False
4✔
750

751

752
def is_int_or_float(val: Any) -> bool:
4✔
753
    return isinstance(val, (int, float))
4✔
754

755

756
def is_non_negative_int_or_float(val: Any) -> bool:
4✔
757
    if is_int_or_float(val):
4✔
758
        return val >= 0
4✔
759
    return False
4✔
760

761

762
def chunks(items, size: int):
4✔
763
    """Break up items, an iterable, into chunks of length size."""
764
    if size < 1:
4✔
765
        raise ValueError(f"size must be positive, not {repr(size)}")
4✔
766
    for i in range(0, len(items), size):
4✔
767
        yield items[i: i + size]
4✔
768

769

770
def format_satoshis_plain(
4✔
771
        x: Union[int, float, Decimal, str],  # amount in satoshis,
772
        *,
773
        decimal_point: int = 8,  # how much to shift decimal point to left (default: sat->BTC)
774
) -> str:
775
    """Display a satoshi amount scaled.  Always uses a '.' as a decimal
776
    point and has no thousands separator"""
777
    if parse_max_spend(x):
4✔
778
        return f'max({x})'
×
779
    assert isinstance(x, (int, float, Decimal)), f"{x!r} should be a number"
4✔
780
    scale_factor = pow(10, decimal_point)
4✔
781
    return "{:.8f}".format(Decimal(x) / scale_factor).rstrip('0').rstrip('.')
4✔
782

783

784
# Check that Decimal precision is sufficient.
785
# We need at the very least ~20, as we deal with msat amounts, and
786
# log10(21_000_000 * 10**8 * 1000) ~= 18.3
787
# decimal.DefaultContext.prec == 28 by default, but it is mutable.
788
# We enforce that we have at least that available.
789
assert decimal.getcontext().prec >= 28, f"PyDecimal precision too low: {decimal.getcontext().prec}"
4✔
790

791
# DECIMAL_POINT = locale.localeconv()['decimal_point']  # type: str
792
DECIMAL_POINT = "."
4✔
793
THOUSANDS_SEP = " "
4✔
794
assert len(DECIMAL_POINT) == 1, f"DECIMAL_POINT has unexpected len. {DECIMAL_POINT!r}"
4✔
795
assert len(THOUSANDS_SEP) == 1, f"THOUSANDS_SEP has unexpected len. {THOUSANDS_SEP!r}"
4✔
796

797

798
def format_satoshis(
4✔
799
        x: Union[int, float, Decimal, str, None],  # amount in satoshis
800
        *,
801
        num_zeros: int = 0,
802
        decimal_point: int = 8,  # how much to shift decimal point to left (default: sat->BTC)
803
        precision: int = 0,  # extra digits after satoshi precision
804
        is_diff: bool = False,  # if True, enforce a leading sign (+/-)
805
        whitespaces: bool = False,  # if True, add whitespaces, to align numbers in a column
806
        add_thousands_sep: bool = False,  # if True, add whitespaces, for better readability of the numbers
807
) -> str:
808
    if x is None:
4✔
809
        return 'unknown'
×
810
    if parse_max_spend(x):
4✔
811
        return f'max({x})'
×
812
    assert isinstance(x, (int, float, Decimal)), f"{x!r} should be a number"
4✔
813
    # lose redundant precision
814
    x = Decimal(x).quantize(Decimal(10) ** (-precision))
4✔
815
    # format string
816
    overall_precision = decimal_point + precision  # max digits after final decimal point
4✔
817
    decimal_format = "." + str(overall_precision) if overall_precision > 0 else ""
4✔
818
    if is_diff:
4✔
819
        decimal_format = '+' + decimal_format
4✔
820
    # initial result
821
    scale_factor = pow(10, decimal_point)
4✔
822
    result = ("{:" + decimal_format + "f}").format(x / scale_factor)
4✔
823
    if "." not in result: result += "."
4✔
824
    result = result.rstrip('0')
4✔
825
    # add extra decimal places (zeros)
826
    integer_part, fract_part = result.split(".")
4✔
827
    if len(fract_part) < num_zeros:
4✔
828
        fract_part += "0" * (num_zeros - len(fract_part))
4✔
829
    # add whitespaces as thousands' separator for better readability of numbers
830
    if add_thousands_sep:
4✔
831
        sign = integer_part[0] if integer_part[0] in ("+", "-") else ""
4✔
832
        if sign == "-":
4✔
833
            integer_part = integer_part[1:]
4✔
834
        integer_part = "{:,}".format(int(integer_part)).replace(',', THOUSANDS_SEP)
4✔
835
        integer_part = sign + integer_part
4✔
836
        fract_part = THOUSANDS_SEP.join(fract_part[i:i+3] for i in range(0, len(fract_part), 3))
4✔
837
    result = integer_part + DECIMAL_POINT + fract_part
4✔
838
    # add leading/trailing whitespaces so that numbers can be aligned in a column
839
    if whitespaces:
4✔
840
        target_fract_len = overall_precision
4✔
841
        target_integer_len = 14 - decimal_point  # should be enough for up to unsigned 999999 BTC
4✔
842
        if add_thousands_sep:
4✔
843
            target_fract_len += max(0, (target_fract_len - 1) // 3)
4✔
844
            target_integer_len += max(0, (target_integer_len - 1) // 3)
4✔
845
        # add trailing whitespaces
846
        result += " " * (target_fract_len - len(fract_part))
4✔
847
        # add leading whitespaces
848
        target_total_len = target_integer_len + 1 + target_fract_len
4✔
849
        result = " " * (target_total_len - len(result)) + result
4✔
850
    return result
4✔
851

852

853
FEERATE_PRECISION = 1  # num fractional decimal places for sat/byte fee rates
4✔
854
_feerate_quanta = Decimal(10) ** (-FEERATE_PRECISION)
4✔
855
UI_UNIT_NAME_FEERATE_SAT_PER_VBYTE = "sat/vbyte"
4✔
856
UI_UNIT_NAME_FEERATE_SAT_PER_VB = "sat/vB"
4✔
857
UI_UNIT_NAME_TXSIZE_VBYTES = "vbytes"
4✔
858
UI_UNIT_NAME_MEMPOOL_MB = "vMB"
4✔
859

860

861
def format_fee_satoshis(fee, *, num_zeros=0, precision=None):
4✔
862
    if precision is None:
4✔
863
        precision = FEERATE_PRECISION
4✔
864
    num_zeros = min(num_zeros, FEERATE_PRECISION)  # no more zeroes than available prec
4✔
865
    return format_satoshis(fee, num_zeros=num_zeros, decimal_point=0, precision=precision)
4✔
866

867

868
def quantize_feerate(fee) -> Union[None, Decimal, int]:
4✔
869
    """Strip sat/byte fee rate of excess precision."""
870
    if fee is None:
4✔
871
        return None
×
872
    return Decimal(fee).quantize(_feerate_quanta, rounding=decimal.ROUND_HALF_DOWN)
4✔
873

874

875
def timestamp_to_datetime(timestamp: Union[int, float, None], *, utc: bool = False) -> Optional[datetime]:
4✔
876
    if timestamp is None:
4✔
877
        return None
×
878
    tz = None
4✔
879
    if utc:
4✔
880
        tz = timezone.utc
×
881
    return datetime.fromtimestamp(timestamp, tz=tz)
4✔
882

883

884
def format_time(timestamp: Union[int, float, None]) -> str:
4✔
885
    date = timestamp_to_datetime(timestamp)
×
886
    return date.isoformat(' ', timespec="minutes") if date else _("Unknown")
×
887

888

889
def age(
4✔
890
    from_date: Union[int, float, None],  # POSIX timestamp
891
    *,
892
    since_date: datetime = None,
893
    target_tz=None,
894
    include_seconds: bool = False,
895
) -> str:
896
    """Takes a timestamp and returns a string with the approximation of the age"""
897
    if from_date is None:
4✔
898
        return _("Unknown")
4✔
899
    from_date = datetime.fromtimestamp(from_date)
4✔
900
    if since_date is None:
4✔
901
        since_date = datetime.now(target_tz)
×
902
    distance_in_time = from_date - since_date
4✔
903
    is_in_past = from_date < since_date
4✔
904
    s = delta_time_str(distance_in_time, include_seconds=include_seconds)
4✔
905
    return _("{} ago").format(s) if is_in_past else _("in {}").format(s)
4✔
906

907

908
def delta_time_str(distance_in_time: timedelta, *, include_seconds: bool = False) -> str:
4✔
909
    distance_in_seconds = int(round(abs(distance_in_time.days * 86400 + distance_in_time.seconds)))
4✔
910
    distance_in_minutes = int(round(distance_in_seconds / 60))
4✔
911
    if distance_in_minutes == 0:
4✔
912
        if include_seconds:
4✔
913
            return _("{} seconds").format(distance_in_seconds)
4✔
914
        else:
915
            return _("less than a minute")
4✔
916
    elif distance_in_minutes < 45:
4✔
917
        return _("about {} minutes").format(distance_in_minutes)
4✔
918
    elif distance_in_minutes < 90:
4✔
919
        return _("about 1 hour")
4✔
920
    elif distance_in_minutes < 1440:
4✔
921
        return _("about {} hours").format(round(distance_in_minutes / 60.0))
4✔
922
    elif distance_in_minutes < 2880:
4✔
923
        return _("about 1 day")
4✔
924
    elif distance_in_minutes < 43220:
4✔
925
        return _("about {} days").format(round(distance_in_minutes / 1440))
4✔
926
    elif distance_in_minutes < 86400:
4✔
927
        return _("about 1 month")
4✔
928
    elif distance_in_minutes < 525600:
4✔
929
        return _("about {} months").format(round(distance_in_minutes / 43200))
4✔
930
    elif distance_in_minutes < 1051200:
4✔
931
        return _("about 1 year")
4✔
932
    else:
933
        return _("over {} years").format(round(distance_in_minutes / 525600))
4✔
934

935
mainnet_block_explorers = {
4✔
936
    '3xpl.com': ('https://3xpl.com/bitcoin/',
937
                        {'tx': 'transaction/', 'addr': 'address/'}),
938
    'Bitflyer.jp': ('https://chainflyer.bitflyer.jp/',
939
                        {'tx': 'Transaction/', 'addr': 'Address/'}),
940
    'Blockchain.info': ('https://blockchain.com/btc/',
941
                        {'tx': 'tx/', 'addr': 'address/'}),
942
    'Blockstream.info': ('https://blockstream.info/',
943
                        {'tx': 'tx/', 'addr': 'address/'}),
944
    'Bitaps.com': ('https://btc.bitaps.com/',
945
                        {'tx': '', 'addr': ''}),
946
    'BTC.com': ('https://btc.com/',
947
                        {'tx': '', 'addr': ''}),
948
    'Chain.so': ('https://www.chain.so/',
949
                        {'tx': 'tx/BTC/', 'addr': 'address/BTC/'}),
950
    'Insight.is': ('https://insight.bitpay.com/',
951
                        {'tx': 'tx/', 'addr': 'address/'}),
952
    'BlockCypher.com': ('https://live.blockcypher.com/btc/',
953
                        {'tx': 'tx/', 'addr': 'address/'}),
954
    'Blockchair.com': ('https://blockchair.com/bitcoin/',
955
                        {'tx': 'transaction/', 'addr': 'address/'}),
956
    'blockonomics.co': ('https://www.blockonomics.co/',
957
                        {'tx': 'api/tx?txid=', 'addr': '#/search?q='}),
958
    'mempool.space': ('https://mempool.space/',
959
                        {'tx': 'tx/', 'addr': 'address/'}),
960
    'mempool.emzy.de': ('https://mempool.emzy.de/',
961
                        {'tx': 'tx/', 'addr': 'address/'}),
962
    'OXT.me': ('https://oxt.me/',
963
                        {'tx': 'transaction/', 'addr': 'address/'}),
964
    'mynode.local': ('http://mynode.local:3002/',
965
                        {'tx': 'tx/', 'addr': 'address/'}),
966
    'system default': ('blockchain:/',
967
                        {'tx': 'tx/', 'addr': 'address/'}),
968
}
969

970
testnet_block_explorers = {
4✔
971
    'Bitaps.com': ('https://tbtc.bitaps.com/',
972
                       {'tx': '', 'addr': ''}),
973
    'BlockCypher.com': ('https://live.blockcypher.com/btc-testnet/',
974
                       {'tx': 'tx/', 'addr': 'address/'}),
975
    'Blockchain.info': ('https://www.blockchain.com/btc-testnet/',
976
                       {'tx': 'tx/', 'addr': 'address/'}),
977
    'Blockstream.info': ('https://blockstream.info/testnet/',
978
                        {'tx': 'tx/', 'addr': 'address/'}),
979
    'mempool.space': ('https://mempool.space/testnet/',
980
                        {'tx': 'tx/', 'addr': 'address/'}),
981
    'smartbit.com.au': ('https://testnet.smartbit.com.au/',
982
                       {'tx': 'tx/', 'addr': 'address/'}),
983
    'system default': ('blockchain://000000000933ea01ad0ee984209779baaec3ced90fa3f408719526f8d77f4943/',
984
                       {'tx': 'tx/', 'addr': 'address/'}),
985
}
986

987
testnet4_block_explorers = {
4✔
988
    'mempool.space': ('https://mempool.space/testnet4/',
989
                        {'tx': 'tx/', 'addr': 'address/'}),
990
    'wakiyamap.dev': ('https://testnet4-explorer.wakiyamap.dev/',
991
                       {'tx': 'tx/', 'addr': 'address/'}),
992
}
993

994
signet_block_explorers = {
4✔
995
    'bc-2.jp': ('https://explorer.bc-2.jp/',
996
                        {'tx': 'tx/', 'addr': 'address/'}),
997
    'mempool.space': ('https://mempool.space/signet/',
998
                        {'tx': 'tx/', 'addr': 'address/'}),
999
    'bitcoinexplorer.org': ('https://signet.bitcoinexplorer.org/',
1000
                       {'tx': 'tx/', 'addr': 'address/'}),
1001
    'wakiyamap.dev': ('https://signet-explorer.wakiyamap.dev/',
1002
                       {'tx': 'tx/', 'addr': 'address/'}),
1003
    'ex.signet.bublina.eu.org': ('https://ex.signet.bublina.eu.org/',
1004
                       {'tx': 'tx/', 'addr': 'address/'}),
1005
    'system default': ('blockchain:/',
1006
                       {'tx': 'tx/', 'addr': 'address/'}),
1007
}
1008

1009
_block_explorer_default_api_loc = {'tx': 'tx/', 'addr': 'address/'}
4✔
1010

1011

1012
def block_explorer_info():
4✔
1013
    from . import constants
×
1014
    if constants.net.NET_NAME == "testnet":
×
1015
        return testnet_block_explorers
×
1016
    elif constants.net.NET_NAME == "testnet4":
×
1017
        return testnet4_block_explorers
×
1018
    elif constants.net.NET_NAME == "signet":
×
1019
        return signet_block_explorers
×
1020
    return mainnet_block_explorers
×
1021

1022

1023
def block_explorer(config: 'SimpleConfig') -> Optional[str]:
4✔
1024
    """Returns name of selected block explorer,
1025
    or None if a custom one (not among hardcoded ones) is configured.
1026
    """
1027
    if config.BLOCK_EXPLORER_CUSTOM is not None:
×
1028
        return None
×
1029
    be_key = config.BLOCK_EXPLORER
×
1030
    be_tuple = block_explorer_info().get(be_key)
×
1031
    if be_tuple is None:
×
1032
        be_key = config.cv.BLOCK_EXPLORER.get_default_value()
×
1033
    assert isinstance(be_key, str), f"{be_key!r} should be str"
×
1034
    return be_key
×
1035

1036

1037
def block_explorer_tuple(config: 'SimpleConfig') -> Optional[Tuple[str, dict]]:
4✔
1038
    custom_be = config.BLOCK_EXPLORER_CUSTOM
×
1039
    if custom_be:
×
1040
        if isinstance(custom_be, str):
×
1041
            return custom_be, _block_explorer_default_api_loc
×
1042
        if isinstance(custom_be, (tuple, list)) and len(custom_be) == 2:
×
1043
            return tuple(custom_be)
×
1044
        _logger.warning(f"not using {config.cv.BLOCK_EXPLORER_CUSTOM.key()!r} from config. "
×
1045
                        f"expected a str or a pair but got {custom_be!r}")
1046
        return None
×
1047
    else:
1048
        # using one of the hardcoded block explorers
1049
        return block_explorer_info().get(block_explorer(config))
×
1050

1051

1052
def block_explorer_URL(config: 'SimpleConfig', kind: str, item: str) -> Optional[str]:
4✔
1053
    be_tuple = block_explorer_tuple(config)
×
1054
    if not be_tuple:
×
1055
        return
×
1056
    explorer_url, explorer_dict = be_tuple
×
1057
    kind_str = explorer_dict.get(kind)
×
1058
    if kind_str is None:
×
1059
        return
×
1060
    if explorer_url[-1] != "/":
×
1061
        explorer_url += "/"
×
1062
    url_parts = [explorer_url, kind_str, item]
×
1063
    return ''.join(url_parts)
×
1064

1065

1066

1067

1068

1069
# Python bug (http://bugs.python.org/issue1927) causes raw_input
1070
# to be redirected improperly between stdin/stderr on Unix systems
1071
#TODO: py3
1072
def raw_input(prompt=None):
4✔
1073
    if prompt:
×
1074
        sys.stdout.write(prompt)
×
1075
    return builtin_raw_input()
×
1076

1077
builtin_raw_input = builtins.input
4✔
1078
builtins.input = raw_input
4✔
1079

1080

1081
def parse_json(message):
4✔
1082
    # TODO: check \r\n pattern
1083
    n = message.find(b'\n')
×
1084
    if n==-1:
×
1085
        return None, message
×
1086
    try:
×
1087
        j = json.loads(message[0:n].decode('utf8'))
×
1088
    except Exception:
×
1089
        j = None
×
1090
    return j, message[n+1:]
×
1091

1092

1093
def setup_thread_excepthook():
4✔
1094
    """
1095
    Workaround for `sys.excepthook` thread bug from:
1096
    http://bugs.python.org/issue1230540
1097

1098
    Call once from the main thread before creating any threads.
1099
    """
1100

1101
    init_original = threading.Thread.__init__
×
1102

1103
    def init(self, *args, **kwargs):
×
1104

1105
        init_original(self, *args, **kwargs)
×
1106
        run_original = self.run
×
1107

1108
        def run_with_except_hook(*args2, **kwargs2):
×
1109
            try:
×
1110
                run_original(*args2, **kwargs2)
×
1111
            except Exception:
×
1112
                sys.excepthook(*sys.exc_info())
×
1113

1114
        self.run = run_with_except_hook
×
1115

1116
    threading.Thread.__init__ = init
×
1117

1118

1119
def send_exception_to_crash_reporter(e: BaseException):
4✔
1120
    from .base_crash_reporter import send_exception_to_crash_reporter
×
1121
    send_exception_to_crash_reporter(e)
×
1122

1123

1124
def versiontuple(v):
4✔
1125
    return tuple(map(int, (v.split("."))))
4✔
1126

1127

1128
def read_json_file(path):
4✔
1129
    try:
4✔
1130
        with open(path, 'r', encoding='utf-8') as f:
4✔
1131
            data = json.loads(f.read())
4✔
1132
    except json.JSONDecodeError:
×
1133
        _logger.exception('')
×
1134
        raise FileImportFailed(_("Invalid JSON code."))
×
1135
    except BaseException as e:
×
1136
        _logger.exception('')
×
1137
        raise FileImportFailed(e)
×
1138
    return data
4✔
1139

1140

1141
def write_json_file(path, data):
4✔
1142
    try:
×
1143
        with open(path, 'w+', encoding='utf-8') as f:
×
1144
            json.dump(data, f, indent=4, sort_keys=True, cls=MyEncoder)
×
1145
    except (IOError, os.error) as e:
×
1146
        _logger.exception('')
×
1147
        raise FileExportFailed(e)
×
1148

1149

1150
def os_chmod(path, mode):
4✔
1151
    """os.chmod aware of tmpfs"""
1152
    try:
4✔
1153
        os.chmod(path, mode)
4✔
1154
    except OSError as e:
×
1155
        xdg_runtime_dir = os.environ.get("XDG_RUNTIME_DIR", None)
×
1156
        if xdg_runtime_dir and is_subpath(path, xdg_runtime_dir):
×
1157
            _logger.info(f"Tried to chmod in tmpfs. Skipping... {e!r}")
×
1158
        else:
1159
            raise
×
1160

1161

1162
def make_dir(path, allow_symlink=True):
4✔
1163
    """Make directory if it does not yet exist."""
1164
    if not os.path.exists(path):
4✔
1165
        if not allow_symlink and os.path.islink(path):
4✔
1166
            raise Exception('Dangling link: ' + path)
×
1167
        os.mkdir(path)
4✔
1168
        os_chmod(path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
4✔
1169

1170

1171
def is_subpath(long_path: str, short_path: str) -> bool:
4✔
1172
    """Returns whether long_path is a sub-path of short_path."""
1173
    try:
4✔
1174
        common = os.path.commonpath([long_path, short_path])
4✔
1175
    except ValueError:
4✔
1176
        return False
4✔
1177
    short_path = standardize_path(short_path)
4✔
1178
    common     = standardize_path(common)
4✔
1179
    return short_path == common
4✔
1180

1181

1182
def log_exceptions(func):
4✔
1183
    """Decorator to log AND re-raise exceptions."""
1184
    assert asyncio.iscoroutinefunction(func), 'func needs to be a coroutine'
4✔
1185
    @functools.wraps(func)
4✔
1186
    async def wrapper(*args, **kwargs):
4✔
1187
        self = args[0] if len(args) > 0 else None
4✔
1188
        try:
4✔
1189
            return await func(*args, **kwargs)
4✔
1190
        except asyncio.CancelledError as e:
4✔
1191
            raise
4✔
1192
        except BaseException as e:
4✔
1193
            mylogger = self.logger if hasattr(self, 'logger') else _logger
4✔
1194
            try:
4✔
1195
                mylogger.exception(f"Exception in {func.__name__}: {repr(e)}")
4✔
1196
            except BaseException as e2:
×
1197
                print(f"logging exception raised: {repr(e2)}... orig exc: {repr(e)} in {func.__name__}")
×
1198
            raise
4✔
1199
    return wrapper
4✔
1200

1201

1202
def ignore_exceptions(func):
4✔
1203
    """Decorator to silently swallow all exceptions."""
1204
    assert asyncio.iscoroutinefunction(func), 'func needs to be a coroutine'
4✔
1205
    @functools.wraps(func)
4✔
1206
    async def wrapper(*args, **kwargs):
4✔
1207
        try:
×
1208
            return await func(*args, **kwargs)
×
1209
        except Exception as e:
×
1210
            pass
×
1211
    return wrapper
4✔
1212

1213

1214
def with_lock(func):
4✔
1215
    """Decorator to enforce a lock on a function call."""
1216
    def func_wrapper(self, *args, **kwargs):
4✔
1217
        with self.lock:
4✔
1218
            return func(self, *args, **kwargs)
4✔
1219
    return func_wrapper
4✔
1220

1221

1222
class TxMinedInfo(NamedTuple):
4✔
1223
    height: int                        # height of block that mined tx
4✔
1224
    conf: Optional[int] = None         # number of confirmations, SPV verified. >=0, or None (None means unknown)
4✔
1225
    timestamp: Optional[int] = None    # timestamp of block that mined tx
4✔
1226
    txpos: Optional[int] = None        # position of tx in serialized block
4✔
1227
    header_hash: Optional[str] = None  # hash of block that mined tx
4✔
1228
    wanted_height: Optional[int] = None  # in case of timelock, min abs block height
4✔
1229

1230
    def short_id(self) -> Optional[str]:
4✔
1231
        if self.txpos is not None and self.txpos >= 0:
×
1232
            assert self.height > 0
×
1233
            return f"{self.height}x{self.txpos}"
×
1234
        return None
×
1235

1236
    def is_local_like(self) -> bool:
4✔
1237
        """Returns whether the tx is local-like (LOCAL/FUTURE)."""
1238
        from .address_synchronizer import TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT
×
1239
        if self.height > 0:
×
1240
            return False
×
1241
        if self.height in (TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT):
×
1242
            return False
×
1243
        return True
×
1244

1245

1246
class ShortID(bytes):
4✔
1247

1248
    def __repr__(self):
4✔
1249
        return f"<ShortID: {format_short_id(self)}>"
4✔
1250

1251
    def __str__(self):
4✔
1252
        return format_short_id(self)
4✔
1253

1254
    @classmethod
4✔
1255
    def from_components(cls, block_height: int, tx_pos_in_block: int, output_index: int) -> 'ShortID':
4✔
1256
        bh = block_height.to_bytes(3, byteorder='big')
4✔
1257
        tpos = tx_pos_in_block.to_bytes(3, byteorder='big')
4✔
1258
        oi = output_index.to_bytes(2, byteorder='big')
4✔
1259
        return ShortID(bh + tpos + oi)
4✔
1260

1261
    @classmethod
4✔
1262
    def from_str(cls, scid: str) -> 'ShortID':
4✔
1263
        """Parses a formatted scid str, e.g. '643920x356x0'."""
1264
        components = scid.split("x")
4✔
1265
        if len(components) != 3:
4✔
1266
            raise ValueError(f"failed to parse ShortID: {scid!r}")
×
1267
        try:
4✔
1268
            components = [int(x) for x in components]
4✔
1269
        except ValueError:
×
1270
            raise ValueError(f"failed to parse ShortID: {scid!r}") from None
×
1271
        return ShortID.from_components(*components)
4✔
1272

1273
    @classmethod
4✔
1274
    def normalize(cls, data: Union[None, str, bytes, 'ShortID']) -> Optional['ShortID']:
4✔
1275
        if isinstance(data, ShortID) or data is None:
4✔
1276
            return data
4✔
1277
        if isinstance(data, str):
4✔
1278
            assert len(data) == 16
4✔
1279
            return ShortID.fromhex(data)
4✔
1280
        if isinstance(data, (bytes, bytearray)):
4✔
1281
            assert len(data) == 8
4✔
1282
            return ShortID(data)
4✔
1283

1284
    @property
4✔
1285
    def block_height(self) -> int:
4✔
1286
        return int.from_bytes(self[:3], byteorder='big')
4✔
1287

1288
    @property
4✔
1289
    def txpos(self) -> int:
4✔
1290
        return int.from_bytes(self[3:6], byteorder='big')
4✔
1291

1292
    @property
4✔
1293
    def output_index(self) -> int:
4✔
1294
        return int.from_bytes(self[6:8], byteorder='big')
4✔
1295

1296

1297
def format_short_id(short_channel_id: Optional[bytes]):
4✔
1298
    if not short_channel_id:
4✔
1299
        return _('Not yet available')
×
1300
    return str(int.from_bytes(short_channel_id[:3], 'big')) \
4✔
1301
        + 'x' + str(int.from_bytes(short_channel_id[3:6], 'big')) \
1302
        + 'x' + str(int.from_bytes(short_channel_id[6:], 'big'))
1303

1304

1305
def make_aiohttp_proxy_connector(proxy: 'ProxySettings', ssl_context: Optional[ssl.SSLContext] = None) -> ProxyConnector:
4✔
1306
    return ProxyConnector(
×
1307
        proxy_type=ProxyType.SOCKS5 if proxy.mode == 'socks5' else ProxyType.SOCKS4,
1308
        host=proxy.host,
1309
        port=int(proxy.port),
1310
        username=proxy.user,
1311
        password=proxy.password,
1312
        rdns=True,  # needed to prevent DNS leaks over proxy
1313
        ssl=ssl_context,
1314
    )
1315

1316

1317
def make_aiohttp_session(proxy: Optional['ProxySettings'], headers=None, timeout=None):
4✔
1318
    if headers is None:
×
1319
        headers = {'User-Agent': 'Electrum'}
×
1320
    if timeout is None:
×
1321
        # The default timeout is high intentionally.
1322
        # DNS on some systems can be really slow, see e.g. #5337
1323
        timeout = aiohttp.ClientTimeout(total=45)
×
1324
    elif isinstance(timeout, (int, float)):
×
1325
        timeout = aiohttp.ClientTimeout(total=timeout)
×
1326
    ssl_context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_path)
×
1327

1328
    if proxy and proxy.enabled:
×
1329
        connector = make_aiohttp_proxy_connector(proxy, ssl_context)
×
1330
    else:
1331
        connector = aiohttp.TCPConnector(ssl=ssl_context)
×
1332

1333
    return aiohttp.ClientSession(headers=headers, timeout=timeout, connector=connector)
×
1334

1335

1336
class OldTaskGroup(aiorpcx.TaskGroup):
4✔
1337
    """Automatically raises exceptions on join; as in aiorpcx prior to version 0.20.
1338
    That is, when using TaskGroup as a context manager, if any task encounters an exception,
1339
    we would like that exception to be re-raised (propagated out). For the wait=all case,
1340
    the OldTaskGroup class is emulating the following code-snippet:
1341
    ```
1342
    async with TaskGroup() as group:
1343
        await group.spawn(task1())
1344
        await group.spawn(task2())
1345

1346
        async for task in group:
1347
            if not task.cancelled():
1348
                task.result()
1349
    ```
1350
    So instead of the above, one can just write:
1351
    ```
1352
    async with OldTaskGroup() as group:
1353
        await group.spawn(task1())
1354
        await group.spawn(task2())
1355
    ```
1356
    # TODO see if we can migrate to asyncio.timeout, introduced in python 3.11, and use stdlib instead of aiorpcx.curio...
1357
    """
1358
    async def join(self):
4✔
1359
        if self._wait is all:
4✔
1360
            exc = False
4✔
1361
            try:
4✔
1362
                async for task in self:
4✔
1363
                    if not task.cancelled():
4✔
1364
                        task.result()
4✔
1365
            except BaseException:  # including asyncio.CancelledError
4✔
1366
                exc = True
4✔
1367
                raise
4✔
1368
            finally:
1369
                if exc:
4✔
1370
                    await self.cancel_remaining()
4✔
1371
                await super().join()
4✔
1372
        else:
1373
            await super().join()
4✔
1374
            if self.completed:
4✔
1375
                self.completed.result()
4✔
1376

1377
# We monkey-patch aiorpcx TimeoutAfter (used by timeout_after and ignore_after API),
1378
# to fix a timing issue present in asyncio as a whole re timing out tasks.
1379
# To see the issue we are trying to fix, consider example:
1380
#     async def outer_task():
1381
#         async with timeout_after(0.1):
1382
#             await inner_task()
1383
# When the 0.1 sec timeout expires, inner_task will get cancelled by timeout_after (=internal cancellation).
1384
# If around the same time (in terms of event loop iterations) another coroutine
1385
# cancels outer_task (=external cancellation), there will be a race.
1386
# Both cancellations work by propagating a CancelledError out to timeout_after, which then
1387
# needs to decide (in TimeoutAfter.__aexit__) whether it's due to an internal or external cancellation.
1388
# AFAICT asyncio provides no reliable way of distinguishing between the two.
1389
# This patch tries to always give priority to external cancellations.
1390
# see https://github.com/kyuupichan/aiorpcX/issues/44
1391
# see https://github.com/aio-libs/async-timeout/issues/229
1392
# see https://bugs.python.org/issue42130 and https://bugs.python.org/issue45098
1393
# TODO see if we can migrate to asyncio.timeout, introduced in python 3.11, and use stdlib instead of aiorpcx.curio...
1394
def _aiorpcx_monkeypatched_set_new_deadline(task, deadline):
4✔
1395
    def timeout_task():
4✔
1396
        task._orig_cancel()
4✔
1397
        task._timed_out = None if getattr(task, "_externally_cancelled", False) else deadline
4✔
1398
    def mycancel(*args, **kwargs):
4✔
1399
        task._orig_cancel(*args, **kwargs)
4✔
1400
        task._externally_cancelled = True
4✔
1401
        task._timed_out = None
4✔
1402
    if not hasattr(task, "_orig_cancel"):
4✔
1403
        task._orig_cancel = task.cancel
4✔
1404
        task.cancel = mycancel
4✔
1405
    task._deadline_handle = task._loop.call_at(deadline, timeout_task)
4✔
1406

1407

1408
def _aiorpcx_monkeypatched_set_task_deadline(task, deadline):
4✔
1409
    ret = _aiorpcx_orig_set_task_deadline(task, deadline)
4✔
1410
    task._externally_cancelled = None
4✔
1411
    return ret
4✔
1412

1413

1414
def _aiorpcx_monkeypatched_unset_task_deadline(task):
4✔
1415
    if hasattr(task, "_orig_cancel"):
4✔
1416
        task.cancel = task._orig_cancel
4✔
1417
        del task._orig_cancel
4✔
1418
    return _aiorpcx_orig_unset_task_deadline(task)
4✔
1419

1420

1421
_aiorpcx_orig_set_task_deadline    = aiorpcx.curio._set_task_deadline
4✔
1422
_aiorpcx_orig_unset_task_deadline  = aiorpcx.curio._unset_task_deadline
4✔
1423

1424
aiorpcx.curio._set_new_deadline    = _aiorpcx_monkeypatched_set_new_deadline
4✔
1425
aiorpcx.curio._set_task_deadline   = _aiorpcx_monkeypatched_set_task_deadline
4✔
1426
aiorpcx.curio._unset_task_deadline = _aiorpcx_monkeypatched_unset_task_deadline
4✔
1427

1428

1429
async def wait_for2(fut: Awaitable, timeout: Union[int, float, None]):
4✔
1430
    """Replacement for asyncio.wait_for,
1431
     due to bugs: https://bugs.python.org/issue42130 and https://github.com/python/cpython/issues/86296 ,
1432
     which are only fixed in python 3.12+.
1433
     """
1434
    if sys.version_info[:3] >= (3, 12):
4✔
1435
        return await asyncio.wait_for(fut, timeout)
2✔
1436
    else:
1437
        async with async_timeout(timeout):
2✔
1438
            return await asyncio.ensure_future(fut, loop=get_running_loop())
2✔
1439

1440

1441
if hasattr(asyncio, 'timeout'):  # python 3.11+
4✔
1442
    async_timeout = asyncio.timeout
3✔
1443
else:
1444
    class TimeoutAfterAsynciolike(aiorpcx.curio.TimeoutAfter):
1✔
1445
        async def __aexit__(self, exc_type, exc_value, tb):
1✔
1446
            try:
1✔
1447
                await super().__aexit__(exc_type, exc_value, tb)
1✔
1448
            except (aiorpcx.TaskTimeout, aiorpcx.UncaughtTimeoutError):
×
1449
                raise asyncio.TimeoutError from None
×
1450
            except aiorpcx.TimeoutCancellationError:
×
1451
                raise asyncio.CancelledError from None
×
1452

1453
    def async_timeout(delay: Union[int, float, None]):
1✔
1454
        if delay is None:
1✔
1455
            return nullcontext()
×
1456
        return TimeoutAfterAsynciolike(delay)
1✔
1457

1458

1459
class NetworkJobOnDefaultServer(Logger, ABC):
4✔
1460
    """An abstract base class for a job that runs on the main network
1461
    interface. Every time the main interface changes, the job is
1462
    restarted, and some of its internals are reset.
1463
    """
1464
    def __init__(self, network: 'Network'):
4✔
1465
        Logger.__init__(self)
4✔
1466
        self.network = network
4✔
1467
        self.interface = None  # type: Interface
4✔
1468
        self._restart_lock = asyncio.Lock()
4✔
1469
        # Ensure fairness between NetworkJobs. e.g. if multiple wallets
1470
        # are open, a large wallet's Synchronizer should not starve the small wallets:
1471
        self._network_request_semaphore = asyncio.Semaphore(100)
4✔
1472

1473
        self._reset()
4✔
1474
        # every time the main interface changes, restart:
1475
        register_callback(self._restart, ['default_server_changed'])
4✔
1476
        # also schedule a one-off restart now, as there might already be a main interface:
1477
        asyncio.run_coroutine_threadsafe(self._restart(), network.asyncio_loop)
4✔
1478

1479
    def _reset(self):
4✔
1480
        """Initialise fields. Called every time the underlying
1481
        server connection changes.
1482
        """
1483
        self.taskgroup = OldTaskGroup()
4✔
1484
        self.reset_request_counters()
4✔
1485

1486
    async def _start(self, interface: 'Interface'):
4✔
1487
        self.logger.debug(f"starting. interface.server={repr(str(interface.server))}")
×
1488
        self.interface = interface
×
1489

1490
        taskgroup = self.taskgroup
×
1491
        async def run_tasks_wrapper():
×
1492
            self.logger.debug(f"starting taskgroup ({hex(id(taskgroup))}).")
×
1493
            try:
×
1494
                await self._run_tasks(taskgroup=taskgroup)
×
1495
            except Exception as e:
×
1496
                self.logger.error(f"taskgroup died ({hex(id(taskgroup))}). exc={e!r}")
×
1497
                raise
×
1498
            finally:
1499
                self.logger.debug(f"taskgroup stopped ({hex(id(taskgroup))}).")
×
1500
        await interface.taskgroup.spawn(run_tasks_wrapper)
×
1501

1502
    @abstractmethod
4✔
1503
    async def _run_tasks(self, *, taskgroup: OldTaskGroup) -> None:
4✔
1504
        """Start tasks in taskgroup. Called every time the underlying
1505
        server connection changes.
1506
        """
1507
        # If self.taskgroup changed, don't start tasks. This can happen if we have
1508
        # been restarted *just now*, i.e. after the _run_tasks coroutine object was created.
1509
        if taskgroup != self.taskgroup:
×
1510
            raise asyncio.CancelledError()
×
1511

1512
    async def stop(self, *, full_shutdown: bool = True):
4✔
1513
        self.logger.debug(f"stopping. {full_shutdown=}")
×
1514
        if full_shutdown:
×
1515
            unregister_callback(self._restart)
×
1516
        await self.taskgroup.cancel_remaining()
×
1517

1518
    @log_exceptions
4✔
1519
    async def _restart(self, *args):
4✔
1520
        interface = self.network.interface
4✔
1521
        if interface is None:
4✔
1522
            return  # we should get called again soon
4✔
1523

1524
        async with self._restart_lock:
×
1525
            await self.stop(full_shutdown=False)
×
1526
            self._reset()
×
1527
            await self._start(interface)
×
1528

1529
    def reset_request_counters(self):
4✔
1530
        self._requests_sent = 0
4✔
1531
        self._requests_answered = 0
4✔
1532

1533
    def num_requests_sent_and_answered(self) -> Tuple[int, int]:
4✔
1534
        return self._requests_sent, self._requests_answered
×
1535

1536
    @property
4✔
1537
    def session(self):
4✔
1538
        s = self.interface.session
×
1539
        assert s is not None
×
1540
        return s
×
1541

1542

1543
async def detect_tor_socks_proxy() -> Optional[Tuple[str, int]]:
4✔
1544
    # Probable ports for Tor to listen at
1545
    candidates = [
×
1546
        ("127.0.0.1", 9050),
1547
        ("127.0.0.1", 9051),
1548
        ("127.0.0.1", 9150),
1549
    ]
1550

1551
    proxy_addr = None
×
1552
    async def test_net_addr(net_addr):
×
1553
        is_tor = await is_tor_socks_port(*net_addr)
×
1554
        # set result, and cancel remaining probes
1555
        if is_tor:
×
1556
            nonlocal proxy_addr
1557
            proxy_addr = net_addr
×
1558
            await group.cancel_remaining()
×
1559

1560
    async with OldTaskGroup() as group:
×
1561
        for net_addr in candidates:
×
1562
            await group.spawn(test_net_addr(net_addr))
×
1563
    return proxy_addr
×
1564

1565

1566
@log_exceptions
4✔
1567
async def is_tor_socks_port(host: str, port: int) -> bool:
4✔
1568
    # mimic "tor-resolve 0.0.0.0".
1569
    # see https://github.com/spesmilo/electrum/issues/7317#issuecomment-1369281075
1570
    # > this is a socks5 handshake, followed by a socks RESOLVE request as defined in
1571
    # > [tor's socks extension spec](https://github.com/torproject/torspec/blob/7116c9cdaba248aae07a3f1d0e15d9dd102f62c5/socks-extensions.txt#L63),
1572
    # > resolving 0.0.0.0, which being an IP, tor resolves itself without needing to ask a relay.
1573
    writer = None
×
1574
    try:
×
1575
        async with async_timeout(10):
×
1576
            reader, writer = await asyncio.open_connection(host, port)
×
1577
            writer.write(b'\x05\x01\x00\x05\xf0\x00\x03\x070.0.0.0\x00\x00')
×
1578
            await writer.drain()
×
1579
            data = await reader.read(1024)
×
1580
            if data == b'\x05\x00\x05\x00\x00\x01\x00\x00\x00\x00\x00\x00':
×
1581
                return True
×
1582
            return False
×
1583
    except (OSError, asyncio.TimeoutError):
×
1584
        return False
×
1585
    finally:
1586
        if writer:
×
1587
            writer.close()
×
1588

1589

1590
AS_LIB_USER_I_WANT_TO_MANAGE_MY_OWN_ASYNCIO_LOOP = False  # used by unit tests
4✔
1591

1592
_asyncio_event_loop = None  # type: Optional[asyncio.AbstractEventLoop]
4✔
1593
def get_asyncio_loop() -> asyncio.AbstractEventLoop:
4✔
1594
    """Returns the global asyncio event loop we use."""
1595
    if loop := _asyncio_event_loop:
4✔
1596
        return loop
4✔
1597
    if AS_LIB_USER_I_WANT_TO_MANAGE_MY_OWN_ASYNCIO_LOOP:
4✔
1598
        if loop := get_running_loop():
4✔
1599
            return loop
4✔
1600
    raise Exception("event loop not created yet")
×
1601

1602

1603
def create_and_start_event_loop() -> Tuple[asyncio.AbstractEventLoop,
4✔
1604
                                           asyncio.Future,
1605
                                           threading.Thread]:
1606
    global _asyncio_event_loop
1607
    if _asyncio_event_loop is not None:
×
1608
        raise Exception("there is already a running event loop")
×
1609

1610
    # asyncio.get_event_loop() became deprecated in python3.10. (see https://github.com/python/cpython/issues/83710)
1611
    # We set a custom event loop policy purely to be compatible with code that
1612
    # relies on asyncio.get_event_loop().
1613
    # - in python 3.8-3.9, asyncio.Event.__init__, asyncio.Lock.__init__,
1614
    #   and similar, calls get_event_loop. see https://github.com/python/cpython/pull/23420
1615
    class MyEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
×
1616
        def get_event_loop(self):
×
1617
            # In case electrum is being used as a library, there might be other
1618
            # event loops in use besides ours. To minimise interfering with those,
1619
            # if there is a loop running in the current thread, return that:
1620
            running_loop = get_running_loop()
×
1621
            if running_loop is not None:
×
1622
                return running_loop
×
1623
            # Otherwise, return our global loop:
1624
            return get_asyncio_loop()
×
1625
    asyncio.set_event_loop_policy(MyEventLoopPolicy())
×
1626

1627
    loop = asyncio.new_event_loop()
×
1628
    _asyncio_event_loop = loop
×
1629

1630
    def on_exception(loop, context):
×
1631
        """Suppress spurious messages it appears we cannot control."""
1632
        SUPPRESS_MESSAGE_REGEX = re.compile('SSL handshake|Fatal read error on|'
×
1633
                                            'SSL error in data received')
1634
        message = context.get('message')
×
1635
        if message and SUPPRESS_MESSAGE_REGEX.match(message):
×
1636
            return
×
1637
        loop.default_exception_handler(context)
×
1638

1639
    def run_event_loop():
×
1640
        try:
×
1641
            loop.run_until_complete(stopping_fut)
×
1642
        finally:
1643
            # clean-up
1644
            global _asyncio_event_loop
1645
            _asyncio_event_loop = None
×
1646

1647
    loop.set_exception_handler(on_exception)
×
1648
    _set_custom_task_factory(loop)
×
1649
    # loop.set_debug(True)
1650
    stopping_fut = loop.create_future()
×
1651
    loop_thread = threading.Thread(
×
1652
        target=run_event_loop,
1653
        name='EventLoop',
1654
    )
1655
    loop_thread.start()
×
1656
    # Wait until the loop actually starts.
1657
    # On a slow PC, or with a debugger attached, this can take a few dozens of ms,
1658
    # and if we returned without a running loop, weird things can happen...
1659
    t0 = time.monotonic()
×
1660
    while not loop.is_running():
×
1661
        time.sleep(0.01)
×
1662
        if time.monotonic() - t0 > 5:
×
1663
            raise Exception("been waiting for 5 seconds but asyncio loop would not start!")
×
1664
    return loop, stopping_fut, loop_thread
×
1665

1666

1667
_running_asyncio_tasks = set()  # type: Set[asyncio.Future]
4✔
1668
def _set_custom_task_factory(loop: asyncio.AbstractEventLoop):
4✔
1669
    """Wrap task creation to track pending and running tasks.
1670
    When tasks are created, asyncio only maintains a weak reference to them.
1671
    Hence, the garbage collector might destroy the task mid-execution.
1672
    To avoid this, we store a strong reference for the task until it completes.
1673

1674
    Without this, a lot of APIs are basically Heisenbug-generators... e.g.:
1675
    - "asyncio.create_task"
1676
    - "loop.create_task"
1677
    - "asyncio.ensure_future"
1678
    - "asyncio.run_coroutine_threadsafe"
1679

1680
    related:
1681
        - https://bugs.python.org/issue44665
1682
        - https://github.com/python/cpython/issues/88831
1683
        - https://github.com/python/cpython/issues/91887
1684
        - https://textual.textualize.io/blog/2023/02/11/the-heisenbug-lurking-in-your-async-code/
1685
        - https://github.com/python/cpython/issues/91887#issuecomment-1434816045
1686
        - "Task was destroyed but it is pending!"
1687
    """
1688

1689
    platform_task_factory = loop.get_task_factory()
4✔
1690

1691
    def factory(loop_, coro, **kwargs):
4✔
1692
        if platform_task_factory is not None:
4✔
1693
            task = platform_task_factory(loop_, coro, **kwargs)
×
1694
        else:
1695
            task = asyncio.Task(coro, loop=loop_, **kwargs)
4✔
1696
        _running_asyncio_tasks.add(task)
4✔
1697
        task.add_done_callback(_running_asyncio_tasks.discard)
4✔
1698
        return task
4✔
1699

1700
    loop.set_task_factory(factory)
4✔
1701

1702

1703
def run_sync_function_on_asyncio_thread(func: Callable, *, block: bool) -> None:
4✔
1704
    """Run a non-async fn on the asyncio thread. Can be called from any thread.
1705

1706
    If the current thread is already the asyncio thread, func is guaranteed
1707
    to have been completed when this method returns.
1708

1709
    For any other thread, we only wait for completion if `block` is True.
1710
    """
1711
    assert not asyncio.iscoroutinefunction(func), "func must be a non-async function"
4✔
1712
    asyncio_loop = get_asyncio_loop()
4✔
1713
    if get_running_loop() == asyncio_loop:  # we are running on the asyncio thread
4✔
1714
        func()
4✔
1715
    else:  # non-asyncio thread
1716
        async def wrapper():
×
1717
            return func()
×
1718
        fut = asyncio.run_coroutine_threadsafe(wrapper(), loop=asyncio_loop)
×
1719
        if block:
×
1720
            fut.result()
×
1721
        else:
1722
            # add explicit logging of exceptions, otherwise they might get lost
1723
            tb1 = traceback.format_stack()[:-1]
×
1724
            tb1_str = "".join(tb1)
×
1725
            def on_done(fut_: concurrent.futures.Future):
×
1726
                assert fut_.done()
×
1727
                if fut_.cancelled():
×
1728
                    _logger.debug(f"func cancelled. {func=}.")
×
1729
                elif exc := fut_.exception():
×
1730
                    # note: We explicitly log the first part of the traceback, tb1_str.
1731
                    #       The second part gets logged by setting "exc_info".
1732
                    _logger.error(
×
1733
                        f"func errored. {func=}. {exc=}"
1734
                        f"\n{tb1_str}", exc_info=exc)
1735
            fut.add_done_callback(on_done)
×
1736

1737

1738
class OrderedDictWithIndex(OrderedDict):
4✔
1739
    """An OrderedDict that keeps track of the positions of keys.
1740

1741
    Note: very inefficient to modify contents, except to add new items.
1742
    """
1743

1744
    def __init__(self):
4✔
1745
        super().__init__()
×
1746
        self._key_to_pos = {}
×
1747
        self._pos_to_key = {}
×
1748

1749
    def _recalc_index(self):
4✔
1750
        self._key_to_pos = {key: pos for (pos, key) in enumerate(self.keys())}
×
1751
        self._pos_to_key = {pos: key for (pos, key) in enumerate(self.keys())}
×
1752

1753
    def pos_from_key(self, key):
4✔
1754
        return self._key_to_pos[key]
×
1755

1756
    def value_from_pos(self, pos):
4✔
1757
        key = self._pos_to_key[pos]
×
1758
        return self[key]
×
1759

1760
    def popitem(self, *args, **kwargs):
4✔
1761
        ret = super().popitem(*args, **kwargs)
×
1762
        self._recalc_index()
×
1763
        return ret
×
1764

1765
    def move_to_end(self, *args, **kwargs):
4✔
1766
        ret = super().move_to_end(*args, **kwargs)
×
1767
        self._recalc_index()
×
1768
        return ret
×
1769

1770
    def clear(self):
4✔
1771
        ret = super().clear()
×
1772
        self._recalc_index()
×
1773
        return ret
×
1774

1775
    def pop(self, *args, **kwargs):
4✔
1776
        ret = super().pop(*args, **kwargs)
×
1777
        self._recalc_index()
×
1778
        return ret
×
1779

1780
    def update(self, *args, **kwargs):
4✔
1781
        ret = super().update(*args, **kwargs)
×
1782
        self._recalc_index()
×
1783
        return ret
×
1784

1785
    def __delitem__(self, *args, **kwargs):
4✔
1786
        ret = super().__delitem__(*args, **kwargs)
×
1787
        self._recalc_index()
×
1788
        return ret
×
1789

1790
    def __setitem__(self, key, *args, **kwargs):
4✔
1791
        is_new_key = key not in self
×
1792
        ret = super().__setitem__(key, *args, **kwargs)
×
1793
        if is_new_key:
×
1794
            pos = len(self) - 1
×
1795
            self._key_to_pos[key] = pos
×
1796
            self._pos_to_key[pos] = key
×
1797
        return ret
×
1798

1799

1800
def multisig_type(wallet_type):
4✔
1801
    '''If wallet_type is mofn multi-sig, return [m, n],
1802
    otherwise return None.'''
1803
    if not wallet_type:
4✔
1804
        return None
×
1805
    match = re.match(r'(\d+)of(\d+)', wallet_type)
4✔
1806
    if match:
4✔
1807
        match = [int(x) for x in match.group(1, 2)]
4✔
1808
    return match
4✔
1809

1810

1811
def is_ip_address(x: Union[str, bytes]) -> bool:
4✔
1812
    if isinstance(x, bytes):
4✔
1813
        x = x.decode("utf-8")
×
1814
    try:
4✔
1815
        ipaddress.ip_address(x)
4✔
1816
        return True
4✔
1817
    except ValueError:
4✔
1818
        return False
4✔
1819

1820

1821
def is_localhost(host: str) -> bool:
4✔
1822
    if str(host) in ('localhost', 'localhost.',):
4✔
1823
        return True
4✔
1824
    if host[0] == '[' and host[-1] == ']':  # IPv6
4✔
1825
        host = host[1:-1]
4✔
1826
    try:
4✔
1827
        ip_addr = ipaddress.ip_address(host)  # type: Union[IPv4Address, IPv6Address]
4✔
1828
        return ip_addr.is_loopback
4✔
1829
    except ValueError:
4✔
1830
        pass  # not an IP
4✔
1831
    return False
4✔
1832

1833

1834
def is_private_netaddress(host: str) -> bool:
4✔
1835
    if is_localhost(host):
4✔
1836
        return True
4✔
1837
    if host[0] == '[' and host[-1] == ']':  # IPv6
4✔
1838
        host = host[1:-1]
4✔
1839
    try:
4✔
1840
        ip_addr = ipaddress.ip_address(host)  # type: Union[IPv4Address, IPv6Address]
4✔
1841
        return ip_addr.is_private
4✔
1842
    except ValueError:
4✔
1843
        pass  # not an IP
4✔
1844
    return False
4✔
1845

1846

1847
def list_enabled_bits(x: int) -> Sequence[int]:
4✔
1848
    """e.g. 77 (0b1001101) --> (0, 2, 3, 6)"""
1849
    binary = bin(x)[2:]
4✔
1850
    rev_bin = reversed(binary)
4✔
1851
    return tuple(i for i, b in enumerate(rev_bin) if b == '1')
4✔
1852

1853

1854
async def resolve_dns_srv(host: str):
4✔
1855
    # FIXME this method is not using the network proxy. (although the proxy might not support UDP?)
NEW
1856
    srv_records = await dns.asyncresolver.resolve(host, 'SRV')
×
1857
    # priority: prefer lower
1858
    # weight: tie breaker; prefer higher
1859
    srv_records = sorted(srv_records, key=lambda x: (x.priority, -x.weight))
×
1860

1861
    def dict_from_srv_record(srv):
×
1862
        return {
×
1863
            'host': str(srv.target),
1864
            'port': srv.port,
1865
        }
1866
    return [dict_from_srv_record(srv) for srv in srv_records]
×
1867

1868

1869
def randrange(bound: int) -> int:
4✔
1870
    """Return a random integer k such that 1 <= k < bound, uniformly
1871
    distributed across that range.
1872
    This is guaranteed to be cryptographically strong.
1873
    """
1874
    # secrets.randbelow(bound) returns a random int: 0 <= r < bound,
1875
    # hence transformations:
1876
    return secrets.randbelow(bound - 1) + 1
4✔
1877

1878

1879
class CallbackManager(Logger):
4✔
1880
    # callbacks set by the GUI or any thread
1881
    # guarantee: the callbacks will always get triggered from the asyncio thread.
1882

1883
    def __init__(self):
4✔
1884
        Logger.__init__(self)
4✔
1885
        self.callback_lock = threading.Lock()
4✔
1886
        self.callbacks = defaultdict(list)      # note: needs self.callback_lock
4✔
1887

1888
    def register_callback(self, func, events):
4✔
1889
        with self.callback_lock:
4✔
1890
            for event in events:
4✔
1891
                self.callbacks[event].append(func)
4✔
1892

1893
    def unregister_callback(self, callback):
4✔
1894
        with self.callback_lock:
4✔
1895
            for callbacks in self.callbacks.values():
4✔
1896
                if callback in callbacks:
4✔
1897
                    callbacks.remove(callback)
4✔
1898

1899
    def trigger_callback(self, event, *args):
4✔
1900
        """Trigger a callback with given arguments.
1901
        Can be called from any thread. The callback itself will get scheduled
1902
        on the event loop.
1903
        """
1904
        loop = get_asyncio_loop()
4✔
1905
        assert loop.is_running(), "event loop not running"
4✔
1906
        with self.callback_lock:
4✔
1907
            callbacks = self.callbacks[event][:]
4✔
1908
        for callback in callbacks:
4✔
1909
            if asyncio.iscoroutinefunction(callback):  # async cb
4✔
1910
                fut = asyncio.run_coroutine_threadsafe(callback(*args), loop)
4✔
1911
                def on_done(fut_: concurrent.futures.Future):
4✔
1912
                    assert fut_.done()
4✔
1913
                    if fut_.cancelled():
4✔
1914
                        self.logger.debug(f"cb cancelled. {event=}.")
3✔
1915
                    elif exc := fut_.exception():
4✔
1916
                        self.logger.error(f"cb errored. {event=}. {exc=}", exc_info=exc)
×
1917
                fut.add_done_callback(on_done)
4✔
1918
            else:  # non-async cb
1919
                run_sync_function_on_asyncio_thread(partial(callback, *args), block=False)
4✔
1920

1921

1922
callback_mgr = CallbackManager()
4✔
1923
trigger_callback = callback_mgr.trigger_callback
4✔
1924
register_callback = callback_mgr.register_callback
4✔
1925
unregister_callback = callback_mgr.unregister_callback
4✔
1926
_event_listeners = defaultdict(set)  # type: Dict[str, Set[str]]
4✔
1927

1928

1929
class EventListener:
4✔
1930
    """Use as a mixin for a class that has methods to be triggered on events.
1931
    - Methods that receive the callbacks should be named "on_event_*" and decorated with @event_listener.
1932
    - register_callbacks() should be called exactly once per instance of EventListener, e.g. in __init__
1933
    - unregister_callbacks() should be called at least once, e.g. when the instance is destroyed
1934
    """
1935

1936
    def _list_callbacks(self):
4✔
1937
        for c in self.__class__.__mro__:
4✔
1938
            classpath = f"{c.__module__}.{c.__name__}"
4✔
1939
            for method_name in _event_listeners[classpath]:
4✔
1940
                method = getattr(self, method_name)
4✔
1941
                assert callable(method)
4✔
1942
                assert method_name.startswith('on_event_')
4✔
1943
                yield method_name[len('on_event_'):], method
4✔
1944

1945
    def register_callbacks(self):
4✔
1946
        for name, method in self._list_callbacks():
4✔
1947
            #_logger.debug(f'registering callback {method}')
1948
            register_callback(method, [name])
4✔
1949

1950
    def unregister_callbacks(self):
4✔
1951
        for name, method in self._list_callbacks():
4✔
1952
            #_logger.debug(f'unregistering callback {method}')
1953
            unregister_callback(method)
4✔
1954

1955

1956
def event_listener(func):
4✔
1957
    """To be used in subclasses of EventListener only. (how to enforce this programmatically?)"""
1958
    classname, method_name = func.__qualname__.split('.')
4✔
1959
    assert method_name.startswith('on_event_')
4✔
1960
    classpath = f"{func.__module__}.{classname}"
4✔
1961
    _event_listeners[classpath].add(method_name)
4✔
1962
    return func
4✔
1963

1964

1965
_NetAddrType = TypeVar("_NetAddrType")
4✔
1966
# requirements for _NetAddrType:
1967
# - reasonable __hash__() implementation (e.g. based on host/port of remote endpoint)
1968

1969
class NetworkRetryManager(Generic[_NetAddrType]):
4✔
1970
    """Truncated Exponential Backoff for network connections."""
1971

1972
    def __init__(
4✔
1973
            self, *,
1974
            max_retry_delay_normal: float,
1975
            init_retry_delay_normal: float,
1976
            max_retry_delay_urgent: float = None,
1977
            init_retry_delay_urgent: float = None,
1978
    ):
1979
        self._last_tried_addr = {}  # type: Dict[_NetAddrType, Tuple[float, int]]  # (unix ts, num_attempts)
4✔
1980

1981
        # note: these all use "seconds" as unit
1982
        if max_retry_delay_urgent is None:
4✔
1983
            max_retry_delay_urgent = max_retry_delay_normal
4✔
1984
        if init_retry_delay_urgent is None:
4✔
1985
            init_retry_delay_urgent = init_retry_delay_normal
4✔
1986
        self._max_retry_delay_normal = max_retry_delay_normal
4✔
1987
        self._init_retry_delay_normal = init_retry_delay_normal
4✔
1988
        self._max_retry_delay_urgent = max_retry_delay_urgent
4✔
1989
        self._init_retry_delay_urgent = init_retry_delay_urgent
4✔
1990

1991
    def _trying_addr_now(self, addr: _NetAddrType) -> None:
4✔
1992
        last_time, num_attempts = self._last_tried_addr.get(addr, (0, 0))
×
1993
        # we add up to 1 second of noise to the time, so that clients are less likely
1994
        # to get synchronised and bombard the remote in connection waves:
1995
        cur_time = time.time() + random.random()
×
1996
        self._last_tried_addr[addr] = cur_time, num_attempts + 1
×
1997

1998
    def _on_connection_successfully_established(self, addr: _NetAddrType) -> None:
4✔
1999
        self._last_tried_addr[addr] = time.time(), 0
×
2000

2001
    def _can_retry_addr(self, addr: _NetAddrType, *,
4✔
2002
                        now: float = None, urgent: bool = False) -> bool:
2003
        if now is None:
×
2004
            now = time.time()
×
2005
        last_time, num_attempts = self._last_tried_addr.get(addr, (0, 0))
×
2006
        if urgent:
×
2007
            max_delay = self._max_retry_delay_urgent
×
2008
            init_delay = self._init_retry_delay_urgent
×
2009
        else:
2010
            max_delay = self._max_retry_delay_normal
×
2011
            init_delay = self._init_retry_delay_normal
×
2012
        delay = self.__calc_delay(multiplier=init_delay, max_delay=max_delay, num_attempts=num_attempts)
×
2013
        next_time = last_time + delay
×
2014
        return next_time < now
×
2015

2016
    @classmethod
4✔
2017
    def __calc_delay(cls, *, multiplier: float, max_delay: float,
4✔
2018
                     num_attempts: int) -> float:
2019
        num_attempts = min(num_attempts, 100_000)
×
2020
        try:
×
2021
            res = multiplier * 2 ** num_attempts
×
2022
        except OverflowError:
×
2023
            return max_delay
×
2024
        return max(0, min(max_delay, res))
×
2025

2026
    def _clear_addr_retry_times(self) -> None:
4✔
2027
        self._last_tried_addr.clear()
4✔
2028

2029

2030
class ESocksProxy(aiorpcx.SOCKSProxy):
4✔
2031
    # note: proxy will not leak DNS as create_connection()
2032
    # sets (local DNS) resolve=False by default
2033

2034
    async def open_connection(self, host=None, port=None, **kwargs):
4✔
2035
        loop = asyncio.get_running_loop()
×
2036
        reader = asyncio.StreamReader(loop=loop)
×
2037
        protocol = asyncio.StreamReaderProtocol(reader, loop=loop)
×
2038
        transport, _ = await self.create_connection(
×
2039
            lambda: protocol, host, port, **kwargs)
2040
        writer = asyncio.StreamWriter(transport, protocol, reader, loop)
×
2041
        return reader, writer
×
2042

2043
    @classmethod
4✔
2044
    def from_network_settings(cls, network: Optional['Network']) -> Optional['ESocksProxy']:
4✔
2045
        if not network or not network.proxy or not network.proxy.enabled:
4✔
2046
            return None
4✔
2047
        proxy = network.proxy
×
2048
        username, pw = proxy.user, proxy.password
×
2049
        if not username or not pw:
×
2050
            # is_proxy_tor is tri-state; None indicates it is still probing the proxy to test for TOR
2051
            if network.is_proxy_tor:
×
2052
                auth = aiorpcx.socks.SOCKSRandomAuth()
×
2053
            else:
2054
                auth = None
×
2055
        else:
2056
            auth = aiorpcx.socks.SOCKSUserAuth(username, pw)
×
2057
        addr = aiorpcx.NetAddress(proxy.host, proxy.port)
×
2058
        if proxy.mode == "socks4":
×
2059
            ret = cls(addr, aiorpcx.socks.SOCKS4a, auth)
×
2060
        elif proxy.mode == "socks5":
×
2061
            ret = cls(addr, aiorpcx.socks.SOCKS5, auth)
×
2062
        else:
2063
            raise NotImplementedError  # http proxy not available with aiorpcx
×
2064
        return ret
×
2065

2066

2067
class JsonRPCError(Exception):
4✔
2068

2069
    class Codes(enum.IntEnum):
4✔
2070
        # application-specific error codes
2071
        USERFACING = 1
4✔
2072
        INTERNAL = 2
4✔
2073

2074
    def __init__(self, *, code: int, message: str, data: Optional[dict] = None):
4✔
2075
        Exception.__init__(self)
×
2076
        self.code = code
×
2077
        self.message = message
×
2078
        self.data = data
×
2079

2080

2081
class JsonRPCClient:
4✔
2082

2083
    def __init__(self, session: aiohttp.ClientSession, url: str):
4✔
2084
        self.session = session
×
2085
        self.url = url
×
2086
        self._id = 0
×
2087

2088
    async def request(self, endpoint, *args):
4✔
2089
        """Send request to server, parse and return result.
2090
        note: parsing code is naive, the server is assumed to be well-behaved.
2091
              Up to the caller to handle exceptions, including those arising from parsing errors.
2092
        """
2093
        self._id += 1
×
2094
        data = ('{"jsonrpc": "2.0", "id":"%d", "method": "%s", "params": %s }'
×
2095
                % (self._id, endpoint, json.dumps(args)))
2096
        async with self.session.post(self.url, data=data) as resp:
×
2097
            if resp.status == 200:
×
2098
                r = await resp.json()
×
2099
                result = r.get('result')
×
2100
                error = r.get('error')
×
2101
                if error:
×
2102
                    raise JsonRPCError(code=error["code"], message=error["message"], data=error.get("data"))
×
2103
                else:
2104
                    return result
×
2105
            else:
2106
                text = await resp.text()
×
2107
                return 'Error: ' + str(text)
×
2108

2109
    def add_method(self, endpoint):
4✔
2110
        async def coro(*args):
×
2111
            return await self.request(endpoint, *args)
×
2112
        setattr(self, endpoint, coro)
×
2113

2114

2115
T = TypeVar('T')
4✔
2116

2117
def random_shuffled_copy(x: Iterable[T]) -> List[T]:
4✔
2118
    """Returns a shuffled copy of the input."""
2119
    x_copy = list(x)  # copy
4✔
2120
    random.shuffle(x_copy)  # shuffle in-place
4✔
2121
    return x_copy
4✔
2122

2123

2124
def test_read_write_permissions(path) -> None:
4✔
2125
    # note: There might already be a file at 'path'.
2126
    #       Make sure we do NOT overwrite/corrupt that!
2127
    temp_path = "%s.tmptest.%s" % (path, os.getpid())
4✔
2128
    echo = "fs r/w test"
4✔
2129
    try:
4✔
2130
        # test READ permissions for actual path
2131
        if os.path.exists(path):
4✔
2132
            with open(path, "rb") as f:
4✔
2133
                f.read(1)  # read 1 byte
4✔
2134
        # test R/W sanity for "similar" path
2135
        with open(temp_path, "w", encoding='utf-8') as f:
4✔
2136
            f.write(echo)
4✔
2137
        with open(temp_path, "r", encoding='utf-8') as f:
4✔
2138
            echo2 = f.read()
4✔
2139
        os.remove(temp_path)
4✔
2140
    except Exception as e:
×
2141
        raise IOError(e) from e
×
2142
    if echo != echo2:
4✔
2143
        raise IOError('echo sanity-check failed')
×
2144

2145

2146
class classproperty(property):
4✔
2147
    """~read-only class-level @property
2148
    from https://stackoverflow.com/a/13624858 by denis-ryzhkov
2149
    """
2150
    def __get__(self, owner_self, owner_cls):
4✔
2151
        return self.fget(owner_cls)
4✔
2152

2153

2154
def get_running_loop() -> Optional[asyncio.AbstractEventLoop]:
4✔
2155
    """Returns the asyncio event loop that is *running in this thread*, if any."""
2156
    try:
4✔
2157
        return asyncio.get_running_loop()
4✔
2158
    except RuntimeError:
×
2159
        return None
×
2160

2161

2162
def error_text_str_to_safe_str(err: str, *, max_len: Optional[int] = 500) -> str:
4✔
2163
    """Converts an untrusted error string to a sane printable ascii str.
2164
    Never raises.
2165
    """
2166
    text = error_text_bytes_to_safe_str(
4✔
2167
        err.encode("ascii", errors='backslashreplace'),
2168
        max_len=None)
2169
    return truncate_text(text, max_len=max_len)
4✔
2170

2171

2172
def error_text_bytes_to_safe_str(err: bytes, *, max_len: Optional[int] = 500) -> str:
4✔
2173
    """Converts an untrusted error bytes text to a sane printable ascii str.
2174
    Never raises.
2175

2176
    Note that naive ascii conversion would be insufficient. Fun stuff:
2177
    >>> b = b"my_long_prefix_blabla" + 21 * b"\x08" + b"malicious_stuff"
2178
    >>> s = b.decode("ascii")
2179
    >>> print(s)
2180
    malicious_stuffblabla
2181
    """
2182
    # convert to ascii, to get rid of unicode stuff
2183
    ascii_text = err.decode("ascii", errors='backslashreplace')
4✔
2184
    # do repr to handle ascii special chars (especially when printing/logging the str)
2185
    text = repr(ascii_text)
4✔
2186
    return truncate_text(text, max_len=max_len)
4✔
2187

2188

2189
def truncate_text(text: str, *, max_len: Optional[int]) -> str:
4✔
2190
    if max_len is None or len(text) <= max_len:
4✔
2191
        return text
4✔
2192
    else:
2193
        return text[:max_len] + f"... (truncated. orig_len={len(text)})"
4✔
2194

2195

2196
def nostr_pow_worker(nonce, nostr_pubk, target_bits, hash_function, hash_len_bits, shutdown):
4✔
2197
    """Function to generate PoW for Nostr, to be spawned in a ProcessPoolExecutor."""
2198
    hash_preimage = b'electrum-' + nostr_pubk
×
2199
    while True:
×
2200
        # we cannot check is_set on each iteration as it has a lot of overhead, this way we can check
2201
        # it with low overhead (just the additional range counter)
2202
        for i in range(1000000):
×
2203
            digest = hash_function(hash_preimage + nonce.to_bytes(32, 'big')).digest()
×
2204
            if int.from_bytes(digest, 'big') < (1 << (hash_len_bits - target_bits)):
×
2205
                shutdown.set()
×
2206
                return hash, nonce
×
2207
            nonce += 1
×
2208
        if shutdown.is_set():
×
2209
            return None, None
×
2210

2211

2212
async def gen_nostr_ann_pow(nostr_pubk: bytes, target_bits: int) -> Tuple[int, int]:
4✔
2213
    """Generate a PoW for a Nostr announcement. The PoW is hash[b'electrum-'+pubk+nonce]"""
2214
    import multiprocessing  # not available on Android, so we import it here
×
2215
    hash_function = hashlib.sha256
×
2216
    hash_len_bits = 256
×
2217
    max_nonce: int = (1 << (32 * 8)) - 1  # 32-byte nonce
×
2218
    start_nonce = 0
×
2219

2220
    max_workers = max(multiprocessing.cpu_count() - 1, 1)  # use all but one CPU
×
2221
    manager = multiprocessing.Manager()
×
2222
    shutdown = manager.Event()
×
2223
    with ProcessPoolExecutor(max_workers=max_workers) as executor:
×
2224
        tasks = []
×
2225
        loop = asyncio.get_running_loop()
×
2226
        for task in range(0, max_workers):
×
2227
            task = loop.run_in_executor(
×
2228
                executor,
2229
                nostr_pow_worker,
2230
                start_nonce,
2231
                nostr_pubk,
2232
                target_bits,
2233
                hash_function,
2234
                hash_len_bits,
2235
                shutdown
2236
            )
2237
            tasks.append(task)
×
2238
            start_nonce += max_nonce // max_workers  # split the nonce range between the processes
×
2239
            if start_nonce > max_nonce:  # make sure we don't go over the max_nonce
×
2240
                start_nonce = random.randint(0, int(max_nonce * 0.75))
×
2241

2242
        done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
×
2243
        hash_res, nonce_res = done.pop().result()
×
2244
        executor.shutdown(wait=False, cancel_futures=True)
×
2245

2246
    return nonce_res, get_nostr_ann_pow_amount(nostr_pubk, nonce_res)
×
2247

2248

2249
def get_nostr_ann_pow_amount(nostr_pubk: bytes, nonce: Optional[int]) -> int:
4✔
2250
    """Return the amount of leading zero bits for a nostr announcement PoW."""
2251
    if not nonce:
×
2252
        return 0
×
2253
    hash_function = hashlib.sha256
×
2254
    hash_len_bits = 256
×
2255
    hash_preimage = b'electrum-' + nostr_pubk
×
2256

2257
    digest = hash_function(hash_preimage + nonce.to_bytes(32, 'big')).digest()
×
2258
    digest = int.from_bytes(digest, 'big')
×
2259
    return hash_len_bits - digest.bit_length()
×
2260

2261

2262
class OnchainHistoryItem(NamedTuple):
4✔
2263
    txid: str
4✔
2264
    amount_sat: int
4✔
2265
    fee_sat: int
4✔
2266
    balance_sat: int
4✔
2267
    tx_mined_status: TxMinedInfo
4✔
2268
    group_id: Optional[str]
4✔
2269
    label: str
4✔
2270
    monotonic_timestamp: int
4✔
2271
    group_id: Optional[str]
4✔
2272
    def to_dict(self):
4✔
2273
        return {
×
2274
            'txid': self.txid,
2275
            'amount_sat': self.amount_sat,
2276
            'fee_sat': self.fee_sat,
2277
            'height': self.tx_mined_status.height,
2278
            'confirmations': self.tx_mined_status.conf,
2279
            'timestamp': self.tx_mined_status.timestamp,
2280
            'monotonic_timestamp': self.monotonic_timestamp,
2281
            'incoming': True if self.amount_sat>0 else False,
2282
            'bc_value': Satoshis(self.amount_sat),
2283
            'bc_balance': Satoshis(self.balance_sat),
2284
            'date': timestamp_to_datetime(self.tx_mined_status.timestamp),
2285
            'txpos_in_block': self.tx_mined_status.txpos,
2286
            'wanted_height': self.tx_mined_status.wanted_height,
2287
            'label': self.label,
2288
            'group_id': self.group_id,
2289
        }
2290

2291
class LightningHistoryItem(NamedTuple):
4✔
2292
    payment_hash: str
4✔
2293
    preimage: str
4✔
2294
    amount_msat: int
4✔
2295
    fee_msat: Optional[int]
4✔
2296
    type: str
4✔
2297
    group_id: Optional[str]
4✔
2298
    timestamp: int
4✔
2299
    label: str
4✔
2300
    direction: Optional[int]
4✔
2301
    def to_dict(self):
4✔
2302
        return {
×
2303
            'type': self.type,
2304
            'label': self.label,
2305
            'timestamp': self.timestamp or 0,
2306
            'date': timestamp_to_datetime(self.timestamp),
2307
            'amount_msat': self.amount_msat,
2308
            'fee_msat': self.fee_msat,
2309
            'payment_hash': self.payment_hash,
2310
            'preimage': self.preimage,
2311
            'group_id': self.group_id,
2312
            'ln_value': Satoshis(Decimal(self.amount_msat) / 1000),
2313
            'direction': self.direction,
2314
        }
2315

2316

2317
@dataclass(kw_only=True, slots=True)
4✔
2318
class ChoiceItem:
4✔
2319
    key: Any
4✔
2320
    label: str  # user facing string
4✔
2321
    extra_data: Any = None
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc