• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 6687568068083712

21 Feb 2025 08:41AM UTC coverage: 60.73%. Remained the same
6687568068083712

push

CirrusCI

ecdsa
lnsweep: do not return SweepInfo with txin equal to None

0 of 15 new or added lines in 1 file covered. (0.0%)

3 existing lines in 2 files now uncovered.

20675 of 34044 relevant lines covered (60.73%)

3.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.54
/electrum/util.py
1
# Electrum - lightweight Bitcoin client
2
# Copyright (C) 2011 Thomas Voegtlin
3
#
4
# Permission is hereby granted, free of charge, to any person
5
# obtaining a copy of this software and associated documentation files
6
# (the "Software"), to deal in the Software without restriction,
7
# including without limitation the rights to use, copy, modify, merge,
8
# publish, distribute, sublicense, and/or sell copies of the Software,
9
# and to permit persons to whom the Software is furnished to do so,
10
# subject to the following conditions:
11
#
12
# The above copyright notice and this permission notice shall be
13
# included in all copies or substantial portions of the Software.
14
#
15
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
16
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
17
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
18
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
19
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
20
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
21
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22
# SOFTWARE.
23
import binascii
5✔
24
import concurrent.futures
5✔
25
import logging
5✔
26
import os, sys, re, json
5✔
27
from collections import defaultdict, OrderedDict
5✔
28
from concurrent.futures.process import ProcessPoolExecutor
5✔
29
from typing import (NamedTuple, Union, TYPE_CHECKING, Tuple, Optional, Callable, Any,
5✔
30
                    Sequence, Dict, Generic, TypeVar, List, Iterable, Set, Awaitable)
31
from datetime import datetime, timezone
5✔
32
import decimal
5✔
33
from decimal import Decimal
5✔
34
import traceback
5✔
35
import urllib
5✔
36
import threading
5✔
37
import hmac
5✔
38
import hashlib
5✔
39
import stat
5✔
40
import locale
5✔
41
import asyncio
5✔
42
import urllib.request, urllib.parse, urllib.error
5✔
43
import builtins
5✔
44
import json
5✔
45
import time
5✔
46
from typing import NamedTuple, Optional
5✔
47
import ssl
5✔
48
import ipaddress
5✔
49
from ipaddress import IPv4Address, IPv6Address
5✔
50
import random
5✔
51
import secrets
5✔
52
import functools
5✔
53
from functools import partial
5✔
54
from abc import abstractmethod, ABC
5✔
55
import socket
5✔
56
import enum
5✔
57
from contextlib import nullcontext
5✔
58

59
import attr
5✔
60
import aiohttp
5✔
61
from aiohttp_socks import ProxyConnector, ProxyType
5✔
62
import aiorpcx
5✔
63
import certifi
5✔
64
import dns.resolver
5✔
65

66
from .i18n import _
5✔
67
from .logging import get_logger, Logger
5✔
68

69
if TYPE_CHECKING:
5✔
70
    from .network import Network
×
71
    from .interface import Interface
×
72
    from .simple_config import SimpleConfig
×
73
    from .paymentrequest import PaymentRequest
×
74

75

76
_logger = get_logger(__name__)
5✔
77

78

79
def inv_dict(d):
5✔
80
    return {v: k for k, v in d.items()}
5✔
81

82

83
def all_subclasses(cls) -> Set:
5✔
84
    """Return all (transitive) subclasses of cls."""
85
    res = set(cls.__subclasses__())
5✔
86
    for sub in res.copy():
5✔
87
        res |= all_subclasses(sub)
5✔
88
    return res
5✔
89

90

91
ca_path = certifi.where()
5✔
92

93

94
base_units = {'BTC':8, 'mBTC':5, 'bits':2, 'sat':0}
5✔
95
base_units_inverse = inv_dict(base_units)
5✔
96
base_units_list = ['BTC', 'mBTC', 'bits', 'sat']  # list(dict) does not guarantee order
5✔
97

98
DECIMAL_POINT_DEFAULT = 5  # mBTC
5✔
99

100

101
class UnknownBaseUnit(Exception): pass
5✔
102

103

104
def decimal_point_to_base_unit_name(dp: int) -> str:
5✔
105
    # e.g. 8 -> "BTC"
106
    try:
5✔
107
        return base_units_inverse[dp]
5✔
108
    except KeyError:
×
109
        raise UnknownBaseUnit(dp) from None
×
110

111

112
def base_unit_name_to_decimal_point(unit_name: str) -> int:
5✔
113
    """Returns the max number of digits allowed after the decimal point."""
114
    # e.g. "BTC" -> 8
115
    try:
×
116
        return base_units[unit_name]
×
117
    except KeyError:
×
118
        raise UnknownBaseUnit(unit_name) from None
×
119

120
def parse_max_spend(amt: Any) -> Optional[int]:
5✔
121
    """Checks if given amount is "spend-max"-like.
122
    Returns None or the positive integer weight for "max". Never raises.
123

124
    When creating invoices and on-chain txs, the user can specify to send "max".
125
    This is done by setting the amount to '!'. Splitting max between multiple
126
    tx outputs is also possible, and custom weights (positive ints) can also be used.
127
    For example, to send 40% of all coins to address1, and 60% to address2:
128
    ```
129
    address1, 2!
130
    address2, 3!
131
    ```
132
    """
133
    if not (isinstance(amt, str) and amt and amt[-1] == '!'):
5✔
134
        return None
5✔
135
    if amt == '!':
5✔
136
        return 1
5✔
137
    x = amt[:-1]
5✔
138
    try:
5✔
139
        x = int(x)
5✔
140
    except ValueError:
×
141
        return None
×
142
    if x > 0:
5✔
143
        return x
5✔
144
    return None
×
145

146
class NotEnoughFunds(Exception):
5✔
147
    def __str__(self):
5✔
148
        return _("Insufficient funds")
×
149

150

151
class UneconomicFee(Exception):
5✔
152
    def __str__(self):
5✔
153
        return _("The fee for the transaction is higher than the funds gained from it.")
×
154

155

156
class NoDynamicFeeEstimates(Exception):
5✔
157
    def __str__(self):
5✔
158
        return _('Dynamic fee estimates not available')
×
159

160

161
class BelowDustLimit(Exception):
5✔
162
    pass
5✔
163

164

165
class InvalidPassword(Exception):
5✔
166
    def __init__(self, message: Optional[str] = None):
5✔
167
        self.message = message
5✔
168

169
    def __str__(self):
5✔
170
        if self.message is None:
×
171
            return _("Incorrect password")
×
172
        else:
173
            return str(self.message)
×
174

175

176
class AddTransactionException(Exception):
5✔
177
    pass
5✔
178

179

180
class UnrelatedTransactionException(AddTransactionException):
5✔
181
    def __str__(self):
5✔
182
        return _("Transaction is unrelated to this wallet.")
×
183

184

185
class FileImportFailed(Exception):
5✔
186
    def __init__(self, message=''):
5✔
187
        self.message = str(message)
×
188

189
    def __str__(self):
5✔
190
        return _("Failed to import from file.") + "\n" + self.message
×
191

192

193
class FileExportFailed(Exception):
5✔
194
    def __init__(self, message=''):
5✔
195
        self.message = str(message)
×
196

197
    def __str__(self):
5✔
198
        return _("Failed to export to file.") + "\n" + self.message
×
199

200

201
class WalletFileException(Exception):
5✔
202
    def __init__(self, message='', *, should_report_crash: bool = False):
5✔
203
        Exception.__init__(self, message)
5✔
204
        self.should_report_crash = should_report_crash
5✔
205

206

207
class BitcoinException(Exception): pass
5✔
208

209

210
class UserFacingException(Exception):
5✔
211
    """Exception that contains information intended to be shown to the user."""
212

213

214
class InvoiceError(UserFacingException): pass
5✔
215

216

217
class NetworkOfflineException(UserFacingException):
5✔
218
    """Can be raised if we are running in offline mode (--offline flag)
219
    and the user requests an operation that requires the network.
220
    """
221
    def __str__(self):
5✔
222
        return _("You are offline.")
×
223

224

225
# Throw this exception to unwind the stack like when an error occurs.
226
# However unlike other exceptions the user won't be informed.
227
class UserCancelled(Exception):
5✔
228
    '''An exception that is suppressed from the user'''
229
    pass
5✔
230

231

232
def to_decimal(x: Union[str, float, int, Decimal]) -> Decimal:
5✔
233
    # helper function mainly for float->Decimal conversion, i.e.:
234
    #   >>> Decimal(41754.681)
235
    #   Decimal('41754.680999999996856786310672760009765625')
236
    #   >>> Decimal("41754.681")
237
    #   Decimal('41754.681')
238
    if isinstance(x, Decimal):
5✔
239
        return x
×
240
    return Decimal(str(x))
5✔
241

242

243
# note: this is not a NamedTuple as then its json encoding cannot be customized
244
class Satoshis(object):
5✔
245
    __slots__ = ('value',)
5✔
246

247
    def __new__(cls, value):
5✔
248
        self = super(Satoshis, cls).__new__(cls)
×
249
        # note: 'value' sometimes has msat precision
250
        assert isinstance(value, (int, Decimal)), f"unexpected type for {value=!r}"
×
251
        self.value = value
×
252
        return self
×
253

254
    def __repr__(self):
5✔
255
        return f'Satoshis({self.value})'
×
256

257
    def __str__(self):
5✔
258
        # note: precision is truncated to satoshis here
259
        return format_satoshis(self.value)
×
260

261
    def __eq__(self, other):
5✔
262
        return self.value == other.value
×
263

264
    def __ne__(self, other):
5✔
265
        return not (self == other)
×
266

267
    def __add__(self, other):
5✔
268
        return Satoshis(self.value + other.value)
×
269

270

271
# note: this is not a NamedTuple as then its json encoding cannot be customized
272
class Fiat(object):
5✔
273
    __slots__ = ('value', 'ccy')
5✔
274

275
    def __new__(cls, value: Optional[Decimal], ccy: str):
5✔
276
        self = super(Fiat, cls).__new__(cls)
×
277
        self.ccy = ccy
×
278
        if not isinstance(value, (Decimal, type(None))):
×
279
            raise TypeError(f"value should be Decimal or None, not {type(value)}")
×
280
        self.value = value
×
281
        return self
×
282

283
    def __repr__(self):
5✔
284
        return 'Fiat(%s)'% self.__str__()
×
285

286
    def __str__(self):
5✔
287
        if self.value is None or self.value.is_nan():
×
288
            return _('No Data')
×
289
        else:
290
            return "{:.2f}".format(self.value)
×
291

292
    def to_ui_string(self):
5✔
293
        if self.value is None or self.value.is_nan():
×
294
            return _('No Data')
×
295
        else:
296
            return "{:.2f}".format(self.value) + ' ' + self.ccy
×
297

298
    def __eq__(self, other):
5✔
299
        if not isinstance(other, Fiat):
×
300
            return False
×
301
        if self.ccy != other.ccy:
×
302
            return False
×
303
        if isinstance(self.value, Decimal) and isinstance(other.value, Decimal) \
×
304
                and self.value.is_nan() and other.value.is_nan():
305
            return True
×
306
        return self.value == other.value
×
307

308
    def __ne__(self, other):
5✔
309
        return not (self == other)
×
310

311
    def __add__(self, other):
5✔
312
        assert self.ccy == other.ccy
×
313
        return Fiat(self.value + other.value, self.ccy)
×
314

315

316
class MyEncoder(json.JSONEncoder):
5✔
317
    def default(self, obj):
5✔
318
        # note: this does not get called for namedtuples :(  https://bugs.python.org/issue30343
319
        from .transaction import Transaction, TxOutput
5✔
320
        if isinstance(obj, Transaction):
5✔
321
            return obj.serialize()
5✔
322
        if isinstance(obj, TxOutput):
5✔
323
            return obj.to_legacy_tuple()
5✔
324
        if isinstance(obj, Satoshis):
5✔
325
            return str(obj)
×
326
        if isinstance(obj, Fiat):
5✔
327
            return str(obj)
×
328
        if isinstance(obj, Decimal):
5✔
329
            return str(obj)
×
330
        if isinstance(obj, datetime):
5✔
331
            return obj.isoformat(' ')[:-3]
×
332
        if isinstance(obj, set):
5✔
333
            return list(obj)
×
334
        if isinstance(obj, bytes): # for nametuples in lnchannel
5✔
335
            return obj.hex()
5✔
336
        if hasattr(obj, 'to_json') and callable(obj.to_json):
5✔
337
            return obj.to_json()
5✔
338
        return super(MyEncoder, self).default(obj)
×
339

340

341
class ThreadJob(Logger):
5✔
342
    """A job that is run periodically from a thread's main loop.  run() is
343
    called from that thread's context.
344
    """
345

346
    def __init__(self):
5✔
347
        Logger.__init__(self)
5✔
348

349
    def run(self):
5✔
350
        """Called periodically from the thread"""
351
        pass
×
352

353
class DebugMem(ThreadJob):
5✔
354
    '''A handy class for debugging GC memory leaks'''
355
    def __init__(self, classes, interval=30):
5✔
356
        ThreadJob.__init__(self)
×
357
        self.next_time = 0
×
358
        self.classes = classes
×
359
        self.interval = interval
×
360

361
    def mem_stats(self):
5✔
362
        import gc
×
363
        self.logger.info("Start memscan")
×
364
        gc.collect()
×
365
        objmap = defaultdict(list)
×
366
        for obj in gc.get_objects():
×
367
            for class_ in self.classes:
×
368
                if isinstance(obj, class_):
×
369
                    objmap[class_].append(obj)
×
370
        for class_, objs in objmap.items():
×
371
            self.logger.info(f"{class_.__name__}: {len(objs)}")
×
372
        self.logger.info("Finish memscan")
×
373

374
    def run(self):
5✔
375
        if time.time() > self.next_time:
×
376
            self.mem_stats()
×
377
            self.next_time = time.time() + self.interval
×
378

379
class DaemonThread(threading.Thread, Logger):
5✔
380
    """ daemon thread that terminates cleanly """
381

382
    LOGGING_SHORTCUT = 'd'
5✔
383

384
    def __init__(self):
5✔
385
        threading.Thread.__init__(self)
5✔
386
        Logger.__init__(self)
5✔
387
        self.parent_thread = threading.current_thread()
5✔
388
        self.running = False
5✔
389
        self.running_lock = threading.Lock()
5✔
390
        self.job_lock = threading.Lock()
5✔
391
        self.jobs = []
5✔
392
        self.stopped_event = threading.Event()        # set when fully stopped
5✔
393
        self.stopped_event_async = asyncio.Event()    # set when fully stopped
5✔
394
        self.wake_up_event = threading.Event()  # for perf optimisation of polling in run()
5✔
395

396
    def add_jobs(self, jobs):
5✔
397
        with self.job_lock:
5✔
398
            self.jobs.extend(jobs)
5✔
399

400
    def run_jobs(self):
5✔
401
        # Don't let a throwing job disrupt the thread, future runs of
402
        # itself, or other jobs.  This is useful protection against
403
        # malformed or malicious server responses
404
        with self.job_lock:
5✔
405
            for job in self.jobs:
5✔
406
                try:
5✔
407
                    job.run()
5✔
408
                except Exception as e:
×
409
                    self.logger.exception('')
×
410

411
    def remove_jobs(self, jobs):
5✔
412
        with self.job_lock:
×
413
            for job in jobs:
×
414
                self.jobs.remove(job)
×
415

416
    def start(self):
5✔
417
        with self.running_lock:
5✔
418
            self.running = True
5✔
419
        return threading.Thread.start(self)
5✔
420

421
    def is_running(self):
5✔
422
        with self.running_lock:
5✔
423
            return self.running and self.parent_thread.is_alive()
5✔
424

425
    def stop(self):
5✔
426
        with self.running_lock:
5✔
427
            self.running = False
5✔
428
            self.wake_up_event.set()
5✔
429
            self.wake_up_event.clear()
5✔
430

431
    def on_stop(self):
5✔
432
        if 'ANDROID_DATA' in os.environ:
5✔
433
            import jnius
×
434
            jnius.detach()
×
435
            self.logger.info("jnius detach")
×
436
        self.logger.info("stopped")
5✔
437
        self.stopped_event.set()
5✔
438
        loop = get_asyncio_loop()
5✔
439
        loop.call_soon_threadsafe(self.stopped_event_async.set)
5✔
440

441

442
def print_stderr(*args):
5✔
443
    args = [str(item) for item in args]
×
444
    sys.stderr.write(" ".join(args) + "\n")
×
445
    sys.stderr.flush()
×
446

447
def print_msg(*args):
5✔
448
    # Stringify args
449
    args = [str(item) for item in args]
×
450
    sys.stdout.write(" ".join(args) + "\n")
×
451
    sys.stdout.flush()
×
452

453
def json_encode(obj):
5✔
454
    try:
×
455
        s = json.dumps(obj, sort_keys = True, indent = 4, cls=MyEncoder)
×
456
    except TypeError:
×
457
        s = repr(obj)
×
458
    return s
×
459

460
def json_decode(x):
5✔
461
    try:
5✔
462
        return json.loads(x, parse_float=Decimal)
5✔
463
    except Exception:
5✔
464
        return x
5✔
465

466
def json_normalize(x):
5✔
467
    # note: The return value of commands, when going through the JSON-RPC interface,
468
    #       is json-encoded. The encoder used there cannot handle some types, e.g. electrum.util.Satoshis.
469
    # note: We should not simply do "json_encode(x)" here, as then later x would get doubly json-encoded.
470
    # see #5868
471
    return json_decode(json_encode(x))
×
472

473

474
# taken from Django Source Code
475
def constant_time_compare(val1, val2):
5✔
476
    """Return True if the two strings are equal, False otherwise."""
477
    return hmac.compare_digest(to_bytes(val1, 'utf8'), to_bytes(val2, 'utf8'))
×
478

479

480
_profiler_logger = _logger.getChild('profiler')
5✔
481
def profiler(func=None, *, min_threshold: Union[int, float, None] = None):
5✔
482
    """Function decorator that logs execution time.
483

484
    min_threshold: if set, only log if time taken is higher than threshold
485
    NOTE: does not work with async methods.
486
    """
487
    if func is None:  # to make "@profiler(...)" work. (in addition to bare "@profiler")
5✔
488
        return partial(profiler, min_threshold=min_threshold)
5✔
489
    def do_profile(*args, **kw_args):
5✔
490
        name = func.__qualname__
5✔
491
        t0 = time.time()
5✔
492
        o = func(*args, **kw_args)
5✔
493
        t = time.time() - t0
5✔
494
        if min_threshold is None or t > min_threshold:
5✔
495
            _profiler_logger.debug(f"{name} {t:,.4f} sec")
5✔
496
        return o
5✔
497
    return do_profile
5✔
498

499

500
class AsyncHangDetector:
5✔
501
    """Context manager that logs every `n` seconds if encapsulated context still has not exited."""
502

503
    def __init__(
5✔
504
        self,
505
        *,
506
        period_sec: int = 15,
507
        message: str,
508
        logger: logging.Logger = None,
509
    ):
510
        self.period_sec = period_sec
5✔
511
        self.message = message
5✔
512
        self.logger = logger or _logger
5✔
513

514
    async def _monitor(self):
5✔
515
        # note: this assumes that the event loop itself is not blocked
516
        t0 = time.monotonic()
5✔
517
        while True:
5✔
518
            await asyncio.sleep(self.period_sec)
5✔
519
            t1 = time.monotonic()
×
520
            self.logger.info(f"{self.message} (after {t1 - t0:.2f} sec)")
×
521

522
    async def __aenter__(self):
5✔
523
        self.mtask = asyncio.create_task(self._monitor())
5✔
524

525
    async def __aexit__(self, exc_type, exc, tb):
5✔
526
        self.mtask.cancel()
5✔
527

528

529
def android_ext_dir():
5✔
530
    from android.storage import primary_external_storage_path
×
531
    return primary_external_storage_path()
×
532

533
def android_backup_dir():
5✔
534
    pkgname = get_android_package_name()
×
535
    d = os.path.join(android_ext_dir(), pkgname)
×
536
    if not os.path.exists(d):
×
537
        os.mkdir(d)
×
538
    return d
×
539

540
def android_data_dir():
5✔
541
    import jnius
×
542
    PythonActivity = jnius.autoclass('org.kivy.android.PythonActivity')
×
543
    return PythonActivity.mActivity.getFilesDir().getPath() + '/data'
×
544

545
def ensure_sparse_file(filename):
5✔
546
    # On modern Linux, no need to do anything.
547
    # On Windows, need to explicitly mark file.
548
    if os.name == "nt":
×
549
        try:
×
550
            os.system('fsutil sparse setflag "{}" 1'.format(filename))
×
551
        except Exception as e:
×
552
            _logger.info(f'error marking file {filename} as sparse: {e}')
×
553

554

555
def get_headers_dir(config):
5✔
556
    return config.path
5✔
557

558

559
def assert_datadir_available(config_path):
5✔
560
    path = config_path
5✔
561
    if os.path.exists(path):
5✔
562
        return
5✔
563
    else:
564
        raise FileNotFoundError(
×
565
            'Electrum datadir does not exist. Was it deleted while running?' + '\n' +
566
            'Should be at {}'.format(path))
567

568

569
def assert_file_in_datadir_available(path, config_path):
5✔
570
    if os.path.exists(path):
×
571
        return
×
572
    else:
573
        assert_datadir_available(config_path)
×
574
        raise FileNotFoundError(
×
575
            'Cannot find file but datadir is there.' + '\n' +
576
            'Should be at {}'.format(path))
577

578

579
def standardize_path(path):
5✔
580
    # note: os.path.realpath() is not used, as on Windows it can return non-working paths (see #8495).
581
    #       This means that we don't resolve symlinks!
582
    return os.path.normcase(
5✔
583
                os.path.abspath(
584
                    os.path.expanduser(
585
                        path
586
    )))
587

588

589
def get_new_wallet_name(wallet_folder: str) -> str:
5✔
590
    """Returns a file basename for a new wallet to be used.
591
    Can raise OSError.
592
    """
593
    i = 1
5✔
594
    while True:
5✔
595
        filename = "wallet_%d" % i
5✔
596
        if filename in os.listdir(wallet_folder):
5✔
597
            i += 1
5✔
598
        else:
599
            break
5✔
600
    return filename
5✔
601

602

603
def is_android_debug_apk() -> bool:
5✔
604
    is_android = 'ANDROID_DATA' in os.environ
×
605
    if not is_android:
×
606
        return False
×
607
    from jnius import autoclass
×
608
    pkgname = get_android_package_name()
×
609
    build_config = autoclass(f"{pkgname}.BuildConfig")
×
610
    return bool(build_config.DEBUG)
×
611

612

613
def get_android_package_name() -> str:
5✔
614
    is_android = 'ANDROID_DATA' in os.environ
×
615
    assert is_android
×
616
    from jnius import autoclass
×
617
    from android.config import ACTIVITY_CLASS_NAME
×
618
    activity = autoclass(ACTIVITY_CLASS_NAME).mActivity
×
619
    pkgname = str(activity.getPackageName())
×
620
    return pkgname
×
621

622

623
def assert_bytes(*args):
5✔
624
    """
625
    porting helper, assert args type
626
    """
627
    try:
5✔
628
        for x in args:
5✔
629
            assert isinstance(x, (bytes, bytearray))
5✔
630
    except Exception:
×
631
        print('assert bytes failed', list(map(type, args)))
×
632
        raise
×
633

634

635
def assert_str(*args):
5✔
636
    """
637
    porting helper, assert args type
638
    """
639
    for x in args:
×
640
        assert isinstance(x, str)
×
641

642

643
def to_string(x, enc) -> str:
5✔
644
    if isinstance(x, (bytes, bytearray)):
5✔
645
        return x.decode(enc)
5✔
646
    if isinstance(x, str):
×
647
        return x
×
648
    else:
649
        raise TypeError("Not a string or bytes like object")
×
650

651

652
def to_bytes(something, encoding='utf8') -> bytes:
5✔
653
    """
654
    cast string to bytes() like object, but for python2 support it's bytearray copy
655
    """
656
    if isinstance(something, bytes):
5✔
657
        return something
5✔
658
    if isinstance(something, str):
5✔
659
        return something.encode(encoding)
5✔
660
    elif isinstance(something, bytearray):
5✔
661
        return bytes(something)
5✔
662
    else:
663
        raise TypeError("Not a string or bytes like object")
5✔
664

665

666
bfh = bytes.fromhex
5✔
667

668

669
def xor_bytes(a: bytes, b: bytes) -> bytes:
5✔
670
    size = min(len(a), len(b))
5✔
671
    return ((int.from_bytes(a[:size], "big") ^ int.from_bytes(b[:size], "big"))
5✔
672
            .to_bytes(size, "big"))
673

674

675
def user_dir():
5✔
676
    if "ELECTRUMDIR" in os.environ:
5✔
677
        return os.environ["ELECTRUMDIR"]
×
678
    elif 'ANDROID_DATA' in os.environ:
5✔
679
        return android_data_dir()
×
680
    elif os.name == 'posix':
5✔
681
        return os.path.join(os.environ["HOME"], ".electrum")
5✔
682
    elif "APPDATA" in os.environ:
×
683
        return os.path.join(os.environ["APPDATA"], "Electrum")
×
684
    elif "LOCALAPPDATA" in os.environ:
×
685
        return os.path.join(os.environ["LOCALAPPDATA"], "Electrum")
×
686
    else:
687
        #raise Exception("No home directory found in environment variables.")
688
        return
×
689

690

691
def resource_path(*parts):
5✔
692
    return os.path.join(pkg_dir, *parts)
5✔
693

694

695
# absolute path to python package folder of electrum ("lib")
696
pkg_dir = os.path.split(os.path.realpath(__file__))[0]
5✔
697

698

699
def is_valid_email(s):
5✔
700
    regexp = r"[^@]+@[^@]+\.[^@]+"
×
701
    return re.match(regexp, s) is not None
×
702

703

704
def is_hash256_str(text: Any) -> bool:
5✔
705
    if not isinstance(text, str): return False
5✔
706
    if len(text) != 64: return False
5✔
707
    return is_hex_str(text)
5✔
708

709

710
def is_hex_str(text: Any) -> bool:
5✔
711
    if not isinstance(text, str): return False
5✔
712
    try:
5✔
713
        b = bytes.fromhex(text)
5✔
714
    except Exception:
5✔
715
        return False
5✔
716
    # forbid whitespaces in text:
717
    if len(text) != 2 * len(b):
5✔
718
        return False
5✔
719
    return True
5✔
720

721

722
def is_integer(val: Any) -> bool:
5✔
723
    return isinstance(val, int)
5✔
724

725

726
def is_non_negative_integer(val: Any) -> bool:
5✔
727
    if is_integer(val):
5✔
728
        return val >= 0
5✔
729
    return False
5✔
730

731

732
def is_int_or_float(val: Any) -> bool:
5✔
733
    return isinstance(val, (int, float))
5✔
734

735

736
def is_non_negative_int_or_float(val: Any) -> bool:
5✔
737
    if is_int_or_float(val):
5✔
738
        return val >= 0
5✔
739
    return False
5✔
740

741

742
def chunks(items, size: int):
5✔
743
    """Break up items, an iterable, into chunks of length size."""
744
    if size < 1:
5✔
745
        raise ValueError(f"size must be positive, not {repr(size)}")
5✔
746
    for i in range(0, len(items), size):
5✔
747
        yield items[i: i + size]
5✔
748

749

750
def format_satoshis_plain(
5✔
751
        x: Union[int, float, Decimal, str],  # amount in satoshis,
752
        *,
753
        decimal_point: int = 8,  # how much to shift decimal point to left (default: sat->BTC)
754
) -> str:
755
    """Display a satoshi amount scaled.  Always uses a '.' as a decimal
756
    point and has no thousands separator"""
757
    if parse_max_spend(x):
5✔
758
        return f'max({x})'
×
759
    assert isinstance(x, (int, float, Decimal)), f"{x!r} should be a number"
5✔
760
    scale_factor = pow(10, decimal_point)
5✔
761
    return "{:.8f}".format(Decimal(x) / scale_factor).rstrip('0').rstrip('.')
5✔
762

763

764
# Check that Decimal precision is sufficient.
765
# We need at the very least ~20, as we deal with msat amounts, and
766
# log10(21_000_000 * 10**8 * 1000) ~= 18.3
767
# decimal.DefaultContext.prec == 28 by default, but it is mutable.
768
# We enforce that we have at least that available.
769
assert decimal.getcontext().prec >= 28, f"PyDecimal precision too low: {decimal.getcontext().prec}"
5✔
770

771
# DECIMAL_POINT = locale.localeconv()['decimal_point']  # type: str
772
DECIMAL_POINT = "."
5✔
773
THOUSANDS_SEP = " "
5✔
774
assert len(DECIMAL_POINT) == 1, f"DECIMAL_POINT has unexpected len. {DECIMAL_POINT!r}"
5✔
775
assert len(THOUSANDS_SEP) == 1, f"THOUSANDS_SEP has unexpected len. {THOUSANDS_SEP!r}"
5✔
776

777

778
def format_satoshis(
5✔
779
        x: Union[int, float, Decimal, str, None],  # amount in satoshis
780
        *,
781
        num_zeros: int = 0,
782
        decimal_point: int = 8,  # how much to shift decimal point to left (default: sat->BTC)
783
        precision: int = 0,  # extra digits after satoshi precision
784
        is_diff: bool = False,  # if True, enforce a leading sign (+/-)
785
        whitespaces: bool = False,  # if True, add whitespaces, to align numbers in a column
786
        add_thousands_sep: bool = False,  # if True, add whitespaces, for better readability of the numbers
787
) -> str:
788
    if x is None:
5✔
789
        return 'unknown'
×
790
    if parse_max_spend(x):
5✔
791
        return f'max({x})'
×
792
    assert isinstance(x, (int, float, Decimal)), f"{x!r} should be a number"
5✔
793
    # lose redundant precision
794
    x = Decimal(x).quantize(Decimal(10) ** (-precision))
5✔
795
    # format string
796
    overall_precision = decimal_point + precision  # max digits after final decimal point
5✔
797
    decimal_format = "." + str(overall_precision) if overall_precision > 0 else ""
5✔
798
    if is_diff:
5✔
799
        decimal_format = '+' + decimal_format
5✔
800
    # initial result
801
    scale_factor = pow(10, decimal_point)
5✔
802
    result = ("{:" + decimal_format + "f}").format(x / scale_factor)
5✔
803
    if "." not in result: result += "."
5✔
804
    result = result.rstrip('0')
5✔
805
    # add extra decimal places (zeros)
806
    integer_part, fract_part = result.split(".")
5✔
807
    if len(fract_part) < num_zeros:
5✔
808
        fract_part += "0" * (num_zeros - len(fract_part))
5✔
809
    # add whitespaces as thousands' separator for better readability of numbers
810
    if add_thousands_sep:
5✔
811
        sign = integer_part[0] if integer_part[0] in ("+", "-") else ""
5✔
812
        if sign == "-":
5✔
813
            integer_part = integer_part[1:]
5✔
814
        integer_part = "{:,}".format(int(integer_part)).replace(',', THOUSANDS_SEP)
5✔
815
        integer_part = sign + integer_part
5✔
816
        fract_part = THOUSANDS_SEP.join(fract_part[i:i+3] for i in range(0, len(fract_part), 3))
5✔
817
    result = integer_part + DECIMAL_POINT + fract_part
5✔
818
    # add leading/trailing whitespaces so that numbers can be aligned in a column
819
    if whitespaces:
5✔
820
        target_fract_len = overall_precision
5✔
821
        target_integer_len = 14 - decimal_point  # should be enough for up to unsigned 999999 BTC
5✔
822
        if add_thousands_sep:
5✔
823
            target_fract_len += max(0, (target_fract_len - 1) // 3)
5✔
824
            target_integer_len += max(0, (target_integer_len - 1) // 3)
5✔
825
        # add trailing whitespaces
826
        result += " " * (target_fract_len - len(fract_part))
5✔
827
        # add leading whitespaces
828
        target_total_len = target_integer_len + 1 + target_fract_len
5✔
829
        result = " " * (target_total_len - len(result)) + result
5✔
830
    return result
5✔
831

832

833
FEERATE_PRECISION = 1  # num fractional decimal places for sat/byte fee rates
5✔
834
_feerate_quanta = Decimal(10) ** (-FEERATE_PRECISION)
5✔
835
UI_UNIT_NAME_FEERATE_SAT_PER_VBYTE = "sat/vbyte"
5✔
836
UI_UNIT_NAME_FEERATE_SAT_PER_VB = "sat/vB"
5✔
837
UI_UNIT_NAME_TXSIZE_VBYTES = "vbytes"
5✔
838
UI_UNIT_NAME_MEMPOOL_MB = "vMB"
5✔
839

840

841
def format_fee_satoshis(fee, *, num_zeros=0, precision=None):
5✔
842
    if precision is None:
5✔
843
        precision = FEERATE_PRECISION
5✔
844
    num_zeros = min(num_zeros, FEERATE_PRECISION)  # no more zeroes than available prec
5✔
845
    return format_satoshis(fee, num_zeros=num_zeros, decimal_point=0, precision=precision)
5✔
846

847

848
def quantize_feerate(fee) -> Union[None, Decimal, int]:
5✔
849
    """Strip sat/byte fee rate of excess precision."""
850
    if fee is None:
5✔
851
        return None
×
852
    return Decimal(fee).quantize(_feerate_quanta, rounding=decimal.ROUND_HALF_DOWN)
5✔
853

854

855
def timestamp_to_datetime(timestamp: Union[int, float, None], *, utc: bool = False) -> Optional[datetime]:
5✔
856
    if timestamp is None:
5✔
857
        return None
×
858
    tz = None
5✔
859
    if utc:
5✔
860
        tz = timezone.utc
×
861
    return datetime.fromtimestamp(timestamp, tz=tz)
5✔
862

863

864
def format_time(timestamp: Union[int, float, None]) -> str:
5✔
865
    date = timestamp_to_datetime(timestamp)
×
866
    return date.isoformat(' ', timespec="minutes") if date else _("Unknown")
×
867

868

869
def age(
5✔
870
    from_date: Union[int, float, None],  # POSIX timestamp
871
    *,
872
    since_date: datetime = None,
873
    target_tz=None,
874
    include_seconds: bool = False,
875
) -> str:
876
    """Takes a timestamp and returns a string with the approximation of the age"""
877
    if from_date is None:
5✔
878
        return _("Unknown")
5✔
879

880
    from_date = datetime.fromtimestamp(from_date)
5✔
881
    if since_date is None:
5✔
882
        since_date = datetime.now(target_tz)
×
883

884
    distance_in_time = from_date - since_date
5✔
885
    is_in_past = from_date < since_date
5✔
886
    distance_in_seconds = int(round(abs(distance_in_time.days * 86400 + distance_in_time.seconds)))
5✔
887
    distance_in_minutes = int(round(distance_in_seconds / 60))
5✔
888

889
    if distance_in_minutes == 0:
5✔
890
        if include_seconds:
5✔
891
            if is_in_past:
5✔
892
                return _("{} seconds ago").format(distance_in_seconds)
5✔
893
            else:
894
                return _("in {} seconds").format(distance_in_seconds)
5✔
895
        else:
896
            if is_in_past:
5✔
897
                return _("less than a minute ago")
5✔
898
            else:
899
                return _("in less than a minute")
5✔
900
    elif distance_in_minutes < 45:
5✔
901
        if is_in_past:
5✔
902
            return _("about {} minutes ago").format(distance_in_minutes)
5✔
903
        else:
904
            return _("in about {} minutes").format(distance_in_minutes)
5✔
905
    elif distance_in_minutes < 90:
5✔
906
        if is_in_past:
5✔
907
            return _("about 1 hour ago")
5✔
908
        else:
909
            return _("in about 1 hour")
5✔
910
    elif distance_in_minutes < 1440:
5✔
911
        if is_in_past:
5✔
912
            return _("about {} hours ago").format(round(distance_in_minutes / 60.0))
5✔
913
        else:
914
            return _("in about {} hours").format(round(distance_in_minutes / 60.0))
5✔
915
    elif distance_in_minutes < 2880:
5✔
916
        if is_in_past:
5✔
917
            return _("about 1 day ago")
5✔
918
        else:
919
            return _("in about 1 day")
5✔
920
    elif distance_in_minutes < 43220:
5✔
921
        if is_in_past:
5✔
922
            return _("about {} days ago").format(round(distance_in_minutes / 1440))
5✔
923
        else:
924
            return _("in about {} days").format(round(distance_in_minutes / 1440))
5✔
925
    elif distance_in_minutes < 86400:
5✔
926
        if is_in_past:
5✔
927
            return _("about 1 month ago")
5✔
928
        else:
929
            return _("in about 1 month")
5✔
930
    elif distance_in_minutes < 525600:
5✔
931
        if is_in_past:
5✔
932
            return _("about {} months ago").format(round(distance_in_minutes / 43200))
5✔
933
        else:
934
            return _("in about {} months").format(round(distance_in_minutes / 43200))
5✔
935
    elif distance_in_minutes < 1051200:
5✔
936
        if is_in_past:
5✔
937
            return _("about 1 year ago")
5✔
938
        else:
939
            return _("in about 1 year")
5✔
940
    else:
941
        if is_in_past:
5✔
942
            return _("over {} years ago").format(round(distance_in_minutes / 525600))
5✔
943
        else:
944
            return _("in over {} years").format(round(distance_in_minutes / 525600))
5✔
945

946
mainnet_block_explorers = {
5✔
947
    '3xpl.com': ('https://3xpl.com/bitcoin/',
948
                        {'tx': 'transaction/', 'addr': 'address/'}),
949
    'Bitflyer.jp': ('https://chainflyer.bitflyer.jp/',
950
                        {'tx': 'Transaction/', 'addr': 'Address/'}),
951
    'Blockchain.info': ('https://blockchain.com/btc/',
952
                        {'tx': 'tx/', 'addr': 'address/'}),
953
    'Blockstream.info': ('https://blockstream.info/',
954
                        {'tx': 'tx/', 'addr': 'address/'}),
955
    'Bitaps.com': ('https://btc.bitaps.com/',
956
                        {'tx': '', 'addr': ''}),
957
    'BTC.com': ('https://btc.com/',
958
                        {'tx': '', 'addr': ''}),
959
    'Chain.so': ('https://www.chain.so/',
960
                        {'tx': 'tx/BTC/', 'addr': 'address/BTC/'}),
961
    'Insight.is': ('https://insight.bitpay.com/',
962
                        {'tx': 'tx/', 'addr': 'address/'}),
963
    'BlockCypher.com': ('https://live.blockcypher.com/btc/',
964
                        {'tx': 'tx/', 'addr': 'address/'}),
965
    'Blockchair.com': ('https://blockchair.com/bitcoin/',
966
                        {'tx': 'transaction/', 'addr': 'address/'}),
967
    'blockonomics.co': ('https://www.blockonomics.co/',
968
                        {'tx': 'api/tx?txid=', 'addr': '#/search?q='}),
969
    'mempool.space': ('https://mempool.space/',
970
                        {'tx': 'tx/', 'addr': 'address/'}),
971
    'mempool.emzy.de': ('https://mempool.emzy.de/',
972
                        {'tx': 'tx/', 'addr': 'address/'}),
973
    'OXT.me': ('https://oxt.me/',
974
                        {'tx': 'transaction/', 'addr': 'address/'}),
975
    'mynode.local': ('http://mynode.local:3002/',
976
                        {'tx': 'tx/', 'addr': 'address/'}),
977
    'system default': ('blockchain:/',
978
                        {'tx': 'tx/', 'addr': 'address/'}),
979
}
980

981
testnet_block_explorers = {
5✔
982
    'Bitaps.com': ('https://tbtc.bitaps.com/',
983
                       {'tx': '', 'addr': ''}),
984
    'BlockCypher.com': ('https://live.blockcypher.com/btc-testnet/',
985
                       {'tx': 'tx/', 'addr': 'address/'}),
986
    'Blockchain.info': ('https://www.blockchain.com/btc-testnet/',
987
                       {'tx': 'tx/', 'addr': 'address/'}),
988
    'Blockstream.info': ('https://blockstream.info/testnet/',
989
                        {'tx': 'tx/', 'addr': 'address/'}),
990
    'mempool.space': ('https://mempool.space/testnet/',
991
                        {'tx': 'tx/', 'addr': 'address/'}),
992
    'smartbit.com.au': ('https://testnet.smartbit.com.au/',
993
                       {'tx': 'tx/', 'addr': 'address/'}),
994
    'system default': ('blockchain://000000000933ea01ad0ee984209779baaec3ced90fa3f408719526f8d77f4943/',
995
                       {'tx': 'tx/', 'addr': 'address/'}),
996
}
997

998
testnet4_block_explorers = {
5✔
999
    'mempool.space': ('https://mempool.space/testnet4/',
1000
                        {'tx': 'tx/', 'addr': 'address/'}),
1001
    'wakiyamap.dev': ('https://testnet4-explorer.wakiyamap.dev/',
1002
                       {'tx': 'tx/', 'addr': 'address/'}),
1003
}
1004

1005
signet_block_explorers = {
5✔
1006
    'bc-2.jp': ('https://explorer.bc-2.jp/',
1007
                        {'tx': 'tx/', 'addr': 'address/'}),
1008
    'mempool.space': ('https://mempool.space/signet/',
1009
                        {'tx': 'tx/', 'addr': 'address/'}),
1010
    'bitcoinexplorer.org': ('https://signet.bitcoinexplorer.org/',
1011
                       {'tx': 'tx/', 'addr': 'address/'}),
1012
    'wakiyamap.dev': ('https://signet-explorer.wakiyamap.dev/',
1013
                       {'tx': 'tx/', 'addr': 'address/'}),
1014
    'ex.signet.bublina.eu.org': ('https://ex.signet.bublina.eu.org/',
1015
                       {'tx': 'tx/', 'addr': 'address/'}),
1016
    'system default': ('blockchain:/',
1017
                       {'tx': 'tx/', 'addr': 'address/'}),
1018
}
1019

1020
_block_explorer_default_api_loc = {'tx': 'tx/', 'addr': 'address/'}
5✔
1021

1022

1023
def block_explorer_info():
5✔
1024
    from . import constants
×
1025
    if constants.net.NET_NAME == "testnet":
×
1026
        return testnet_block_explorers
×
1027
    elif constants.net.NET_NAME == "testnet4":
×
1028
        return testnet4_block_explorers
×
1029
    elif constants.net.NET_NAME == "signet":
×
1030
        return signet_block_explorers
×
1031
    return mainnet_block_explorers
×
1032

1033

1034
def block_explorer(config: 'SimpleConfig') -> Optional[str]:
5✔
1035
    """Returns name of selected block explorer,
1036
    or None if a custom one (not among hardcoded ones) is configured.
1037
    """
1038
    if config.BLOCK_EXPLORER_CUSTOM is not None:
×
1039
        return None
×
1040
    be_key = config.BLOCK_EXPLORER
×
1041
    be_tuple = block_explorer_info().get(be_key)
×
1042
    if be_tuple is None:
×
1043
        be_key = config.cv.BLOCK_EXPLORER.get_default_value()
×
1044
    assert isinstance(be_key, str), f"{be_key!r} should be str"
×
1045
    return be_key
×
1046

1047

1048
def block_explorer_tuple(config: 'SimpleConfig') -> Optional[Tuple[str, dict]]:
5✔
1049
    custom_be = config.BLOCK_EXPLORER_CUSTOM
×
1050
    if custom_be:
×
1051
        if isinstance(custom_be, str):
×
1052
            return custom_be, _block_explorer_default_api_loc
×
1053
        if isinstance(custom_be, (tuple, list)) and len(custom_be) == 2:
×
1054
            return tuple(custom_be)
×
1055
        _logger.warning(f"not using {config.cv.BLOCK_EXPLORER_CUSTOM.key()!r} from config. "
×
1056
                        f"expected a str or a pair but got {custom_be!r}")
1057
        return None
×
1058
    else:
1059
        # using one of the hardcoded block explorers
1060
        return block_explorer_info().get(block_explorer(config))
×
1061

1062

1063
def block_explorer_URL(config: 'SimpleConfig', kind: str, item: str) -> Optional[str]:
5✔
1064
    be_tuple = block_explorer_tuple(config)
×
1065
    if not be_tuple:
×
1066
        return
×
1067
    explorer_url, explorer_dict = be_tuple
×
1068
    kind_str = explorer_dict.get(kind)
×
1069
    if kind_str is None:
×
1070
        return
×
1071
    if explorer_url[-1] != "/":
×
1072
        explorer_url += "/"
×
1073
    url_parts = [explorer_url, kind_str, item]
×
1074
    return ''.join(url_parts)
×
1075

1076

1077

1078

1079

1080
# Python bug (http://bugs.python.org/issue1927) causes raw_input
1081
# to be redirected improperly between stdin/stderr on Unix systems
1082
#TODO: py3
1083
def raw_input(prompt=None):
5✔
1084
    if prompt:
×
1085
        sys.stdout.write(prompt)
×
1086
    return builtin_raw_input()
×
1087

1088
builtin_raw_input = builtins.input
5✔
1089
builtins.input = raw_input
5✔
1090

1091

1092
def parse_json(message):
5✔
1093
    # TODO: check \r\n pattern
1094
    n = message.find(b'\n')
×
1095
    if n==-1:
×
1096
        return None, message
×
1097
    try:
×
1098
        j = json.loads(message[0:n].decode('utf8'))
×
1099
    except Exception:
×
1100
        j = None
×
1101
    return j, message[n+1:]
×
1102

1103

1104
def setup_thread_excepthook():
5✔
1105
    """
1106
    Workaround for `sys.excepthook` thread bug from:
1107
    http://bugs.python.org/issue1230540
1108

1109
    Call once from the main thread before creating any threads.
1110
    """
1111

1112
    init_original = threading.Thread.__init__
×
1113

1114
    def init(self, *args, **kwargs):
×
1115

1116
        init_original(self, *args, **kwargs)
×
1117
        run_original = self.run
×
1118

1119
        def run_with_except_hook(*args2, **kwargs2):
×
1120
            try:
×
1121
                run_original(*args2, **kwargs2)
×
1122
            except Exception:
×
1123
                sys.excepthook(*sys.exc_info())
×
1124

1125
        self.run = run_with_except_hook
×
1126

1127
    threading.Thread.__init__ = init
×
1128

1129

1130
def send_exception_to_crash_reporter(e: BaseException):
5✔
1131
    from .base_crash_reporter import send_exception_to_crash_reporter
×
1132
    send_exception_to_crash_reporter(e)
×
1133

1134

1135
def versiontuple(v):
5✔
1136
    return tuple(map(int, (v.split("."))))
5✔
1137

1138

1139
def read_json_file(path):
5✔
1140
    try:
5✔
1141
        with open(path, 'r', encoding='utf-8') as f:
5✔
1142
            data = json.loads(f.read())
5✔
1143
    except json.JSONDecodeError:
×
1144
        _logger.exception('')
×
1145
        raise FileImportFailed(_("Invalid JSON code."))
×
1146
    except BaseException as e:
×
1147
        _logger.exception('')
×
1148
        raise FileImportFailed(e)
×
1149
    return data
5✔
1150

1151

1152
def write_json_file(path, data):
5✔
1153
    try:
×
1154
        with open(path, 'w+', encoding='utf-8') as f:
×
1155
            json.dump(data, f, indent=4, sort_keys=True, cls=MyEncoder)
×
1156
    except (IOError, os.error) as e:
×
1157
        _logger.exception('')
×
1158
        raise FileExportFailed(e)
×
1159

1160

1161
def os_chmod(path, mode):
5✔
1162
    """os.chmod aware of tmpfs"""
1163
    try:
5✔
1164
        os.chmod(path, mode)
5✔
1165
    except OSError as e:
×
1166
        xdg_runtime_dir = os.environ.get("XDG_RUNTIME_DIR", None)
×
1167
        if xdg_runtime_dir and is_subpath(path, xdg_runtime_dir):
×
1168
            _logger.info(f"Tried to chmod in tmpfs. Skipping... {e!r}")
×
1169
        else:
1170
            raise
×
1171

1172

1173
def make_dir(path, allow_symlink=True):
5✔
1174
    """Make directory if it does not yet exist."""
1175
    if not os.path.exists(path):
5✔
1176
        if not allow_symlink and os.path.islink(path):
5✔
1177
            raise Exception('Dangling link: ' + path)
×
1178
        os.mkdir(path)
5✔
1179
        os_chmod(path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
5✔
1180

1181

1182
def is_subpath(long_path: str, short_path: str) -> bool:
5✔
1183
    """Returns whether long_path is a sub-path of short_path."""
1184
    try:
5✔
1185
        common = os.path.commonpath([long_path, short_path])
5✔
1186
    except ValueError:
5✔
1187
        return False
5✔
1188
    short_path = standardize_path(short_path)
5✔
1189
    common     = standardize_path(common)
5✔
1190
    return short_path == common
5✔
1191

1192

1193
def log_exceptions(func):
5✔
1194
    """Decorator to log AND re-raise exceptions."""
1195
    assert asyncio.iscoroutinefunction(func), 'func needs to be a coroutine'
5✔
1196
    @functools.wraps(func)
5✔
1197
    async def wrapper(*args, **kwargs):
5✔
1198
        self = args[0] if len(args) > 0 else None
5✔
1199
        try:
5✔
1200
            return await func(*args, **kwargs)
5✔
1201
        except asyncio.CancelledError as e:
5✔
1202
            raise
5✔
1203
        except BaseException as e:
5✔
1204
            mylogger = self.logger if hasattr(self, 'logger') else _logger
5✔
1205
            try:
5✔
1206
                mylogger.exception(f"Exception in {func.__name__}: {repr(e)}")
5✔
1207
            except BaseException as e2:
×
1208
                print(f"logging exception raised: {repr(e2)}... orig exc: {repr(e)} in {func.__name__}")
×
1209
            raise
5✔
1210
    return wrapper
5✔
1211

1212

1213
def ignore_exceptions(func):
5✔
1214
    """Decorator to silently swallow all exceptions."""
1215
    assert asyncio.iscoroutinefunction(func), 'func needs to be a coroutine'
5✔
1216
    @functools.wraps(func)
5✔
1217
    async def wrapper(*args, **kwargs):
5✔
1218
        try:
×
1219
            return await func(*args, **kwargs)
×
1220
        except Exception as e:
×
1221
            pass
×
1222
    return wrapper
5✔
1223

1224

1225
def with_lock(func):
5✔
1226
    """Decorator to enforce a lock on a function call."""
1227
    def func_wrapper(self, *args, **kwargs):
5✔
1228
        with self.lock:
5✔
1229
            return func(self, *args, **kwargs)
5✔
1230
    return func_wrapper
5✔
1231

1232

1233
class TxMinedInfo(NamedTuple):
5✔
1234
    height: int                        # height of block that mined tx
5✔
1235
    conf: Optional[int] = None         # number of confirmations, SPV verified. >=0, or None (None means unknown)
5✔
1236
    timestamp: Optional[int] = None    # timestamp of block that mined tx
5✔
1237
    txpos: Optional[int] = None        # position of tx in serialized block
5✔
1238
    header_hash: Optional[str] = None  # hash of block that mined tx
5✔
1239
    wanted_height: Optional[int] = None  # in case of timelock, min abs block height
5✔
1240

1241
    def short_id(self) -> Optional[str]:
5✔
1242
        if self.txpos is not None and self.txpos >= 0:
×
1243
            assert self.height > 0
×
1244
            return f"{self.height}x{self.txpos}"
×
1245
        return None
×
1246

1247
    def is_local_like(self) -> bool:
5✔
1248
        """Returns whether the tx is local-like (LOCAL/FUTURE)."""
1249
        from .address_synchronizer import TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT
×
1250
        if self.height > 0:
×
1251
            return False
×
1252
        if self.height in (TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT):
×
1253
            return False
×
1254
        return True
×
1255

1256

1257
class ShortID(bytes):
5✔
1258

1259
    def __repr__(self):
5✔
1260
        return f"<ShortID: {format_short_id(self)}>"
5✔
1261

1262
    def __str__(self):
5✔
1263
        return format_short_id(self)
5✔
1264

1265
    @classmethod
5✔
1266
    def from_components(cls, block_height: int, tx_pos_in_block: int, output_index: int) -> 'ShortID':
5✔
1267
        bh = block_height.to_bytes(3, byteorder='big')
5✔
1268
        tpos = tx_pos_in_block.to_bytes(3, byteorder='big')
5✔
1269
        oi = output_index.to_bytes(2, byteorder='big')
5✔
1270
        return ShortID(bh + tpos + oi)
5✔
1271

1272
    @classmethod
5✔
1273
    def from_str(cls, scid: str) -> 'ShortID':
5✔
1274
        """Parses a formatted scid str, e.g. '643920x356x0'."""
1275
        components = scid.split("x")
5✔
1276
        if len(components) != 3:
5✔
1277
            raise ValueError(f"failed to parse ShortID: {scid!r}")
×
1278
        try:
5✔
1279
            components = [int(x) for x in components]
5✔
1280
        except ValueError:
×
1281
            raise ValueError(f"failed to parse ShortID: {scid!r}") from None
×
1282
        return ShortID.from_components(*components)
5✔
1283

1284
    @classmethod
5✔
1285
    def normalize(cls, data: Union[None, str, bytes, 'ShortID']) -> Optional['ShortID']:
5✔
1286
        if isinstance(data, ShortID) or data is None:
5✔
1287
            return data
5✔
1288
        if isinstance(data, str):
5✔
1289
            assert len(data) == 16
5✔
1290
            return ShortID.fromhex(data)
5✔
1291
        if isinstance(data, (bytes, bytearray)):
5✔
1292
            assert len(data) == 8
5✔
1293
            return ShortID(data)
5✔
1294

1295
    @property
5✔
1296
    def block_height(self) -> int:
5✔
1297
        return int.from_bytes(self[:3], byteorder='big')
5✔
1298

1299
    @property
5✔
1300
    def txpos(self) -> int:
5✔
1301
        return int.from_bytes(self[3:6], byteorder='big')
5✔
1302

1303
    @property
5✔
1304
    def output_index(self) -> int:
5✔
1305
        return int.from_bytes(self[6:8], byteorder='big')
5✔
1306

1307

1308
def format_short_id(short_channel_id: Optional[bytes]):
5✔
1309
    if not short_channel_id:
5✔
1310
        return _('Not yet available')
×
1311
    return str(int.from_bytes(short_channel_id[:3], 'big')) \
5✔
1312
        + 'x' + str(int.from_bytes(short_channel_id[3:6], 'big')) \
1313
        + 'x' + str(int.from_bytes(short_channel_id[6:], 'big'))
1314

1315

1316
def make_aiohttp_session(proxy: Optional[dict], headers=None, timeout=None):
5✔
1317
    if headers is None:
×
1318
        headers = {'User-Agent': 'Electrum'}
×
1319
    if timeout is None:
×
1320
        # The default timeout is high intentionally.
1321
        # DNS on some systems can be really slow, see e.g. #5337
1322
        timeout = aiohttp.ClientTimeout(total=45)
×
1323
    elif isinstance(timeout, (int, float)):
×
1324
        timeout = aiohttp.ClientTimeout(total=timeout)
×
1325
    ssl_context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_path)
×
1326

1327
    if proxy:
×
1328
        connector = ProxyConnector(
×
1329
            proxy_type=ProxyType.SOCKS5 if proxy['mode'] == 'socks5' else ProxyType.SOCKS4,
1330
            host=proxy['host'],
1331
            port=int(proxy['port']),
1332
            username=proxy.get('user', None),
1333
            password=proxy.get('password', None),
1334
            rdns=True,  # needed to prevent DNS leaks over proxy
1335
            ssl=ssl_context,
1336
        )
1337
    else:
1338
        connector = aiohttp.TCPConnector(ssl=ssl_context)
×
1339

1340
    return aiohttp.ClientSession(headers=headers, timeout=timeout, connector=connector)
×
1341

1342

1343
class OldTaskGroup(aiorpcx.TaskGroup):
5✔
1344
    """Automatically raises exceptions on join; as in aiorpcx prior to version 0.20.
1345
    That is, when using TaskGroup as a context manager, if any task encounters an exception,
1346
    we would like that exception to be re-raised (propagated out). For the wait=all case,
1347
    the OldTaskGroup class is emulating the following code-snippet:
1348
    ```
1349
    async with TaskGroup() as group:
1350
        await group.spawn(task1())
1351
        await group.spawn(task2())
1352

1353
        async for task in group:
1354
            if not task.cancelled():
1355
                task.result()
1356
    ```
1357
    So instead of the above, one can just write:
1358
    ```
1359
    async with OldTaskGroup() as group:
1360
        await group.spawn(task1())
1361
        await group.spawn(task2())
1362
    ```
1363
    # TODO see if we can migrate to asyncio.timeout, introduced in python 3.11, and use stdlib instead of aiorpcx.curio...
1364
    """
1365
    async def join(self):
5✔
1366
        if self._wait is all:
5✔
1367
            exc = False
5✔
1368
            try:
5✔
1369
                async for task in self:
5✔
1370
                    if not task.cancelled():
5✔
1371
                        task.result()
5✔
1372
            except BaseException:  # including asyncio.CancelledError
5✔
1373
                exc = True
5✔
1374
                raise
5✔
1375
            finally:
1376
                if exc:
5✔
1377
                    await self.cancel_remaining()
5✔
1378
                await super().join()
5✔
1379
        else:
1380
            await super().join()
5✔
1381
            if self.completed:
5✔
1382
                self.completed.result()
5✔
1383

1384
# We monkey-patch aiorpcx TimeoutAfter (used by timeout_after and ignore_after API),
1385
# to fix a timing issue present in asyncio as a whole re timing out tasks.
1386
# To see the issue we are trying to fix, consider example:
1387
#     async def outer_task():
1388
#         async with timeout_after(0.1):
1389
#             await inner_task()
1390
# When the 0.1 sec timeout expires, inner_task will get cancelled by timeout_after (=internal cancellation).
1391
# If around the same time (in terms of event loop iterations) another coroutine
1392
# cancels outer_task (=external cancellation), there will be a race.
1393
# Both cancellations work by propagating a CancelledError out to timeout_after, which then
1394
# needs to decide (in TimeoutAfter.__aexit__) whether it's due to an internal or external cancellation.
1395
# AFAICT asyncio provides no reliable way of distinguishing between the two.
1396
# This patch tries to always give priority to external cancellations.
1397
# see https://github.com/kyuupichan/aiorpcX/issues/44
1398
# see https://github.com/aio-libs/async-timeout/issues/229
1399
# see https://bugs.python.org/issue42130 and https://bugs.python.org/issue45098
1400
# TODO see if we can migrate to asyncio.timeout, introduced in python 3.11, and use stdlib instead of aiorpcx.curio...
1401
def _aiorpcx_monkeypatched_set_new_deadline(task, deadline):
5✔
1402
    def timeout_task():
5✔
1403
        task._orig_cancel()
5✔
1404
        task._timed_out = None if getattr(task, "_externally_cancelled", False) else deadline
5✔
1405
    def mycancel(*args, **kwargs):
5✔
1406
        task._orig_cancel(*args, **kwargs)
5✔
1407
        task._externally_cancelled = True
5✔
1408
        task._timed_out = None
5✔
1409
    if not hasattr(task, "_orig_cancel"):
5✔
1410
        task._orig_cancel = task.cancel
5✔
1411
        task.cancel = mycancel
5✔
1412
    task._deadline_handle = task._loop.call_at(deadline, timeout_task)
5✔
1413

1414

1415
def _aiorpcx_monkeypatched_set_task_deadline(task, deadline):
5✔
1416
    ret = _aiorpcx_orig_set_task_deadline(task, deadline)
5✔
1417
    task._externally_cancelled = None
5✔
1418
    return ret
5✔
1419

1420

1421
def _aiorpcx_monkeypatched_unset_task_deadline(task):
5✔
1422
    if hasattr(task, "_orig_cancel"):
5✔
1423
        task.cancel = task._orig_cancel
5✔
1424
        del task._orig_cancel
5✔
1425
    return _aiorpcx_orig_unset_task_deadline(task)
5✔
1426

1427

1428
_aiorpcx_orig_set_task_deadline    = aiorpcx.curio._set_task_deadline
5✔
1429
_aiorpcx_orig_unset_task_deadline  = aiorpcx.curio._unset_task_deadline
5✔
1430

1431
aiorpcx.curio._set_new_deadline    = _aiorpcx_monkeypatched_set_new_deadline
5✔
1432
aiorpcx.curio._set_task_deadline   = _aiorpcx_monkeypatched_set_task_deadline
5✔
1433
aiorpcx.curio._unset_task_deadline = _aiorpcx_monkeypatched_unset_task_deadline
5✔
1434

1435

1436
async def wait_for2(fut: Awaitable, timeout: Union[int, float, None]):
5✔
1437
    """Replacement for asyncio.wait_for,
1438
     due to bugs: https://bugs.python.org/issue42130 and https://github.com/python/cpython/issues/86296 ,
1439
     which are only fixed in python 3.12+.
1440
     """
1441
    if sys.version_info[:3] >= (3, 12):
5✔
UNCOV
1442
        return await asyncio.wait_for(fut, timeout)
2✔
1443
    else:
1444
        async with async_timeout(timeout):
3✔
1445
            return await asyncio.ensure_future(fut, loop=get_running_loop())
3✔
1446

1447

1448
if hasattr(asyncio, 'timeout'):  # python 3.11+
5✔
1449
    async_timeout = asyncio.timeout
4✔
1450
else:
1451
    class TimeoutAfterAsynciolike(aiorpcx.curio.TimeoutAfter):
1✔
1452
        async def __aexit__(self, exc_type, exc_value, traceback):
1✔
1453
            try:
1✔
1454
                await super().__aexit__(exc_type, exc_value, traceback)
1✔
1455
            except (aiorpcx.TaskTimeout, aiorpcx.UncaughtTimeoutError):
×
1456
                raise asyncio.TimeoutError from None
×
1457
            except aiorpcx.TimeoutCancellationError:
×
1458
                raise asyncio.CancelledError from None
×
1459

1460
    def async_timeout(delay: Union[int, float, None]):
1✔
1461
        if delay is None:
1✔
1462
            return nullcontext()
×
1463
        return TimeoutAfterAsynciolike(delay)
1✔
1464

1465

1466
class NetworkJobOnDefaultServer(Logger, ABC):
5✔
1467
    """An abstract base class for a job that runs on the main network
1468
    interface. Every time the main interface changes, the job is
1469
    restarted, and some of its internals are reset.
1470
    """
1471
    def __init__(self, network: 'Network'):
5✔
1472
        Logger.__init__(self)
5✔
1473
        self.network = network
5✔
1474
        self.interface = None  # type: Interface
5✔
1475
        self._restart_lock = asyncio.Lock()
5✔
1476
        # Ensure fairness between NetworkJobs. e.g. if multiple wallets
1477
        # are open, a large wallet's Synchronizer should not starve the small wallets:
1478
        self._network_request_semaphore = asyncio.Semaphore(100)
5✔
1479

1480
        self._reset()
5✔
1481
        # every time the main interface changes, restart:
1482
        register_callback(self._restart, ['default_server_changed'])
5✔
1483
        # also schedule a one-off restart now, as there might already be a main interface:
1484
        asyncio.run_coroutine_threadsafe(self._restart(), network.asyncio_loop)
5✔
1485

1486
    def _reset(self):
5✔
1487
        """Initialise fields. Called every time the underlying
1488
        server connection changes.
1489
        """
1490
        self.taskgroup = OldTaskGroup()
5✔
1491
        self.reset_request_counters()
5✔
1492

1493
    async def _start(self, interface: 'Interface'):
5✔
1494
        self.logger.debug(f"starting. interface.server={repr(str(interface.server))}")
×
1495
        self.interface = interface
×
1496

1497
        taskgroup = self.taskgroup
×
1498
        async def run_tasks_wrapper():
×
1499
            self.logger.debug(f"starting taskgroup ({hex(id(taskgroup))}).")
×
1500
            try:
×
1501
                await self._run_tasks(taskgroup=taskgroup)
×
1502
            except Exception as e:
×
1503
                self.logger.error(f"taskgroup died ({hex(id(taskgroup))}). exc={e!r}")
×
1504
                raise
×
1505
            finally:
1506
                self.logger.debug(f"taskgroup stopped ({hex(id(taskgroup))}).")
×
1507
        await interface.taskgroup.spawn(run_tasks_wrapper)
×
1508

1509
    @abstractmethod
5✔
1510
    async def _run_tasks(self, *, taskgroup: OldTaskGroup) -> None:
5✔
1511
        """Start tasks in taskgroup. Called every time the underlying
1512
        server connection changes.
1513
        """
1514
        # If self.taskgroup changed, don't start tasks. This can happen if we have
1515
        # been restarted *just now*, i.e. after the _run_tasks coroutine object was created.
1516
        if taskgroup != self.taskgroup:
×
1517
            raise asyncio.CancelledError()
×
1518

1519
    async def stop(self, *, full_shutdown: bool = True):
5✔
1520
        self.logger.debug(f"stopping. {full_shutdown=}")
×
1521
        if full_shutdown:
×
1522
            unregister_callback(self._restart)
×
1523
        await self.taskgroup.cancel_remaining()
×
1524

1525
    @log_exceptions
5✔
1526
    async def _restart(self, *args):
5✔
1527
        interface = self.network.interface
5✔
1528
        if interface is None:
5✔
1529
            return  # we should get called again soon
5✔
1530

1531
        async with self._restart_lock:
×
1532
            await self.stop(full_shutdown=False)
×
1533
            self._reset()
×
1534
            await self._start(interface)
×
1535

1536
    def reset_request_counters(self):
5✔
1537
        self._requests_sent = 0
5✔
1538
        self._requests_answered = 0
5✔
1539

1540
    def num_requests_sent_and_answered(self) -> Tuple[int, int]:
5✔
1541
        return self._requests_sent, self._requests_answered
×
1542

1543
    @property
5✔
1544
    def session(self):
5✔
1545
        s = self.interface.session
×
1546
        assert s is not None
×
1547
        return s
×
1548

1549

1550
def detect_tor_socks_proxy() -> Optional[Tuple[str, int]]:
5✔
1551
    # Probable ports for Tor to listen at
1552
    candidates = [
×
1553
        ("127.0.0.1", 9050),
1554
        ("127.0.0.1", 9150),
1555
    ]
1556
    for net_addr in candidates:
×
1557
        if is_tor_socks_port(*net_addr):
×
1558
            return net_addr
×
1559
    return None
×
1560

1561

1562
def is_tor_socks_port(host: str, port: int) -> bool:
5✔
1563
    try:
×
1564
        with socket.create_connection((host, port), timeout=10) as s:
×
1565
            # mimic "tor-resolve 0.0.0.0".
1566
            # see https://github.com/spesmilo/electrum/issues/7317#issuecomment-1369281075
1567
            # > this is a socks5 handshake, followed by a socks RESOLVE request as defined in
1568
            # > [tor's socks extension spec](https://github.com/torproject/torspec/blob/7116c9cdaba248aae07a3f1d0e15d9dd102f62c5/socks-extensions.txt#L63),
1569
            # > resolving 0.0.0.0, which being an IP, tor resolves itself without needing to ask a relay.
1570
            s.send(b'\x05\x01\x00\x05\xf0\x00\x03\x070.0.0.0\x00\x00')
×
1571
            if s.recv(1024) == b'\x05\x00\x05\x00\x00\x01\x00\x00\x00\x00\x00\x00':
×
1572
                return True
×
1573
    except socket.error:
×
1574
        pass
×
1575
    return False
×
1576

1577

1578
AS_LIB_USER_I_WANT_TO_MANAGE_MY_OWN_ASYNCIO_LOOP = False  # used by unit tests
5✔
1579

1580
_asyncio_event_loop = None  # type: Optional[asyncio.AbstractEventLoop]
5✔
1581
def get_asyncio_loop() -> asyncio.AbstractEventLoop:
5✔
1582
    """Returns the global asyncio event loop we use."""
1583
    if loop := _asyncio_event_loop:
5✔
1584
        return loop
5✔
1585
    if AS_LIB_USER_I_WANT_TO_MANAGE_MY_OWN_ASYNCIO_LOOP:
5✔
1586
        if loop := get_running_loop():
5✔
1587
            return loop
5✔
1588
    raise Exception("event loop not created yet")
×
1589

1590

1591
def create_and_start_event_loop() -> Tuple[asyncio.AbstractEventLoop,
5✔
1592
                                           asyncio.Future,
1593
                                           threading.Thread]:
1594
    global _asyncio_event_loop
1595
    if _asyncio_event_loop is not None:
×
1596
        raise Exception("there is already a running event loop")
×
1597

1598
    # asyncio.get_event_loop() became deprecated in python3.10. (see https://github.com/python/cpython/issues/83710)
1599
    # We set a custom event loop policy purely to be compatible with code that
1600
    # relies on asyncio.get_event_loop().
1601
    # - in python 3.8-3.9, asyncio.Event.__init__, asyncio.Lock.__init__,
1602
    #   and similar, calls get_event_loop. see https://github.com/python/cpython/pull/23420
1603
    class MyEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
×
1604
        def get_event_loop(self):
×
1605
            # In case electrum is being used as a library, there might be other
1606
            # event loops in use besides ours. To minimise interfering with those,
1607
            # if there is a loop running in the current thread, return that:
1608
            running_loop = get_running_loop()
×
1609
            if running_loop is not None:
×
1610
                return running_loop
×
1611
            # Otherwise, return our global loop:
1612
            return get_asyncio_loop()
×
1613
    asyncio.set_event_loop_policy(MyEventLoopPolicy())
×
1614

1615
    loop = asyncio.new_event_loop()
×
1616
    _asyncio_event_loop = loop
×
1617

1618
    def on_exception(loop, context):
×
1619
        """Suppress spurious messages it appears we cannot control."""
1620
        SUPPRESS_MESSAGE_REGEX = re.compile('SSL handshake|Fatal read error on|'
×
1621
                                            'SSL error in data received')
1622
        message = context.get('message')
×
1623
        if message and SUPPRESS_MESSAGE_REGEX.match(message):
×
1624
            return
×
1625
        loop.default_exception_handler(context)
×
1626

1627
    def run_event_loop():
×
1628
        try:
×
1629
            loop.run_until_complete(stopping_fut)
×
1630
        finally:
1631
            # clean-up
1632
            global _asyncio_event_loop
1633
            _asyncio_event_loop = None
×
1634

1635
    loop.set_exception_handler(on_exception)
×
1636
    # loop.set_debug(True)
1637
    stopping_fut = loop.create_future()
×
1638
    loop_thread = threading.Thread(
×
1639
        target=run_event_loop,
1640
        name='EventLoop',
1641
    )
1642
    loop_thread.start()
×
1643
    # Wait until the loop actually starts.
1644
    # On a slow PC, or with a debugger attached, this can take a few dozens of ms,
1645
    # and if we returned without a running loop, weird things can happen...
1646
    t0 = time.monotonic()
×
1647
    while not loop.is_running():
×
1648
        time.sleep(0.01)
×
1649
        if time.monotonic() - t0 > 5:
×
1650
            raise Exception("been waiting for 5 seconds but asyncio loop would not start!")
×
1651
    return loop, stopping_fut, loop_thread
×
1652

1653

1654
class OrderedDictWithIndex(OrderedDict):
5✔
1655
    """An OrderedDict that keeps track of the positions of keys.
1656

1657
    Note: very inefficient to modify contents, except to add new items.
1658
    """
1659

1660
    def __init__(self):
5✔
1661
        super().__init__()
×
1662
        self._key_to_pos = {}
×
1663
        self._pos_to_key = {}
×
1664

1665
    def _recalc_index(self):
5✔
1666
        self._key_to_pos = {key: pos for (pos, key) in enumerate(self.keys())}
×
1667
        self._pos_to_key = {pos: key for (pos, key) in enumerate(self.keys())}
×
1668

1669
    def pos_from_key(self, key):
5✔
1670
        return self._key_to_pos[key]
×
1671

1672
    def value_from_pos(self, pos):
5✔
1673
        key = self._pos_to_key[pos]
×
1674
        return self[key]
×
1675

1676
    def popitem(self, *args, **kwargs):
5✔
1677
        ret = super().popitem(*args, **kwargs)
×
1678
        self._recalc_index()
×
1679
        return ret
×
1680

1681
    def move_to_end(self, *args, **kwargs):
5✔
1682
        ret = super().move_to_end(*args, **kwargs)
×
1683
        self._recalc_index()
×
1684
        return ret
×
1685

1686
    def clear(self):
5✔
1687
        ret = super().clear()
×
1688
        self._recalc_index()
×
1689
        return ret
×
1690

1691
    def pop(self, *args, **kwargs):
5✔
1692
        ret = super().pop(*args, **kwargs)
×
1693
        self._recalc_index()
×
1694
        return ret
×
1695

1696
    def update(self, *args, **kwargs):
5✔
1697
        ret = super().update(*args, **kwargs)
×
1698
        self._recalc_index()
×
1699
        return ret
×
1700

1701
    def __delitem__(self, *args, **kwargs):
5✔
1702
        ret = super().__delitem__(*args, **kwargs)
×
1703
        self._recalc_index()
×
1704
        return ret
×
1705

1706
    def __setitem__(self, key, *args, **kwargs):
5✔
1707
        is_new_key = key not in self
×
1708
        ret = super().__setitem__(key, *args, **kwargs)
×
1709
        if is_new_key:
×
1710
            pos = len(self) - 1
×
1711
            self._key_to_pos[key] = pos
×
1712
            self._pos_to_key[pos] = key
×
1713
        return ret
×
1714

1715

1716
def multisig_type(wallet_type):
5✔
1717
    '''If wallet_type is mofn multi-sig, return [m, n],
1718
    otherwise return None.'''
1719
    if not wallet_type:
5✔
1720
        return None
×
1721
    match = re.match(r'(\d+)of(\d+)', wallet_type)
5✔
1722
    if match:
5✔
1723
        match = [int(x) for x in match.group(1, 2)]
5✔
1724
    return match
5✔
1725

1726

1727
def is_ip_address(x: Union[str, bytes]) -> bool:
5✔
1728
    if isinstance(x, bytes):
5✔
1729
        x = x.decode("utf-8")
×
1730
    try:
5✔
1731
        ipaddress.ip_address(x)
5✔
1732
        return True
5✔
1733
    except ValueError:
5✔
1734
        return False
5✔
1735

1736

1737
def is_localhost(host: str) -> bool:
5✔
1738
    if str(host) in ('localhost', 'localhost.',):
5✔
1739
        return True
5✔
1740
    if host[0] == '[' and host[-1] == ']':  # IPv6
5✔
1741
        host = host[1:-1]
5✔
1742
    try:
5✔
1743
        ip_addr = ipaddress.ip_address(host)  # type: Union[IPv4Address, IPv6Address]
5✔
1744
        return ip_addr.is_loopback
5✔
1745
    except ValueError:
5✔
1746
        pass  # not an IP
5✔
1747
    return False
5✔
1748

1749

1750
def is_private_netaddress(host: str) -> bool:
5✔
1751
    if is_localhost(host):
5✔
1752
        return True
5✔
1753
    if host[0] == '[' and host[-1] == ']':  # IPv6
5✔
1754
        host = host[1:-1]
5✔
1755
    try:
5✔
1756
        ip_addr = ipaddress.ip_address(host)  # type: Union[IPv4Address, IPv6Address]
5✔
1757
        return ip_addr.is_private
5✔
1758
    except ValueError:
5✔
1759
        pass  # not an IP
5✔
1760
    return False
5✔
1761

1762

1763
def list_enabled_bits(x: int) -> Sequence[int]:
5✔
1764
    """e.g. 77 (0b1001101) --> (0, 2, 3, 6)"""
1765
    binary = bin(x)[2:]
5✔
1766
    rev_bin = reversed(binary)
5✔
1767
    return tuple(i for i, b in enumerate(rev_bin) if b == '1')
5✔
1768

1769

1770
def resolve_dns_srv(host: str):
5✔
1771
    # FIXME this method is not using the network proxy. (although the proxy might not support UDP?)
1772
    srv_records = dns.resolver.resolve(host, 'SRV')
×
1773
    # priority: prefer lower
1774
    # weight: tie breaker; prefer higher
1775
    srv_records = sorted(srv_records, key=lambda x: (x.priority, -x.weight))
×
1776

1777
    def dict_from_srv_record(srv):
×
1778
        return {
×
1779
            'host': str(srv.target),
1780
            'port': srv.port,
1781
        }
1782
    return [dict_from_srv_record(srv) for srv in srv_records]
×
1783

1784

1785
def randrange(bound: int) -> int:
5✔
1786
    """Return a random integer k such that 1 <= k < bound, uniformly
1787
    distributed across that range.
1788
    This is guaranteed to be cryptographically strong.
1789
    """
1790
    # secrets.randbelow(bound) returns a random int: 0 <= r < bound,
1791
    # hence transformations:
1792
    return secrets.randbelow(bound - 1) + 1
5✔
1793

1794

1795
class CallbackManager(Logger):
5✔
1796
    # callbacks set by the GUI or any thread
1797
    # guarantee: the callbacks will always get triggered from the asyncio thread.
1798

1799
    def __init__(self):
5✔
1800
        Logger.__init__(self)
5✔
1801
        self.callback_lock = threading.Lock()
5✔
1802
        self.callbacks = defaultdict(list)      # note: needs self.callback_lock
5✔
1803
        self._running_cb_futs = set()
5✔
1804

1805
    def register_callback(self, func, events):
5✔
1806
        with self.callback_lock:
5✔
1807
            for event in events:
5✔
1808
                self.callbacks[event].append(func)
5✔
1809

1810
    def unregister_callback(self, callback):
5✔
1811
        with self.callback_lock:
5✔
1812
            for callbacks in self.callbacks.values():
5✔
1813
                if callback in callbacks:
5✔
1814
                    callbacks.remove(callback)
5✔
1815

1816
    def trigger_callback(self, event, *args):
5✔
1817
        """Trigger a callback with given arguments.
1818
        Can be called from any thread. The callback itself will get scheduled
1819
        on the event loop.
1820
        """
1821
        loop = get_asyncio_loop()
5✔
1822
        assert loop.is_running(), "event loop not running"
5✔
1823
        with self.callback_lock:
5✔
1824
            callbacks = self.callbacks[event][:]
5✔
1825
        for callback in callbacks:
5✔
1826
            if asyncio.iscoroutinefunction(callback):  # async cb
5✔
1827
                fut = asyncio.run_coroutine_threadsafe(callback(*args), loop)
5✔
1828
                # keep strong references around to avoid GC issues:
1829
                self._running_cb_futs.add(fut)
5✔
1830
                def on_done(fut_: concurrent.futures.Future):
5✔
1831
                    assert fut_.done()
5✔
1832
                    self._running_cb_futs.remove(fut_)
5✔
1833
                    if fut_.cancelled():
5✔
1834
                        self.logger.debug(f"cb cancelled. {event=}.")
4✔
1835
                    elif exc := fut_.exception():
5✔
1836
                        self.logger.error(f"cb errored. {event=}. {exc=}", exc_info=exc)
×
1837
                fut.add_done_callback(on_done)
5✔
1838
            else:  # non-async cb
1839
                # note: the cb needs to run in the asyncio thread
1840
                if get_running_loop() == loop:
5✔
1841
                    # run callback immediately, so that it is guaranteed
1842
                    # to have been executed when this method returns
1843
                    callback(*args)
5✔
1844
                else:
1845
                    # note: if cb raises, asyncio will log the exception
1846
                    loop.call_soon_threadsafe(callback, *args)
×
1847

1848

1849
callback_mgr = CallbackManager()
5✔
1850
trigger_callback = callback_mgr.trigger_callback
5✔
1851
register_callback = callback_mgr.register_callback
5✔
1852
unregister_callback = callback_mgr.unregister_callback
5✔
1853
_event_listeners = defaultdict(set)  # type: Dict[str, Set[str]]
5✔
1854

1855

1856
class EventListener:
5✔
1857
    """Use as a mixin for a class that has methods to be triggered on events.
1858
    - Methods that receive the callbacks should be named "on_event_*" and decorated with @event_listener.
1859
    - register_callbacks() should be called exactly once per instance of EventListener, e.g. in __init__
1860
    - unregister_callbacks() should be called at least once, e.g. when the instance is destroyed
1861
    """
1862

1863
    def _list_callbacks(self):
5✔
1864
        for c in self.__class__.__mro__:
5✔
1865
            classpath = f"{c.__module__}.{c.__name__}"
5✔
1866
            for method_name in _event_listeners[classpath]:
5✔
1867
                method = getattr(self, method_name)
5✔
1868
                assert callable(method)
5✔
1869
                assert method_name.startswith('on_event_')
5✔
1870
                yield method_name[len('on_event_'):], method
5✔
1871

1872
    def register_callbacks(self):
5✔
1873
        for name, method in self._list_callbacks():
5✔
1874
            #_logger.debug(f'registering callback {method}')
1875
            register_callback(method, [name])
5✔
1876

1877
    def unregister_callbacks(self):
5✔
1878
        for name, method in self._list_callbacks():
5✔
1879
            #_logger.debug(f'unregistering callback {method}')
1880
            unregister_callback(method)
5✔
1881

1882

1883
def event_listener(func):
5✔
1884
    """To be used in subclasses of EventListener only. (how to enforce this programmatically?)"""
1885
    classname, method_name = func.__qualname__.split('.')
5✔
1886
    assert method_name.startswith('on_event_')
5✔
1887
    classpath = f"{func.__module__}.{classname}"
5✔
1888
    _event_listeners[classpath].add(method_name)
5✔
1889
    return func
5✔
1890

1891

1892
_NetAddrType = TypeVar("_NetAddrType")
5✔
1893
# requirements for _NetAddrType:
1894
# - reasonable __hash__() implementation (e.g. based on host/port of remote endpoint)
1895

1896
class NetworkRetryManager(Generic[_NetAddrType]):
5✔
1897
    """Truncated Exponential Backoff for network connections."""
1898

1899
    def __init__(
5✔
1900
            self, *,
1901
            max_retry_delay_normal: float,
1902
            init_retry_delay_normal: float,
1903
            max_retry_delay_urgent: float = None,
1904
            init_retry_delay_urgent: float = None,
1905
    ):
1906
        self._last_tried_addr = {}  # type: Dict[_NetAddrType, Tuple[float, int]]  # (unix ts, num_attempts)
5✔
1907

1908
        # note: these all use "seconds" as unit
1909
        if max_retry_delay_urgent is None:
5✔
1910
            max_retry_delay_urgent = max_retry_delay_normal
5✔
1911
        if init_retry_delay_urgent is None:
5✔
1912
            init_retry_delay_urgent = init_retry_delay_normal
5✔
1913
        self._max_retry_delay_normal = max_retry_delay_normal
5✔
1914
        self._init_retry_delay_normal = init_retry_delay_normal
5✔
1915
        self._max_retry_delay_urgent = max_retry_delay_urgent
5✔
1916
        self._init_retry_delay_urgent = init_retry_delay_urgent
5✔
1917

1918
    def _trying_addr_now(self, addr: _NetAddrType) -> None:
5✔
1919
        last_time, num_attempts = self._last_tried_addr.get(addr, (0, 0))
×
1920
        # we add up to 1 second of noise to the time, so that clients are less likely
1921
        # to get synchronised and bombard the remote in connection waves:
1922
        cur_time = time.time() + random.random()
×
1923
        self._last_tried_addr[addr] = cur_time, num_attempts + 1
×
1924

1925
    def _on_connection_successfully_established(self, addr: _NetAddrType) -> None:
5✔
1926
        self._last_tried_addr[addr] = time.time(), 0
×
1927

1928
    def _can_retry_addr(self, addr: _NetAddrType, *,
5✔
1929
                        now: float = None, urgent: bool = False) -> bool:
1930
        if now is None:
×
1931
            now = time.time()
×
1932
        last_time, num_attempts = self._last_tried_addr.get(addr, (0, 0))
×
1933
        if urgent:
×
1934
            max_delay = self._max_retry_delay_urgent
×
1935
            init_delay = self._init_retry_delay_urgent
×
1936
        else:
1937
            max_delay = self._max_retry_delay_normal
×
1938
            init_delay = self._init_retry_delay_normal
×
1939
        delay = self.__calc_delay(multiplier=init_delay, max_delay=max_delay, num_attempts=num_attempts)
×
1940
        next_time = last_time + delay
×
1941
        return next_time < now
×
1942

1943
    @classmethod
5✔
1944
    def __calc_delay(cls, *, multiplier: float, max_delay: float,
5✔
1945
                     num_attempts: int) -> float:
1946
        num_attempts = min(num_attempts, 100_000)
×
1947
        try:
×
1948
            res = multiplier * 2 ** num_attempts
×
1949
        except OverflowError:
×
1950
            return max_delay
×
1951
        return max(0, min(max_delay, res))
×
1952

1953
    def _clear_addr_retry_times(self) -> None:
5✔
1954
        self._last_tried_addr.clear()
5✔
1955

1956

1957
class ESocksProxy(aiorpcx.SOCKSProxy):
5✔
1958
    # note: proxy will not leak DNS as create_connection()
1959
    # sets (local DNS) resolve=False by default
1960

1961
    async def open_connection(self, host=None, port=None, **kwargs):
5✔
1962
        loop = asyncio.get_running_loop()
×
1963
        reader = asyncio.StreamReader(loop=loop)
×
1964
        protocol = asyncio.StreamReaderProtocol(reader, loop=loop)
×
1965
        transport, _ = await self.create_connection(
×
1966
            lambda: protocol, host, port, **kwargs)
1967
        writer = asyncio.StreamWriter(transport, protocol, reader, loop)
×
1968
        return reader, writer
×
1969

1970
    @classmethod
5✔
1971
    def from_network_settings(cls, network: Optional['Network']) -> Optional['ESocksProxy']:
5✔
1972
        if not network or not network.proxy:
5✔
1973
            return None
5✔
1974
        proxy = network.proxy
×
1975
        username, pw = proxy.get('user'), proxy.get('password')
×
1976
        if not username or not pw:
×
1977
            # is_proxy_tor is tri-state; None indicates it is still probing the proxy to test for TOR
1978
            if network.is_proxy_tor:
×
1979
                auth = aiorpcx.socks.SOCKSRandomAuth()
×
1980
            else:
1981
                auth = None
×
1982
        else:
1983
            auth = aiorpcx.socks.SOCKSUserAuth(username, pw)
×
1984
        addr = aiorpcx.NetAddress(proxy['host'], proxy['port'])
×
1985
        if proxy['mode'] == "socks4":
×
1986
            ret = cls(addr, aiorpcx.socks.SOCKS4a, auth)
×
1987
        elif proxy['mode'] == "socks5":
×
1988
            ret = cls(addr, aiorpcx.socks.SOCKS5, auth)
×
1989
        else:
1990
            raise NotImplementedError  # http proxy not available with aiorpcx
×
1991
        return ret
×
1992

1993

1994
class JsonRPCError(Exception):
5✔
1995

1996
    class Codes(enum.IntEnum):
5✔
1997
        # application-specific error codes
1998
        USERFACING = 1
5✔
1999
        INTERNAL = 2
5✔
2000

2001
    def __init__(self, *, code: int, message: str, data: Optional[dict] = None):
5✔
2002
        Exception.__init__(self)
×
2003
        self.code = code
×
2004
        self.message = message
×
2005
        self.data = data
×
2006

2007

2008
class JsonRPCClient:
5✔
2009

2010
    def __init__(self, session: aiohttp.ClientSession, url: str):
5✔
2011
        self.session = session
×
2012
        self.url = url
×
2013
        self._id = 0
×
2014

2015
    async def request(self, endpoint, *args):
5✔
2016
        """Send request to server, parse and return result.
2017
        note: parsing code is naive, the server is assumed to be well-behaved.
2018
              Up to the caller to handle exceptions, including those arising from parsing errors.
2019
        """
2020
        self._id += 1
×
2021
        data = ('{"jsonrpc": "2.0", "id":"%d", "method": "%s", "params": %s }'
×
2022
                % (self._id, endpoint, json.dumps(args)))
2023
        async with self.session.post(self.url, data=data) as resp:
×
2024
            if resp.status == 200:
×
2025
                r = await resp.json()
×
2026
                result = r.get('result')
×
2027
                error = r.get('error')
×
2028
                if error:
×
2029
                    raise JsonRPCError(code=error["code"], message=error["message"], data=error.get("data"))
×
2030
                else:
2031
                    return result
×
2032
            else:
2033
                text = await resp.text()
×
2034
                return 'Error: ' + str(text)
×
2035

2036
    def add_method(self, endpoint):
5✔
2037
        async def coro(*args):
×
2038
            return await self.request(endpoint, *args)
×
2039
        setattr(self, endpoint, coro)
×
2040

2041

2042
T = TypeVar('T')
5✔
2043

2044
def random_shuffled_copy(x: Iterable[T]) -> List[T]:
5✔
2045
    """Returns a shuffled copy of the input."""
2046
    x_copy = list(x)  # copy
5✔
2047
    random.shuffle(x_copy)  # shuffle in-place
5✔
2048
    return x_copy
5✔
2049

2050

2051
def test_read_write_permissions(path) -> None:
5✔
2052
    # note: There might already be a file at 'path'.
2053
    #       Make sure we do NOT overwrite/corrupt that!
2054
    temp_path = "%s.tmptest.%s" % (path, os.getpid())
5✔
2055
    echo = "fs r/w test"
5✔
2056
    try:
5✔
2057
        # test READ permissions for actual path
2058
        if os.path.exists(path):
5✔
2059
            with open(path, "rb") as f:
5✔
2060
                f.read(1)  # read 1 byte
5✔
2061
        # test R/W sanity for "similar" path
2062
        with open(temp_path, "w", encoding='utf-8') as f:
5✔
2063
            f.write(echo)
5✔
2064
        with open(temp_path, "r", encoding='utf-8') as f:
5✔
2065
            echo2 = f.read()
5✔
2066
        os.remove(temp_path)
5✔
2067
    except Exception as e:
×
2068
        raise IOError(e) from e
×
2069
    if echo != echo2:
5✔
2070
        raise IOError('echo sanity-check failed')
×
2071

2072

2073
class classproperty(property):
5✔
2074
    """~read-only class-level @property
2075
    from https://stackoverflow.com/a/13624858 by denis-ryzhkov
2076
    """
2077
    def __get__(self, owner_self, owner_cls):
5✔
2078
        return self.fget(owner_cls)
5✔
2079

2080

2081
def get_running_loop() -> Optional[asyncio.AbstractEventLoop]:
5✔
2082
    """Returns the asyncio event loop that is *running in this thread*, if any."""
2083
    try:
5✔
2084
        return asyncio.get_running_loop()
5✔
2085
    except RuntimeError:
×
2086
        return None
×
2087

2088

2089
def error_text_str_to_safe_str(err: str, *, max_len: Optional[int] = 500) -> str:
5✔
2090
    """Converts an untrusted error string to a sane printable ascii str.
2091
    Never raises.
2092
    """
2093
    text = error_text_bytes_to_safe_str(
5✔
2094
        err.encode("ascii", errors='backslashreplace'),
2095
        max_len=None)
2096
    return truncate_text(text, max_len=max_len)
5✔
2097

2098

2099
def error_text_bytes_to_safe_str(err: bytes, *, max_len: Optional[int] = 500) -> str:
5✔
2100
    """Converts an untrusted error bytes text to a sane printable ascii str.
2101
    Never raises.
2102

2103
    Note that naive ascii conversion would be insufficient. Fun stuff:
2104
    >>> b = b"my_long_prefix_blabla" + 21 * b"\x08" + b"malicious_stuff"
2105
    >>> s = b.decode("ascii")
2106
    >>> print(s)
2107
    malicious_stuffblabla
2108
    """
2109
    # convert to ascii, to get rid of unicode stuff
2110
    ascii_text = err.decode("ascii", errors='backslashreplace')
5✔
2111
    # do repr to handle ascii special chars (especially when printing/logging the str)
2112
    text = repr(ascii_text)
5✔
2113
    return truncate_text(text, max_len=max_len)
5✔
2114

2115

2116
def truncate_text(text: str, *, max_len: Optional[int]) -> str:
5✔
2117
    if max_len is None or len(text) <= max_len:
5✔
2118
        return text
5✔
2119
    else:
2120
        return text[:max_len] + f"... (truncated. orig_len={len(text)})"
5✔
2121

2122

2123
def nostr_pow_worker(nonce, nostr_pubk, target_bits, hash_function, hash_len_bits, shutdown):
5✔
2124
    """Function to generate PoW for Nostr, to be spawned in a ProcessPoolExecutor."""
2125
    hash_preimage = b'electrum-' + nostr_pubk
×
2126
    while True:
×
2127
        # we cannot check is_set on each iteration as it has a lot of overhead, this way we can check
2128
        # it with low overhead (just the additional range counter)
2129
        for i in range(1000000):
×
2130
            digest = hash_function(hash_preimage + nonce.to_bytes(32, 'big')).digest()
×
2131
            if int.from_bytes(digest, 'big') < (1 << (hash_len_bits - target_bits)):
×
2132
                shutdown.set()
×
2133
                return hash, nonce
×
2134
            nonce += 1
×
2135
        if shutdown.is_set():
×
2136
            return None, None
×
2137

2138

2139
async def gen_nostr_ann_pow(nostr_pubk: bytes, target_bits: int) -> Tuple[int, int]:
5✔
2140
    """Generate a PoW for a Nostr announcement. The PoW is hash[b'electrum-'+pubk+nonce]"""
2141
    import multiprocessing  # not available on Android, so we import it here
×
2142
    hash_function = hashlib.sha256
×
2143
    hash_len_bits = 256
×
2144
    max_nonce: int = (1 << (32 * 8)) - 1  # 32-byte nonce
×
2145
    start_nonce = 0
×
2146

2147
    max_workers = max(multiprocessing.cpu_count() - 1, 1)  # use all but one CPU
×
2148
    manager = multiprocessing.Manager()
×
2149
    shutdown = manager.Event()
×
2150
    with ProcessPoolExecutor(max_workers=max_workers) as executor:
×
2151
        tasks = []
×
2152
        loop = asyncio.get_running_loop()
×
2153
        for task in range(0, max_workers):
×
2154
            task = loop.run_in_executor(
×
2155
                executor,
2156
                nostr_pow_worker,
2157
                start_nonce,
2158
                nostr_pubk,
2159
                target_bits,
2160
                hash_function,
2161
                hash_len_bits,
2162
                shutdown
2163
            )
2164
            tasks.append(task)
×
2165
            start_nonce += max_nonce // max_workers  # split the nonce range between the processes
×
2166
            if start_nonce > max_nonce:  # make sure we don't go over the max_nonce
×
2167
                start_nonce = random.randint(0, int(max_nonce * 0.75))
×
2168

2169
        done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
×
2170
        hash_res, nonce_res = done.pop().result()
×
2171
        executor.shutdown(wait=False, cancel_futures=True)
×
2172

2173
    return nonce_res, get_nostr_ann_pow_amount(nostr_pubk, nonce_res)
×
2174

2175

2176
def get_nostr_ann_pow_amount(nostr_pubk: bytes, nonce: Optional[int]) -> int:
5✔
2177
    """Return the amount of leading zero bits for a nostr announcement PoW."""
2178
    if not nonce:
×
2179
        return 0
×
2180
    hash_function = hashlib.sha256
×
2181
    hash_len_bits = 256
×
2182
    hash_preimage = b'electrum-' + nostr_pubk
×
2183

2184
    digest = hash_function(hash_preimage + nonce.to_bytes(32, 'big')).digest()
×
2185
    digest = int.from_bytes(digest, 'big')
×
2186
    return hash_len_bits - digest.bit_length()
×
2187

2188

2189
class OnchainHistoryItem(NamedTuple):
5✔
2190
    txid: str
5✔
2191
    amount_sat: int
5✔
2192
    fee_sat: int
5✔
2193
    balance_sat: int
5✔
2194
    tx_mined_status: TxMinedInfo
5✔
2195
    group_id: Optional[str]
5✔
2196
    label: str
5✔
2197
    monotonic_timestamp: int
5✔
2198
    group_id: Optional[str]
5✔
2199
    def to_dict(self):
5✔
2200
        return {
×
2201
            'txid': self.txid,
2202
            'amount_sat': self.amount_sat,
2203
            'fee_sat': self.fee_sat,
2204
            'height': self.tx_mined_status.height,
2205
            'confirmations': self.tx_mined_status.conf,
2206
            'timestamp': self.tx_mined_status.timestamp,
2207
            'monotonic_timestamp': self.monotonic_timestamp,
2208
            'incoming': True if self.amount_sat>0 else False,
2209
            'bc_value': Satoshis(self.amount_sat),
2210
            'bc_balance': Satoshis(self.balance_sat),
2211
            'date': timestamp_to_datetime(self.tx_mined_status.timestamp),
2212
            'txpos_in_block': self.tx_mined_status.txpos,
2213
            'wanted_height': self.tx_mined_status.wanted_height,
2214
            'label': self.label,
2215
            'group_id': self.group_id,
2216
        }
2217

2218
class LightningHistoryItem(NamedTuple):
5✔
2219
    payment_hash: str
5✔
2220
    preimage: str
5✔
2221
    amount_msat: int
5✔
2222
    fee_msat: Optional[int]
5✔
2223
    type: str
5✔
2224
    group_id: Optional[str]
5✔
2225
    timestamp: int
5✔
2226
    label: str
5✔
2227
    def to_dict(self):
5✔
2228
        return {
×
2229
            'type': self.type,
2230
            'label': self.label,
2231
            'timestamp': self.timestamp or 0,
2232
            'date': timestamp_to_datetime(self.timestamp),
2233
            'amount_msat': self.amount_msat,
2234
            'fee_msat': self.fee_msat,
2235
            'payment_hash': self.payment_hash,
2236
            'preimage': self.preimage,
2237
            'group_id': self.group_id,
2238
            'ln_value': Satoshis(Decimal(self.amount_msat) / 1000),
2239
        }
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc