• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 4824330053353472

05 Mar 2025 03:01PM UTC coverage: 60.374% (-0.008%) from 60.382%
4824330053353472

push

CirrusCI

SomberNight
Merge branch 'pr/9507': qt: refactor NetworkChoiceLayout to ProxyWidget+ServerWidget

ref https://github.com/spesmilo/electrum/pull/9507

53 of 146 new or added lines in 5 files covered. (36.3%)

19 existing lines in 4 files now uncovered.

20815 of 34477 relevant lines covered (60.37%)

3.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.01
/electrum/util.py
1
# Electrum - lightweight Bitcoin client
2
# Copyright (C) 2011 Thomas Voegtlin
3
#
4
# Permission is hereby granted, free of charge, to any person
5
# obtaining a copy of this software and associated documentation files
6
# (the "Software"), to deal in the Software without restriction,
7
# including without limitation the rights to use, copy, modify, merge,
8
# publish, distribute, sublicense, and/or sell copies of the Software,
9
# and to permit persons to whom the Software is furnished to do so,
10
# subject to the following conditions:
11
#
12
# The above copyright notice and this permission notice shall be
13
# included in all copies or substantial portions of the Software.
14
#
15
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
16
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
17
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
18
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
19
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
20
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
21
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22
# SOFTWARE.
23
import binascii
5✔
24
import concurrent.futures
5✔
25
import logging
5✔
26
import os, sys, re, json
5✔
27
from collections import defaultdict, OrderedDict
5✔
28
from concurrent.futures.process import ProcessPoolExecutor
5✔
29
from typing import (NamedTuple, Union, TYPE_CHECKING, Tuple, Optional, Callable, Any,
5✔
30
                    Sequence, Dict, Generic, TypeVar, List, Iterable, Set, Awaitable)
31
from datetime import datetime, timezone
5✔
32
import decimal
5✔
33
from decimal import Decimal
5✔
34
import traceback
5✔
35
import urllib
5✔
36
import threading
5✔
37
import hmac
5✔
38
import hashlib
5✔
39
import stat
5✔
40
import locale
5✔
41
import asyncio
5✔
42
import urllib.request, urllib.parse, urllib.error
5✔
43
import builtins
5✔
44
import json
5✔
45
import time
5✔
46
from typing import NamedTuple, Optional
5✔
47
import ssl
5✔
48
import ipaddress
5✔
49
from ipaddress import IPv4Address, IPv6Address
5✔
50
import random
5✔
51
import secrets
5✔
52
import functools
5✔
53
from functools import partial
5✔
54
from abc import abstractmethod, ABC
5✔
55
import socket
5✔
56
import enum
5✔
57
from contextlib import nullcontext
5✔
58

59
import attr
5✔
60
import aiohttp
5✔
61
from aiohttp_socks import ProxyConnector, ProxyType
5✔
62
import aiorpcx
5✔
63
import certifi
5✔
64
import dns.resolver
5✔
65

66
from .i18n import _
5✔
67
from .logging import get_logger, Logger
5✔
68

69
if TYPE_CHECKING:
5✔
NEW
70
    from .network import Network, ProxySettings
×
71
    from .interface import Interface
×
72
    from .simple_config import SimpleConfig
×
73
    from .paymentrequest import PaymentRequest
×
74

75

76
_logger = get_logger(__name__)
5✔
77

78

79
def inv_dict(d):
5✔
80
    return {v: k for k, v in d.items()}
5✔
81

82

83
def all_subclasses(cls) -> Set:
5✔
84
    """Return all (transitive) subclasses of cls."""
85
    res = set(cls.__subclasses__())
5✔
86
    for sub in res.copy():
5✔
87
        res |= all_subclasses(sub)
5✔
88
    return res
5✔
89

90

91
ca_path = certifi.where()
5✔
92

93

94
base_units = {'BTC':8, 'mBTC':5, 'bits':2, 'sat':0}
5✔
95
base_units_inverse = inv_dict(base_units)
5✔
96
base_units_list = ['BTC', 'mBTC', 'bits', 'sat']  # list(dict) does not guarantee order
5✔
97

98
DECIMAL_POINT_DEFAULT = 5  # mBTC
5✔
99

100

101
class UnknownBaseUnit(Exception): pass
5✔
102

103

104
def decimal_point_to_base_unit_name(dp: int) -> str:
5✔
105
    # e.g. 8 -> "BTC"
106
    try:
5✔
107
        return base_units_inverse[dp]
5✔
108
    except KeyError:
×
109
        raise UnknownBaseUnit(dp) from None
×
110

111

112
def base_unit_name_to_decimal_point(unit_name: str) -> int:
5✔
113
    """Returns the max number of digits allowed after the decimal point."""
114
    # e.g. "BTC" -> 8
115
    try:
×
116
        return base_units[unit_name]
×
117
    except KeyError:
×
118
        raise UnknownBaseUnit(unit_name) from None
×
119

120
def parse_max_spend(amt: Any) -> Optional[int]:
5✔
121
    """Checks if given amount is "spend-max"-like.
122
    Returns None or the positive integer weight for "max". Never raises.
123

124
    When creating invoices and on-chain txs, the user can specify to send "max".
125
    This is done by setting the amount to '!'. Splitting max between multiple
126
    tx outputs is also possible, and custom weights (positive ints) can also be used.
127
    For example, to send 40% of all coins to address1, and 60% to address2:
128
    ```
129
    address1, 2!
130
    address2, 3!
131
    ```
132
    """
133
    if not (isinstance(amt, str) and amt and amt[-1] == '!'):
5✔
134
        return None
5✔
135
    if amt == '!':
5✔
136
        return 1
5✔
137
    x = amt[:-1]
5✔
138
    try:
5✔
139
        x = int(x)
5✔
140
    except ValueError:
×
141
        return None
×
142
    if x > 0:
5✔
143
        return x
5✔
144
    return None
×
145

146
class NotEnoughFunds(Exception):
5✔
147
    def __str__(self):
5✔
148
        return _("Insufficient funds")
×
149

150

151
class UneconomicFee(Exception):
5✔
152
    def __str__(self):
5✔
153
        return _("The fee for the transaction is higher than the funds gained from it.")
×
154

155

156
class NoDynamicFeeEstimates(Exception):
5✔
157
    def __str__(self):
5✔
158
        return _('Dynamic fee estimates not available')
×
159

160

161
class BelowDustLimit(Exception):
5✔
162
    pass
5✔
163

164

165
class InvalidPassword(Exception):
5✔
166
    def __init__(self, message: Optional[str] = None):
5✔
167
        self.message = message
5✔
168

169
    def __str__(self):
5✔
170
        if self.message is None:
×
171
            return _("Incorrect password")
×
172
        else:
173
            return str(self.message)
×
174

175

176
class AddTransactionException(Exception):
5✔
177
    pass
5✔
178

179

180
class UnrelatedTransactionException(AddTransactionException):
5✔
181
    def __str__(self):
5✔
182
        return _("Transaction is unrelated to this wallet.")
×
183

184

185
class FileImportFailed(Exception):
5✔
186
    def __init__(self, message=''):
5✔
187
        self.message = str(message)
×
188

189
    def __str__(self):
5✔
190
        return _("Failed to import from file.") + "\n" + self.message
×
191

192

193
class FileExportFailed(Exception):
5✔
194
    def __init__(self, message=''):
5✔
195
        self.message = str(message)
×
196

197
    def __str__(self):
5✔
198
        return _("Failed to export to file.") + "\n" + self.message
×
199

200

201
class WalletFileException(Exception):
5✔
202
    def __init__(self, message='', *, should_report_crash: bool = False):
5✔
203
        Exception.__init__(self, message)
5✔
204
        self.should_report_crash = should_report_crash
5✔
205

206

207
class BitcoinException(Exception): pass
5✔
208

209

210
class UserFacingException(Exception):
5✔
211
    """Exception that contains information intended to be shown to the user."""
212

213

214
class InvoiceError(UserFacingException): pass
5✔
215

216

217
class NetworkOfflineException(UserFacingException):
5✔
218
    """Can be raised if we are running in offline mode (--offline flag)
219
    and the user requests an operation that requires the network.
220
    """
221
    def __str__(self):
5✔
222
        return _("You are offline.")
×
223

224

225
# Throw this exception to unwind the stack like when an error occurs.
226
# However unlike other exceptions the user won't be informed.
227
class UserCancelled(Exception):
5✔
228
    '''An exception that is suppressed from the user'''
229
    pass
5✔
230

231

232
def to_decimal(x: Union[str, float, int, Decimal]) -> Decimal:
5✔
233
    # helper function mainly for float->Decimal conversion, i.e.:
234
    #   >>> Decimal(41754.681)
235
    #   Decimal('41754.680999999996856786310672760009765625')
236
    #   >>> Decimal("41754.681")
237
    #   Decimal('41754.681')
238
    if isinstance(x, Decimal):
5✔
239
        return x
×
240
    return Decimal(str(x))
5✔
241

242

243
# note: this is not a NamedTuple as then its json encoding cannot be customized
244
class Satoshis(object):
5✔
245
    __slots__ = ('value',)
5✔
246

247
    def __new__(cls, value):
5✔
248
        self = super(Satoshis, cls).__new__(cls)
×
249
        # note: 'value' sometimes has msat precision
250
        assert isinstance(value, (int, Decimal)), f"unexpected type for {value=!r}"
×
251
        self.value = value
×
252
        return self
×
253

254
    def __repr__(self):
5✔
255
        return f'Satoshis({self.value})'
×
256

257
    def __str__(self):
5✔
258
        # note: precision is truncated to satoshis here
259
        return format_satoshis(self.value)
×
260

261
    def __eq__(self, other):
5✔
262
        return self.value == other.value
×
263

264
    def __ne__(self, other):
5✔
265
        return not (self == other)
×
266

267
    def __add__(self, other):
5✔
268
        return Satoshis(self.value + other.value)
×
269

270

271
# note: this is not a NamedTuple as then its json encoding cannot be customized
272
class Fiat(object):
5✔
273
    __slots__ = ('value', 'ccy')
5✔
274

275
    def __new__(cls, value: Optional[Decimal], ccy: str):
5✔
276
        self = super(Fiat, cls).__new__(cls)
×
277
        self.ccy = ccy
×
278
        if not isinstance(value, (Decimal, type(None))):
×
279
            raise TypeError(f"value should be Decimal or None, not {type(value)}")
×
280
        self.value = value
×
281
        return self
×
282

283
    def __repr__(self):
5✔
284
        return 'Fiat(%s)'% self.__str__()
×
285

286
    def __str__(self):
5✔
287
        if self.value is None or self.value.is_nan():
×
288
            return _('No Data')
×
289
        else:
290
            return "{:.2f}".format(self.value)
×
291

292
    def to_ui_string(self):
5✔
293
        if self.value is None or self.value.is_nan():
×
294
            return _('No Data')
×
295
        else:
296
            return "{:.2f}".format(self.value) + ' ' + self.ccy
×
297

298
    def __eq__(self, other):
5✔
299
        if not isinstance(other, Fiat):
×
300
            return False
×
301
        if self.ccy != other.ccy:
×
302
            return False
×
303
        if isinstance(self.value, Decimal) and isinstance(other.value, Decimal) \
×
304
                and self.value.is_nan() and other.value.is_nan():
305
            return True
×
306
        return self.value == other.value
×
307

308
    def __ne__(self, other):
5✔
309
        return not (self == other)
×
310

311
    def __add__(self, other):
5✔
312
        assert self.ccy == other.ccy
×
313
        return Fiat(self.value + other.value, self.ccy)
×
314

315

316
class MyEncoder(json.JSONEncoder):
5✔
317
    def default(self, obj):
5✔
318
        # note: this does not get called for namedtuples :(  https://bugs.python.org/issue30343
319
        from .transaction import Transaction, TxOutput
5✔
320
        if isinstance(obj, Transaction):
5✔
321
            return obj.serialize()
5✔
322
        if isinstance(obj, TxOutput):
5✔
323
            return obj.to_legacy_tuple()
5✔
324
        if isinstance(obj, Satoshis):
5✔
325
            return str(obj)
×
326
        if isinstance(obj, Fiat):
5✔
327
            return str(obj)
×
328
        if isinstance(obj, Decimal):
5✔
329
            return str(obj)
×
330
        if isinstance(obj, datetime):
5✔
331
            return obj.isoformat(' ')[:-3]
×
332
        if isinstance(obj, set):
5✔
333
            return list(obj)
×
334
        if isinstance(obj, bytes): # for nametuples in lnchannel
5✔
335
            return obj.hex()
5✔
336
        if hasattr(obj, 'to_json') and callable(obj.to_json):
5✔
337
            return obj.to_json()
5✔
338
        return super(MyEncoder, self).default(obj)
×
339

340

341
class ThreadJob(Logger):
5✔
342
    """A job that is run periodically from a thread's main loop.  run() is
343
    called from that thread's context.
344
    """
345

346
    def __init__(self):
5✔
347
        Logger.__init__(self)
5✔
348

349
    def run(self):
5✔
350
        """Called periodically from the thread"""
351
        pass
×
352

353
class DebugMem(ThreadJob):
5✔
354
    '''A handy class for debugging GC memory leaks'''
355
    def __init__(self, classes, interval=30):
5✔
356
        ThreadJob.__init__(self)
×
357
        self.next_time = 0
×
358
        self.classes = classes
×
359
        self.interval = interval
×
360

361
    def mem_stats(self):
5✔
362
        import gc
×
363
        self.logger.info("Start memscan")
×
364
        gc.collect()
×
365
        objmap = defaultdict(list)
×
366
        for obj in gc.get_objects():
×
367
            for class_ in self.classes:
×
368
                if isinstance(obj, class_):
×
369
                    objmap[class_].append(obj)
×
370
        for class_, objs in objmap.items():
×
371
            self.logger.info(f"{class_.__name__}: {len(objs)}")
×
372
        self.logger.info("Finish memscan")
×
373

374
    def run(self):
5✔
375
        if time.time() > self.next_time:
×
376
            self.mem_stats()
×
377
            self.next_time = time.time() + self.interval
×
378

379
class DaemonThread(threading.Thread, Logger):
5✔
380
    """ daemon thread that terminates cleanly """
381

382
    LOGGING_SHORTCUT = 'd'
5✔
383

384
    def __init__(self):
5✔
385
        threading.Thread.__init__(self)
5✔
386
        Logger.__init__(self)
5✔
387
        self.parent_thread = threading.current_thread()
5✔
388
        self.running = False
5✔
389
        self.running_lock = threading.Lock()
5✔
390
        self.job_lock = threading.Lock()
5✔
391
        self.jobs = []
5✔
392
        self.stopped_event = threading.Event()        # set when fully stopped
5✔
393
        self.stopped_event_async = asyncio.Event()    # set when fully stopped
5✔
394
        self.wake_up_event = threading.Event()  # for perf optimisation of polling in run()
5✔
395

396
    def add_jobs(self, jobs):
5✔
397
        with self.job_lock:
5✔
398
            self.jobs.extend(jobs)
5✔
399

400
    def run_jobs(self):
5✔
401
        # Don't let a throwing job disrupt the thread, future runs of
402
        # itself, or other jobs.  This is useful protection against
403
        # malformed or malicious server responses
404
        with self.job_lock:
5✔
405
            for job in self.jobs:
5✔
406
                try:
5✔
407
                    job.run()
5✔
408
                except Exception as e:
×
409
                    self.logger.exception('')
×
410

411
    def remove_jobs(self, jobs):
5✔
412
        with self.job_lock:
×
413
            for job in jobs:
×
414
                self.jobs.remove(job)
×
415

416
    def start(self):
5✔
417
        with self.running_lock:
5✔
418
            self.running = True
5✔
419
        return threading.Thread.start(self)
5✔
420

421
    def is_running(self):
5✔
422
        with self.running_lock:
5✔
423
            return self.running and self.parent_thread.is_alive()
5✔
424

425
    def stop(self):
5✔
426
        with self.running_lock:
5✔
427
            self.running = False
5✔
428
            self.wake_up_event.set()
5✔
429
            self.wake_up_event.clear()
5✔
430

431
    def on_stop(self):
5✔
432
        if 'ANDROID_DATA' in os.environ:
5✔
433
            import jnius
×
434
            jnius.detach()
×
435
            self.logger.info("jnius detach")
×
436
        self.logger.info("stopped")
5✔
437
        self.stopped_event.set()
5✔
438
        loop = get_asyncio_loop()
5✔
439
        loop.call_soon_threadsafe(self.stopped_event_async.set)
5✔
440

441

442
def print_stderr(*args):
5✔
443
    args = [str(item) for item in args]
×
444
    sys.stderr.write(" ".join(args) + "\n")
×
445
    sys.stderr.flush()
×
446

447
def print_msg(*args):
5✔
448
    # Stringify args
449
    args = [str(item) for item in args]
×
450
    sys.stdout.write(" ".join(args) + "\n")
×
451
    sys.stdout.flush()
×
452

453
def json_encode(obj):
5✔
454
    try:
×
455
        s = json.dumps(obj, sort_keys = True, indent = 4, cls=MyEncoder)
×
456
    except TypeError:
×
457
        s = repr(obj)
×
458
    return s
×
459

460
def json_decode(x):
5✔
461
    try:
5✔
462
        return json.loads(x, parse_float=Decimal)
5✔
463
    except Exception:
5✔
464
        return x
5✔
465

466
def json_normalize(x):
5✔
467
    # note: The return value of commands, when going through the JSON-RPC interface,
468
    #       is json-encoded. The encoder used there cannot handle some types, e.g. electrum.util.Satoshis.
469
    # note: We should not simply do "json_encode(x)" here, as then later x would get doubly json-encoded.
470
    # see #5868
471
    return json_decode(json_encode(x))
×
472

473

474
# taken from Django Source Code
475
def constant_time_compare(val1, val2):
5✔
476
    """Return True if the two strings are equal, False otherwise."""
477
    return hmac.compare_digest(to_bytes(val1, 'utf8'), to_bytes(val2, 'utf8'))
×
478

479

480
_profiler_logger = _logger.getChild('profiler')
5✔
481
def profiler(func=None, *, min_threshold: Union[int, float, None] = None):
5✔
482
    """Function decorator that logs execution time.
483

484
    min_threshold: if set, only log if time taken is higher than threshold
485
    NOTE: does not work with async methods.
486
    """
487
    if func is None:  # to make "@profiler(...)" work. (in addition to bare "@profiler")
5✔
488
        return partial(profiler, min_threshold=min_threshold)
5✔
489
    def do_profile(*args, **kw_args):
5✔
490
        name = func.__qualname__
5✔
491
        t0 = time.time()
5✔
492
        o = func(*args, **kw_args)
5✔
493
        t = time.time() - t0
5✔
494
        if min_threshold is None or t > min_threshold:
5✔
495
            _profiler_logger.debug(f"{name} {t:,.4f} sec")
5✔
496
        return o
5✔
497
    return do_profile
5✔
498

499

500
class AsyncHangDetector:
5✔
501
    """Context manager that logs every `n` seconds if encapsulated context still has not exited."""
502

503
    def __init__(
5✔
504
        self,
505
        *,
506
        period_sec: int = 15,
507
        message: str,
508
        logger: logging.Logger = None,
509
    ):
510
        self.period_sec = period_sec
5✔
511
        self.message = message
5✔
512
        self.logger = logger or _logger
5✔
513

514
    async def _monitor(self):
5✔
515
        # note: this assumes that the event loop itself is not blocked
516
        t0 = time.monotonic()
5✔
517
        while True:
5✔
518
            await asyncio.sleep(self.period_sec)
5✔
519
            t1 = time.monotonic()
×
520
            self.logger.info(f"{self.message} (after {t1 - t0:.2f} sec)")
×
521

522
    async def __aenter__(self):
5✔
523
        self.mtask = asyncio.create_task(self._monitor())
5✔
524

525
    async def __aexit__(self, exc_type, exc, tb):
5✔
526
        self.mtask.cancel()
5✔
527

528

529
def android_ext_dir():
5✔
530
    from android.storage import primary_external_storage_path
×
531
    return primary_external_storage_path()
×
532

533
def android_backup_dir():
5✔
534
    pkgname = get_android_package_name()
×
535
    d = os.path.join(android_ext_dir(), pkgname)
×
536
    if not os.path.exists(d):
×
537
        os.mkdir(d)
×
538
    return d
×
539

540
def android_data_dir():
5✔
541
    import jnius
×
542
    PythonActivity = jnius.autoclass('org.kivy.android.PythonActivity')
×
543
    return PythonActivity.mActivity.getFilesDir().getPath() + '/data'
×
544

545
def ensure_sparse_file(filename):
5✔
546
    # On modern Linux, no need to do anything.
547
    # On Windows, need to explicitly mark file.
548
    if os.name == "nt":
×
549
        try:
×
550
            os.system('fsutil sparse setflag "{}" 1'.format(filename))
×
551
        except Exception as e:
×
552
            _logger.info(f'error marking file {filename} as sparse: {e}')
×
553

554

555
def get_headers_dir(config):
5✔
556
    return config.path
5✔
557

558

559
def assert_datadir_available(config_path):
5✔
560
    path = config_path
5✔
561
    if os.path.exists(path):
5✔
562
        return
5✔
563
    else:
564
        raise FileNotFoundError(
×
565
            'Electrum datadir does not exist. Was it deleted while running?' + '\n' +
566
            'Should be at {}'.format(path))
567

568

569
def assert_file_in_datadir_available(path, config_path):
5✔
570
    if os.path.exists(path):
×
571
        return
×
572
    else:
573
        assert_datadir_available(config_path)
×
574
        raise FileNotFoundError(
×
575
            'Cannot find file but datadir is there.' + '\n' +
576
            'Should be at {}'.format(path))
577

578

579
def standardize_path(path):
5✔
580
    # note: os.path.realpath() is not used, as on Windows it can return non-working paths (see #8495).
581
    #       This means that we don't resolve symlinks!
582
    return os.path.normcase(
5✔
583
                os.path.abspath(
584
                    os.path.expanduser(
585
                        path
586
    )))
587

588

589
def get_new_wallet_name(wallet_folder: str) -> str:
5✔
590
    """Returns a file basename for a new wallet to be used.
591
    Can raise OSError.
592
    """
593
    i = 1
5✔
594
    while True:
5✔
595
        filename = "wallet_%d" % i
5✔
596
        if filename in os.listdir(wallet_folder):
5✔
597
            i += 1
5✔
598
        else:
599
            break
5✔
600
    return filename
5✔
601

602

603
def is_android_debug_apk() -> bool:
5✔
604
    is_android = 'ANDROID_DATA' in os.environ
×
605
    if not is_android:
×
606
        return False
×
607
    from jnius import autoclass
×
608
    pkgname = get_android_package_name()
×
609
    build_config = autoclass(f"{pkgname}.BuildConfig")
×
610
    return bool(build_config.DEBUG)
×
611

612

613
def get_android_package_name() -> str:
5✔
614
    is_android = 'ANDROID_DATA' in os.environ
×
615
    assert is_android
×
616
    from jnius import autoclass
×
617
    from android.config import ACTIVITY_CLASS_NAME
×
618
    activity = autoclass(ACTIVITY_CLASS_NAME).mActivity
×
619
    pkgname = str(activity.getPackageName())
×
620
    return pkgname
×
621

622

623
def assert_bytes(*args):
5✔
624
    """
625
    porting helper, assert args type
626
    """
627
    try:
5✔
628
        for x in args:
5✔
629
            assert isinstance(x, (bytes, bytearray))
5✔
630
    except Exception:
×
631
        print('assert bytes failed', list(map(type, args)))
×
632
        raise
×
633

634

635
def assert_str(*args):
5✔
636
    """
637
    porting helper, assert args type
638
    """
639
    for x in args:
×
640
        assert isinstance(x, str)
×
641

642

643
def to_string(x, enc) -> str:
5✔
644
    if isinstance(x, (bytes, bytearray)):
5✔
645
        return x.decode(enc)
5✔
646
    if isinstance(x, str):
×
647
        return x
×
648
    else:
649
        raise TypeError("Not a string or bytes like object")
×
650

651

652
def to_bytes(something, encoding='utf8') -> bytes:
5✔
653
    """
654
    cast string to bytes() like object, but for python2 support it's bytearray copy
655
    """
656
    if isinstance(something, bytes):
5✔
657
        return something
5✔
658
    if isinstance(something, str):
5✔
659
        return something.encode(encoding)
5✔
660
    elif isinstance(something, bytearray):
5✔
661
        return bytes(something)
5✔
662
    else:
663
        raise TypeError("Not a string or bytes like object")
5✔
664

665

666
bfh = bytes.fromhex
5✔
667

668

669
def xor_bytes(a: bytes, b: bytes) -> bytes:
5✔
670
    size = min(len(a), len(b))
5✔
671
    return ((int.from_bytes(a[:size], "big") ^ int.from_bytes(b[:size], "big"))
5✔
672
            .to_bytes(size, "big"))
673

674

675
def user_dir():
5✔
676
    if "ELECTRUMDIR" in os.environ:
5✔
677
        return os.environ["ELECTRUMDIR"]
×
678
    elif 'ANDROID_DATA' in os.environ:
5✔
679
        return android_data_dir()
×
680
    elif os.name == 'posix':
5✔
681
        return os.path.join(os.environ["HOME"], ".electrum")
5✔
682
    elif "APPDATA" in os.environ:
×
683
        return os.path.join(os.environ["APPDATA"], "Electrum")
×
684
    elif "LOCALAPPDATA" in os.environ:
×
685
        return os.path.join(os.environ["LOCALAPPDATA"], "Electrum")
×
686
    else:
687
        #raise Exception("No home directory found in environment variables.")
688
        return
×
689

690

691
def resource_path(*parts):
5✔
692
    return os.path.join(pkg_dir, *parts)
5✔
693

694

695
# absolute path to python package folder of electrum ("lib")
696
pkg_dir = os.path.split(os.path.realpath(__file__))[0]
5✔
697

698

699
def is_valid_email(s):
5✔
700
    regexp = r"[^@]+@[^@]+\.[^@]+"
×
701
    return re.match(regexp, s) is not None
×
702

703

704
def is_hash256_str(text: Any) -> bool:
5✔
705
    if not isinstance(text, str): return False
5✔
706
    if len(text) != 64: return False
5✔
707
    return is_hex_str(text)
5✔
708

709

710
def is_hex_str(text: Any) -> bool:
5✔
711
    if not isinstance(text, str): return False
5✔
712
    try:
5✔
713
        b = bytes.fromhex(text)
5✔
714
    except Exception:
5✔
715
        return False
5✔
716
    # forbid whitespaces in text:
717
    if len(text) != 2 * len(b):
5✔
718
        return False
5✔
719
    return True
5✔
720

721

722
def is_integer(val: Any) -> bool:
5✔
723
    return isinstance(val, int)
5✔
724

725

726
def is_non_negative_integer(val: Any) -> bool:
5✔
727
    if is_integer(val):
5✔
728
        return val >= 0
5✔
729
    return False
5✔
730

731

732
def is_int_or_float(val: Any) -> bool:
5✔
733
    return isinstance(val, (int, float))
5✔
734

735

736
def is_non_negative_int_or_float(val: Any) -> bool:
5✔
737
    if is_int_or_float(val):
5✔
738
        return val >= 0
5✔
739
    return False
5✔
740

741

742
def chunks(items, size: int):
5✔
743
    """Break up items, an iterable, into chunks of length size."""
744
    if size < 1:
5✔
745
        raise ValueError(f"size must be positive, not {repr(size)}")
5✔
746
    for i in range(0, len(items), size):
5✔
747
        yield items[i: i + size]
5✔
748

749

750
def format_satoshis_plain(
5✔
751
        x: Union[int, float, Decimal, str],  # amount in satoshis,
752
        *,
753
        decimal_point: int = 8,  # how much to shift decimal point to left (default: sat->BTC)
754
) -> str:
755
    """Display a satoshi amount scaled.  Always uses a '.' as a decimal
756
    point and has no thousands separator"""
757
    if parse_max_spend(x):
5✔
758
        return f'max({x})'
×
759
    assert isinstance(x, (int, float, Decimal)), f"{x!r} should be a number"
5✔
760
    scale_factor = pow(10, decimal_point)
5✔
761
    return "{:.8f}".format(Decimal(x) / scale_factor).rstrip('0').rstrip('.')
5✔
762

763

764
# Check that Decimal precision is sufficient.
765
# We need at the very least ~20, as we deal with msat amounts, and
766
# log10(21_000_000 * 10**8 * 1000) ~= 18.3
767
# decimal.DefaultContext.prec == 28 by default, but it is mutable.
768
# We enforce that we have at least that available.
769
assert decimal.getcontext().prec >= 28, f"PyDecimal precision too low: {decimal.getcontext().prec}"
5✔
770

771
# DECIMAL_POINT = locale.localeconv()['decimal_point']  # type: str
772
DECIMAL_POINT = "."
5✔
773
THOUSANDS_SEP = " "
5✔
774
assert len(DECIMAL_POINT) == 1, f"DECIMAL_POINT has unexpected len. {DECIMAL_POINT!r}"
5✔
775
assert len(THOUSANDS_SEP) == 1, f"THOUSANDS_SEP has unexpected len. {THOUSANDS_SEP!r}"
5✔
776

777

778
def format_satoshis(
5✔
779
        x: Union[int, float, Decimal, str, None],  # amount in satoshis
780
        *,
781
        num_zeros: int = 0,
782
        decimal_point: int = 8,  # how much to shift decimal point to left (default: sat->BTC)
783
        precision: int = 0,  # extra digits after satoshi precision
784
        is_diff: bool = False,  # if True, enforce a leading sign (+/-)
785
        whitespaces: bool = False,  # if True, add whitespaces, to align numbers in a column
786
        add_thousands_sep: bool = False,  # if True, add whitespaces, for better readability of the numbers
787
) -> str:
788
    if x is None:
5✔
789
        return 'unknown'
×
790
    if parse_max_spend(x):
5✔
791
        return f'max({x})'
×
792
    assert isinstance(x, (int, float, Decimal)), f"{x!r} should be a number"
5✔
793
    # lose redundant precision
794
    x = Decimal(x).quantize(Decimal(10) ** (-precision))
5✔
795
    # format string
796
    overall_precision = decimal_point + precision  # max digits after final decimal point
5✔
797
    decimal_format = "." + str(overall_precision) if overall_precision > 0 else ""
5✔
798
    if is_diff:
5✔
799
        decimal_format = '+' + decimal_format
5✔
800
    # initial result
801
    scale_factor = pow(10, decimal_point)
5✔
802
    result = ("{:" + decimal_format + "f}").format(x / scale_factor)
5✔
803
    if "." not in result: result += "."
5✔
804
    result = result.rstrip('0')
5✔
805
    # add extra decimal places (zeros)
806
    integer_part, fract_part = result.split(".")
5✔
807
    if len(fract_part) < num_zeros:
5✔
808
        fract_part += "0" * (num_zeros - len(fract_part))
5✔
809
    # add whitespaces as thousands' separator for better readability of numbers
810
    if add_thousands_sep:
5✔
811
        sign = integer_part[0] if integer_part[0] in ("+", "-") else ""
5✔
812
        if sign == "-":
5✔
813
            integer_part = integer_part[1:]
5✔
814
        integer_part = "{:,}".format(int(integer_part)).replace(',', THOUSANDS_SEP)
5✔
815
        integer_part = sign + integer_part
5✔
816
        fract_part = THOUSANDS_SEP.join(fract_part[i:i+3] for i in range(0, len(fract_part), 3))
5✔
817
    result = integer_part + DECIMAL_POINT + fract_part
5✔
818
    # add leading/trailing whitespaces so that numbers can be aligned in a column
819
    if whitespaces:
5✔
820
        target_fract_len = overall_precision
5✔
821
        target_integer_len = 14 - decimal_point  # should be enough for up to unsigned 999999 BTC
5✔
822
        if add_thousands_sep:
5✔
823
            target_fract_len += max(0, (target_fract_len - 1) // 3)
5✔
824
            target_integer_len += max(0, (target_integer_len - 1) // 3)
5✔
825
        # add trailing whitespaces
826
        result += " " * (target_fract_len - len(fract_part))
5✔
827
        # add leading whitespaces
828
        target_total_len = target_integer_len + 1 + target_fract_len
5✔
829
        result = " " * (target_total_len - len(result)) + result
5✔
830
    return result
5✔
831

832

833
FEERATE_PRECISION = 1  # num fractional decimal places for sat/byte fee rates
5✔
834
_feerate_quanta = Decimal(10) ** (-FEERATE_PRECISION)
5✔
835
UI_UNIT_NAME_FEERATE_SAT_PER_VBYTE = "sat/vbyte"
5✔
836
UI_UNIT_NAME_FEERATE_SAT_PER_VB = "sat/vB"
5✔
837
UI_UNIT_NAME_TXSIZE_VBYTES = "vbytes"
5✔
838
UI_UNIT_NAME_MEMPOOL_MB = "vMB"
5✔
839

840

841
def format_fee_satoshis(fee, *, num_zeros=0, precision=None):
5✔
842
    if precision is None:
5✔
843
        precision = FEERATE_PRECISION
5✔
844
    num_zeros = min(num_zeros, FEERATE_PRECISION)  # no more zeroes than available prec
5✔
845
    return format_satoshis(fee, num_zeros=num_zeros, decimal_point=0, precision=precision)
5✔
846

847

848
def quantize_feerate(fee) -> Union[None, Decimal, int]:
5✔
849
    """Strip sat/byte fee rate of excess precision."""
850
    if fee is None:
5✔
851
        return None
×
852
    return Decimal(fee).quantize(_feerate_quanta, rounding=decimal.ROUND_HALF_DOWN)
5✔
853

854

855
def timestamp_to_datetime(timestamp: Union[int, float, None], *, utc: bool = False) -> Optional[datetime]:
5✔
856
    if timestamp is None:
5✔
857
        return None
×
858
    tz = None
5✔
859
    if utc:
5✔
860
        tz = timezone.utc
×
861
    return datetime.fromtimestamp(timestamp, tz=tz)
5✔
862

863

864
def format_time(timestamp: Union[int, float, None]) -> str:
5✔
865
    date = timestamp_to_datetime(timestamp)
×
866
    return date.isoformat(' ', timespec="minutes") if date else _("Unknown")
×
867

868

869
def age(
5✔
870
    from_date: Union[int, float, None],  # POSIX timestamp
871
    *,
872
    since_date: datetime = None,
873
    target_tz=None,
874
    include_seconds: bool = False,
875
) -> str:
876
    """Takes a timestamp and returns a string with the approximation of the age"""
877
    if from_date is None:
5✔
878
        return _("Unknown")
5✔
879

880
    from_date = datetime.fromtimestamp(from_date)
5✔
881
    if since_date is None:
5✔
882
        since_date = datetime.now(target_tz)
×
883

884
    distance_in_time = from_date - since_date
5✔
885
    is_in_past = from_date < since_date
5✔
886
    distance_in_seconds = int(round(abs(distance_in_time.days * 86400 + distance_in_time.seconds)))
5✔
887
    distance_in_minutes = int(round(distance_in_seconds / 60))
5✔
888

889
    if distance_in_minutes == 0:
5✔
890
        if include_seconds:
5✔
891
            if is_in_past:
5✔
892
                return _("{} seconds ago").format(distance_in_seconds)
5✔
893
            else:
894
                return _("in {} seconds").format(distance_in_seconds)
5✔
895
        else:
896
            if is_in_past:
5✔
897
                return _("less than a minute ago")
5✔
898
            else:
899
                return _("in less than a minute")
5✔
900
    elif distance_in_minutes < 45:
5✔
901
        if is_in_past:
5✔
902
            return _("about {} minutes ago").format(distance_in_minutes)
5✔
903
        else:
904
            return _("in about {} minutes").format(distance_in_minutes)
5✔
905
    elif distance_in_minutes < 90:
5✔
906
        if is_in_past:
5✔
907
            return _("about 1 hour ago")
5✔
908
        else:
909
            return _("in about 1 hour")
5✔
910
    elif distance_in_minutes < 1440:
5✔
911
        if is_in_past:
5✔
912
            return _("about {} hours ago").format(round(distance_in_minutes / 60.0))
5✔
913
        else:
914
            return _("in about {} hours").format(round(distance_in_minutes / 60.0))
5✔
915
    elif distance_in_minutes < 2880:
5✔
916
        if is_in_past:
5✔
917
            return _("about 1 day ago")
5✔
918
        else:
919
            return _("in about 1 day")
5✔
920
    elif distance_in_minutes < 43220:
5✔
921
        if is_in_past:
5✔
922
            return _("about {} days ago").format(round(distance_in_minutes / 1440))
5✔
923
        else:
924
            return _("in about {} days").format(round(distance_in_minutes / 1440))
5✔
925
    elif distance_in_minutes < 86400:
5✔
926
        if is_in_past:
5✔
927
            return _("about 1 month ago")
5✔
928
        else:
929
            return _("in about 1 month")
5✔
930
    elif distance_in_minutes < 525600:
5✔
931
        if is_in_past:
5✔
932
            return _("about {} months ago").format(round(distance_in_minutes / 43200))
5✔
933
        else:
934
            return _("in about {} months").format(round(distance_in_minutes / 43200))
5✔
935
    elif distance_in_minutes < 1051200:
5✔
936
        if is_in_past:
5✔
937
            return _("about 1 year ago")
5✔
938
        else:
939
            return _("in about 1 year")
5✔
940
    else:
941
        if is_in_past:
5✔
942
            return _("over {} years ago").format(round(distance_in_minutes / 525600))
5✔
943
        else:
944
            return _("in over {} years").format(round(distance_in_minutes / 525600))
5✔
945

946
mainnet_block_explorers = {
5✔
947
    '3xpl.com': ('https://3xpl.com/bitcoin/',
948
                        {'tx': 'transaction/', 'addr': 'address/'}),
949
    'Bitflyer.jp': ('https://chainflyer.bitflyer.jp/',
950
                        {'tx': 'Transaction/', 'addr': 'Address/'}),
951
    'Blockchain.info': ('https://blockchain.com/btc/',
952
                        {'tx': 'tx/', 'addr': 'address/'}),
953
    'Blockstream.info': ('https://blockstream.info/',
954
                        {'tx': 'tx/', 'addr': 'address/'}),
955
    'Bitaps.com': ('https://btc.bitaps.com/',
956
                        {'tx': '', 'addr': ''}),
957
    'BTC.com': ('https://btc.com/',
958
                        {'tx': '', 'addr': ''}),
959
    'Chain.so': ('https://www.chain.so/',
960
                        {'tx': 'tx/BTC/', 'addr': 'address/BTC/'}),
961
    'Insight.is': ('https://insight.bitpay.com/',
962
                        {'tx': 'tx/', 'addr': 'address/'}),
963
    'BlockCypher.com': ('https://live.blockcypher.com/btc/',
964
                        {'tx': 'tx/', 'addr': 'address/'}),
965
    'Blockchair.com': ('https://blockchair.com/bitcoin/',
966
                        {'tx': 'transaction/', 'addr': 'address/'}),
967
    'blockonomics.co': ('https://www.blockonomics.co/',
968
                        {'tx': 'api/tx?txid=', 'addr': '#/search?q='}),
969
    'mempool.space': ('https://mempool.space/',
970
                        {'tx': 'tx/', 'addr': 'address/'}),
971
    'mempool.emzy.de': ('https://mempool.emzy.de/',
972
                        {'tx': 'tx/', 'addr': 'address/'}),
973
    'OXT.me': ('https://oxt.me/',
974
                        {'tx': 'transaction/', 'addr': 'address/'}),
975
    'mynode.local': ('http://mynode.local:3002/',
976
                        {'tx': 'tx/', 'addr': 'address/'}),
977
    'system default': ('blockchain:/',
978
                        {'tx': 'tx/', 'addr': 'address/'}),
979
}
980

981
testnet_block_explorers = {
5✔
982
    'Bitaps.com': ('https://tbtc.bitaps.com/',
983
                       {'tx': '', 'addr': ''}),
984
    'BlockCypher.com': ('https://live.blockcypher.com/btc-testnet/',
985
                       {'tx': 'tx/', 'addr': 'address/'}),
986
    'Blockchain.info': ('https://www.blockchain.com/btc-testnet/',
987
                       {'tx': 'tx/', 'addr': 'address/'}),
988
    'Blockstream.info': ('https://blockstream.info/testnet/',
989
                        {'tx': 'tx/', 'addr': 'address/'}),
990
    'mempool.space': ('https://mempool.space/testnet/',
991
                        {'tx': 'tx/', 'addr': 'address/'}),
992
    'smartbit.com.au': ('https://testnet.smartbit.com.au/',
993
                       {'tx': 'tx/', 'addr': 'address/'}),
994
    'system default': ('blockchain://000000000933ea01ad0ee984209779baaec3ced90fa3f408719526f8d77f4943/',
995
                       {'tx': 'tx/', 'addr': 'address/'}),
996
}
997

998
testnet4_block_explorers = {
5✔
999
    'mempool.space': ('https://mempool.space/testnet4/',
1000
                        {'tx': 'tx/', 'addr': 'address/'}),
1001
    'wakiyamap.dev': ('https://testnet4-explorer.wakiyamap.dev/',
1002
                       {'tx': 'tx/', 'addr': 'address/'}),
1003
}
1004

1005
signet_block_explorers = {
5✔
1006
    'bc-2.jp': ('https://explorer.bc-2.jp/',
1007
                        {'tx': 'tx/', 'addr': 'address/'}),
1008
    'mempool.space': ('https://mempool.space/signet/',
1009
                        {'tx': 'tx/', 'addr': 'address/'}),
1010
    'bitcoinexplorer.org': ('https://signet.bitcoinexplorer.org/',
1011
                       {'tx': 'tx/', 'addr': 'address/'}),
1012
    'wakiyamap.dev': ('https://signet-explorer.wakiyamap.dev/',
1013
                       {'tx': 'tx/', 'addr': 'address/'}),
1014
    'ex.signet.bublina.eu.org': ('https://ex.signet.bublina.eu.org/',
1015
                       {'tx': 'tx/', 'addr': 'address/'}),
1016
    'system default': ('blockchain:/',
1017
                       {'tx': 'tx/', 'addr': 'address/'}),
1018
}
1019

1020
_block_explorer_default_api_loc = {'tx': 'tx/', 'addr': 'address/'}
5✔
1021

1022

1023
def block_explorer_info():
5✔
1024
    from . import constants
×
1025
    if constants.net.NET_NAME == "testnet":
×
1026
        return testnet_block_explorers
×
1027
    elif constants.net.NET_NAME == "testnet4":
×
1028
        return testnet4_block_explorers
×
1029
    elif constants.net.NET_NAME == "signet":
×
1030
        return signet_block_explorers
×
1031
    return mainnet_block_explorers
×
1032

1033

1034
def block_explorer(config: 'SimpleConfig') -> Optional[str]:
5✔
1035
    """Returns name of selected block explorer,
1036
    or None if a custom one (not among hardcoded ones) is configured.
1037
    """
1038
    if config.BLOCK_EXPLORER_CUSTOM is not None:
×
1039
        return None
×
1040
    be_key = config.BLOCK_EXPLORER
×
1041
    be_tuple = block_explorer_info().get(be_key)
×
1042
    if be_tuple is None:
×
1043
        be_key = config.cv.BLOCK_EXPLORER.get_default_value()
×
1044
    assert isinstance(be_key, str), f"{be_key!r} should be str"
×
1045
    return be_key
×
1046

1047

1048
def block_explorer_tuple(config: 'SimpleConfig') -> Optional[Tuple[str, dict]]:
5✔
1049
    custom_be = config.BLOCK_EXPLORER_CUSTOM
×
1050
    if custom_be:
×
1051
        if isinstance(custom_be, str):
×
1052
            return custom_be, _block_explorer_default_api_loc
×
1053
        if isinstance(custom_be, (tuple, list)) and len(custom_be) == 2:
×
1054
            return tuple(custom_be)
×
1055
        _logger.warning(f"not using {config.cv.BLOCK_EXPLORER_CUSTOM.key()!r} from config. "
×
1056
                        f"expected a str or a pair but got {custom_be!r}")
1057
        return None
×
1058
    else:
1059
        # using one of the hardcoded block explorers
1060
        return block_explorer_info().get(block_explorer(config))
×
1061

1062

1063
def block_explorer_URL(config: 'SimpleConfig', kind: str, item: str) -> Optional[str]:
5✔
1064
    be_tuple = block_explorer_tuple(config)
×
1065
    if not be_tuple:
×
1066
        return
×
1067
    explorer_url, explorer_dict = be_tuple
×
1068
    kind_str = explorer_dict.get(kind)
×
1069
    if kind_str is None:
×
1070
        return
×
1071
    if explorer_url[-1] != "/":
×
1072
        explorer_url += "/"
×
1073
    url_parts = [explorer_url, kind_str, item]
×
1074
    return ''.join(url_parts)
×
1075

1076

1077

1078

1079

1080
# Python bug (http://bugs.python.org/issue1927) causes raw_input
1081
# to be redirected improperly between stdin/stderr on Unix systems
1082
#TODO: py3
1083
def raw_input(prompt=None):
5✔
1084
    if prompt:
×
1085
        sys.stdout.write(prompt)
×
1086
    return builtin_raw_input()
×
1087

1088
builtin_raw_input = builtins.input
5✔
1089
builtins.input = raw_input
5✔
1090

1091

1092
def parse_json(message):
5✔
1093
    # TODO: check \r\n pattern
1094
    n = message.find(b'\n')
×
1095
    if n==-1:
×
1096
        return None, message
×
1097
    try:
×
1098
        j = json.loads(message[0:n].decode('utf8'))
×
1099
    except Exception:
×
1100
        j = None
×
1101
    return j, message[n+1:]
×
1102

1103

1104
def setup_thread_excepthook():
5✔
1105
    """
1106
    Workaround for `sys.excepthook` thread bug from:
1107
    http://bugs.python.org/issue1230540
1108

1109
    Call once from the main thread before creating any threads.
1110
    """
1111

1112
    init_original = threading.Thread.__init__
×
1113

1114
    def init(self, *args, **kwargs):
×
1115

1116
        init_original(self, *args, **kwargs)
×
1117
        run_original = self.run
×
1118

1119
        def run_with_except_hook(*args2, **kwargs2):
×
1120
            try:
×
1121
                run_original(*args2, **kwargs2)
×
1122
            except Exception:
×
1123
                sys.excepthook(*sys.exc_info())
×
1124

1125
        self.run = run_with_except_hook
×
1126

1127
    threading.Thread.__init__ = init
×
1128

1129

1130
def send_exception_to_crash_reporter(e: BaseException):
5✔
1131
    from .base_crash_reporter import send_exception_to_crash_reporter
×
1132
    send_exception_to_crash_reporter(e)
×
1133

1134

1135
def versiontuple(v):
5✔
1136
    return tuple(map(int, (v.split("."))))
5✔
1137

1138

1139
def read_json_file(path):
5✔
1140
    try:
5✔
1141
        with open(path, 'r', encoding='utf-8') as f:
5✔
1142
            data = json.loads(f.read())
5✔
1143
    except json.JSONDecodeError:
×
1144
        _logger.exception('')
×
1145
        raise FileImportFailed(_("Invalid JSON code."))
×
1146
    except BaseException as e:
×
1147
        _logger.exception('')
×
1148
        raise FileImportFailed(e)
×
1149
    return data
5✔
1150

1151

1152
def write_json_file(path, data):
5✔
1153
    try:
×
1154
        with open(path, 'w+', encoding='utf-8') as f:
×
1155
            json.dump(data, f, indent=4, sort_keys=True, cls=MyEncoder)
×
1156
    except (IOError, os.error) as e:
×
1157
        _logger.exception('')
×
1158
        raise FileExportFailed(e)
×
1159

1160

1161
def os_chmod(path, mode):
5✔
1162
    """os.chmod aware of tmpfs"""
1163
    try:
5✔
1164
        os.chmod(path, mode)
5✔
1165
    except OSError as e:
×
1166
        xdg_runtime_dir = os.environ.get("XDG_RUNTIME_DIR", None)
×
1167
        if xdg_runtime_dir and is_subpath(path, xdg_runtime_dir):
×
1168
            _logger.info(f"Tried to chmod in tmpfs. Skipping... {e!r}")
×
1169
        else:
1170
            raise
×
1171

1172

1173
def make_dir(path, allow_symlink=True):
5✔
1174
    """Make directory if it does not yet exist."""
1175
    if not os.path.exists(path):
5✔
1176
        if not allow_symlink and os.path.islink(path):
5✔
1177
            raise Exception('Dangling link: ' + path)
×
1178
        os.mkdir(path)
5✔
1179
        os_chmod(path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
5✔
1180

1181

1182
def is_subpath(long_path: str, short_path: str) -> bool:
5✔
1183
    """Returns whether long_path is a sub-path of short_path."""
1184
    try:
5✔
1185
        common = os.path.commonpath([long_path, short_path])
5✔
1186
    except ValueError:
5✔
1187
        return False
5✔
1188
    short_path = standardize_path(short_path)
5✔
1189
    common     = standardize_path(common)
5✔
1190
    return short_path == common
5✔
1191

1192

1193
def log_exceptions(func):
5✔
1194
    """Decorator to log AND re-raise exceptions."""
1195
    assert asyncio.iscoroutinefunction(func), 'func needs to be a coroutine'
5✔
1196
    @functools.wraps(func)
5✔
1197
    async def wrapper(*args, **kwargs):
5✔
1198
        self = args[0] if len(args) > 0 else None
5✔
1199
        try:
5✔
1200
            return await func(*args, **kwargs)
5✔
1201
        except asyncio.CancelledError as e:
5✔
1202
            raise
5✔
1203
        except BaseException as e:
5✔
1204
            mylogger = self.logger if hasattr(self, 'logger') else _logger
5✔
1205
            try:
5✔
1206
                mylogger.exception(f"Exception in {func.__name__}: {repr(e)}")
5✔
1207
            except BaseException as e2:
×
1208
                print(f"logging exception raised: {repr(e2)}... orig exc: {repr(e)} in {func.__name__}")
×
1209
            raise
5✔
1210
    return wrapper
5✔
1211

1212

1213
def ignore_exceptions(func):
5✔
1214
    """Decorator to silently swallow all exceptions."""
1215
    assert asyncio.iscoroutinefunction(func), 'func needs to be a coroutine'
5✔
1216
    @functools.wraps(func)
5✔
1217
    async def wrapper(*args, **kwargs):
5✔
1218
        try:
×
1219
            return await func(*args, **kwargs)
×
1220
        except Exception as e:
×
1221
            pass
×
1222
    return wrapper
5✔
1223

1224

1225
def with_lock(func):
5✔
1226
    """Decorator to enforce a lock on a function call."""
1227
    def func_wrapper(self, *args, **kwargs):
5✔
1228
        with self.lock:
5✔
1229
            return func(self, *args, **kwargs)
5✔
1230
    return func_wrapper
5✔
1231

1232

1233
class TxMinedInfo(NamedTuple):
5✔
1234
    height: int                        # height of block that mined tx
5✔
1235
    conf: Optional[int] = None         # number of confirmations, SPV verified. >=0, or None (None means unknown)
5✔
1236
    timestamp: Optional[int] = None    # timestamp of block that mined tx
5✔
1237
    txpos: Optional[int] = None        # position of tx in serialized block
5✔
1238
    header_hash: Optional[str] = None  # hash of block that mined tx
5✔
1239
    wanted_height: Optional[int] = None  # in case of timelock, min abs block height
5✔
1240

1241
    def short_id(self) -> Optional[str]:
5✔
1242
        if self.txpos is not None and self.txpos >= 0:
×
1243
            assert self.height > 0
×
1244
            return f"{self.height}x{self.txpos}"
×
1245
        return None
×
1246

1247
    def is_local_like(self) -> bool:
5✔
1248
        """Returns whether the tx is local-like (LOCAL/FUTURE)."""
1249
        from .address_synchronizer import TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT
×
1250
        if self.height > 0:
×
1251
            return False
×
1252
        if self.height in (TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT):
×
1253
            return False
×
1254
        return True
×
1255

1256

1257
class ShortID(bytes):
5✔
1258

1259
    def __repr__(self):
5✔
1260
        return f"<ShortID: {format_short_id(self)}>"
5✔
1261

1262
    def __str__(self):
5✔
1263
        return format_short_id(self)
5✔
1264

1265
    @classmethod
5✔
1266
    def from_components(cls, block_height: int, tx_pos_in_block: int, output_index: int) -> 'ShortID':
5✔
1267
        bh = block_height.to_bytes(3, byteorder='big')
5✔
1268
        tpos = tx_pos_in_block.to_bytes(3, byteorder='big')
5✔
1269
        oi = output_index.to_bytes(2, byteorder='big')
5✔
1270
        return ShortID(bh + tpos + oi)
5✔
1271

1272
    @classmethod
5✔
1273
    def from_str(cls, scid: str) -> 'ShortID':
5✔
1274
        """Parses a formatted scid str, e.g. '643920x356x0'."""
1275
        components = scid.split("x")
5✔
1276
        if len(components) != 3:
5✔
1277
            raise ValueError(f"failed to parse ShortID: {scid!r}")
×
1278
        try:
5✔
1279
            components = [int(x) for x in components]
5✔
1280
        except ValueError:
×
1281
            raise ValueError(f"failed to parse ShortID: {scid!r}") from None
×
1282
        return ShortID.from_components(*components)
5✔
1283

1284
    @classmethod
5✔
1285
    def normalize(cls, data: Union[None, str, bytes, 'ShortID']) -> Optional['ShortID']:
5✔
1286
        if isinstance(data, ShortID) or data is None:
5✔
1287
            return data
5✔
1288
        if isinstance(data, str):
5✔
1289
            assert len(data) == 16
5✔
1290
            return ShortID.fromhex(data)
5✔
1291
        if isinstance(data, (bytes, bytearray)):
5✔
1292
            assert len(data) == 8
5✔
1293
            return ShortID(data)
5✔
1294

1295
    @property
5✔
1296
    def block_height(self) -> int:
5✔
1297
        return int.from_bytes(self[:3], byteorder='big')
5✔
1298

1299
    @property
5✔
1300
    def txpos(self) -> int:
5✔
1301
        return int.from_bytes(self[3:6], byteorder='big')
5✔
1302

1303
    @property
5✔
1304
    def output_index(self) -> int:
5✔
1305
        return int.from_bytes(self[6:8], byteorder='big')
5✔
1306

1307

1308
def format_short_id(short_channel_id: Optional[bytes]):
5✔
1309
    if not short_channel_id:
5✔
1310
        return _('Not yet available')
×
1311
    return str(int.from_bytes(short_channel_id[:3], 'big')) \
5✔
1312
        + 'x' + str(int.from_bytes(short_channel_id[3:6], 'big')) \
1313
        + 'x' + str(int.from_bytes(short_channel_id[6:], 'big'))
1314

1315

1316
def make_aiohttp_session(proxy: Optional['ProxySettings'], headers=None, timeout=None):
5✔
1317
    if headers is None:
×
1318
        headers = {'User-Agent': 'Electrum'}
×
1319
    if timeout is None:
×
1320
        # The default timeout is high intentionally.
1321
        # DNS on some systems can be really slow, see e.g. #5337
1322
        timeout = aiohttp.ClientTimeout(total=45)
×
1323
    elif isinstance(timeout, (int, float)):
×
1324
        timeout = aiohttp.ClientTimeout(total=timeout)
×
1325
    ssl_context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_path)
×
1326

NEW
1327
    if proxy and proxy.enabled:
×
1328
        connector = ProxyConnector(
×
1329
            proxy_type=ProxyType.SOCKS5 if proxy.mode == 'socks5' else ProxyType.SOCKS4,
1330
            host=proxy.host,
1331
            port=int(proxy.port),
1332
            username=proxy.user,
1333
            password=proxy.password,
1334
            rdns=True,  # needed to prevent DNS leaks over proxy
1335
            ssl=ssl_context,
1336
        )
1337
    else:
1338
        connector = aiohttp.TCPConnector(ssl=ssl_context)
×
1339

1340
    return aiohttp.ClientSession(headers=headers, timeout=timeout, connector=connector)
×
1341

1342

1343
class OldTaskGroup(aiorpcx.TaskGroup):
5✔
1344
    """Automatically raises exceptions on join; as in aiorpcx prior to version 0.20.
1345
    That is, when using TaskGroup as a context manager, if any task encounters an exception,
1346
    we would like that exception to be re-raised (propagated out). For the wait=all case,
1347
    the OldTaskGroup class is emulating the following code-snippet:
1348
    ```
1349
    async with TaskGroup() as group:
1350
        await group.spawn(task1())
1351
        await group.spawn(task2())
1352

1353
        async for task in group:
1354
            if not task.cancelled():
1355
                task.result()
1356
    ```
1357
    So instead of the above, one can just write:
1358
    ```
1359
    async with OldTaskGroup() as group:
1360
        await group.spawn(task1())
1361
        await group.spawn(task2())
1362
    ```
1363
    # TODO see if we can migrate to asyncio.timeout, introduced in python 3.11, and use stdlib instead of aiorpcx.curio...
1364
    """
1365
    async def join(self):
5✔
1366
        if self._wait is all:
5✔
1367
            exc = False
5✔
1368
            try:
5✔
1369
                async for task in self:
5✔
1370
                    if not task.cancelled():
5✔
1371
                        task.result()
5✔
1372
            except BaseException:  # including asyncio.CancelledError
5✔
1373
                exc = True
5✔
1374
                raise
5✔
1375
            finally:
1376
                if exc:
5✔
1377
                    await self.cancel_remaining()
5✔
1378
                await super().join()
5✔
1379
        else:
1380
            await super().join()
5✔
1381
            if self.completed:
5✔
1382
                self.completed.result()
5✔
1383

1384
# We monkey-patch aiorpcx TimeoutAfter (used by timeout_after and ignore_after API),
1385
# to fix a timing issue present in asyncio as a whole re timing out tasks.
1386
# To see the issue we are trying to fix, consider example:
1387
#     async def outer_task():
1388
#         async with timeout_after(0.1):
1389
#             await inner_task()
1390
# When the 0.1 sec timeout expires, inner_task will get cancelled by timeout_after (=internal cancellation).
1391
# If around the same time (in terms of event loop iterations) another coroutine
1392
# cancels outer_task (=external cancellation), there will be a race.
1393
# Both cancellations work by propagating a CancelledError out to timeout_after, which then
1394
# needs to decide (in TimeoutAfter.__aexit__) whether it's due to an internal or external cancellation.
1395
# AFAICT asyncio provides no reliable way of distinguishing between the two.
1396
# This patch tries to always give priority to external cancellations.
1397
# see https://github.com/kyuupichan/aiorpcX/issues/44
1398
# see https://github.com/aio-libs/async-timeout/issues/229
1399
# see https://bugs.python.org/issue42130 and https://bugs.python.org/issue45098
1400
# TODO see if we can migrate to asyncio.timeout, introduced in python 3.11, and use stdlib instead of aiorpcx.curio...
1401
def _aiorpcx_monkeypatched_set_new_deadline(task, deadline):
5✔
1402
    def timeout_task():
5✔
1403
        task._orig_cancel()
5✔
1404
        task._timed_out = None if getattr(task, "_externally_cancelled", False) else deadline
5✔
1405
    def mycancel(*args, **kwargs):
5✔
1406
        task._orig_cancel(*args, **kwargs)
5✔
1407
        task._externally_cancelled = True
5✔
1408
        task._timed_out = None
5✔
1409
    if not hasattr(task, "_orig_cancel"):
5✔
1410
        task._orig_cancel = task.cancel
5✔
1411
        task.cancel = mycancel
5✔
1412
    task._deadline_handle = task._loop.call_at(deadline, timeout_task)
5✔
1413

1414

1415
def _aiorpcx_monkeypatched_set_task_deadline(task, deadline):
5✔
1416
    ret = _aiorpcx_orig_set_task_deadline(task, deadline)
5✔
1417
    task._externally_cancelled = None
5✔
1418
    return ret
5✔
1419

1420

1421
def _aiorpcx_monkeypatched_unset_task_deadline(task):
5✔
1422
    if hasattr(task, "_orig_cancel"):
5✔
1423
        task.cancel = task._orig_cancel
5✔
1424
        del task._orig_cancel
5✔
1425
    return _aiorpcx_orig_unset_task_deadline(task)
5✔
1426

1427

1428
_aiorpcx_orig_set_task_deadline    = aiorpcx.curio._set_task_deadline
5✔
1429
_aiorpcx_orig_unset_task_deadline  = aiorpcx.curio._unset_task_deadline
5✔
1430

1431
aiorpcx.curio._set_new_deadline    = _aiorpcx_monkeypatched_set_new_deadline
5✔
1432
aiorpcx.curio._set_task_deadline   = _aiorpcx_monkeypatched_set_task_deadline
5✔
1433
aiorpcx.curio._unset_task_deadline = _aiorpcx_monkeypatched_unset_task_deadline
5✔
1434

1435

1436
async def wait_for2(fut: Awaitable, timeout: Union[int, float, None]):
5✔
1437
    """Replacement for asyncio.wait_for,
1438
     due to bugs: https://bugs.python.org/issue42130 and https://github.com/python/cpython/issues/86296 ,
1439
     which are only fixed in python 3.12+.
1440
     """
1441
    if sys.version_info[:3] >= (3, 12):
5✔
1442
        return await asyncio.wait_for(fut, timeout)
3✔
1443
    else:
1444
        async with async_timeout(timeout):
2✔
1445
            return await asyncio.ensure_future(fut, loop=get_running_loop())
2✔
1446

1447

1448
if hasattr(asyncio, 'timeout'):  # python 3.11+
5✔
UNCOV
1449
    async_timeout = asyncio.timeout
4✔
1450
else:
1451
    class TimeoutAfterAsynciolike(aiorpcx.curio.TimeoutAfter):
1✔
1452
        async def __aexit__(self, exc_type, exc_value, traceback):
1✔
1453
            try:
1✔
1454
                await super().__aexit__(exc_type, exc_value, traceback)
1✔
1455
            except (aiorpcx.TaskTimeout, aiorpcx.UncaughtTimeoutError):
×
1456
                raise asyncio.TimeoutError from None
×
1457
            except aiorpcx.TimeoutCancellationError:
×
1458
                raise asyncio.CancelledError from None
×
1459

1460
    def async_timeout(delay: Union[int, float, None]):
1✔
1461
        if delay is None:
1✔
1462
            return nullcontext()
×
1463
        return TimeoutAfterAsynciolike(delay)
1✔
1464

1465

1466
class NetworkJobOnDefaultServer(Logger, ABC):
5✔
1467
    """An abstract base class for a job that runs on the main network
1468
    interface. Every time the main interface changes, the job is
1469
    restarted, and some of its internals are reset.
1470
    """
1471
    def __init__(self, network: 'Network'):
5✔
1472
        Logger.__init__(self)
5✔
1473
        self.network = network
5✔
1474
        self.interface = None  # type: Interface
5✔
1475
        self._restart_lock = asyncio.Lock()
5✔
1476
        # Ensure fairness between NetworkJobs. e.g. if multiple wallets
1477
        # are open, a large wallet's Synchronizer should not starve the small wallets:
1478
        self._network_request_semaphore = asyncio.Semaphore(100)
5✔
1479

1480
        self._reset()
5✔
1481
        # every time the main interface changes, restart:
1482
        register_callback(self._restart, ['default_server_changed'])
5✔
1483
        # also schedule a one-off restart now, as there might already be a main interface:
1484
        asyncio.run_coroutine_threadsafe(self._restart(), network.asyncio_loop)
5✔
1485

1486
    def _reset(self):
5✔
1487
        """Initialise fields. Called every time the underlying
1488
        server connection changes.
1489
        """
1490
        self.taskgroup = OldTaskGroup()
5✔
1491
        self.reset_request_counters()
5✔
1492

1493
    async def _start(self, interface: 'Interface'):
5✔
1494
        self.logger.debug(f"starting. interface.server={repr(str(interface.server))}")
×
1495
        self.interface = interface
×
1496

1497
        taskgroup = self.taskgroup
×
1498
        async def run_tasks_wrapper():
×
1499
            self.logger.debug(f"starting taskgroup ({hex(id(taskgroup))}).")
×
1500
            try:
×
1501
                await self._run_tasks(taskgroup=taskgroup)
×
1502
            except Exception as e:
×
1503
                self.logger.error(f"taskgroup died ({hex(id(taskgroup))}). exc={e!r}")
×
1504
                raise
×
1505
            finally:
1506
                self.logger.debug(f"taskgroup stopped ({hex(id(taskgroup))}).")
×
1507
        await interface.taskgroup.spawn(run_tasks_wrapper)
×
1508

1509
    @abstractmethod
5✔
1510
    async def _run_tasks(self, *, taskgroup: OldTaskGroup) -> None:
5✔
1511
        """Start tasks in taskgroup. Called every time the underlying
1512
        server connection changes.
1513
        """
1514
        # If self.taskgroup changed, don't start tasks. This can happen if we have
1515
        # been restarted *just now*, i.e. after the _run_tasks coroutine object was created.
1516
        if taskgroup != self.taskgroup:
×
1517
            raise asyncio.CancelledError()
×
1518

1519
    async def stop(self, *, full_shutdown: bool = True):
5✔
1520
        self.logger.debug(f"stopping. {full_shutdown=}")
×
1521
        if full_shutdown:
×
1522
            unregister_callback(self._restart)
×
1523
        await self.taskgroup.cancel_remaining()
×
1524

1525
    @log_exceptions
5✔
1526
    async def _restart(self, *args):
5✔
1527
        interface = self.network.interface
5✔
1528
        if interface is None:
5✔
1529
            return  # we should get called again soon
5✔
1530

1531
        async with self._restart_lock:
×
1532
            await self.stop(full_shutdown=False)
×
1533
            self._reset()
×
1534
            await self._start(interface)
×
1535

1536
    def reset_request_counters(self):
5✔
1537
        self._requests_sent = 0
5✔
1538
        self._requests_answered = 0
5✔
1539

1540
    def num_requests_sent_and_answered(self) -> Tuple[int, int]:
5✔
1541
        return self._requests_sent, self._requests_answered
×
1542

1543
    @property
5✔
1544
    def session(self):
5✔
1545
        s = self.interface.session
×
1546
        assert s is not None
×
1547
        return s
×
1548

1549

1550
async def detect_tor_socks_proxy() -> Optional[Tuple[str, int]]:
5✔
1551
    # Probable ports for Tor to listen at
1552
    candidates = [
×
1553
        ("127.0.0.1", 9050),
1554
        ("127.0.0.1", 9051),
1555
        ("127.0.0.1", 9150),
1556
    ]
1557

NEW
1558
    proxy_addr = None
×
NEW
1559
    async def test_net_addr(net_addr):
×
NEW
1560
        is_tor = await is_tor_socks_port(*net_addr)
×
1561
        # set result, and cancel remaining probes
NEW
1562
        if is_tor:
×
1563
            nonlocal proxy_addr
NEW
1564
            proxy_addr = net_addr
×
NEW
1565
            await group.cancel_remaining()
×
1566

NEW
1567
    async with OldTaskGroup() as group:
×
NEW
1568
        for net_addr in candidates:
×
NEW
1569
            await group.spawn(test_net_addr(net_addr))
×
NEW
1570
    return proxy_addr
×
1571

1572

1573
@log_exceptions
5✔
1574
async def is_tor_socks_port(host: str, port: int) -> bool:
5✔
1575
    # mimic "tor-resolve 0.0.0.0".
1576
    # see https://github.com/spesmilo/electrum/issues/7317#issuecomment-1369281075
1577
    # > this is a socks5 handshake, followed by a socks RESOLVE request as defined in
1578
    # > [tor's socks extension spec](https://github.com/torproject/torspec/blob/7116c9cdaba248aae07a3f1d0e15d9dd102f62c5/socks-extensions.txt#L63),
1579
    # > resolving 0.0.0.0, which being an IP, tor resolves itself without needing to ask a relay.
NEW
1580
    writer = None
×
1581
    try:
×
NEW
1582
        async with async_timeout(10):
×
NEW
1583
            reader, writer = await asyncio.open_connection(host, port)
×
NEW
1584
            writer.write(b'\x05\x01\x00\x05\xf0\x00\x03\x070.0.0.0\x00\x00')
×
NEW
1585
            await writer.drain()
×
NEW
1586
            data = await reader.read(1024)
×
NEW
1587
            if data == b'\x05\x00\x05\x00\x00\x01\x00\x00\x00\x00\x00\x00':
×
1588
                return True
×
NEW
1589
            return False
×
NEW
1590
    except (OSError, asyncio.TimeoutError):
×
NEW
1591
        return False
×
1592
    finally:
NEW
1593
        if writer:
×
NEW
1594
            writer.close()
×
1595

1596

1597
AS_LIB_USER_I_WANT_TO_MANAGE_MY_OWN_ASYNCIO_LOOP = False  # used by unit tests
5✔
1598

1599
_asyncio_event_loop = None  # type: Optional[asyncio.AbstractEventLoop]
5✔
1600
def get_asyncio_loop() -> asyncio.AbstractEventLoop:
5✔
1601
    """Returns the global asyncio event loop we use."""
1602
    if loop := _asyncio_event_loop:
5✔
1603
        return loop
5✔
1604
    if AS_LIB_USER_I_WANT_TO_MANAGE_MY_OWN_ASYNCIO_LOOP:
5✔
1605
        if loop := get_running_loop():
5✔
1606
            return loop
5✔
1607
    raise Exception("event loop not created yet")
×
1608

1609

1610
def create_and_start_event_loop() -> Tuple[asyncio.AbstractEventLoop,
5✔
1611
                                           asyncio.Future,
1612
                                           threading.Thread]:
1613
    global _asyncio_event_loop
1614
    if _asyncio_event_loop is not None:
×
1615
        raise Exception("there is already a running event loop")
×
1616

1617
    # asyncio.get_event_loop() became deprecated in python3.10. (see https://github.com/python/cpython/issues/83710)
1618
    # We set a custom event loop policy purely to be compatible with code that
1619
    # relies on asyncio.get_event_loop().
1620
    # - in python 3.8-3.9, asyncio.Event.__init__, asyncio.Lock.__init__,
1621
    #   and similar, calls get_event_loop. see https://github.com/python/cpython/pull/23420
1622
    class MyEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
×
1623
        def get_event_loop(self):
×
1624
            # In case electrum is being used as a library, there might be other
1625
            # event loops in use besides ours. To minimise interfering with those,
1626
            # if there is a loop running in the current thread, return that:
1627
            running_loop = get_running_loop()
×
1628
            if running_loop is not None:
×
1629
                return running_loop
×
1630
            # Otherwise, return our global loop:
1631
            return get_asyncio_loop()
×
1632
    asyncio.set_event_loop_policy(MyEventLoopPolicy())
×
1633

1634
    loop = asyncio.new_event_loop()
×
1635
    _asyncio_event_loop = loop
×
1636

1637
    def on_exception(loop, context):
×
1638
        """Suppress spurious messages it appears we cannot control."""
1639
        SUPPRESS_MESSAGE_REGEX = re.compile('SSL handshake|Fatal read error on|'
×
1640
                                            'SSL error in data received')
1641
        message = context.get('message')
×
1642
        if message and SUPPRESS_MESSAGE_REGEX.match(message):
×
1643
            return
×
1644
        loop.default_exception_handler(context)
×
1645

1646
    def run_event_loop():
×
1647
        try:
×
1648
            loop.run_until_complete(stopping_fut)
×
1649
        finally:
1650
            # clean-up
1651
            global _asyncio_event_loop
1652
            _asyncio_event_loop = None
×
1653

1654
    loop.set_exception_handler(on_exception)
×
1655
    # loop.set_debug(True)
1656
    stopping_fut = loop.create_future()
×
1657
    loop_thread = threading.Thread(
×
1658
        target=run_event_loop,
1659
        name='EventLoop',
1660
    )
1661
    loop_thread.start()
×
1662
    # Wait until the loop actually starts.
1663
    # On a slow PC, or with a debugger attached, this can take a few dozens of ms,
1664
    # and if we returned without a running loop, weird things can happen...
1665
    t0 = time.monotonic()
×
1666
    while not loop.is_running():
×
1667
        time.sleep(0.01)
×
1668
        if time.monotonic() - t0 > 5:
×
1669
            raise Exception("been waiting for 5 seconds but asyncio loop would not start!")
×
1670
    return loop, stopping_fut, loop_thread
×
1671

1672

1673
class OrderedDictWithIndex(OrderedDict):
5✔
1674
    """An OrderedDict that keeps track of the positions of keys.
1675

1676
    Note: very inefficient to modify contents, except to add new items.
1677
    """
1678

1679
    def __init__(self):
5✔
1680
        super().__init__()
×
1681
        self._key_to_pos = {}
×
1682
        self._pos_to_key = {}
×
1683

1684
    def _recalc_index(self):
5✔
1685
        self._key_to_pos = {key: pos for (pos, key) in enumerate(self.keys())}
×
1686
        self._pos_to_key = {pos: key for (pos, key) in enumerate(self.keys())}
×
1687

1688
    def pos_from_key(self, key):
5✔
1689
        return self._key_to_pos[key]
×
1690

1691
    def value_from_pos(self, pos):
5✔
1692
        key = self._pos_to_key[pos]
×
1693
        return self[key]
×
1694

1695
    def popitem(self, *args, **kwargs):
5✔
1696
        ret = super().popitem(*args, **kwargs)
×
1697
        self._recalc_index()
×
1698
        return ret
×
1699

1700
    def move_to_end(self, *args, **kwargs):
5✔
1701
        ret = super().move_to_end(*args, **kwargs)
×
1702
        self._recalc_index()
×
1703
        return ret
×
1704

1705
    def clear(self):
5✔
1706
        ret = super().clear()
×
1707
        self._recalc_index()
×
1708
        return ret
×
1709

1710
    def pop(self, *args, **kwargs):
5✔
1711
        ret = super().pop(*args, **kwargs)
×
1712
        self._recalc_index()
×
1713
        return ret
×
1714

1715
    def update(self, *args, **kwargs):
5✔
1716
        ret = super().update(*args, **kwargs)
×
1717
        self._recalc_index()
×
1718
        return ret
×
1719

1720
    def __delitem__(self, *args, **kwargs):
5✔
1721
        ret = super().__delitem__(*args, **kwargs)
×
1722
        self._recalc_index()
×
1723
        return ret
×
1724

1725
    def __setitem__(self, key, *args, **kwargs):
5✔
1726
        is_new_key = key not in self
×
1727
        ret = super().__setitem__(key, *args, **kwargs)
×
1728
        if is_new_key:
×
1729
            pos = len(self) - 1
×
1730
            self._key_to_pos[key] = pos
×
1731
            self._pos_to_key[pos] = key
×
1732
        return ret
×
1733

1734

1735
def multisig_type(wallet_type):
5✔
1736
    '''If wallet_type is mofn multi-sig, return [m, n],
1737
    otherwise return None.'''
1738
    if not wallet_type:
5✔
1739
        return None
×
1740
    match = re.match(r'(\d+)of(\d+)', wallet_type)
5✔
1741
    if match:
5✔
1742
        match = [int(x) for x in match.group(1, 2)]
5✔
1743
    return match
5✔
1744

1745

1746
def is_ip_address(x: Union[str, bytes]) -> bool:
5✔
1747
    if isinstance(x, bytes):
5✔
1748
        x = x.decode("utf-8")
×
1749
    try:
5✔
1750
        ipaddress.ip_address(x)
5✔
1751
        return True
5✔
1752
    except ValueError:
5✔
1753
        return False
5✔
1754

1755

1756
def is_localhost(host: str) -> bool:
5✔
1757
    if str(host) in ('localhost', 'localhost.',):
5✔
1758
        return True
5✔
1759
    if host[0] == '[' and host[-1] == ']':  # IPv6
5✔
1760
        host = host[1:-1]
5✔
1761
    try:
5✔
1762
        ip_addr = ipaddress.ip_address(host)  # type: Union[IPv4Address, IPv6Address]
5✔
1763
        return ip_addr.is_loopback
5✔
1764
    except ValueError:
5✔
1765
        pass  # not an IP
5✔
1766
    return False
5✔
1767

1768

1769
def is_private_netaddress(host: str) -> bool:
5✔
1770
    if is_localhost(host):
5✔
1771
        return True
5✔
1772
    if host[0] == '[' and host[-1] == ']':  # IPv6
5✔
1773
        host = host[1:-1]
5✔
1774
    try:
5✔
1775
        ip_addr = ipaddress.ip_address(host)  # type: Union[IPv4Address, IPv6Address]
5✔
1776
        return ip_addr.is_private
5✔
1777
    except ValueError:
5✔
1778
        pass  # not an IP
5✔
1779
    return False
5✔
1780

1781

1782
def list_enabled_bits(x: int) -> Sequence[int]:
5✔
1783
    """e.g. 77 (0b1001101) --> (0, 2, 3, 6)"""
1784
    binary = bin(x)[2:]
5✔
1785
    rev_bin = reversed(binary)
5✔
1786
    return tuple(i for i, b in enumerate(rev_bin) if b == '1')
5✔
1787

1788

1789
def resolve_dns_srv(host: str):
5✔
1790
    # FIXME this method is not using the network proxy. (although the proxy might not support UDP?)
1791
    srv_records = dns.resolver.resolve(host, 'SRV')
×
1792
    # priority: prefer lower
1793
    # weight: tie breaker; prefer higher
1794
    srv_records = sorted(srv_records, key=lambda x: (x.priority, -x.weight))
×
1795

1796
    def dict_from_srv_record(srv):
×
1797
        return {
×
1798
            'host': str(srv.target),
1799
            'port': srv.port,
1800
        }
1801
    return [dict_from_srv_record(srv) for srv in srv_records]
×
1802

1803

1804
def randrange(bound: int) -> int:
5✔
1805
    """Return a random integer k such that 1 <= k < bound, uniformly
1806
    distributed across that range.
1807
    This is guaranteed to be cryptographically strong.
1808
    """
1809
    # secrets.randbelow(bound) returns a random int: 0 <= r < bound,
1810
    # hence transformations:
1811
    return secrets.randbelow(bound - 1) + 1
5✔
1812

1813

1814
class CallbackManager(Logger):
5✔
1815
    # callbacks set by the GUI or any thread
1816
    # guarantee: the callbacks will always get triggered from the asyncio thread.
1817

1818
    def __init__(self):
5✔
1819
        Logger.__init__(self)
5✔
1820
        self.callback_lock = threading.Lock()
5✔
1821
        self.callbacks = defaultdict(list)      # note: needs self.callback_lock
5✔
1822
        self._running_cb_futs = set()
5✔
1823

1824
    def register_callback(self, func, events):
5✔
1825
        with self.callback_lock:
5✔
1826
            for event in events:
5✔
1827
                self.callbacks[event].append(func)
5✔
1828

1829
    def unregister_callback(self, callback):
5✔
1830
        with self.callback_lock:
5✔
1831
            for callbacks in self.callbacks.values():
5✔
1832
                if callback in callbacks:
5✔
1833
                    callbacks.remove(callback)
5✔
1834

1835
    def trigger_callback(self, event, *args):
5✔
1836
        """Trigger a callback with given arguments.
1837
        Can be called from any thread. The callback itself will get scheduled
1838
        on the event loop.
1839
        """
1840
        loop = get_asyncio_loop()
5✔
1841
        assert loop.is_running(), "event loop not running"
5✔
1842
        with self.callback_lock:
5✔
1843
            callbacks = self.callbacks[event][:]
5✔
1844
        for callback in callbacks:
5✔
1845
            if asyncio.iscoroutinefunction(callback):  # async cb
5✔
1846
                fut = asyncio.run_coroutine_threadsafe(callback(*args), loop)
5✔
1847
                # keep strong references around to avoid GC issues:
1848
                self._running_cb_futs.add(fut)
5✔
1849
                def on_done(fut_: concurrent.futures.Future):
5✔
1850
                    assert fut_.done()
5✔
1851
                    self._running_cb_futs.remove(fut_)
5✔
1852
                    if fut_.cancelled():
5✔
1853
                        self.logger.debug(f"cb cancelled. {event=}.")
5✔
1854
                    elif exc := fut_.exception():
5✔
1855
                        self.logger.error(f"cb errored. {event=}. {exc=}", exc_info=exc)
×
1856
                fut.add_done_callback(on_done)
5✔
1857
            else:  # non-async cb
1858
                # note: the cb needs to run in the asyncio thread
1859
                if get_running_loop() == loop:
5✔
1860
                    # run callback immediately, so that it is guaranteed
1861
                    # to have been executed when this method returns
1862
                    callback(*args)
5✔
1863
                else:
1864
                    # note: if cb raises, asyncio will log the exception
1865
                    loop.call_soon_threadsafe(callback, *args)
×
1866

1867

1868
callback_mgr = CallbackManager()
5✔
1869
trigger_callback = callback_mgr.trigger_callback
5✔
1870
register_callback = callback_mgr.register_callback
5✔
1871
unregister_callback = callback_mgr.unregister_callback
5✔
1872
_event_listeners = defaultdict(set)  # type: Dict[str, Set[str]]
5✔
1873

1874

1875
class EventListener:
5✔
1876
    """Use as a mixin for a class that has methods to be triggered on events.
1877
    - Methods that receive the callbacks should be named "on_event_*" and decorated with @event_listener.
1878
    - register_callbacks() should be called exactly once per instance of EventListener, e.g. in __init__
1879
    - unregister_callbacks() should be called at least once, e.g. when the instance is destroyed
1880
    """
1881

1882
    def _list_callbacks(self):
5✔
1883
        for c in self.__class__.__mro__:
5✔
1884
            classpath = f"{c.__module__}.{c.__name__}"
5✔
1885
            for method_name in _event_listeners[classpath]:
5✔
1886
                method = getattr(self, method_name)
5✔
1887
                assert callable(method)
5✔
1888
                assert method_name.startswith('on_event_')
5✔
1889
                yield method_name[len('on_event_'):], method
5✔
1890

1891
    def register_callbacks(self):
5✔
1892
        for name, method in self._list_callbacks():
5✔
1893
            #_logger.debug(f'registering callback {method}')
1894
            register_callback(method, [name])
5✔
1895

1896
    def unregister_callbacks(self):
5✔
1897
        for name, method in self._list_callbacks():
5✔
1898
            #_logger.debug(f'unregistering callback {method}')
1899
            unregister_callback(method)
5✔
1900

1901

1902
def event_listener(func):
5✔
1903
    """To be used in subclasses of EventListener only. (how to enforce this programmatically?)"""
1904
    classname, method_name = func.__qualname__.split('.')
5✔
1905
    assert method_name.startswith('on_event_')
5✔
1906
    classpath = f"{func.__module__}.{classname}"
5✔
1907
    _event_listeners[classpath].add(method_name)
5✔
1908
    return func
5✔
1909

1910

1911
_NetAddrType = TypeVar("_NetAddrType")
5✔
1912
# requirements for _NetAddrType:
1913
# - reasonable __hash__() implementation (e.g. based on host/port of remote endpoint)
1914

1915
class NetworkRetryManager(Generic[_NetAddrType]):
5✔
1916
    """Truncated Exponential Backoff for network connections."""
1917

1918
    def __init__(
5✔
1919
            self, *,
1920
            max_retry_delay_normal: float,
1921
            init_retry_delay_normal: float,
1922
            max_retry_delay_urgent: float = None,
1923
            init_retry_delay_urgent: float = None,
1924
    ):
1925
        self._last_tried_addr = {}  # type: Dict[_NetAddrType, Tuple[float, int]]  # (unix ts, num_attempts)
5✔
1926

1927
        # note: these all use "seconds" as unit
1928
        if max_retry_delay_urgent is None:
5✔
1929
            max_retry_delay_urgent = max_retry_delay_normal
5✔
1930
        if init_retry_delay_urgent is None:
5✔
1931
            init_retry_delay_urgent = init_retry_delay_normal
5✔
1932
        self._max_retry_delay_normal = max_retry_delay_normal
5✔
1933
        self._init_retry_delay_normal = init_retry_delay_normal
5✔
1934
        self._max_retry_delay_urgent = max_retry_delay_urgent
5✔
1935
        self._init_retry_delay_urgent = init_retry_delay_urgent
5✔
1936

1937
    def _trying_addr_now(self, addr: _NetAddrType) -> None:
5✔
1938
        last_time, num_attempts = self._last_tried_addr.get(addr, (0, 0))
×
1939
        # we add up to 1 second of noise to the time, so that clients are less likely
1940
        # to get synchronised and bombard the remote in connection waves:
1941
        cur_time = time.time() + random.random()
×
1942
        self._last_tried_addr[addr] = cur_time, num_attempts + 1
×
1943

1944
    def _on_connection_successfully_established(self, addr: _NetAddrType) -> None:
5✔
1945
        self._last_tried_addr[addr] = time.time(), 0
×
1946

1947
    def _can_retry_addr(self, addr: _NetAddrType, *,
5✔
1948
                        now: float = None, urgent: bool = False) -> bool:
1949
        if now is None:
×
1950
            now = time.time()
×
1951
        last_time, num_attempts = self._last_tried_addr.get(addr, (0, 0))
×
1952
        if urgent:
×
1953
            max_delay = self._max_retry_delay_urgent
×
1954
            init_delay = self._init_retry_delay_urgent
×
1955
        else:
1956
            max_delay = self._max_retry_delay_normal
×
1957
            init_delay = self._init_retry_delay_normal
×
1958
        delay = self.__calc_delay(multiplier=init_delay, max_delay=max_delay, num_attempts=num_attempts)
×
1959
        next_time = last_time + delay
×
1960
        return next_time < now
×
1961

1962
    @classmethod
5✔
1963
    def __calc_delay(cls, *, multiplier: float, max_delay: float,
5✔
1964
                     num_attempts: int) -> float:
1965
        num_attempts = min(num_attempts, 100_000)
×
1966
        try:
×
1967
            res = multiplier * 2 ** num_attempts
×
1968
        except OverflowError:
×
1969
            return max_delay
×
1970
        return max(0, min(max_delay, res))
×
1971

1972
    def _clear_addr_retry_times(self) -> None:
5✔
1973
        self._last_tried_addr.clear()
5✔
1974

1975

1976
class ESocksProxy(aiorpcx.SOCKSProxy):
5✔
1977
    # note: proxy will not leak DNS as create_connection()
1978
    # sets (local DNS) resolve=False by default
1979

1980
    async def open_connection(self, host=None, port=None, **kwargs):
5✔
1981
        loop = asyncio.get_running_loop()
×
1982
        reader = asyncio.StreamReader(loop=loop)
×
1983
        protocol = asyncio.StreamReaderProtocol(reader, loop=loop)
×
1984
        transport, _ = await self.create_connection(
×
1985
            lambda: protocol, host, port, **kwargs)
1986
        writer = asyncio.StreamWriter(transport, protocol, reader, loop)
×
1987
        return reader, writer
×
1988

1989
    @classmethod
5✔
1990
    def from_network_settings(cls, network: Optional['Network']) -> Optional['ESocksProxy']:
5✔
1991
        if not network or not network.proxy or not network.proxy.enabled:
5✔
1992
            return None
5✔
1993
        proxy = network.proxy
×
NEW
1994
        username, pw = proxy.user, proxy.password
×
1995
        if not username or not pw:
×
1996
            # is_proxy_tor is tri-state; None indicates it is still probing the proxy to test for TOR
1997
            if network.is_proxy_tor:
×
1998
                auth = aiorpcx.socks.SOCKSRandomAuth()
×
1999
            else:
2000
                auth = None
×
2001
        else:
2002
            auth = aiorpcx.socks.SOCKSUserAuth(username, pw)
×
NEW
2003
        addr = aiorpcx.NetAddress(proxy.host, proxy.port)
×
NEW
2004
        if proxy.mode == "socks4":
×
2005
            ret = cls(addr, aiorpcx.socks.SOCKS4a, auth)
×
NEW
2006
        elif proxy.mode == "socks5":
×
2007
            ret = cls(addr, aiorpcx.socks.SOCKS5, auth)
×
2008
        else:
2009
            raise NotImplementedError  # http proxy not available with aiorpcx
×
2010
        return ret
×
2011

2012

2013
class JsonRPCError(Exception):
5✔
2014

2015
    class Codes(enum.IntEnum):
5✔
2016
        # application-specific error codes
2017
        USERFACING = 1
5✔
2018
        INTERNAL = 2
5✔
2019

2020
    def __init__(self, *, code: int, message: str, data: Optional[dict] = None):
5✔
2021
        Exception.__init__(self)
×
2022
        self.code = code
×
2023
        self.message = message
×
2024
        self.data = data
×
2025

2026

2027
class JsonRPCClient:
5✔
2028

2029
    def __init__(self, session: aiohttp.ClientSession, url: str):
5✔
2030
        self.session = session
×
2031
        self.url = url
×
2032
        self._id = 0
×
2033

2034
    async def request(self, endpoint, *args):
5✔
2035
        """Send request to server, parse and return result.
2036
        note: parsing code is naive, the server is assumed to be well-behaved.
2037
              Up to the caller to handle exceptions, including those arising from parsing errors.
2038
        """
2039
        self._id += 1
×
2040
        data = ('{"jsonrpc": "2.0", "id":"%d", "method": "%s", "params": %s }'
×
2041
                % (self._id, endpoint, json.dumps(args)))
2042
        async with self.session.post(self.url, data=data) as resp:
×
2043
            if resp.status == 200:
×
2044
                r = await resp.json()
×
2045
                result = r.get('result')
×
2046
                error = r.get('error')
×
2047
                if error:
×
2048
                    raise JsonRPCError(code=error["code"], message=error["message"], data=error.get("data"))
×
2049
                else:
2050
                    return result
×
2051
            else:
2052
                text = await resp.text()
×
2053
                return 'Error: ' + str(text)
×
2054

2055
    def add_method(self, endpoint):
5✔
2056
        async def coro(*args):
×
2057
            return await self.request(endpoint, *args)
×
2058
        setattr(self, endpoint, coro)
×
2059

2060

2061
T = TypeVar('T')
5✔
2062

2063
def random_shuffled_copy(x: Iterable[T]) -> List[T]:
5✔
2064
    """Returns a shuffled copy of the input."""
2065
    x_copy = list(x)  # copy
5✔
2066
    random.shuffle(x_copy)  # shuffle in-place
5✔
2067
    return x_copy
5✔
2068

2069

2070
def test_read_write_permissions(path) -> None:
5✔
2071
    # note: There might already be a file at 'path'.
2072
    #       Make sure we do NOT overwrite/corrupt that!
2073
    temp_path = "%s.tmptest.%s" % (path, os.getpid())
5✔
2074
    echo = "fs r/w test"
5✔
2075
    try:
5✔
2076
        # test READ permissions for actual path
2077
        if os.path.exists(path):
5✔
2078
            with open(path, "rb") as f:
5✔
2079
                f.read(1)  # read 1 byte
5✔
2080
        # test R/W sanity for "similar" path
2081
        with open(temp_path, "w", encoding='utf-8') as f:
5✔
2082
            f.write(echo)
5✔
2083
        with open(temp_path, "r", encoding='utf-8') as f:
5✔
2084
            echo2 = f.read()
5✔
2085
        os.remove(temp_path)
5✔
2086
    except Exception as e:
×
2087
        raise IOError(e) from e
×
2088
    if echo != echo2:
5✔
2089
        raise IOError('echo sanity-check failed')
×
2090

2091

2092
class classproperty(property):
5✔
2093
    """~read-only class-level @property
2094
    from https://stackoverflow.com/a/13624858 by denis-ryzhkov
2095
    """
2096
    def __get__(self, owner_self, owner_cls):
5✔
2097
        return self.fget(owner_cls)
5✔
2098

2099

2100
def get_running_loop() -> Optional[asyncio.AbstractEventLoop]:
5✔
2101
    """Returns the asyncio event loop that is *running in this thread*, if any."""
2102
    try:
5✔
2103
        return asyncio.get_running_loop()
5✔
2104
    except RuntimeError:
×
2105
        return None
×
2106

2107

2108
def error_text_str_to_safe_str(err: str, *, max_len: Optional[int] = 500) -> str:
5✔
2109
    """Converts an untrusted error string to a sane printable ascii str.
2110
    Never raises.
2111
    """
2112
    text = error_text_bytes_to_safe_str(
5✔
2113
        err.encode("ascii", errors='backslashreplace'),
2114
        max_len=None)
2115
    return truncate_text(text, max_len=max_len)
5✔
2116

2117

2118
def error_text_bytes_to_safe_str(err: bytes, *, max_len: Optional[int] = 500) -> str:
5✔
2119
    """Converts an untrusted error bytes text to a sane printable ascii str.
2120
    Never raises.
2121

2122
    Note that naive ascii conversion would be insufficient. Fun stuff:
2123
    >>> b = b"my_long_prefix_blabla" + 21 * b"\x08" + b"malicious_stuff"
2124
    >>> s = b.decode("ascii")
2125
    >>> print(s)
2126
    malicious_stuffblabla
2127
    """
2128
    # convert to ascii, to get rid of unicode stuff
2129
    ascii_text = err.decode("ascii", errors='backslashreplace')
5✔
2130
    # do repr to handle ascii special chars (especially when printing/logging the str)
2131
    text = repr(ascii_text)
5✔
2132
    return truncate_text(text, max_len=max_len)
5✔
2133

2134

2135
def truncate_text(text: str, *, max_len: Optional[int]) -> str:
5✔
2136
    if max_len is None or len(text) <= max_len:
5✔
2137
        return text
5✔
2138
    else:
2139
        return text[:max_len] + f"... (truncated. orig_len={len(text)})"
5✔
2140

2141

2142
def nostr_pow_worker(nonce, nostr_pubk, target_bits, hash_function, hash_len_bits, shutdown):
5✔
2143
    """Function to generate PoW for Nostr, to be spawned in a ProcessPoolExecutor."""
2144
    hash_preimage = b'electrum-' + nostr_pubk
×
2145
    while True:
×
2146
        # we cannot check is_set on each iteration as it has a lot of overhead, this way we can check
2147
        # it with low overhead (just the additional range counter)
2148
        for i in range(1000000):
×
2149
            digest = hash_function(hash_preimage + nonce.to_bytes(32, 'big')).digest()
×
2150
            if int.from_bytes(digest, 'big') < (1 << (hash_len_bits - target_bits)):
×
2151
                shutdown.set()
×
2152
                return hash, nonce
×
2153
            nonce += 1
×
2154
        if shutdown.is_set():
×
2155
            return None, None
×
2156

2157

2158
async def gen_nostr_ann_pow(nostr_pubk: bytes, target_bits: int) -> Tuple[int, int]:
5✔
2159
    """Generate a PoW for a Nostr announcement. The PoW is hash[b'electrum-'+pubk+nonce]"""
2160
    import multiprocessing  # not available on Android, so we import it here
×
2161
    hash_function = hashlib.sha256
×
2162
    hash_len_bits = 256
×
2163
    max_nonce: int = (1 << (32 * 8)) - 1  # 32-byte nonce
×
2164
    start_nonce = 0
×
2165

2166
    max_workers = max(multiprocessing.cpu_count() - 1, 1)  # use all but one CPU
×
2167
    manager = multiprocessing.Manager()
×
2168
    shutdown = manager.Event()
×
2169
    with ProcessPoolExecutor(max_workers=max_workers) as executor:
×
2170
        tasks = []
×
2171
        loop = asyncio.get_running_loop()
×
2172
        for task in range(0, max_workers):
×
2173
            task = loop.run_in_executor(
×
2174
                executor,
2175
                nostr_pow_worker,
2176
                start_nonce,
2177
                nostr_pubk,
2178
                target_bits,
2179
                hash_function,
2180
                hash_len_bits,
2181
                shutdown
2182
            )
2183
            tasks.append(task)
×
2184
            start_nonce += max_nonce // max_workers  # split the nonce range between the processes
×
2185
            if start_nonce > max_nonce:  # make sure we don't go over the max_nonce
×
2186
                start_nonce = random.randint(0, int(max_nonce * 0.75))
×
2187

2188
        done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
×
2189
        hash_res, nonce_res = done.pop().result()
×
2190
        executor.shutdown(wait=False, cancel_futures=True)
×
2191

2192
    return nonce_res, get_nostr_ann_pow_amount(nostr_pubk, nonce_res)
×
2193

2194

2195
def get_nostr_ann_pow_amount(nostr_pubk: bytes, nonce: Optional[int]) -> int:
5✔
2196
    """Return the amount of leading zero bits for a nostr announcement PoW."""
2197
    if not nonce:
×
2198
        return 0
×
2199
    hash_function = hashlib.sha256
×
2200
    hash_len_bits = 256
×
2201
    hash_preimage = b'electrum-' + nostr_pubk
×
2202

2203
    digest = hash_function(hash_preimage + nonce.to_bytes(32, 'big')).digest()
×
2204
    digest = int.from_bytes(digest, 'big')
×
2205
    return hash_len_bits - digest.bit_length()
×
2206

2207

2208
class OnchainHistoryItem(NamedTuple):
5✔
2209
    txid: str
5✔
2210
    amount_sat: int
5✔
2211
    fee_sat: int
5✔
2212
    balance_sat: int
5✔
2213
    tx_mined_status: TxMinedInfo
5✔
2214
    group_id: Optional[str]
5✔
2215
    label: str
5✔
2216
    monotonic_timestamp: int
5✔
2217
    group_id: Optional[str]
5✔
2218
    def to_dict(self):
5✔
2219
        return {
×
2220
            'txid': self.txid,
2221
            'amount_sat': self.amount_sat,
2222
            'fee_sat': self.fee_sat,
2223
            'height': self.tx_mined_status.height,
2224
            'confirmations': self.tx_mined_status.conf,
2225
            'timestamp': self.tx_mined_status.timestamp,
2226
            'monotonic_timestamp': self.monotonic_timestamp,
2227
            'incoming': True if self.amount_sat>0 else False,
2228
            'bc_value': Satoshis(self.amount_sat),
2229
            'bc_balance': Satoshis(self.balance_sat),
2230
            'date': timestamp_to_datetime(self.tx_mined_status.timestamp),
2231
            'txpos_in_block': self.tx_mined_status.txpos,
2232
            'wanted_height': self.tx_mined_status.wanted_height,
2233
            'label': self.label,
2234
            'group_id': self.group_id,
2235
        }
2236

2237
class LightningHistoryItem(NamedTuple):
5✔
2238
    payment_hash: str
5✔
2239
    preimage: str
5✔
2240
    amount_msat: int
5✔
2241
    fee_msat: Optional[int]
5✔
2242
    type: str
5✔
2243
    group_id: Optional[str]
5✔
2244
    timestamp: int
5✔
2245
    label: str
5✔
2246
    def to_dict(self):
5✔
2247
        return {
×
2248
            'type': self.type,
2249
            'label': self.label,
2250
            'timestamp': self.timestamp or 0,
2251
            'date': timestamp_to_datetime(self.timestamp),
2252
            'amount_msat': self.amount_msat,
2253
            'fee_msat': self.fee_msat,
2254
            'payment_hash': self.payment_hash,
2255
            'preimage': self.preimage,
2256
            'group_id': self.group_id,
2257
            'ln_value': Satoshis(Decimal(self.amount_msat) / 1000),
2258
        }
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc