• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 6135053526237184

17 Jan 2025 11:16AM UTC coverage: 60.595% (+0.003%) from 60.592%
6135053526237184

push

CirrusCI

accumulator
qml: fix regression caused by ee42e0938

in qml, we need the password in-memory as the auth wrapper (@auth_protect) does not
pass the password to the wrapped fn.

20191 of 33321 relevant lines covered (60.6%)

3.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.53
/electrum/util.py
1
# Electrum - lightweight Bitcoin client
2
# Copyright (C) 2011 Thomas Voegtlin
3
#
4
# Permission is hereby granted, free of charge, to any person
5
# obtaining a copy of this software and associated documentation files
6
# (the "Software"), to deal in the Software without restriction,
7
# including without limitation the rights to use, copy, modify, merge,
8
# publish, distribute, sublicense, and/or sell copies of the Software,
9
# and to permit persons to whom the Software is furnished to do so,
10
# subject to the following conditions:
11
#
12
# The above copyright notice and this permission notice shall be
13
# included in all copies or substantial portions of the Software.
14
#
15
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
16
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
17
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
18
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
19
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
20
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
21
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22
# SOFTWARE.
23
import binascii
5✔
24
import concurrent.futures
5✔
25
import logging
5✔
26
import os, sys, re, json
5✔
27
from collections import defaultdict, OrderedDict
5✔
28
from typing import (NamedTuple, Union, TYPE_CHECKING, Tuple, Optional, Callable, Any,
5✔
29
                    Sequence, Dict, Generic, TypeVar, List, Iterable, Set, Awaitable)
30
from datetime import datetime, timezone
5✔
31
import decimal
5✔
32
from decimal import Decimal
5✔
33
import traceback
5✔
34
import urllib
5✔
35
import threading
5✔
36
import hmac
5✔
37
import stat
5✔
38
import locale
5✔
39
import asyncio
5✔
40
import urllib.request, urllib.parse, urllib.error
5✔
41
import builtins
5✔
42
import json
5✔
43
import time
5✔
44
from typing import NamedTuple, Optional
5✔
45
import ssl
5✔
46
import ipaddress
5✔
47
from ipaddress import IPv4Address, IPv6Address
5✔
48
import random
5✔
49
import secrets
5✔
50
import functools
5✔
51
from functools import partial
5✔
52
from abc import abstractmethod, ABC
5✔
53
import socket
5✔
54
import enum
5✔
55
from contextlib import nullcontext
5✔
56

57
import attr
5✔
58
import aiohttp
5✔
59
from aiohttp_socks import ProxyConnector, ProxyType
5✔
60
import aiorpcx
5✔
61
import certifi
5✔
62
import dns.resolver
5✔
63

64
from .i18n import _
5✔
65
from .logging import get_logger, Logger
5✔
66

67
if TYPE_CHECKING:
5✔
68
    from .network import Network
×
69
    from .interface import Interface
×
70
    from .simple_config import SimpleConfig
×
71
    from .paymentrequest import PaymentRequest
×
72

73

74
_logger = get_logger(__name__)
5✔
75

76

77
def inv_dict(d):
5✔
78
    return {v: k for k, v in d.items()}
5✔
79

80

81
def all_subclasses(cls) -> Set:
5✔
82
    """Return all (transitive) subclasses of cls."""
83
    res = set(cls.__subclasses__())
5✔
84
    for sub in res.copy():
5✔
85
        res |= all_subclasses(sub)
5✔
86
    return res
5✔
87

88

89
ca_path = certifi.where()
5✔
90

91

92
base_units = {'BTC':8, 'mBTC':5, 'bits':2, 'sat':0}
5✔
93
base_units_inverse = inv_dict(base_units)
5✔
94
base_units_list = ['BTC', 'mBTC', 'bits', 'sat']  # list(dict) does not guarantee order
5✔
95

96
DECIMAL_POINT_DEFAULT = 5  # mBTC
5✔
97

98

99
class UnknownBaseUnit(Exception): pass
5✔
100

101

102
def decimal_point_to_base_unit_name(dp: int) -> str:
5✔
103
    # e.g. 8 -> "BTC"
104
    try:
5✔
105
        return base_units_inverse[dp]
5✔
106
    except KeyError:
×
107
        raise UnknownBaseUnit(dp) from None
×
108

109

110
def base_unit_name_to_decimal_point(unit_name: str) -> int:
5✔
111
    """Returns the max number of digits allowed after the decimal point."""
112
    # e.g. "BTC" -> 8
113
    try:
×
114
        return base_units[unit_name]
×
115
    except KeyError:
×
116
        raise UnknownBaseUnit(unit_name) from None
×
117

118
def parse_max_spend(amt: Any) -> Optional[int]:
5✔
119
    """Checks if given amount is "spend-max"-like.
120
    Returns None or the positive integer weight for "max". Never raises.
121

122
    When creating invoices and on-chain txs, the user can specify to send "max".
123
    This is done by setting the amount to '!'. Splitting max between multiple
124
    tx outputs is also possible, and custom weights (positive ints) can also be used.
125
    For example, to send 40% of all coins to address1, and 60% to address2:
126
    ```
127
    address1, 2!
128
    address2, 3!
129
    ```
130
    """
131
    if not (isinstance(amt, str) and amt and amt[-1] == '!'):
5✔
132
        return None
5✔
133
    if amt == '!':
5✔
134
        return 1
5✔
135
    x = amt[:-1]
5✔
136
    try:
5✔
137
        x = int(x)
5✔
138
    except ValueError:
×
139
        return None
×
140
    if x > 0:
5✔
141
        return x
5✔
142
    return None
×
143

144
class NotEnoughFunds(Exception):
5✔
145
    def __str__(self):
5✔
146
        return _("Insufficient funds")
×
147

148

149
class UneconomicFee(Exception):
5✔
150
    def __str__(self):
5✔
151
        return _("The fee for the transaction is higher than the funds gained from it.")
×
152

153

154
class NoDynamicFeeEstimates(Exception):
5✔
155
    def __str__(self):
5✔
156
        return _('Dynamic fee estimates not available')
×
157

158

159
class BelowDustLimit(Exception):
5✔
160
    pass
5✔
161

162

163
class InvalidPassword(Exception):
5✔
164
    def __init__(self, message: Optional[str] = None):
5✔
165
        self.message = message
5✔
166

167
    def __str__(self):
5✔
168
        if self.message is None:
×
169
            return _("Incorrect password")
×
170
        else:
171
            return str(self.message)
×
172

173

174
class AddTransactionException(Exception):
5✔
175
    pass
5✔
176

177

178
class UnrelatedTransactionException(AddTransactionException):
5✔
179
    def __str__(self):
5✔
180
        return _("Transaction is unrelated to this wallet.")
×
181

182

183
class FileImportFailed(Exception):
5✔
184
    def __init__(self, message=''):
5✔
185
        self.message = str(message)
×
186

187
    def __str__(self):
5✔
188
        return _("Failed to import from file.") + "\n" + self.message
×
189

190

191
class FileExportFailed(Exception):
5✔
192
    def __init__(self, message=''):
5✔
193
        self.message = str(message)
×
194

195
    def __str__(self):
5✔
196
        return _("Failed to export to file.") + "\n" + self.message
×
197

198

199
class WalletFileException(Exception):
5✔
200
    def __init__(self, message='', *, should_report_crash: bool = False):
5✔
201
        Exception.__init__(self, message)
5✔
202
        self.should_report_crash = should_report_crash
5✔
203

204

205
class BitcoinException(Exception): pass
5✔
206

207

208
class UserFacingException(Exception):
5✔
209
    """Exception that contains information intended to be shown to the user."""
210

211

212
class InvoiceError(UserFacingException): pass
5✔
213

214

215
class NetworkOfflineException(UserFacingException):
5✔
216
    """Can be raised if we are running in offline mode (--offline flag)
217
    and the user requests an operation that requires the network.
218
    """
219
    def __str__(self):
5✔
220
        return _("You are offline.")
×
221

222

223
# Throw this exception to unwind the stack like when an error occurs.
224
# However unlike other exceptions the user won't be informed.
225
class UserCancelled(Exception):
5✔
226
    '''An exception that is suppressed from the user'''
227
    pass
5✔
228

229

230
def to_decimal(x: Union[str, float, int, Decimal]) -> Decimal:
5✔
231
    # helper function mainly for float->Decimal conversion, i.e.:
232
    #   >>> Decimal(41754.681)
233
    #   Decimal('41754.680999999996856786310672760009765625')
234
    #   >>> Decimal("41754.681")
235
    #   Decimal('41754.681')
236
    if isinstance(x, Decimal):
5✔
237
        return x
×
238
    return Decimal(str(x))
5✔
239

240

241
# note: this is not a NamedTuple as then its json encoding cannot be customized
242
class Satoshis(object):
5✔
243
    __slots__ = ('value',)
5✔
244

245
    def __new__(cls, value):
5✔
246
        self = super(Satoshis, cls).__new__(cls)
×
247
        # note: 'value' sometimes has msat precision
248
        assert isinstance(value, (int, Decimal)), f"unexpected type for {value=!r}"
×
249
        self.value = value
×
250
        return self
×
251

252
    def __repr__(self):
5✔
253
        return f'Satoshis({self.value})'
×
254

255
    def __str__(self):
5✔
256
        # note: precision is truncated to satoshis here
257
        return format_satoshis(self.value)
×
258

259
    def __eq__(self, other):
5✔
260
        return self.value == other.value
×
261

262
    def __ne__(self, other):
5✔
263
        return not (self == other)
×
264

265
    def __add__(self, other):
5✔
266
        return Satoshis(self.value + other.value)
×
267

268

269
# note: this is not a NamedTuple as then its json encoding cannot be customized
270
class Fiat(object):
5✔
271
    __slots__ = ('value', 'ccy')
5✔
272

273
    def __new__(cls, value: Optional[Decimal], ccy: str):
5✔
274
        self = super(Fiat, cls).__new__(cls)
×
275
        self.ccy = ccy
×
276
        if not isinstance(value, (Decimal, type(None))):
×
277
            raise TypeError(f"value should be Decimal or None, not {type(value)}")
×
278
        self.value = value
×
279
        return self
×
280

281
    def __repr__(self):
5✔
282
        return 'Fiat(%s)'% self.__str__()
×
283

284
    def __str__(self):
5✔
285
        if self.value is None or self.value.is_nan():
×
286
            return _('No Data')
×
287
        else:
288
            return "{:.2f}".format(self.value)
×
289

290
    def to_ui_string(self):
5✔
291
        if self.value is None or self.value.is_nan():
×
292
            return _('No Data')
×
293
        else:
294
            return "{:.2f}".format(self.value) + ' ' + self.ccy
×
295

296
    def __eq__(self, other):
5✔
297
        if not isinstance(other, Fiat):
×
298
            return False
×
299
        if self.ccy != other.ccy:
×
300
            return False
×
301
        if isinstance(self.value, Decimal) and isinstance(other.value, Decimal) \
×
302
                and self.value.is_nan() and other.value.is_nan():
303
            return True
×
304
        return self.value == other.value
×
305

306
    def __ne__(self, other):
5✔
307
        return not (self == other)
×
308

309
    def __add__(self, other):
5✔
310
        assert self.ccy == other.ccy
×
311
        return Fiat(self.value + other.value, self.ccy)
×
312

313

314
class MyEncoder(json.JSONEncoder):
5✔
315
    def default(self, obj):
5✔
316
        # note: this does not get called for namedtuples :(  https://bugs.python.org/issue30343
317
        from .transaction import Transaction, TxOutput
5✔
318
        if isinstance(obj, Transaction):
5✔
319
            return obj.serialize()
5✔
320
        if isinstance(obj, TxOutput):
5✔
321
            return obj.to_legacy_tuple()
5✔
322
        if isinstance(obj, Satoshis):
5✔
323
            return str(obj)
×
324
        if isinstance(obj, Fiat):
5✔
325
            return str(obj)
×
326
        if isinstance(obj, Decimal):
5✔
327
            return str(obj)
×
328
        if isinstance(obj, datetime):
5✔
329
            return obj.isoformat(' ')[:-3]
×
330
        if isinstance(obj, set):
5✔
331
            return list(obj)
×
332
        if isinstance(obj, bytes): # for nametuples in lnchannel
5✔
333
            return obj.hex()
5✔
334
        if hasattr(obj, 'to_json') and callable(obj.to_json):
5✔
335
            return obj.to_json()
5✔
336
        return super(MyEncoder, self).default(obj)
×
337

338

339
class ThreadJob(Logger):
5✔
340
    """A job that is run periodically from a thread's main loop.  run() is
341
    called from that thread's context.
342
    """
343

344
    def __init__(self):
5✔
345
        Logger.__init__(self)
5✔
346

347
    def run(self):
5✔
348
        """Called periodically from the thread"""
349
        pass
×
350

351
class DebugMem(ThreadJob):
5✔
352
    '''A handy class for debugging GC memory leaks'''
353
    def __init__(self, classes, interval=30):
5✔
354
        ThreadJob.__init__(self)
×
355
        self.next_time = 0
×
356
        self.classes = classes
×
357
        self.interval = interval
×
358

359
    def mem_stats(self):
5✔
360
        import gc
×
361
        self.logger.info("Start memscan")
×
362
        gc.collect()
×
363
        objmap = defaultdict(list)
×
364
        for obj in gc.get_objects():
×
365
            for class_ in self.classes:
×
366
                if isinstance(obj, class_):
×
367
                    objmap[class_].append(obj)
×
368
        for class_, objs in objmap.items():
×
369
            self.logger.info(f"{class_.__name__}: {len(objs)}")
×
370
        self.logger.info("Finish memscan")
×
371

372
    def run(self):
5✔
373
        if time.time() > self.next_time:
×
374
            self.mem_stats()
×
375
            self.next_time = time.time() + self.interval
×
376

377
class DaemonThread(threading.Thread, Logger):
5✔
378
    """ daemon thread that terminates cleanly """
379

380
    LOGGING_SHORTCUT = 'd'
5✔
381

382
    def __init__(self):
5✔
383
        threading.Thread.__init__(self)
5✔
384
        Logger.__init__(self)
5✔
385
        self.parent_thread = threading.current_thread()
5✔
386
        self.running = False
5✔
387
        self.running_lock = threading.Lock()
5✔
388
        self.job_lock = threading.Lock()
5✔
389
        self.jobs = []
5✔
390
        self.stopped_event = threading.Event()        # set when fully stopped
5✔
391
        self.stopped_event_async = asyncio.Event()    # set when fully stopped
5✔
392
        self.wake_up_event = threading.Event()  # for perf optimisation of polling in run()
5✔
393

394
    def add_jobs(self, jobs):
5✔
395
        with self.job_lock:
5✔
396
            self.jobs.extend(jobs)
5✔
397

398
    def run_jobs(self):
5✔
399
        # Don't let a throwing job disrupt the thread, future runs of
400
        # itself, or other jobs.  This is useful protection against
401
        # malformed or malicious server responses
402
        with self.job_lock:
5✔
403
            for job in self.jobs:
5✔
404
                try:
5✔
405
                    job.run()
5✔
406
                except Exception as e:
×
407
                    self.logger.exception('')
×
408

409
    def remove_jobs(self, jobs):
5✔
410
        with self.job_lock:
×
411
            for job in jobs:
×
412
                self.jobs.remove(job)
×
413

414
    def start(self):
5✔
415
        with self.running_lock:
5✔
416
            self.running = True
5✔
417
        return threading.Thread.start(self)
5✔
418

419
    def is_running(self):
5✔
420
        with self.running_lock:
5✔
421
            return self.running and self.parent_thread.is_alive()
5✔
422

423
    def stop(self):
5✔
424
        with self.running_lock:
5✔
425
            self.running = False
5✔
426
            self.wake_up_event.set()
5✔
427
            self.wake_up_event.clear()
5✔
428

429
    def on_stop(self):
5✔
430
        if 'ANDROID_DATA' in os.environ:
5✔
431
            import jnius
×
432
            jnius.detach()
×
433
            self.logger.info("jnius detach")
×
434
        self.logger.info("stopped")
5✔
435
        self.stopped_event.set()
5✔
436
        loop = get_asyncio_loop()
5✔
437
        loop.call_soon_threadsafe(self.stopped_event_async.set)
5✔
438

439

440
def print_stderr(*args):
5✔
441
    args = [str(item) for item in args]
×
442
    sys.stderr.write(" ".join(args) + "\n")
×
443
    sys.stderr.flush()
×
444

445
def print_msg(*args):
5✔
446
    # Stringify args
447
    args = [str(item) for item in args]
×
448
    sys.stdout.write(" ".join(args) + "\n")
×
449
    sys.stdout.flush()
×
450

451
def json_encode(obj):
5✔
452
    try:
×
453
        s = json.dumps(obj, sort_keys = True, indent = 4, cls=MyEncoder)
×
454
    except TypeError:
×
455
        s = repr(obj)
×
456
    return s
×
457

458
def json_decode(x):
5✔
459
    try:
5✔
460
        return json.loads(x, parse_float=Decimal)
5✔
461
    except Exception:
5✔
462
        return x
5✔
463

464
def json_normalize(x):
5✔
465
    # note: The return value of commands, when going through the JSON-RPC interface,
466
    #       is json-encoded. The encoder used there cannot handle some types, e.g. electrum.util.Satoshis.
467
    # note: We should not simply do "json_encode(x)" here, as then later x would get doubly json-encoded.
468
    # see #5868
469
    return json_decode(json_encode(x))
×
470

471

472
# taken from Django Source Code
473
def constant_time_compare(val1, val2):
5✔
474
    """Return True if the two strings are equal, False otherwise."""
475
    return hmac.compare_digest(to_bytes(val1, 'utf8'), to_bytes(val2, 'utf8'))
×
476

477

478
_profiler_logger = _logger.getChild('profiler')
5✔
479
def profiler(func=None, *, min_threshold: Union[int, float, None] = None):
5✔
480
    """Function decorator that logs execution time.
481

482
    min_threshold: if set, only log if time taken is higher than threshold
483
    NOTE: does not work with async methods.
484
    """
485
    if func is None:  # to make "@profiler(...)" work. (in addition to bare "@profiler")
5✔
486
        return partial(profiler, min_threshold=min_threshold)
5✔
487
    def do_profile(*args, **kw_args):
5✔
488
        name = func.__qualname__
5✔
489
        t0 = time.time()
5✔
490
        o = func(*args, **kw_args)
5✔
491
        t = time.time() - t0
5✔
492
        if min_threshold is None or t > min_threshold:
5✔
493
            _profiler_logger.debug(f"{name} {t:,.4f} sec")
5✔
494
        return o
5✔
495
    return do_profile
5✔
496

497

498
class AsyncHangDetector:
5✔
499
    """Context manager that logs every `n` seconds if encapsulated context still has not exited."""
500

501
    def __init__(
5✔
502
        self,
503
        *,
504
        period_sec: int = 15,
505
        message: str,
506
        logger: logging.Logger = None,
507
    ):
508
        self.period_sec = period_sec
5✔
509
        self.message = message
5✔
510
        self.logger = logger or _logger
5✔
511

512
    async def _monitor(self):
5✔
513
        # note: this assumes that the event loop itself is not blocked
514
        t0 = time.monotonic()
5✔
515
        while True:
5✔
516
            await asyncio.sleep(self.period_sec)
5✔
517
            t1 = time.monotonic()
×
518
            self.logger.info(f"{self.message} (after {t1 - t0:.2f} sec)")
×
519

520
    async def __aenter__(self):
5✔
521
        self.mtask = asyncio.create_task(self._monitor())
5✔
522

523
    async def __aexit__(self, exc_type, exc, tb):
5✔
524
        self.mtask.cancel()
5✔
525

526

527
def android_ext_dir():
5✔
528
    from android.storage import primary_external_storage_path
×
529
    return primary_external_storage_path()
×
530

531
def android_backup_dir():
5✔
532
    pkgname = get_android_package_name()
×
533
    d = os.path.join(android_ext_dir(), pkgname)
×
534
    if not os.path.exists(d):
×
535
        os.mkdir(d)
×
536
    return d
×
537

538
def android_data_dir():
5✔
539
    import jnius
×
540
    PythonActivity = jnius.autoclass('org.kivy.android.PythonActivity')
×
541
    return PythonActivity.mActivity.getFilesDir().getPath() + '/data'
×
542

543
def ensure_sparse_file(filename):
5✔
544
    # On modern Linux, no need to do anything.
545
    # On Windows, need to explicitly mark file.
546
    if os.name == "nt":
×
547
        try:
×
548
            os.system('fsutil sparse setflag "{}" 1'.format(filename))
×
549
        except Exception as e:
×
550
            _logger.info(f'error marking file {filename} as sparse: {e}')
×
551

552

553
def get_headers_dir(config):
5✔
554
    return config.path
5✔
555

556

557
def assert_datadir_available(config_path):
5✔
558
    path = config_path
5✔
559
    if os.path.exists(path):
5✔
560
        return
5✔
561
    else:
562
        raise FileNotFoundError(
×
563
            'Electrum datadir does not exist. Was it deleted while running?' + '\n' +
564
            'Should be at {}'.format(path))
565

566

567
def assert_file_in_datadir_available(path, config_path):
5✔
568
    if os.path.exists(path):
×
569
        return
×
570
    else:
571
        assert_datadir_available(config_path)
×
572
        raise FileNotFoundError(
×
573
            'Cannot find file but datadir is there.' + '\n' +
574
            'Should be at {}'.format(path))
575

576

577
def standardize_path(path):
5✔
578
    # note: os.path.realpath() is not used, as on Windows it can return non-working paths (see #8495).
579
    #       This means that we don't resolve symlinks!
580
    return os.path.normcase(
5✔
581
                os.path.abspath(
582
                    os.path.expanduser(
583
                        path
584
    )))
585

586

587
def get_new_wallet_name(wallet_folder: str) -> str:
5✔
588
    """Returns a file basename for a new wallet to be used.
589
    Can raise OSError.
590
    """
591
    i = 1
5✔
592
    while True:
5✔
593
        filename = "wallet_%d" % i
5✔
594
        if filename in os.listdir(wallet_folder):
5✔
595
            i += 1
5✔
596
        else:
597
            break
5✔
598
    return filename
5✔
599

600

601
def is_android_debug_apk() -> bool:
5✔
602
    is_android = 'ANDROID_DATA' in os.environ
×
603
    if not is_android:
×
604
        return False
×
605
    from jnius import autoclass
×
606
    pkgname = get_android_package_name()
×
607
    build_config = autoclass(f"{pkgname}.BuildConfig")
×
608
    return bool(build_config.DEBUG)
×
609

610

611
def get_android_package_name() -> str:
5✔
612
    is_android = 'ANDROID_DATA' in os.environ
×
613
    assert is_android
×
614
    from jnius import autoclass
×
615
    from android.config import ACTIVITY_CLASS_NAME
×
616
    activity = autoclass(ACTIVITY_CLASS_NAME).mActivity
×
617
    pkgname = str(activity.getPackageName())
×
618
    return pkgname
×
619

620

621
def assert_bytes(*args):
5✔
622
    """
623
    porting helper, assert args type
624
    """
625
    try:
5✔
626
        for x in args:
5✔
627
            assert isinstance(x, (bytes, bytearray))
5✔
628
    except Exception:
×
629
        print('assert bytes failed', list(map(type, args)))
×
630
        raise
×
631

632

633
def assert_str(*args):
5✔
634
    """
635
    porting helper, assert args type
636
    """
637
    for x in args:
×
638
        assert isinstance(x, str)
×
639

640

641
def to_string(x, enc) -> str:
5✔
642
    if isinstance(x, (bytes, bytearray)):
5✔
643
        return x.decode(enc)
5✔
644
    if isinstance(x, str):
×
645
        return x
×
646
    else:
647
        raise TypeError("Not a string or bytes like object")
×
648

649

650
def to_bytes(something, encoding='utf8') -> bytes:
5✔
651
    """
652
    cast string to bytes() like object, but for python2 support it's bytearray copy
653
    """
654
    if isinstance(something, bytes):
5✔
655
        return something
5✔
656
    if isinstance(something, str):
5✔
657
        return something.encode(encoding)
5✔
658
    elif isinstance(something, bytearray):
5✔
659
        return bytes(something)
5✔
660
    else:
661
        raise TypeError("Not a string or bytes like object")
5✔
662

663

664
bfh = bytes.fromhex
5✔
665

666

667
def xor_bytes(a: bytes, b: bytes) -> bytes:
5✔
668
    size = min(len(a), len(b))
5✔
669
    return ((int.from_bytes(a[:size], "big") ^ int.from_bytes(b[:size], "big"))
5✔
670
            .to_bytes(size, "big"))
671

672

673
def user_dir():
5✔
674
    if "ELECTRUMDIR" in os.environ:
×
675
        return os.environ["ELECTRUMDIR"]
×
676
    elif 'ANDROID_DATA' in os.environ:
×
677
        return android_data_dir()
×
678
    elif os.name == 'posix':
×
679
        return os.path.join(os.environ["HOME"], ".electrum")
×
680
    elif "APPDATA" in os.environ:
×
681
        return os.path.join(os.environ["APPDATA"], "Electrum")
×
682
    elif "LOCALAPPDATA" in os.environ:
×
683
        return os.path.join(os.environ["LOCALAPPDATA"], "Electrum")
×
684
    else:
685
        #raise Exception("No home directory found in environment variables.")
686
        return
×
687

688

689
def resource_path(*parts):
5✔
690
    return os.path.join(pkg_dir, *parts)
5✔
691

692

693
# absolute path to python package folder of electrum ("lib")
694
pkg_dir = os.path.split(os.path.realpath(__file__))[0]
5✔
695

696

697
def is_valid_email(s):
5✔
698
    regexp = r"[^@]+@[^@]+\.[^@]+"
×
699
    return re.match(regexp, s) is not None
×
700

701

702
def is_hash256_str(text: Any) -> bool:
5✔
703
    if not isinstance(text, str): return False
5✔
704
    if len(text) != 64: return False
5✔
705
    return is_hex_str(text)
5✔
706

707

708
def is_hex_str(text: Any) -> bool:
5✔
709
    if not isinstance(text, str): return False
5✔
710
    try:
5✔
711
        b = bytes.fromhex(text)
5✔
712
    except Exception:
5✔
713
        return False
5✔
714
    # forbid whitespaces in text:
715
    if len(text) != 2 * len(b):
5✔
716
        return False
5✔
717
    return True
5✔
718

719

720
def is_integer(val: Any) -> bool:
5✔
721
    return isinstance(val, int)
5✔
722

723

724
def is_non_negative_integer(val: Any) -> bool:
5✔
725
    if is_integer(val):
5✔
726
        return val >= 0
5✔
727
    return False
5✔
728

729

730
def is_int_or_float(val: Any) -> bool:
5✔
731
    return isinstance(val, (int, float))
5✔
732

733

734
def is_non_negative_int_or_float(val: Any) -> bool:
5✔
735
    if is_int_or_float(val):
5✔
736
        return val >= 0
5✔
737
    return False
5✔
738

739

740
def chunks(items, size: int):
5✔
741
    """Break up items, an iterable, into chunks of length size."""
742
    if size < 1:
5✔
743
        raise ValueError(f"size must be positive, not {repr(size)}")
5✔
744
    for i in range(0, len(items), size):
5✔
745
        yield items[i: i + size]
5✔
746

747

748
def format_satoshis_plain(
5✔
749
        x: Union[int, float, Decimal, str],  # amount in satoshis,
750
        *,
751
        decimal_point: int = 8,  # how much to shift decimal point to left (default: sat->BTC)
752
) -> str:
753
    """Display a satoshi amount scaled.  Always uses a '.' as a decimal
754
    point and has no thousands separator"""
755
    if parse_max_spend(x):
5✔
756
        return f'max({x})'
×
757
    assert isinstance(x, (int, float, Decimal)), f"{x!r} should be a number"
5✔
758
    scale_factor = pow(10, decimal_point)
5✔
759
    return "{:.8f}".format(Decimal(x) / scale_factor).rstrip('0').rstrip('.')
5✔
760

761

762
# Check that Decimal precision is sufficient.
763
# We need at the very least ~20, as we deal with msat amounts, and
764
# log10(21_000_000 * 10**8 * 1000) ~= 18.3
765
# decimal.DefaultContext.prec == 28 by default, but it is mutable.
766
# We enforce that we have at least that available.
767
assert decimal.getcontext().prec >= 28, f"PyDecimal precision too low: {decimal.getcontext().prec}"
5✔
768

769
# DECIMAL_POINT = locale.localeconv()['decimal_point']  # type: str
770
DECIMAL_POINT = "."
5✔
771
THOUSANDS_SEP = " "
5✔
772
assert len(DECIMAL_POINT) == 1, f"DECIMAL_POINT has unexpected len. {DECIMAL_POINT!r}"
5✔
773
assert len(THOUSANDS_SEP) == 1, f"THOUSANDS_SEP has unexpected len. {THOUSANDS_SEP!r}"
5✔
774

775

776
def format_satoshis(
5✔
777
        x: Union[int, float, Decimal, str, None],  # amount in satoshis
778
        *,
779
        num_zeros: int = 0,
780
        decimal_point: int = 8,  # how much to shift decimal point to left (default: sat->BTC)
781
        precision: int = 0,  # extra digits after satoshi precision
782
        is_diff: bool = False,  # if True, enforce a leading sign (+/-)
783
        whitespaces: bool = False,  # if True, add whitespaces, to align numbers in a column
784
        add_thousands_sep: bool = False,  # if True, add whitespaces, for better readability of the numbers
785
) -> str:
786
    if x is None:
5✔
787
        return 'unknown'
×
788
    if parse_max_spend(x):
5✔
789
        return f'max({x})'
×
790
    assert isinstance(x, (int, float, Decimal)), f"{x!r} should be a number"
5✔
791
    # lose redundant precision
792
    x = Decimal(x).quantize(Decimal(10) ** (-precision))
5✔
793
    # format string
794
    overall_precision = decimal_point + precision  # max digits after final decimal point
5✔
795
    decimal_format = "." + str(overall_precision) if overall_precision > 0 else ""
5✔
796
    if is_diff:
5✔
797
        decimal_format = '+' + decimal_format
5✔
798
    # initial result
799
    scale_factor = pow(10, decimal_point)
5✔
800
    result = ("{:" + decimal_format + "f}").format(x / scale_factor)
5✔
801
    if "." not in result: result += "."
5✔
802
    result = result.rstrip('0')
5✔
803
    # add extra decimal places (zeros)
804
    integer_part, fract_part = result.split(".")
5✔
805
    if len(fract_part) < num_zeros:
5✔
806
        fract_part += "0" * (num_zeros - len(fract_part))
5✔
807
    # add whitespaces as thousands' separator for better readability of numbers
808
    if add_thousands_sep:
5✔
809
        sign = integer_part[0] if integer_part[0] in ("+", "-") else ""
5✔
810
        if sign == "-":
5✔
811
            integer_part = integer_part[1:]
5✔
812
        integer_part = "{:,}".format(int(integer_part)).replace(',', THOUSANDS_SEP)
5✔
813
        integer_part = sign + integer_part
5✔
814
        fract_part = THOUSANDS_SEP.join(fract_part[i:i+3] for i in range(0, len(fract_part), 3))
5✔
815
    result = integer_part + DECIMAL_POINT + fract_part
5✔
816
    # add leading/trailing whitespaces so that numbers can be aligned in a column
817
    if whitespaces:
5✔
818
        target_fract_len = overall_precision
5✔
819
        target_integer_len = 14 - decimal_point  # should be enough for up to unsigned 999999 BTC
5✔
820
        if add_thousands_sep:
5✔
821
            target_fract_len += max(0, (target_fract_len - 1) // 3)
5✔
822
            target_integer_len += max(0, (target_integer_len - 1) // 3)
5✔
823
        # add trailing whitespaces
824
        result += " " * (target_fract_len - len(fract_part))
5✔
825
        # add leading whitespaces
826
        target_total_len = target_integer_len + 1 + target_fract_len
5✔
827
        result = " " * (target_total_len - len(result)) + result
5✔
828
    return result
5✔
829

830

831
FEERATE_PRECISION = 1  # num fractional decimal places for sat/byte fee rates
5✔
832
_feerate_quanta = Decimal(10) ** (-FEERATE_PRECISION)
5✔
833
UI_UNIT_NAME_FEERATE_SAT_PER_VBYTE = "sat/vbyte"
5✔
834
UI_UNIT_NAME_FEERATE_SAT_PER_VB = "sat/vB"
5✔
835
UI_UNIT_NAME_TXSIZE_VBYTES = "vbytes"
5✔
836
UI_UNIT_NAME_MEMPOOL_MB = "vMB"
5✔
837

838

839
def format_fee_satoshis(fee, *, num_zeros=0, precision=None):
5✔
840
    if precision is None:
5✔
841
        precision = FEERATE_PRECISION
5✔
842
    num_zeros = min(num_zeros, FEERATE_PRECISION)  # no more zeroes than available prec
5✔
843
    return format_satoshis(fee, num_zeros=num_zeros, decimal_point=0, precision=precision)
5✔
844

845

846
def quantize_feerate(fee) -> Union[None, Decimal, int]:
5✔
847
    """Strip sat/byte fee rate of excess precision."""
848
    if fee is None:
5✔
849
        return None
×
850
    return Decimal(fee).quantize(_feerate_quanta, rounding=decimal.ROUND_HALF_DOWN)
5✔
851

852

853
def timestamp_to_datetime(timestamp: Union[int, float, None], *, utc: bool = False) -> Optional[datetime]:
5✔
854
    if timestamp is None:
5✔
855
        return None
×
856
    tz = None
5✔
857
    if utc:
5✔
858
        tz = timezone.utc
×
859
    return datetime.fromtimestamp(timestamp, tz=tz)
5✔
860

861

862
def format_time(timestamp: Union[int, float, None]) -> str:
5✔
863
    date = timestamp_to_datetime(timestamp)
×
864
    return date.isoformat(' ', timespec="minutes") if date else _("Unknown")
×
865

866

867
def age(
5✔
868
    from_date: Union[int, float, None],  # POSIX timestamp
869
    *,
870
    since_date: datetime = None,
871
    target_tz=None,
872
    include_seconds: bool = False,
873
) -> str:
874
    """Takes a timestamp and returns a string with the approximation of the age"""
875
    if from_date is None:
5✔
876
        return _("Unknown")
5✔
877

878
    from_date = datetime.fromtimestamp(from_date)
5✔
879
    if since_date is None:
5✔
880
        since_date = datetime.now(target_tz)
×
881

882
    distance_in_time = from_date - since_date
5✔
883
    is_in_past = from_date < since_date
5✔
884
    distance_in_seconds = int(round(abs(distance_in_time.days * 86400 + distance_in_time.seconds)))
5✔
885
    distance_in_minutes = int(round(distance_in_seconds / 60))
5✔
886

887
    if distance_in_minutes == 0:
5✔
888
        if include_seconds:
5✔
889
            if is_in_past:
5✔
890
                return _("{} seconds ago").format(distance_in_seconds)
5✔
891
            else:
892
                return _("in {} seconds").format(distance_in_seconds)
5✔
893
        else:
894
            if is_in_past:
5✔
895
                return _("less than a minute ago")
5✔
896
            else:
897
                return _("in less than a minute")
5✔
898
    elif distance_in_minutes < 45:
5✔
899
        if is_in_past:
5✔
900
            return _("about {} minutes ago").format(distance_in_minutes)
5✔
901
        else:
902
            return _("in about {} minutes").format(distance_in_minutes)
5✔
903
    elif distance_in_minutes < 90:
5✔
904
        if is_in_past:
5✔
905
            return _("about 1 hour ago")
5✔
906
        else:
907
            return _("in about 1 hour")
5✔
908
    elif distance_in_minutes < 1440:
5✔
909
        if is_in_past:
5✔
910
            return _("about {} hours ago").format(round(distance_in_minutes / 60.0))
5✔
911
        else:
912
            return _("in about {} hours").format(round(distance_in_minutes / 60.0))
5✔
913
    elif distance_in_minutes < 2880:
5✔
914
        if is_in_past:
5✔
915
            return _("about 1 day ago")
5✔
916
        else:
917
            return _("in about 1 day")
5✔
918
    elif distance_in_minutes < 43220:
5✔
919
        if is_in_past:
5✔
920
            return _("about {} days ago").format(round(distance_in_minutes / 1440))
5✔
921
        else:
922
            return _("in about {} days").format(round(distance_in_minutes / 1440))
5✔
923
    elif distance_in_minutes < 86400:
5✔
924
        if is_in_past:
5✔
925
            return _("about 1 month ago")
5✔
926
        else:
927
            return _("in about 1 month")
5✔
928
    elif distance_in_minutes < 525600:
5✔
929
        if is_in_past:
5✔
930
            return _("about {} months ago").format(round(distance_in_minutes / 43200))
5✔
931
        else:
932
            return _("in about {} months").format(round(distance_in_minutes / 43200))
5✔
933
    elif distance_in_minutes < 1051200:
5✔
934
        if is_in_past:
5✔
935
            return _("about 1 year ago")
5✔
936
        else:
937
            return _("in about 1 year")
5✔
938
    else:
939
        if is_in_past:
5✔
940
            return _("over {} years ago").format(round(distance_in_minutes / 525600))
5✔
941
        else:
942
            return _("in over {} years").format(round(distance_in_minutes / 525600))
5✔
943

944
mainnet_block_explorers = {
5✔
945
    '3xpl.com': ('https://3xpl.com/bitcoin/',
946
                        {'tx': 'transaction/', 'addr': 'address/'}),
947
    'Bitflyer.jp': ('https://chainflyer.bitflyer.jp/',
948
                        {'tx': 'Transaction/', 'addr': 'Address/'}),
949
    'Blockchain.info': ('https://blockchain.com/btc/',
950
                        {'tx': 'tx/', 'addr': 'address/'}),
951
    'Blockstream.info': ('https://blockstream.info/',
952
                        {'tx': 'tx/', 'addr': 'address/'}),
953
    'Bitaps.com': ('https://btc.bitaps.com/',
954
                        {'tx': '', 'addr': ''}),
955
    'BTC.com': ('https://btc.com/',
956
                        {'tx': '', 'addr': ''}),
957
    'Chain.so': ('https://www.chain.so/',
958
                        {'tx': 'tx/BTC/', 'addr': 'address/BTC/'}),
959
    'Insight.is': ('https://insight.bitpay.com/',
960
                        {'tx': 'tx/', 'addr': 'address/'}),
961
    'BlockCypher.com': ('https://live.blockcypher.com/btc/',
962
                        {'tx': 'tx/', 'addr': 'address/'}),
963
    'Blockchair.com': ('https://blockchair.com/bitcoin/',
964
                        {'tx': 'transaction/', 'addr': 'address/'}),
965
    'blockonomics.co': ('https://www.blockonomics.co/',
966
                        {'tx': 'api/tx?txid=', 'addr': '#/search?q='}),
967
    'mempool.space': ('https://mempool.space/',
968
                        {'tx': 'tx/', 'addr': 'address/'}),
969
    'mempool.emzy.de': ('https://mempool.emzy.de/',
970
                        {'tx': 'tx/', 'addr': 'address/'}),
971
    'OXT.me': ('https://oxt.me/',
972
                        {'tx': 'transaction/', 'addr': 'address/'}),
973
    'mynode.local': ('http://mynode.local:3002/',
974
                        {'tx': 'tx/', 'addr': 'address/'}),
975
    'system default': ('blockchain:/',
976
                        {'tx': 'tx/', 'addr': 'address/'}),
977
}
978

979
testnet_block_explorers = {
5✔
980
    'Bitaps.com': ('https://tbtc.bitaps.com/',
981
                       {'tx': '', 'addr': ''}),
982
    'BlockCypher.com': ('https://live.blockcypher.com/btc-testnet/',
983
                       {'tx': 'tx/', 'addr': 'address/'}),
984
    'Blockchain.info': ('https://www.blockchain.com/btc-testnet/',
985
                       {'tx': 'tx/', 'addr': 'address/'}),
986
    'Blockstream.info': ('https://blockstream.info/testnet/',
987
                        {'tx': 'tx/', 'addr': 'address/'}),
988
    'mempool.space': ('https://mempool.space/testnet/',
989
                        {'tx': 'tx/', 'addr': 'address/'}),
990
    'smartbit.com.au': ('https://testnet.smartbit.com.au/',
991
                       {'tx': 'tx/', 'addr': 'address/'}),
992
    'system default': ('blockchain://000000000933ea01ad0ee984209779baaec3ced90fa3f408719526f8d77f4943/',
993
                       {'tx': 'tx/', 'addr': 'address/'}),
994
}
995

996
testnet4_block_explorers = {
5✔
997
    'mempool.space': ('https://mempool.space/testnet4/',
998
                        {'tx': 'tx/', 'addr': 'address/'}),
999
    'wakiyamap.dev': ('https://testnet4-explorer.wakiyamap.dev/',
1000
                       {'tx': 'tx/', 'addr': 'address/'}),
1001
}
1002

1003
signet_block_explorers = {
5✔
1004
    'bc-2.jp': ('https://explorer.bc-2.jp/',
1005
                        {'tx': 'tx/', 'addr': 'address/'}),
1006
    'mempool.space': ('https://mempool.space/signet/',
1007
                        {'tx': 'tx/', 'addr': 'address/'}),
1008
    'bitcoinexplorer.org': ('https://signet.bitcoinexplorer.org/',
1009
                       {'tx': 'tx/', 'addr': 'address/'}),
1010
    'wakiyamap.dev': ('https://signet-explorer.wakiyamap.dev/',
1011
                       {'tx': 'tx/', 'addr': 'address/'}),
1012
    'ex.signet.bublina.eu.org': ('https://ex.signet.bublina.eu.org/',
1013
                       {'tx': 'tx/', 'addr': 'address/'}),
1014
    'system default': ('blockchain:/',
1015
                       {'tx': 'tx/', 'addr': 'address/'}),
1016
}
1017

1018
_block_explorer_default_api_loc = {'tx': 'tx/', 'addr': 'address/'}
5✔
1019

1020

1021
def block_explorer_info():
5✔
1022
    from . import constants
×
1023
    if constants.net.NET_NAME == "testnet":
×
1024
        return testnet_block_explorers
×
1025
    elif constants.net.NET_NAME == "testnet4":
×
1026
        return testnet4_block_explorers
×
1027
    elif constants.net.NET_NAME == "signet":
×
1028
        return signet_block_explorers
×
1029
    return mainnet_block_explorers
×
1030

1031

1032
def block_explorer(config: 'SimpleConfig') -> Optional[str]:
5✔
1033
    """Returns name of selected block explorer,
1034
    or None if a custom one (not among hardcoded ones) is configured.
1035
    """
1036
    if config.BLOCK_EXPLORER_CUSTOM is not None:
×
1037
        return None
×
1038
    be_key = config.BLOCK_EXPLORER
×
1039
    be_tuple = block_explorer_info().get(be_key)
×
1040
    if be_tuple is None:
×
1041
        be_key = config.cv.BLOCK_EXPLORER.get_default_value()
×
1042
    assert isinstance(be_key, str), f"{be_key!r} should be str"
×
1043
    return be_key
×
1044

1045

1046
def block_explorer_tuple(config: 'SimpleConfig') -> Optional[Tuple[str, dict]]:
5✔
1047
    custom_be = config.BLOCK_EXPLORER_CUSTOM
×
1048
    if custom_be:
×
1049
        if isinstance(custom_be, str):
×
1050
            return custom_be, _block_explorer_default_api_loc
×
1051
        if isinstance(custom_be, (tuple, list)) and len(custom_be) == 2:
×
1052
            return tuple(custom_be)
×
1053
        _logger.warning(f"not using {config.cv.BLOCK_EXPLORER_CUSTOM.key()!r} from config. "
×
1054
                        f"expected a str or a pair but got {custom_be!r}")
1055
        return None
×
1056
    else:
1057
        # using one of the hardcoded block explorers
1058
        return block_explorer_info().get(block_explorer(config))
×
1059

1060

1061
def block_explorer_URL(config: 'SimpleConfig', kind: str, item: str) -> Optional[str]:
5✔
1062
    be_tuple = block_explorer_tuple(config)
×
1063
    if not be_tuple:
×
1064
        return
×
1065
    explorer_url, explorer_dict = be_tuple
×
1066
    kind_str = explorer_dict.get(kind)
×
1067
    if kind_str is None:
×
1068
        return
×
1069
    if explorer_url[-1] != "/":
×
1070
        explorer_url += "/"
×
1071
    url_parts = [explorer_url, kind_str, item]
×
1072
    return ''.join(url_parts)
×
1073

1074

1075

1076

1077

1078
# Python bug (http://bugs.python.org/issue1927) causes raw_input
1079
# to be redirected improperly between stdin/stderr on Unix systems
1080
#TODO: py3
1081
def raw_input(prompt=None):
5✔
1082
    if prompt:
×
1083
        sys.stdout.write(prompt)
×
1084
    return builtin_raw_input()
×
1085

1086
builtin_raw_input = builtins.input
5✔
1087
builtins.input = raw_input
5✔
1088

1089

1090
def parse_json(message):
5✔
1091
    # TODO: check \r\n pattern
1092
    n = message.find(b'\n')
×
1093
    if n==-1:
×
1094
        return None, message
×
1095
    try:
×
1096
        j = json.loads(message[0:n].decode('utf8'))
×
1097
    except Exception:
×
1098
        j = None
×
1099
    return j, message[n+1:]
×
1100

1101

1102
def setup_thread_excepthook():
5✔
1103
    """
1104
    Workaround for `sys.excepthook` thread bug from:
1105
    http://bugs.python.org/issue1230540
1106

1107
    Call once from the main thread before creating any threads.
1108
    """
1109

1110
    init_original = threading.Thread.__init__
×
1111

1112
    def init(self, *args, **kwargs):
×
1113

1114
        init_original(self, *args, **kwargs)
×
1115
        run_original = self.run
×
1116

1117
        def run_with_except_hook(*args2, **kwargs2):
×
1118
            try:
×
1119
                run_original(*args2, **kwargs2)
×
1120
            except Exception:
×
1121
                sys.excepthook(*sys.exc_info())
×
1122

1123
        self.run = run_with_except_hook
×
1124

1125
    threading.Thread.__init__ = init
×
1126

1127

1128
def send_exception_to_crash_reporter(e: BaseException):
5✔
1129
    from .base_crash_reporter import send_exception_to_crash_reporter
×
1130
    send_exception_to_crash_reporter(e)
×
1131

1132

1133
def versiontuple(v):
5✔
1134
    return tuple(map(int, (v.split("."))))
5✔
1135

1136

1137
def read_json_file(path):
5✔
1138
    try:
×
1139
        with open(path, 'r', encoding='utf-8') as f:
×
1140
            data = json.loads(f.read())
×
1141
    except json.JSONDecodeError:
×
1142
        _logger.exception('')
×
1143
        raise FileImportFailed(_("Invalid JSON code."))
×
1144
    except BaseException as e:
×
1145
        _logger.exception('')
×
1146
        raise FileImportFailed(e)
×
1147
    return data
×
1148

1149

1150
def write_json_file(path, data):
5✔
1151
    try:
×
1152
        with open(path, 'w+', encoding='utf-8') as f:
×
1153
            json.dump(data, f, indent=4, sort_keys=True, cls=MyEncoder)
×
1154
    except (IOError, os.error) as e:
×
1155
        _logger.exception('')
×
1156
        raise FileExportFailed(e)
×
1157

1158

1159
def os_chmod(path, mode):
5✔
1160
    """os.chmod aware of tmpfs"""
1161
    try:
5✔
1162
        os.chmod(path, mode)
5✔
1163
    except OSError as e:
×
1164
        xdg_runtime_dir = os.environ.get("XDG_RUNTIME_DIR", None)
×
1165
        if xdg_runtime_dir and is_subpath(path, xdg_runtime_dir):
×
1166
            _logger.info(f"Tried to chmod in tmpfs. Skipping... {e!r}")
×
1167
        else:
1168
            raise
×
1169

1170

1171
def make_dir(path, allow_symlink=True):
5✔
1172
    """Make directory if it does not yet exist."""
1173
    if not os.path.exists(path):
5✔
1174
        if not allow_symlink and os.path.islink(path):
5✔
1175
            raise Exception('Dangling link: ' + path)
×
1176
        os.mkdir(path)
5✔
1177
        os_chmod(path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
5✔
1178

1179

1180
def is_subpath(long_path: str, short_path: str) -> bool:
5✔
1181
    """Returns whether long_path is a sub-path of short_path."""
1182
    try:
5✔
1183
        common = os.path.commonpath([long_path, short_path])
5✔
1184
    except ValueError:
5✔
1185
        return False
5✔
1186
    short_path = standardize_path(short_path)
5✔
1187
    common     = standardize_path(common)
5✔
1188
    return short_path == common
5✔
1189

1190

1191
def log_exceptions(func):
5✔
1192
    """Decorator to log AND re-raise exceptions."""
1193
    assert asyncio.iscoroutinefunction(func), 'func needs to be a coroutine'
5✔
1194
    @functools.wraps(func)
5✔
1195
    async def wrapper(*args, **kwargs):
5✔
1196
        self = args[0] if len(args) > 0 else None
5✔
1197
        try:
5✔
1198
            return await func(*args, **kwargs)
5✔
1199
        except asyncio.CancelledError as e:
5✔
1200
            raise
5✔
1201
        except BaseException as e:
5✔
1202
            mylogger = self.logger if hasattr(self, 'logger') else _logger
5✔
1203
            try:
5✔
1204
                mylogger.exception(f"Exception in {func.__name__}: {repr(e)}")
5✔
1205
            except BaseException as e2:
×
1206
                print(f"logging exception raised: {repr(e2)}... orig exc: {repr(e)} in {func.__name__}")
×
1207
            raise
5✔
1208
    return wrapper
5✔
1209

1210

1211
def ignore_exceptions(func):
5✔
1212
    """Decorator to silently swallow all exceptions."""
1213
    assert asyncio.iscoroutinefunction(func), 'func needs to be a coroutine'
5✔
1214
    @functools.wraps(func)
5✔
1215
    async def wrapper(*args, **kwargs):
5✔
1216
        try:
×
1217
            return await func(*args, **kwargs)
×
1218
        except Exception as e:
×
1219
            pass
×
1220
    return wrapper
5✔
1221

1222

1223
def with_lock(func):
5✔
1224
    """Decorator to enforce a lock on a function call."""
1225
    def func_wrapper(self, *args, **kwargs):
5✔
1226
        with self.lock:
5✔
1227
            return func(self, *args, **kwargs)
5✔
1228
    return func_wrapper
5✔
1229

1230

1231
class TxMinedInfo(NamedTuple):
5✔
1232
    height: int                        # height of block that mined tx
5✔
1233
    conf: Optional[int] = None         # number of confirmations, SPV verified. >=0, or None (None means unknown)
5✔
1234
    timestamp: Optional[int] = None    # timestamp of block that mined tx
5✔
1235
    txpos: Optional[int] = None        # position of tx in serialized block
5✔
1236
    header_hash: Optional[str] = None  # hash of block that mined tx
5✔
1237
    wanted_height: Optional[int] = None  # in case of timelock, min abs block height
5✔
1238

1239
    def short_id(self) -> Optional[str]:
5✔
1240
        if self.txpos is not None and self.txpos >= 0:
×
1241
            assert self.height > 0
×
1242
            return f"{self.height}x{self.txpos}"
×
1243
        return None
×
1244

1245
    def is_local_like(self) -> bool:
5✔
1246
        """Returns whether the tx is local-like (LOCAL/FUTURE)."""
1247
        from .address_synchronizer import TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT
×
1248
        if self.height > 0:
×
1249
            return False
×
1250
        if self.height in (TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT):
×
1251
            return False
×
1252
        return True
×
1253

1254

1255
class ShortID(bytes):
5✔
1256

1257
    def __repr__(self):
5✔
1258
        return f"<ShortID: {format_short_id(self)}>"
5✔
1259

1260
    def __str__(self):
5✔
1261
        return format_short_id(self)
5✔
1262

1263
    @classmethod
5✔
1264
    def from_components(cls, block_height: int, tx_pos_in_block: int, output_index: int) -> 'ShortID':
5✔
1265
        bh = block_height.to_bytes(3, byteorder='big')
5✔
1266
        tpos = tx_pos_in_block.to_bytes(3, byteorder='big')
5✔
1267
        oi = output_index.to_bytes(2, byteorder='big')
5✔
1268
        return ShortID(bh + tpos + oi)
5✔
1269

1270
    @classmethod
5✔
1271
    def from_str(cls, scid: str) -> 'ShortID':
5✔
1272
        """Parses a formatted scid str, e.g. '643920x356x0'."""
1273
        components = scid.split("x")
×
1274
        if len(components) != 3:
×
1275
            raise ValueError(f"failed to parse ShortID: {scid!r}")
×
1276
        try:
×
1277
            components = [int(x) for x in components]
×
1278
        except ValueError:
×
1279
            raise ValueError(f"failed to parse ShortID: {scid!r}") from None
×
1280
        return ShortID.from_components(*components)
×
1281

1282
    @classmethod
5✔
1283
    def normalize(cls, data: Union[None, str, bytes, 'ShortID']) -> Optional['ShortID']:
5✔
1284
        if isinstance(data, ShortID) or data is None:
5✔
1285
            return data
5✔
1286
        if isinstance(data, str):
5✔
1287
            assert len(data) == 16
5✔
1288
            return ShortID.fromhex(data)
5✔
1289
        if isinstance(data, (bytes, bytearray)):
5✔
1290
            assert len(data) == 8
5✔
1291
            return ShortID(data)
5✔
1292

1293
    @property
5✔
1294
    def block_height(self) -> int:
5✔
1295
        return int.from_bytes(self[:3], byteorder='big')
×
1296

1297
    @property
5✔
1298
    def txpos(self) -> int:
5✔
1299
        return int.from_bytes(self[3:6], byteorder='big')
×
1300

1301
    @property
5✔
1302
    def output_index(self) -> int:
5✔
1303
        return int.from_bytes(self[6:8], byteorder='big')
×
1304

1305

1306
def format_short_id(short_channel_id: Optional[bytes]):
5✔
1307
    if not short_channel_id:
5✔
1308
        return _('Not yet available')
×
1309
    return str(int.from_bytes(short_channel_id[:3], 'big')) \
5✔
1310
        + 'x' + str(int.from_bytes(short_channel_id[3:6], 'big')) \
1311
        + 'x' + str(int.from_bytes(short_channel_id[6:], 'big'))
1312

1313

1314
def make_aiohttp_session(proxy: Optional[dict], headers=None, timeout=None):
5✔
1315
    if headers is None:
×
1316
        headers = {'User-Agent': 'Electrum'}
×
1317
    if timeout is None:
×
1318
        # The default timeout is high intentionally.
1319
        # DNS on some systems can be really slow, see e.g. #5337
1320
        timeout = aiohttp.ClientTimeout(total=45)
×
1321
    elif isinstance(timeout, (int, float)):
×
1322
        timeout = aiohttp.ClientTimeout(total=timeout)
×
1323
    ssl_context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_path)
×
1324

1325
    if proxy:
×
1326
        connector = ProxyConnector(
×
1327
            proxy_type=ProxyType.SOCKS5 if proxy['mode'] == 'socks5' else ProxyType.SOCKS4,
1328
            host=proxy['host'],
1329
            port=int(proxy['port']),
1330
            username=proxy.get('user', None),
1331
            password=proxy.get('password', None),
1332
            rdns=True,  # needed to prevent DNS leaks over proxy
1333
            ssl=ssl_context,
1334
        )
1335
    else:
1336
        connector = aiohttp.TCPConnector(ssl=ssl_context)
×
1337

1338
    return aiohttp.ClientSession(headers=headers, timeout=timeout, connector=connector)
×
1339

1340

1341
class OldTaskGroup(aiorpcx.TaskGroup):
5✔
1342
    """Automatically raises exceptions on join; as in aiorpcx prior to version 0.20.
1343
    That is, when using TaskGroup as a context manager, if any task encounters an exception,
1344
    we would like that exception to be re-raised (propagated out). For the wait=all case,
1345
    the OldTaskGroup class is emulating the following code-snippet:
1346
    ```
1347
    async with TaskGroup() as group:
1348
        await group.spawn(task1())
1349
        await group.spawn(task2())
1350

1351
        async for task in group:
1352
            if not task.cancelled():
1353
                task.result()
1354
    ```
1355
    So instead of the above, one can just write:
1356
    ```
1357
    async with OldTaskGroup() as group:
1358
        await group.spawn(task1())
1359
        await group.spawn(task2())
1360
    ```
1361
    # TODO see if we can migrate to asyncio.timeout, introduced in python 3.11, and use stdlib instead of aiorpcx.curio...
1362
    """
1363
    async def join(self):
5✔
1364
        if self._wait is all:
5✔
1365
            exc = False
5✔
1366
            try:
5✔
1367
                async for task in self:
5✔
1368
                    if not task.cancelled():
5✔
1369
                        task.result()
5✔
1370
            except BaseException:  # including asyncio.CancelledError
5✔
1371
                exc = True
5✔
1372
                raise
5✔
1373
            finally:
1374
                if exc:
5✔
1375
                    await self.cancel_remaining()
5✔
1376
                await super().join()
5✔
1377
        else:
1378
            await super().join()
5✔
1379
            if self.completed:
5✔
1380
                self.completed.result()
5✔
1381

1382
# We monkey-patch aiorpcx TimeoutAfter (used by timeout_after and ignore_after API),
1383
# to fix a timing issue present in asyncio as a whole re timing out tasks.
1384
# To see the issue we are trying to fix, consider example:
1385
#     async def outer_task():
1386
#         async with timeout_after(0.1):
1387
#             await inner_task()
1388
# When the 0.1 sec timeout expires, inner_task will get cancelled by timeout_after (=internal cancellation).
1389
# If around the same time (in terms of event loop iterations) another coroutine
1390
# cancels outer_task (=external cancellation), there will be a race.
1391
# Both cancellations work by propagating a CancelledError out to timeout_after, which then
1392
# needs to decide (in TimeoutAfter.__aexit__) whether it's due to an internal or external cancellation.
1393
# AFAICT asyncio provides no reliable way of distinguishing between the two.
1394
# This patch tries to always give priority to external cancellations.
1395
# see https://github.com/kyuupichan/aiorpcX/issues/44
1396
# see https://github.com/aio-libs/async-timeout/issues/229
1397
# see https://bugs.python.org/issue42130 and https://bugs.python.org/issue45098
1398
# TODO see if we can migrate to asyncio.timeout, introduced in python 3.11, and use stdlib instead of aiorpcx.curio...
1399
def _aiorpcx_monkeypatched_set_new_deadline(task, deadline):
5✔
1400
    def timeout_task():
5✔
1401
        task._orig_cancel()
5✔
1402
        task._timed_out = None if getattr(task, "_externally_cancelled", False) else deadline
5✔
1403
    def mycancel(*args, **kwargs):
5✔
1404
        task._orig_cancel(*args, **kwargs)
5✔
1405
        task._externally_cancelled = True
5✔
1406
        task._timed_out = None
5✔
1407
    if not hasattr(task, "_orig_cancel"):
5✔
1408
        task._orig_cancel = task.cancel
5✔
1409
        task.cancel = mycancel
5✔
1410
    task._deadline_handle = task._loop.call_at(deadline, timeout_task)
5✔
1411

1412

1413
def _aiorpcx_monkeypatched_set_task_deadline(task, deadline):
5✔
1414
    ret = _aiorpcx_orig_set_task_deadline(task, deadline)
5✔
1415
    task._externally_cancelled = None
5✔
1416
    return ret
5✔
1417

1418

1419
def _aiorpcx_monkeypatched_unset_task_deadline(task):
5✔
1420
    if hasattr(task, "_orig_cancel"):
5✔
1421
        task.cancel = task._orig_cancel
5✔
1422
        del task._orig_cancel
5✔
1423
    return _aiorpcx_orig_unset_task_deadline(task)
5✔
1424

1425

1426
_aiorpcx_orig_set_task_deadline    = aiorpcx.curio._set_task_deadline
5✔
1427
_aiorpcx_orig_unset_task_deadline  = aiorpcx.curio._unset_task_deadline
5✔
1428

1429
aiorpcx.curio._set_new_deadline    = _aiorpcx_monkeypatched_set_new_deadline
5✔
1430
aiorpcx.curio._set_task_deadline   = _aiorpcx_monkeypatched_set_task_deadline
5✔
1431
aiorpcx.curio._unset_task_deadline = _aiorpcx_monkeypatched_unset_task_deadline
5✔
1432

1433

1434
async def wait_for2(fut: Awaitable, timeout: Union[int, float, None]):
5✔
1435
    """Replacement for asyncio.wait_for,
1436
     due to bugs: https://bugs.python.org/issue42130 and https://github.com/python/cpython/issues/86296 ,
1437
     which are only fixed in python 3.12+.
1438
     """
1439
    if sys.version_info[:3] >= (3, 12):
5✔
1440
        return await asyncio.wait_for(fut, timeout)
2✔
1441
    else:
1442
        async with async_timeout(timeout):
3✔
1443
            return await asyncio.ensure_future(fut, loop=get_running_loop())
3✔
1444

1445

1446
if hasattr(asyncio, 'timeout'):  # python 3.11+
5✔
1447
    async_timeout = asyncio.timeout
4✔
1448
else:
1449
    class TimeoutAfterAsynciolike(aiorpcx.curio.TimeoutAfter):
1✔
1450
        async def __aexit__(self, exc_type, exc_value, traceback):
1✔
1451
            try:
1✔
1452
                await super().__aexit__(exc_type, exc_value, traceback)
1✔
1453
            except (aiorpcx.TaskTimeout, aiorpcx.UncaughtTimeoutError):
×
1454
                raise asyncio.TimeoutError from None
×
1455
            except aiorpcx.TimeoutCancellationError:
×
1456
                raise asyncio.CancelledError from None
×
1457

1458
    def async_timeout(delay: Union[int, float, None]):
1✔
1459
        if delay is None:
1✔
1460
            return nullcontext()
×
1461
        return TimeoutAfterAsynciolike(delay)
1✔
1462

1463

1464
class NetworkJobOnDefaultServer(Logger, ABC):
5✔
1465
    """An abstract base class for a job that runs on the main network
1466
    interface. Every time the main interface changes, the job is
1467
    restarted, and some of its internals are reset.
1468
    """
1469
    def __init__(self, network: 'Network'):
5✔
1470
        Logger.__init__(self)
5✔
1471
        self.network = network
5✔
1472
        self.interface = None  # type: Interface
5✔
1473
        self._restart_lock = asyncio.Lock()
5✔
1474
        # Ensure fairness between NetworkJobs. e.g. if multiple wallets
1475
        # are open, a large wallet's Synchronizer should not starve the small wallets:
1476
        self._network_request_semaphore = asyncio.Semaphore(100)
5✔
1477

1478
        self._reset()
5✔
1479
        # every time the main interface changes, restart:
1480
        register_callback(self._restart, ['default_server_changed'])
5✔
1481
        # also schedule a one-off restart now, as there might already be a main interface:
1482
        asyncio.run_coroutine_threadsafe(self._restart(), network.asyncio_loop)
5✔
1483

1484
    def _reset(self):
5✔
1485
        """Initialise fields. Called every time the underlying
1486
        server connection changes.
1487
        """
1488
        self.taskgroup = OldTaskGroup()
5✔
1489
        self.reset_request_counters()
5✔
1490

1491
    async def _start(self, interface: 'Interface'):
5✔
1492
        self.logger.debug(f"starting. interface.server={repr(str(interface.server))}")
×
1493
        self.interface = interface
×
1494

1495
        taskgroup = self.taskgroup
×
1496
        async def run_tasks_wrapper():
×
1497
            self.logger.debug(f"starting taskgroup ({hex(id(taskgroup))}).")
×
1498
            try:
×
1499
                await self._run_tasks(taskgroup=taskgroup)
×
1500
            except Exception as e:
×
1501
                self.logger.error(f"taskgroup died ({hex(id(taskgroup))}). exc={e!r}")
×
1502
                raise
×
1503
            finally:
1504
                self.logger.debug(f"taskgroup stopped ({hex(id(taskgroup))}).")
×
1505
        await interface.taskgroup.spawn(run_tasks_wrapper)
×
1506

1507
    @abstractmethod
5✔
1508
    async def _run_tasks(self, *, taskgroup: OldTaskGroup) -> None:
5✔
1509
        """Start tasks in taskgroup. Called every time the underlying
1510
        server connection changes.
1511
        """
1512
        # If self.taskgroup changed, don't start tasks. This can happen if we have
1513
        # been restarted *just now*, i.e. after the _run_tasks coroutine object was created.
1514
        if taskgroup != self.taskgroup:
×
1515
            raise asyncio.CancelledError()
×
1516

1517
    async def stop(self, *, full_shutdown: bool = True):
5✔
1518
        self.logger.debug(f"stopping. {full_shutdown=}")
×
1519
        if full_shutdown:
×
1520
            unregister_callback(self._restart)
×
1521
        await self.taskgroup.cancel_remaining()
×
1522

1523
    @log_exceptions
5✔
1524
    async def _restart(self, *args):
5✔
1525
        interface = self.network.interface
5✔
1526
        if interface is None:
5✔
1527
            return  # we should get called again soon
5✔
1528

1529
        async with self._restart_lock:
×
1530
            await self.stop(full_shutdown=False)
×
1531
            self._reset()
×
1532
            await self._start(interface)
×
1533

1534
    def reset_request_counters(self):
5✔
1535
        self._requests_sent = 0
5✔
1536
        self._requests_answered = 0
5✔
1537

1538
    def num_requests_sent_and_answered(self) -> Tuple[int, int]:
5✔
1539
        return self._requests_sent, self._requests_answered
×
1540

1541
    @property
5✔
1542
    def session(self):
5✔
1543
        s = self.interface.session
×
1544
        assert s is not None
×
1545
        return s
×
1546

1547

1548
def detect_tor_socks_proxy() -> Optional[Tuple[str, int]]:
5✔
1549
    # Probable ports for Tor to listen at
1550
    candidates = [
×
1551
        ("127.0.0.1", 9050),
1552
        ("127.0.0.1", 9150),
1553
    ]
1554
    for net_addr in candidates:
×
1555
        if is_tor_socks_port(*net_addr):
×
1556
            return net_addr
×
1557
    return None
×
1558

1559

1560
def is_tor_socks_port(host: str, port: int) -> bool:
5✔
1561
    try:
×
1562
        with socket.create_connection((host, port), timeout=10) as s:
×
1563
            # mimic "tor-resolve 0.0.0.0".
1564
            # see https://github.com/spesmilo/electrum/issues/7317#issuecomment-1369281075
1565
            # > this is a socks5 handshake, followed by a socks RESOLVE request as defined in
1566
            # > [tor's socks extension spec](https://github.com/torproject/torspec/blob/7116c9cdaba248aae07a3f1d0e15d9dd102f62c5/socks-extensions.txt#L63),
1567
            # > resolving 0.0.0.0, which being an IP, tor resolves itself without needing to ask a relay.
1568
            s.send(b'\x05\x01\x00\x05\xf0\x00\x03\x070.0.0.0\x00\x00')
×
1569
            if s.recv(1024) == b'\x05\x00\x05\x00\x00\x01\x00\x00\x00\x00\x00\x00':
×
1570
                return True
×
1571
    except socket.error:
×
1572
        pass
×
1573
    return False
×
1574

1575

1576
AS_LIB_USER_I_WANT_TO_MANAGE_MY_OWN_ASYNCIO_LOOP = False  # used by unit tests
5✔
1577

1578
_asyncio_event_loop = None  # type: Optional[asyncio.AbstractEventLoop]
5✔
1579
def get_asyncio_loop() -> asyncio.AbstractEventLoop:
5✔
1580
    """Returns the global asyncio event loop we use."""
1581
    if loop := _asyncio_event_loop:
5✔
1582
        return loop
5✔
1583
    if AS_LIB_USER_I_WANT_TO_MANAGE_MY_OWN_ASYNCIO_LOOP:
5✔
1584
        if loop := get_running_loop():
5✔
1585
            return loop
5✔
1586
    raise Exception("event loop not created yet")
×
1587

1588

1589
def create_and_start_event_loop() -> Tuple[asyncio.AbstractEventLoop,
5✔
1590
                                           asyncio.Future,
1591
                                           threading.Thread]:
1592
    global _asyncio_event_loop
1593
    if _asyncio_event_loop is not None:
×
1594
        raise Exception("there is already a running event loop")
×
1595

1596
    # asyncio.get_event_loop() became deprecated in python3.10. (see https://github.com/python/cpython/issues/83710)
1597
    # We set a custom event loop policy purely to be compatible with code that
1598
    # relies on asyncio.get_event_loop().
1599
    # - in python 3.8-3.9, asyncio.Event.__init__, asyncio.Lock.__init__,
1600
    #   and similar, calls get_event_loop. see https://github.com/python/cpython/pull/23420
1601
    class MyEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
×
1602
        def get_event_loop(self):
×
1603
            # In case electrum is being used as a library, there might be other
1604
            # event loops in use besides ours. To minimise interfering with those,
1605
            # if there is a loop running in the current thread, return that:
1606
            running_loop = get_running_loop()
×
1607
            if running_loop is not None:
×
1608
                return running_loop
×
1609
            # Otherwise, return our global loop:
1610
            return get_asyncio_loop()
×
1611
    asyncio.set_event_loop_policy(MyEventLoopPolicy())
×
1612

1613
    loop = asyncio.new_event_loop()
×
1614
    _asyncio_event_loop = loop
×
1615

1616
    def on_exception(loop, context):
×
1617
        """Suppress spurious messages it appears we cannot control."""
1618
        SUPPRESS_MESSAGE_REGEX = re.compile('SSL handshake|Fatal read error on|'
×
1619
                                            'SSL error in data received')
1620
        message = context.get('message')
×
1621
        if message and SUPPRESS_MESSAGE_REGEX.match(message):
×
1622
            return
×
1623
        loop.default_exception_handler(context)
×
1624

1625
    def run_event_loop():
×
1626
        try:
×
1627
            loop.run_until_complete(stopping_fut)
×
1628
        finally:
1629
            # clean-up
1630
            global _asyncio_event_loop
1631
            _asyncio_event_loop = None
×
1632

1633
    loop.set_exception_handler(on_exception)
×
1634
    # loop.set_debug(True)
1635
    stopping_fut = loop.create_future()
×
1636
    loop_thread = threading.Thread(
×
1637
        target=run_event_loop,
1638
        name='EventLoop',
1639
    )
1640
    loop_thread.start()
×
1641
    # Wait until the loop actually starts.
1642
    # On a slow PC, or with a debugger attached, this can take a few dozens of ms,
1643
    # and if we returned without a running loop, weird things can happen...
1644
    t0 = time.monotonic()
×
1645
    while not loop.is_running():
×
1646
        time.sleep(0.01)
×
1647
        if time.monotonic() - t0 > 5:
×
1648
            raise Exception("been waiting for 5 seconds but asyncio loop would not start!")
×
1649
    return loop, stopping_fut, loop_thread
×
1650

1651

1652
class OrderedDictWithIndex(OrderedDict):
5✔
1653
    """An OrderedDict that keeps track of the positions of keys.
1654

1655
    Note: very inefficient to modify contents, except to add new items.
1656
    """
1657

1658
    def __init__(self):
5✔
1659
        super().__init__()
×
1660
        self._key_to_pos = {}
×
1661
        self._pos_to_key = {}
×
1662

1663
    def _recalc_index(self):
5✔
1664
        self._key_to_pos = {key: pos for (pos, key) in enumerate(self.keys())}
×
1665
        self._pos_to_key = {pos: key for (pos, key) in enumerate(self.keys())}
×
1666

1667
    def pos_from_key(self, key):
5✔
1668
        return self._key_to_pos[key]
×
1669

1670
    def value_from_pos(self, pos):
5✔
1671
        key = self._pos_to_key[pos]
×
1672
        return self[key]
×
1673

1674
    def popitem(self, *args, **kwargs):
5✔
1675
        ret = super().popitem(*args, **kwargs)
×
1676
        self._recalc_index()
×
1677
        return ret
×
1678

1679
    def move_to_end(self, *args, **kwargs):
5✔
1680
        ret = super().move_to_end(*args, **kwargs)
×
1681
        self._recalc_index()
×
1682
        return ret
×
1683

1684
    def clear(self):
5✔
1685
        ret = super().clear()
×
1686
        self._recalc_index()
×
1687
        return ret
×
1688

1689
    def pop(self, *args, **kwargs):
5✔
1690
        ret = super().pop(*args, **kwargs)
×
1691
        self._recalc_index()
×
1692
        return ret
×
1693

1694
    def update(self, *args, **kwargs):
5✔
1695
        ret = super().update(*args, **kwargs)
×
1696
        self._recalc_index()
×
1697
        return ret
×
1698

1699
    def __delitem__(self, *args, **kwargs):
5✔
1700
        ret = super().__delitem__(*args, **kwargs)
×
1701
        self._recalc_index()
×
1702
        return ret
×
1703

1704
    def __setitem__(self, key, *args, **kwargs):
5✔
1705
        is_new_key = key not in self
×
1706
        ret = super().__setitem__(key, *args, **kwargs)
×
1707
        if is_new_key:
×
1708
            pos = len(self) - 1
×
1709
            self._key_to_pos[key] = pos
×
1710
            self._pos_to_key[pos] = key
×
1711
        return ret
×
1712

1713

1714
def multisig_type(wallet_type):
5✔
1715
    '''If wallet_type is mofn multi-sig, return [m, n],
1716
    otherwise return None.'''
1717
    if not wallet_type:
5✔
1718
        return None
×
1719
    match = re.match(r'(\d+)of(\d+)', wallet_type)
5✔
1720
    if match:
5✔
1721
        match = [int(x) for x in match.group(1, 2)]
5✔
1722
    return match
5✔
1723

1724

1725
def is_ip_address(x: Union[str, bytes]) -> bool:
5✔
1726
    if isinstance(x, bytes):
5✔
1727
        x = x.decode("utf-8")
×
1728
    try:
5✔
1729
        ipaddress.ip_address(x)
5✔
1730
        return True
5✔
1731
    except ValueError:
5✔
1732
        return False
5✔
1733

1734

1735
def is_localhost(host: str) -> bool:
5✔
1736
    if str(host) in ('localhost', 'localhost.',):
5✔
1737
        return True
5✔
1738
    if host[0] == '[' and host[-1] == ']':  # IPv6
5✔
1739
        host = host[1:-1]
5✔
1740
    try:
5✔
1741
        ip_addr = ipaddress.ip_address(host)  # type: Union[IPv4Address, IPv6Address]
5✔
1742
        return ip_addr.is_loopback
5✔
1743
    except ValueError:
5✔
1744
        pass  # not an IP
5✔
1745
    return False
5✔
1746

1747

1748
def is_private_netaddress(host: str) -> bool:
5✔
1749
    if is_localhost(host):
5✔
1750
        return True
5✔
1751
    if host[0] == '[' and host[-1] == ']':  # IPv6
5✔
1752
        host = host[1:-1]
5✔
1753
    try:
5✔
1754
        ip_addr = ipaddress.ip_address(host)  # type: Union[IPv4Address, IPv6Address]
5✔
1755
        return ip_addr.is_private
5✔
1756
    except ValueError:
5✔
1757
        pass  # not an IP
5✔
1758
    return False
5✔
1759

1760

1761
def list_enabled_bits(x: int) -> Sequence[int]:
5✔
1762
    """e.g. 77 (0b1001101) --> (0, 2, 3, 6)"""
1763
    binary = bin(x)[2:]
5✔
1764
    rev_bin = reversed(binary)
5✔
1765
    return tuple(i for i, b in enumerate(rev_bin) if b == '1')
5✔
1766

1767

1768
def resolve_dns_srv(host: str):
5✔
1769
    # FIXME this method is not using the network proxy. (although the proxy might not support UDP?)
1770
    srv_records = dns.resolver.resolve(host, 'SRV')
×
1771
    # priority: prefer lower
1772
    # weight: tie breaker; prefer higher
1773
    srv_records = sorted(srv_records, key=lambda x: (x.priority, -x.weight))
×
1774

1775
    def dict_from_srv_record(srv):
×
1776
        return {
×
1777
            'host': str(srv.target),
1778
            'port': srv.port,
1779
        }
1780
    return [dict_from_srv_record(srv) for srv in srv_records]
×
1781

1782

1783
def randrange(bound: int) -> int:
5✔
1784
    """Return a random integer k such that 1 <= k < bound, uniformly
1785
    distributed across that range.
1786
    This is guaranteed to be cryptographically strong.
1787
    """
1788
    # secrets.randbelow(bound) returns a random int: 0 <= r < bound,
1789
    # hence transformations:
1790
    return secrets.randbelow(bound - 1) + 1
5✔
1791

1792

1793
class CallbackManager(Logger):
5✔
1794
    # callbacks set by the GUI or any thread
1795
    # guarantee: the callbacks will always get triggered from the asyncio thread.
1796

1797
    def __init__(self):
5✔
1798
        Logger.__init__(self)
5✔
1799
        self.callback_lock = threading.Lock()
5✔
1800
        self.callbacks = defaultdict(list)      # note: needs self.callback_lock
5✔
1801
        self._running_cb_futs = set()
5✔
1802

1803
    def register_callback(self, func, events):
5✔
1804
        with self.callback_lock:
5✔
1805
            for event in events:
5✔
1806
                self.callbacks[event].append(func)
5✔
1807

1808
    def unregister_callback(self, callback):
5✔
1809
        with self.callback_lock:
5✔
1810
            for callbacks in self.callbacks.values():
5✔
1811
                if callback in callbacks:
5✔
1812
                    callbacks.remove(callback)
5✔
1813

1814
    def trigger_callback(self, event, *args):
5✔
1815
        """Trigger a callback with given arguments.
1816
        Can be called from any thread. The callback itself will get scheduled
1817
        on the event loop.
1818
        """
1819
        loop = get_asyncio_loop()
5✔
1820
        assert loop.is_running(), "event loop not running"
5✔
1821
        with self.callback_lock:
5✔
1822
            callbacks = self.callbacks[event][:]
5✔
1823
        for callback in callbacks:
5✔
1824
            if asyncio.iscoroutinefunction(callback):  # async cb
5✔
1825
                fut = asyncio.run_coroutine_threadsafe(callback(*args), loop)
5✔
1826
                # keep strong references around to avoid GC issues:
1827
                self._running_cb_futs.add(fut)
5✔
1828
                def on_done(fut_: concurrent.futures.Future):
5✔
1829
                    assert fut_.done()
5✔
1830
                    self._running_cb_futs.remove(fut_)
5✔
1831
                    if fut_.cancelled():
5✔
1832
                        self.logger.debug(f"cb cancelled. {event=}.")
4✔
1833
                    elif exc := fut_.exception():
5✔
1834
                        self.logger.error(f"cb errored. {event=}. {exc=}", exc_info=exc)
×
1835
                fut.add_done_callback(on_done)
5✔
1836
            else:  # non-async cb
1837
                # note: the cb needs to run in the asyncio thread
1838
                if get_running_loop() == loop:
5✔
1839
                    # run callback immediately, so that it is guaranteed
1840
                    # to have been executed when this method returns
1841
                    callback(*args)
5✔
1842
                else:
1843
                    # note: if cb raises, asyncio will log the exception
1844
                    loop.call_soon_threadsafe(callback, *args)
×
1845

1846

1847
callback_mgr = CallbackManager()
5✔
1848
trigger_callback = callback_mgr.trigger_callback
5✔
1849
register_callback = callback_mgr.register_callback
5✔
1850
unregister_callback = callback_mgr.unregister_callback
5✔
1851
_event_listeners = defaultdict(set)  # type: Dict[str, Set[str]]
5✔
1852

1853

1854
class EventListener:
5✔
1855
    """Use as a mixin for a class that has methods to be triggered on events.
1856
    - Methods that receive the callbacks should be named "on_event_*" and decorated with @event_listener.
1857
    - register_callbacks() should be called exactly once per instance of EventListener, e.g. in __init__
1858
    - unregister_callbacks() should be called at least once, e.g. when the instance is destroyed
1859
    """
1860

1861
    def _list_callbacks(self):
5✔
1862
        for c in self.__class__.__mro__:
5✔
1863
            classpath = f"{c.__module__}.{c.__name__}"
5✔
1864
            for method_name in _event_listeners[classpath]:
5✔
1865
                method = getattr(self, method_name)
5✔
1866
                assert callable(method)
5✔
1867
                assert method_name.startswith('on_event_')
5✔
1868
                yield method_name[len('on_event_'):], method
5✔
1869

1870
    def register_callbacks(self):
5✔
1871
        for name, method in self._list_callbacks():
5✔
1872
            #_logger.debug(f'registering callback {method}')
1873
            register_callback(method, [name])
5✔
1874

1875
    def unregister_callbacks(self):
5✔
1876
        for name, method in self._list_callbacks():
5✔
1877
            #_logger.debug(f'unregistering callback {method}')
1878
            unregister_callback(method)
5✔
1879

1880

1881
def event_listener(func):
5✔
1882
    """To be used in subclasses of EventListener only. (how to enforce this programmatically?)"""
1883
    classname, method_name = func.__qualname__.split('.')
5✔
1884
    assert method_name.startswith('on_event_')
5✔
1885
    classpath = f"{func.__module__}.{classname}"
5✔
1886
    _event_listeners[classpath].add(method_name)
5✔
1887
    return func
5✔
1888

1889

1890
_NetAddrType = TypeVar("_NetAddrType")
5✔
1891
# requirements for _NetAddrType:
1892
# - reasonable __hash__() implementation (e.g. based on host/port of remote endpoint)
1893

1894
class NetworkRetryManager(Generic[_NetAddrType]):
5✔
1895
    """Truncated Exponential Backoff for network connections."""
1896

1897
    def __init__(
5✔
1898
            self, *,
1899
            max_retry_delay_normal: float,
1900
            init_retry_delay_normal: float,
1901
            max_retry_delay_urgent: float = None,
1902
            init_retry_delay_urgent: float = None,
1903
    ):
1904
        self._last_tried_addr = {}  # type: Dict[_NetAddrType, Tuple[float, int]]  # (unix ts, num_attempts)
5✔
1905

1906
        # note: these all use "seconds" as unit
1907
        if max_retry_delay_urgent is None:
5✔
1908
            max_retry_delay_urgent = max_retry_delay_normal
5✔
1909
        if init_retry_delay_urgent is None:
5✔
1910
            init_retry_delay_urgent = init_retry_delay_normal
5✔
1911
        self._max_retry_delay_normal = max_retry_delay_normal
5✔
1912
        self._init_retry_delay_normal = init_retry_delay_normal
5✔
1913
        self._max_retry_delay_urgent = max_retry_delay_urgent
5✔
1914
        self._init_retry_delay_urgent = init_retry_delay_urgent
5✔
1915

1916
    def _trying_addr_now(self, addr: _NetAddrType) -> None:
5✔
1917
        last_time, num_attempts = self._last_tried_addr.get(addr, (0, 0))
×
1918
        # we add up to 1 second of noise to the time, so that clients are less likely
1919
        # to get synchronised and bombard the remote in connection waves:
1920
        cur_time = time.time() + random.random()
×
1921
        self._last_tried_addr[addr] = cur_time, num_attempts + 1
×
1922

1923
    def _on_connection_successfully_established(self, addr: _NetAddrType) -> None:
5✔
1924
        self._last_tried_addr[addr] = time.time(), 0
×
1925

1926
    def _can_retry_addr(self, addr: _NetAddrType, *,
5✔
1927
                        now: float = None, urgent: bool = False) -> bool:
1928
        if now is None:
×
1929
            now = time.time()
×
1930
        last_time, num_attempts = self._last_tried_addr.get(addr, (0, 0))
×
1931
        if urgent:
×
1932
            max_delay = self._max_retry_delay_urgent
×
1933
            init_delay = self._init_retry_delay_urgent
×
1934
        else:
1935
            max_delay = self._max_retry_delay_normal
×
1936
            init_delay = self._init_retry_delay_normal
×
1937
        delay = self.__calc_delay(multiplier=init_delay, max_delay=max_delay, num_attempts=num_attempts)
×
1938
        next_time = last_time + delay
×
1939
        return next_time < now
×
1940

1941
    @classmethod
5✔
1942
    def __calc_delay(cls, *, multiplier: float, max_delay: float,
5✔
1943
                     num_attempts: int) -> float:
1944
        num_attempts = min(num_attempts, 100_000)
×
1945
        try:
×
1946
            res = multiplier * 2 ** num_attempts
×
1947
        except OverflowError:
×
1948
            return max_delay
×
1949
        return max(0, min(max_delay, res))
×
1950

1951
    def _clear_addr_retry_times(self) -> None:
5✔
1952
        self._last_tried_addr.clear()
5✔
1953

1954

1955
class ESocksProxy(aiorpcx.SOCKSProxy):
5✔
1956
    # note: proxy will not leak DNS as create_connection()
1957
    # sets (local DNS) resolve=False by default
1958

1959
    async def open_connection(self, host=None, port=None, **kwargs):
5✔
1960
        loop = asyncio.get_running_loop()
×
1961
        reader = asyncio.StreamReader(loop=loop)
×
1962
        protocol = asyncio.StreamReaderProtocol(reader, loop=loop)
×
1963
        transport, _ = await self.create_connection(
×
1964
            lambda: protocol, host, port, **kwargs)
1965
        writer = asyncio.StreamWriter(transport, protocol, reader, loop)
×
1966
        return reader, writer
×
1967

1968
    @classmethod
5✔
1969
    def from_network_settings(cls, network: Optional['Network']) -> Optional['ESocksProxy']:
5✔
1970
        if not network or not network.proxy:
5✔
1971
            return None
5✔
1972
        proxy = network.proxy
×
1973
        username, pw = proxy.get('user'), proxy.get('password')
×
1974
        if not username or not pw:
×
1975
            # is_proxy_tor is tri-state; None indicates it is still probing the proxy to test for TOR
1976
            if network.is_proxy_tor:
×
1977
                auth = aiorpcx.socks.SOCKSRandomAuth()
×
1978
            else:
1979
                auth = None
×
1980
        else:
1981
            auth = aiorpcx.socks.SOCKSUserAuth(username, pw)
×
1982
        addr = aiorpcx.NetAddress(proxy['host'], proxy['port'])
×
1983
        if proxy['mode'] == "socks4":
×
1984
            ret = cls(addr, aiorpcx.socks.SOCKS4a, auth)
×
1985
        elif proxy['mode'] == "socks5":
×
1986
            ret = cls(addr, aiorpcx.socks.SOCKS5, auth)
×
1987
        else:
1988
            raise NotImplementedError  # http proxy not available with aiorpcx
×
1989
        return ret
×
1990

1991

1992
class JsonRPCError(Exception):
5✔
1993

1994
    class Codes(enum.IntEnum):
5✔
1995
        # application-specific error codes
1996
        USERFACING = 1
5✔
1997
        INTERNAL = 2
5✔
1998

1999
    def __init__(self, *, code: int, message: str, data: Optional[dict] = None):
5✔
2000
        Exception.__init__(self)
×
2001
        self.code = code
×
2002
        self.message = message
×
2003
        self.data = data
×
2004

2005

2006
class JsonRPCClient:
5✔
2007

2008
    def __init__(self, session: aiohttp.ClientSession, url: str):
5✔
2009
        self.session = session
×
2010
        self.url = url
×
2011
        self._id = 0
×
2012

2013
    async def request(self, endpoint, *args):
5✔
2014
        """Send request to server, parse and return result.
2015
        note: parsing code is naive, the server is assumed to be well-behaved.
2016
              Up to the caller to handle exceptions, including those arising from parsing errors.
2017
        """
2018
        self._id += 1
×
2019
        data = ('{"jsonrpc": "2.0", "id":"%d", "method": "%s", "params": %s }'
×
2020
                % (self._id, endpoint, json.dumps(args)))
2021
        async with self.session.post(self.url, data=data) as resp:
×
2022
            if resp.status == 200:
×
2023
                r = await resp.json()
×
2024
                result = r.get('result')
×
2025
                error = r.get('error')
×
2026
                if error:
×
2027
                    raise JsonRPCError(code=error["code"], message=error["message"], data=error.get("data"))
×
2028
                else:
2029
                    return result
×
2030
            else:
2031
                text = await resp.text()
×
2032
                return 'Error: ' + str(text)
×
2033

2034
    def add_method(self, endpoint):
5✔
2035
        async def coro(*args):
×
2036
            return await self.request(endpoint, *args)
×
2037
        setattr(self, endpoint, coro)
×
2038

2039

2040
T = TypeVar('T')
5✔
2041

2042
def random_shuffled_copy(x: Iterable[T]) -> List[T]:
5✔
2043
    """Returns a shuffled copy of the input."""
2044
    x_copy = list(x)  # copy
5✔
2045
    random.shuffle(x_copy)  # shuffle in-place
5✔
2046
    return x_copy
5✔
2047

2048

2049
def test_read_write_permissions(path) -> None:
5✔
2050
    # note: There might already be a file at 'path'.
2051
    #       Make sure we do NOT overwrite/corrupt that!
2052
    temp_path = "%s.tmptest.%s" % (path, os.getpid())
5✔
2053
    echo = "fs r/w test"
5✔
2054
    try:
5✔
2055
        # test READ permissions for actual path
2056
        if os.path.exists(path):
5✔
2057
            with open(path, "rb") as f:
5✔
2058
                f.read(1)  # read 1 byte
5✔
2059
        # test R/W sanity for "similar" path
2060
        with open(temp_path, "w", encoding='utf-8') as f:
5✔
2061
            f.write(echo)
5✔
2062
        with open(temp_path, "r", encoding='utf-8') as f:
5✔
2063
            echo2 = f.read()
5✔
2064
        os.remove(temp_path)
5✔
2065
    except Exception as e:
×
2066
        raise IOError(e) from e
×
2067
    if echo != echo2:
5✔
2068
        raise IOError('echo sanity-check failed')
×
2069

2070

2071
class classproperty(property):
5✔
2072
    """~read-only class-level @property
2073
    from https://stackoverflow.com/a/13624858 by denis-ryzhkov
2074
    """
2075
    def __get__(self, owner_self, owner_cls):
5✔
2076
        return self.fget(owner_cls)
5✔
2077

2078

2079
def get_running_loop() -> Optional[asyncio.AbstractEventLoop]:
5✔
2080
    """Returns the asyncio event loop that is *running in this thread*, if any."""
2081
    try:
5✔
2082
        return asyncio.get_running_loop()
5✔
2083
    except RuntimeError:
×
2084
        return None
×
2085

2086

2087
def error_text_str_to_safe_str(err: str, *, max_len: Optional[int] = 500) -> str:
5✔
2088
    """Converts an untrusted error string to a sane printable ascii str.
2089
    Never raises.
2090
    """
2091
    text = error_text_bytes_to_safe_str(
5✔
2092
        err.encode("ascii", errors='backslashreplace'),
2093
        max_len=None)
2094
    return truncate_text(text, max_len=max_len)
5✔
2095

2096

2097
def error_text_bytes_to_safe_str(err: bytes, *, max_len: Optional[int] = 500) -> str:
5✔
2098
    """Converts an untrusted error bytes text to a sane printable ascii str.
2099
    Never raises.
2100

2101
    Note that naive ascii conversion would be insufficient. Fun stuff:
2102
    >>> b = b"my_long_prefix_blabla" + 21 * b"\x08" + b"malicious_stuff"
2103
    >>> s = b.decode("ascii")
2104
    >>> print(s)
2105
    malicious_stuffblabla
2106
    """
2107
    # convert to ascii, to get rid of unicode stuff
2108
    ascii_text = err.decode("ascii", errors='backslashreplace')
5✔
2109
    # do repr to handle ascii special chars (especially when printing/logging the str)
2110
    text = repr(ascii_text)
5✔
2111
    return truncate_text(text, max_len=max_len)
5✔
2112

2113

2114
def truncate_text(text: str, *, max_len: Optional[int]) -> str:
5✔
2115
    if max_len is None or len(text) <= max_len:
5✔
2116
        return text
5✔
2117
    else:
2118
        return text[:max_len] + f"... (truncated. orig_len={len(text)})"
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc