• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 6601523842514944

26 Jun 2025 02:22PM UTC coverage: 59.829% (-0.001%) from 59.83%
6601523842514944

Pull #9983

CirrusCI

f321x
android: build BarcodeScannerView from src

Adds a script `make_barcode_scanner.sh` which builds the
`BarcodeScannerView` library and its dependencies, `zxing-cpp` and
`CameraView` from source. Builds `zxing-cpp` architecture dependent
reducing the final apk size.
Pull Request #9983: android: replace qr code scanning library

21943 of 36676 relevant lines covered (59.83%)

2.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.2
/electrum/util.py
1
# Electrum - lightweight Bitcoin client
2
# Copyright (C) 2011 Thomas Voegtlin
3
#
4
# Permission is hereby granted, free of charge, to any person
5
# obtaining a copy of this software and associated documentation files
6
# (the "Software"), to deal in the Software without restriction,
7
# including without limitation the rights to use, copy, modify, merge,
8
# publish, distribute, sublicense, and/or sell copies of the Software,
9
# and to permit persons to whom the Software is furnished to do so,
10
# subject to the following conditions:
11
#
12
# The above copyright notice and this permission notice shall be
13
# included in all copies or substantial portions of the Software.
14
#
15
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
16
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
17
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
18
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
19
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
20
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
21
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22
# SOFTWARE.
23
import concurrent.futures
5✔
24
from dataclasses import dataclass
5✔
25
import logging
5✔
26
import os
5✔
27
import sys
5✔
28
import re
5✔
29
from collections import defaultdict, OrderedDict
5✔
30
from concurrent.futures.process import ProcessPoolExecutor
5✔
31
from typing import (
5✔
32
    NamedTuple, Union, TYPE_CHECKING, Tuple, Optional, Callable, Any, Sequence, Dict, Generic, TypeVar, List, Iterable,
33
    Set, Awaitable
34
)
35
from datetime import datetime, timezone, timedelta
5✔
36
import decimal
5✔
37
from decimal import Decimal
5✔
38
import threading
5✔
39
import hmac
5✔
40
import hashlib
5✔
41
import stat
5✔
42
import asyncio
5✔
43
import builtins
5✔
44
import json
5✔
45
import time
5✔
46
import ssl
5✔
47
import ipaddress
5✔
48
from ipaddress import IPv4Address, IPv6Address
5✔
49
import random
5✔
50
import secrets
5✔
51
import functools
5✔
52
from functools import partial
5✔
53
from abc import abstractmethod, ABC
5✔
54
import enum
5✔
55
from contextlib import nullcontext
5✔
56
import traceback
5✔
57

58
import aiohttp
5✔
59
from aiohttp_socks import ProxyConnector, ProxyType
5✔
60
import aiorpcx
5✔
61
import certifi
5✔
62
import dns.asyncresolver
5✔
63

64
from .i18n import _
5✔
65
from .logging import get_logger, Logger
5✔
66

67
if TYPE_CHECKING:
5✔
68
    from .network import Network, ProxySettings
×
69
    from .interface import Interface
×
70
    from .simple_config import SimpleConfig
×
71

72

73
_logger = get_logger(__name__)
5✔
74

75

76
def inv_dict(d):
5✔
77
    return {v: k for k, v in d.items()}
5✔
78

79

80
def all_subclasses(cls) -> Set:
5✔
81
    """Return all (transitive) subclasses of cls."""
82
    res = set(cls.__subclasses__())
5✔
83
    for sub in res.copy():
5✔
84
        res |= all_subclasses(sub)
5✔
85
    return res
5✔
86

87

88
ca_path = certifi.where()
5✔
89

90

91
base_units = {'BTC':8, 'mBTC':5, 'bits':2, 'sat':0}
5✔
92
base_units_inverse = inv_dict(base_units)
5✔
93
base_units_list = ['BTC', 'mBTC', 'bits', 'sat']  # list(dict) does not guarantee order
5✔
94

95
DECIMAL_POINT_DEFAULT = 5  # mBTC
5✔
96

97

98
class UnknownBaseUnit(Exception): pass
5✔
99

100

101
def decimal_point_to_base_unit_name(dp: int) -> str:
5✔
102
    # e.g. 8 -> "BTC"
103
    try:
5✔
104
        return base_units_inverse[dp]
5✔
105
    except KeyError:
×
106
        raise UnknownBaseUnit(dp) from None
×
107

108

109
def base_unit_name_to_decimal_point(unit_name: str) -> int:
5✔
110
    """Returns the max number of digits allowed after the decimal point."""
111
    # e.g. "BTC" -> 8
112
    try:
×
113
        return base_units[unit_name]
×
114
    except KeyError:
×
115
        raise UnknownBaseUnit(unit_name) from None
×
116

117
def parse_max_spend(amt: Any) -> Optional[int]:
5✔
118
    """Checks if given amount is "spend-max"-like.
119
    Returns None or the positive integer weight for "max". Never raises.
120

121
    When creating invoices and on-chain txs, the user can specify to send "max".
122
    This is done by setting the amount to '!'. Splitting max between multiple
123
    tx outputs is also possible, and custom weights (positive ints) can also be used.
124
    For example, to send 40% of all coins to address1, and 60% to address2:
125
    ```
126
    address1, 2!
127
    address2, 3!
128
    ```
129
    """
130
    if not (isinstance(amt, str) and amt and amt[-1] == '!'):
5✔
131
        return None
5✔
132
    if amt == '!':
5✔
133
        return 1
5✔
134
    x = amt[:-1]
5✔
135
    try:
5✔
136
        x = int(x)
5✔
137
    except ValueError:
×
138
        return None
×
139
    if x > 0:
5✔
140
        return x
5✔
141
    return None
×
142

143
class NotEnoughFunds(Exception):
5✔
144
    def __str__(self):
5✔
145
        return _("Insufficient funds")
×
146

147

148
class UneconomicFee(Exception):
5✔
149
    def __str__(self):
5✔
150
        return _("The fee for the transaction is higher than the funds gained from it.")
×
151

152

153
class NoDynamicFeeEstimates(Exception):
5✔
154
    def __str__(self):
5✔
155
        return _('Dynamic fee estimates not available')
×
156

157

158
class BelowDustLimit(Exception):
5✔
159
    pass
5✔
160

161

162
class InvalidPassword(Exception):
5✔
163
    def __init__(self, message: Optional[str] = None):
5✔
164
        self.message = message
5✔
165

166
    def __str__(self):
5✔
167
        if self.message is None:
×
168
            return _("Incorrect password")
×
169
        else:
170
            return str(self.message)
×
171

172

173
class AddTransactionException(Exception):
5✔
174
    pass
5✔
175

176

177
class UnrelatedTransactionException(AddTransactionException):
5✔
178
    def __str__(self):
5✔
179
        return _("Transaction is unrelated to this wallet.")
×
180

181

182
class FileImportFailed(Exception):
5✔
183
    def __init__(self, message=''):
5✔
184
        self.message = str(message)
×
185

186
    def __str__(self):
5✔
187
        return _("Failed to import from file.") + "\n" + self.message
×
188

189

190
class FileExportFailed(Exception):
5✔
191
    def __init__(self, message=''):
5✔
192
        self.message = str(message)
×
193

194
    def __str__(self):
5✔
195
        return _("Failed to export to file.") + "\n" + self.message
×
196

197

198
class WalletFileException(Exception):
5✔
199
    def __init__(self, message='', *, should_report_crash: bool = False):
5✔
200
        Exception.__init__(self, message)
5✔
201
        self.should_report_crash = should_report_crash
5✔
202

203

204
class BitcoinException(Exception): pass
5✔
205

206

207
class UserFacingException(Exception):
5✔
208
    """Exception that contains information intended to be shown to the user."""
209

210

211
class InvoiceError(UserFacingException): pass
5✔
212

213

214
class NetworkOfflineException(UserFacingException):
5✔
215
    """Can be raised if we are running in offline mode (--offline flag)
216
    and the user requests an operation that requires the network.
217
    """
218
    def __str__(self):
5✔
219
        return _("You are offline.")
×
220

221

222
# Throw this exception to unwind the stack like when an error occurs.
223
# However unlike other exceptions the user won't be informed.
224
class UserCancelled(Exception):
5✔
225
    '''An exception that is suppressed from the user'''
226
    pass
5✔
227

228

229
def to_decimal(x: Union[str, float, int, Decimal]) -> Decimal:
5✔
230
    # helper function mainly for float->Decimal conversion, i.e.:
231
    #   >>> Decimal(41754.681)
232
    #   Decimal('41754.680999999996856786310672760009765625')
233
    #   >>> Decimal("41754.681")
234
    #   Decimal('41754.681')
235
    if isinstance(x, Decimal):
5✔
236
        return x
5✔
237
    return Decimal(str(x))
5✔
238

239

240
# note: this is not a NamedTuple as then its json encoding cannot be customized
241
class Satoshis(object):
5✔
242
    __slots__ = ('value',)
5✔
243

244
    def __new__(cls, value):
5✔
245
        self = super(Satoshis, cls).__new__(cls)
×
246
        # note: 'value' sometimes has msat precision
247
        assert isinstance(value, (int, Decimal)), f"unexpected type for {value=!r}"
×
248
        self.value = value
×
249
        return self
×
250

251
    def __repr__(self):
5✔
252
        return f'Satoshis({self.value})'
×
253

254
    def __str__(self):
5✔
255
        # note: precision is truncated to satoshis here
256
        return format_satoshis(self.value)
×
257

258
    def __eq__(self, other):
5✔
259
        return self.value == other.value
×
260

261
    def __ne__(self, other):
5✔
262
        return not (self == other)
×
263

264
    def __add__(self, other):
5✔
265
        return Satoshis(self.value + other.value)
×
266

267

268
# note: this is not a NamedTuple as then its json encoding cannot be customized
269
class Fiat(object):
5✔
270
    __slots__ = ('value', 'ccy')
5✔
271

272
    def __new__(cls, value: Optional[Decimal], ccy: str):
5✔
273
        self = super(Fiat, cls).__new__(cls)
×
274
        self.ccy = ccy
×
275
        if not isinstance(value, (Decimal, type(None))):
×
276
            raise TypeError(f"value should be Decimal or None, not {type(value)}")
×
277
        self.value = value
×
278
        return self
×
279

280
    def __repr__(self):
5✔
281
        return 'Fiat(%s)'% self.__str__()
×
282

283
    def __str__(self):
5✔
284
        if self.value is None or self.value.is_nan():
×
285
            return _('No Data')
×
286
        else:
287
            return "{:.2f}".format(self.value)
×
288

289
    def to_ui_string(self):
5✔
290
        if self.value is None or self.value.is_nan():
×
291
            return _('No Data')
×
292
        else:
293
            return "{:.2f}".format(self.value) + ' ' + self.ccy
×
294

295
    def __eq__(self, other):
5✔
296
        if not isinstance(other, Fiat):
×
297
            return False
×
298
        if self.ccy != other.ccy:
×
299
            return False
×
300
        if isinstance(self.value, Decimal) and isinstance(other.value, Decimal) \
×
301
                and self.value.is_nan() and other.value.is_nan():
302
            return True
×
303
        return self.value == other.value
×
304

305
    def __ne__(self, other):
5✔
306
        return not (self == other)
×
307

308
    def __add__(self, other):
5✔
309
        assert self.ccy == other.ccy
×
310
        return Fiat(self.value + other.value, self.ccy)
×
311

312

313
class MyEncoder(json.JSONEncoder):
5✔
314
    def default(self, obj):
5✔
315
        # note: this does not get called for namedtuples :(  https://bugs.python.org/issue30343
316
        from .transaction import Transaction, TxOutput
5✔
317
        if isinstance(obj, Transaction):
5✔
318
            return obj.serialize()
5✔
319
        if isinstance(obj, TxOutput):
5✔
320
            return obj.to_legacy_tuple()
5✔
321
        if isinstance(obj, Satoshis):
5✔
322
            return str(obj)
×
323
        if isinstance(obj, Fiat):
5✔
324
            return str(obj)
×
325
        if isinstance(obj, Decimal):
5✔
326
            return str(obj)
×
327
        if isinstance(obj, datetime):
5✔
328
            return obj.isoformat(' ')[:-3]
×
329
        if isinstance(obj, set):
5✔
330
            return list(obj)
×
331
        if isinstance(obj, bytes): # for nametuples in lnchannel
5✔
332
            return obj.hex()
5✔
333
        if hasattr(obj, 'to_json') and callable(obj.to_json):
5✔
334
            return obj.to_json()
5✔
335
        return super(MyEncoder, self).default(obj)
×
336

337

338
class ThreadJob(Logger):
5✔
339
    """A job that is run periodically from a thread's main loop.  run() is
340
    called from that thread's context.
341
    """
342

343
    def __init__(self):
5✔
344
        Logger.__init__(self)
5✔
345

346
    def run(self):
5✔
347
        """Called periodically from the thread"""
348
        pass
×
349

350
class DebugMem(ThreadJob):
5✔
351
    '''A handy class for debugging GC memory leaks'''
352
    def __init__(self, classes, interval=30):
5✔
353
        ThreadJob.__init__(self)
×
354
        self.next_time = 0
×
355
        self.classes = classes
×
356
        self.interval = interval
×
357

358
    def mem_stats(self):
5✔
359
        import gc
×
360
        self.logger.info("Start memscan")
×
361
        gc.collect()
×
362
        objmap = defaultdict(list)
×
363
        for obj in gc.get_objects():
×
364
            for class_ in self.classes:
×
365
                if isinstance(obj, class_):
×
366
                    objmap[class_].append(obj)
×
367
        for class_, objs in objmap.items():
×
368
            self.logger.info(f"{class_.__name__}: {len(objs)}")
×
369
        self.logger.info("Finish memscan")
×
370

371
    def run(self):
5✔
372
        if time.time() > self.next_time:
×
373
            self.mem_stats()
×
374
            self.next_time = time.time() + self.interval
×
375

376
class DaemonThread(threading.Thread, Logger):
5✔
377
    """ daemon thread that terminates cleanly """
378

379
    def __init__(self):
5✔
380
        threading.Thread.__init__(self)
5✔
381
        Logger.__init__(self)
5✔
382
        self.parent_thread = threading.current_thread()
5✔
383
        self.running = False
5✔
384
        self.running_lock = threading.Lock()
5✔
385
        self.job_lock = threading.Lock()
5✔
386
        self.jobs = []
5✔
387
        self.stopped_event = threading.Event()        # set when fully stopped
5✔
388
        self.stopped_event_async = asyncio.Event()    # set when fully stopped
5✔
389
        self.wake_up_event = threading.Event()  # for perf optimisation of polling in run()
5✔
390

391
    def add_jobs(self, jobs):
5✔
392
        with self.job_lock:
5✔
393
            self.jobs.extend(jobs)
5✔
394

395
    def run_jobs(self):
5✔
396
        # Don't let a throwing job disrupt the thread, future runs of
397
        # itself, or other jobs.  This is useful protection against
398
        # malformed or malicious server responses
399
        with self.job_lock:
5✔
400
            for job in self.jobs:
5✔
401
                try:
5✔
402
                    job.run()
5✔
403
                except Exception as e:
×
404
                    self.logger.exception('')
×
405

406
    def remove_jobs(self, jobs):
5✔
407
        with self.job_lock:
×
408
            for job in jobs:
×
409
                self.jobs.remove(job)
×
410

411
    def start(self):
5✔
412
        with self.running_lock:
5✔
413
            self.running = True
5✔
414
        return threading.Thread.start(self)
5✔
415

416
    def is_running(self):
5✔
417
        with self.running_lock:
5✔
418
            return self.running and self.parent_thread.is_alive()
5✔
419

420
    def stop(self):
5✔
421
        with self.running_lock:
5✔
422
            self.running = False
5✔
423
            self.wake_up_event.set()
5✔
424
            self.wake_up_event.clear()
5✔
425

426
    def on_stop(self):
5✔
427
        if 'ANDROID_DATA' in os.environ:
5✔
428
            import jnius
×
429
            jnius.detach()
×
430
            self.logger.info("jnius detach")
×
431
        self.logger.info("stopped")
5✔
432
        self.stopped_event.set()
5✔
433
        loop = get_asyncio_loop()
5✔
434
        loop.call_soon_threadsafe(self.stopped_event_async.set)
5✔
435

436

437
def print_stderr(*args):
5✔
438
    args = [str(item) for item in args]
×
439
    sys.stderr.write(" ".join(args) + "\n")
×
440
    sys.stderr.flush()
×
441

442

443
def print_msg(*args):
5✔
444
    # Stringify args
445
    args = [str(item) for item in args]
×
446
    sys.stdout.write(" ".join(args) + "\n")
×
447
    sys.stdout.flush()
×
448

449

450
def json_encode(obj):
5✔
451
    try:
×
452
        s = json.dumps(obj, sort_keys = True, indent = 4, cls=MyEncoder)
×
453
    except TypeError:
×
454
        s = repr(obj)
×
455
    return s
×
456

457

458
def json_decode(x):
5✔
459
    try:
5✔
460
        return json.loads(x, parse_float=Decimal)
5✔
461
    except Exception:
5✔
462
        return x
5✔
463

464

465
def json_normalize(x):
5✔
466
    # note: The return value of commands, when going through the JSON-RPC interface,
467
    #       is json-encoded. The encoder used there cannot handle some types, e.g. electrum.util.Satoshis.
468
    # note: We should not simply do "json_encode(x)" here, as then later x would get doubly json-encoded.
469
    # see #5868
470
    return json_decode(json_encode(x))
×
471

472

473
# taken from Django Source Code
474
def constant_time_compare(val1, val2):
5✔
475
    """Return True if the two strings are equal, False otherwise."""
476
    return hmac.compare_digest(to_bytes(val1, 'utf8'), to_bytes(val2, 'utf8'))
×
477

478

479
_profiler_logger = _logger.getChild('profiler')
5✔
480

481

482
def profiler(func=None, *, min_threshold: Union[int, float, None] = None):
5✔
483
    """Function decorator that logs execution time.
484

485
    min_threshold: if set, only log if time taken is higher than threshold
486
    """
487
    if func is None:  # to make "@profiler(...)" work. (in addition to bare "@profiler")
5✔
488
        return partial(profiler, min_threshold=min_threshold)
5✔
489
    t0 = None  # type: Optional[float]
5✔
490

491
    def timer_start():
5✔
492
        nonlocal t0
493
        t0 = time.time()
5✔
494

495
    def timer_done():
5✔
496
        t = time.time() - t0
5✔
497
        if min_threshold is None or t > min_threshold:
5✔
498
            _profiler_logger.debug(f"{func.__qualname__} {t:,.4f} sec")
5✔
499

500
    if asyncio.iscoroutinefunction(func):
5✔
501
        async def do_profile(*args, **kw_args):
×
502
            timer_start()
×
503
            o = await func(*args, **kw_args)
×
504
            timer_done()
×
505
            return o
×
506
    else:
507
        def do_profile(*args, **kw_args):
5✔
508
            timer_start()
5✔
509
            o = func(*args, **kw_args)
5✔
510
            timer_done()
5✔
511
            return o
5✔
512
    return do_profile
5✔
513

514

515
class AsyncHangDetector:
5✔
516
    """Context manager that logs every `n` seconds if encapsulated context still has not exited."""
517

518
    def __init__(
5✔
519
        self,
520
        *,
521
        period_sec: int = 15,
522
        message: str,
523
        logger: logging.Logger = None,
524
    ):
525
        self.period_sec = period_sec
5✔
526
        self.message = message
5✔
527
        self.logger = logger or _logger
5✔
528

529
    async def _monitor(self):
5✔
530
        # note: this assumes that the event loop itself is not blocked
531
        t0 = time.monotonic()
5✔
532
        while True:
5✔
533
            await asyncio.sleep(self.period_sec)
5✔
534
            t1 = time.monotonic()
×
535
            self.logger.info(f"{self.message} (after {t1 - t0:.2f} sec)")
×
536

537
    async def __aenter__(self):
5✔
538
        self.mtask = asyncio.create_task(self._monitor())
5✔
539

540
    async def __aexit__(self, exc_type, exc, tb):
5✔
541
        self.mtask.cancel()
5✔
542

543

544
def android_ext_dir():
5✔
545
    from android.storage import primary_external_storage_path
×
546
    return primary_external_storage_path()
×
547

548

549
def android_backup_dir():
5✔
550
    pkgname = get_android_package_name()
×
551
    d = os.path.join(android_ext_dir(), pkgname)
×
552
    if not os.path.exists(d):
×
553
        os.mkdir(d)
×
554
    return d
×
555

556

557
def android_data_dir():
5✔
558
    import jnius
×
559
    PythonActivity = jnius.autoclass('org.kivy.android.PythonActivity')
×
560
    return PythonActivity.mActivity.getFilesDir().getPath() + '/data'
×
561

562

563
def ensure_sparse_file(filename):
5✔
564
    # On modern Linux, no need to do anything.
565
    # On Windows, need to explicitly mark file.
566
    if os.name == "nt":
×
567
        try:
×
568
            os.system('fsutil sparse setflag "{}" 1'.format(filename))
×
569
        except Exception as e:
×
570
            _logger.info(f'error marking file {filename} as sparse: {e}')
×
571

572

573
def get_headers_dir(config):
5✔
574
    return config.path
5✔
575

576

577
def assert_datadir_available(config_path):
5✔
578
    path = config_path
5✔
579
    if os.path.exists(path):
5✔
580
        return
5✔
581
    else:
582
        raise FileNotFoundError(
×
583
            'Electrum datadir does not exist. Was it deleted while running?' + '\n' +
584
            'Should be at {}'.format(path))
585

586

587
def assert_file_in_datadir_available(path, config_path):
5✔
588
    if os.path.exists(path):
×
589
        return
×
590
    else:
591
        assert_datadir_available(config_path)
×
592
        raise FileNotFoundError(
×
593
            'Cannot find file but datadir is there.' + '\n' +
594
            'Should be at {}'.format(path))
595

596

597
def standardize_path(path):
5✔
598
    # note: os.path.realpath() is not used, as on Windows it can return non-working paths (see #8495).
599
    #       This means that we don't resolve symlinks!
600
    return os.path.normcase(
5✔
601
                os.path.abspath(
602
                    os.path.expanduser(
603
                        path
604
    )))
605

606

607
def get_new_wallet_name(wallet_folder: str) -> str:
5✔
608
    """Returns a file basename for a new wallet to be used.
609
    Can raise OSError.
610
    """
611
    i = 1
5✔
612
    while True:
5✔
613
        filename = "wallet_%d" % i
5✔
614
        if filename in os.listdir(wallet_folder):
5✔
615
            i += 1
5✔
616
        else:
617
            break
5✔
618
    return filename
5✔
619

620

621
def is_android_debug_apk() -> bool:
5✔
622
    is_android = 'ANDROID_DATA' in os.environ
×
623
    if not is_android:
×
624
        return False
×
625
    from jnius import autoclass
×
626
    pkgname = get_android_package_name()
×
627
    build_config = autoclass(f"{pkgname}.BuildConfig")
×
628
    return bool(build_config.DEBUG)
×
629

630

631
def get_android_package_name() -> str:
5✔
632
    is_android = 'ANDROID_DATA' in os.environ
×
633
    assert is_android
×
634
    from jnius import autoclass
×
635
    from android.config import ACTIVITY_CLASS_NAME
×
636
    activity = autoclass(ACTIVITY_CLASS_NAME).mActivity
×
637
    pkgname = str(activity.getPackageName())
×
638
    return pkgname
×
639

640

641
def assert_bytes(*args):
5✔
642
    """
643
    porting helper, assert args type
644
    """
645
    try:
5✔
646
        for x in args:
5✔
647
            assert isinstance(x, (bytes, bytearray))
5✔
648
    except Exception:
×
649
        print('assert bytes failed', list(map(type, args)))
×
650
        raise
×
651

652

653
def assert_str(*args):
5✔
654
    """
655
    porting helper, assert args type
656
    """
657
    for x in args:
×
658
        assert isinstance(x, str)
×
659

660

661
def to_string(x, enc) -> str:
5✔
662
    if isinstance(x, (bytes, bytearray)):
5✔
663
        return x.decode(enc)
5✔
664
    if isinstance(x, str):
×
665
        return x
×
666
    else:
667
        raise TypeError("Not a string or bytes like object")
×
668

669

670
def to_bytes(something, encoding='utf8') -> bytes:
5✔
671
    """
672
    cast string to bytes() like object, but for python2 support it's bytearray copy
673
    """
674
    if isinstance(something, bytes):
5✔
675
        return something
5✔
676
    if isinstance(something, str):
5✔
677
        return something.encode(encoding)
5✔
678
    elif isinstance(something, bytearray):
5✔
679
        return bytes(something)
5✔
680
    else:
681
        raise TypeError("Not a string or bytes like object")
5✔
682

683

684
bfh = bytes.fromhex
5✔
685

686

687
def xor_bytes(a: bytes, b: bytes) -> bytes:
5✔
688
    size = min(len(a), len(b))
5✔
689
    return ((int.from_bytes(a[:size], "big") ^ int.from_bytes(b[:size], "big"))
5✔
690
            .to_bytes(size, "big"))
691

692

693
def user_dir():
5✔
694
    if "ELECTRUMDIR" in os.environ:
5✔
695
        return os.environ["ELECTRUMDIR"]
×
696
    elif 'ANDROID_DATA' in os.environ:
5✔
697
        return android_data_dir()
×
698
    elif os.name == 'posix':
5✔
699
        return os.path.join(os.environ["HOME"], ".electrum")
5✔
700
    elif "APPDATA" in os.environ:
×
701
        return os.path.join(os.environ["APPDATA"], "Electrum")
×
702
    elif "LOCALAPPDATA" in os.environ:
×
703
        return os.path.join(os.environ["LOCALAPPDATA"], "Electrum")
×
704
    else:
705
        #raise Exception("No home directory found in environment variables.")
706
        return
×
707

708

709
def resource_path(*parts):
5✔
710
    return os.path.join(pkg_dir, *parts)
5✔
711

712

713
# absolute path to python package folder of electrum ("lib")
714
pkg_dir = os.path.split(os.path.realpath(__file__))[0]
5✔
715

716

717
def is_valid_email(s):
5✔
718
    regexp = r"[^@]+@[^@]+\.[^@]+"
×
719
    return re.match(regexp, s) is not None
×
720

721

722
def is_valid_websocket_url(url: str) -> bool:
5✔
723
    """
724
    uses this django url validation regex:
725
    https://github.com/django/django/blob/2c6906a0c4673a7685817156576724aba13ad893/django/core/validators.py#L45C1-L52C43
726
    Note: this is not perfect, urls and their parsing can get very complex (see recent django code).
727
    however its sufficient for catching weird user input in the gui dialog
728
    """
729
    # stores the compiled regex in the function object itself to avoid recompiling it every call
730
    if not hasattr(is_valid_websocket_url, "regex"):
×
731
        is_valid_websocket_url.regex = re.compile(
×
732
            r'^(?:ws|wss)://'  # ws:// or wss://
733
            r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|'  # domain...
734
            r'localhost|'  # localhost...
735
            r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}|'  # ...or ipv4
736
            r'\[?[A-F0-9]*:[A-F0-9:]+\]?)'  # ...or ipv6
737
            r'(?::\d+)?'  # optional port
738
            r'(?:/?|[/?]\S+)$', re.IGNORECASE)
739
    try:
×
740
        return re.match(is_valid_websocket_url.regex, url) is not None
×
741
    except Exception:
×
742
        return False
×
743

744

745
def is_hash256_str(text: Any) -> bool:
5✔
746
    if not isinstance(text, str): return False
5✔
747
    if len(text) != 64: return False
5✔
748
    return is_hex_str(text)
5✔
749

750

751
def is_hex_str(text: Any) -> bool:
5✔
752
    if not isinstance(text, str): return False
5✔
753
    try:
5✔
754
        b = bytes.fromhex(text)
5✔
755
    except Exception:
5✔
756
        return False
5✔
757
    # forbid whitespaces in text:
758
    if len(text) != 2 * len(b):
5✔
759
        return False
5✔
760
    return True
5✔
761

762

763
def is_integer(val: Any) -> bool:
5✔
764
    return isinstance(val, int)
5✔
765

766

767
def is_non_negative_integer(val: Any) -> bool:
5✔
768
    if is_integer(val):
5✔
769
        return val >= 0
5✔
770
    return False
5✔
771

772

773
def is_int_or_float(val: Any) -> bool:
5✔
774
    return isinstance(val, (int, float))
5✔
775

776

777
def is_non_negative_int_or_float(val: Any) -> bool:
5✔
778
    if is_int_or_float(val):
5✔
779
        return val >= 0
5✔
780
    return False
5✔
781

782

783
def chunks(items, size: int):
5✔
784
    """Break up items, an iterable, into chunks of length size."""
785
    if size < 1:
5✔
786
        raise ValueError(f"size must be positive, not {repr(size)}")
5✔
787
    for i in range(0, len(items), size):
5✔
788
        yield items[i: i + size]
5✔
789

790

791
def format_satoshis_plain(
5✔
792
        x: Union[int, float, Decimal, str],  # amount in satoshis,
793
        *,
794
        decimal_point: int = 8,  # how much to shift decimal point to left (default: sat->BTC)
795
) -> str:
796
    """Display a satoshi amount scaled.  Always uses a '.' as a decimal
797
    point and has no thousands separator"""
798
    if parse_max_spend(x):
5✔
799
        return f'max({x})'
×
800
    assert isinstance(x, (int, float, Decimal)), f"{x!r} should be a number"
5✔
801
    scale_factor = pow(10, decimal_point)
5✔
802
    return "{:.8f}".format(Decimal(x) / scale_factor).rstrip('0').rstrip('.')
5✔
803

804

805
# Check that Decimal precision is sufficient.
806
# We need at the very least ~20, as we deal with msat amounts, and
807
# log10(21_000_000 * 10**8 * 1000) ~= 18.3
808
# decimal.DefaultContext.prec == 28 by default, but it is mutable.
809
# We enforce that we have at least that available.
810
assert decimal.getcontext().prec >= 28, f"PyDecimal precision too low: {decimal.getcontext().prec}"
5✔
811

812
# DECIMAL_POINT = locale.localeconv()['decimal_point']  # type: str
813
DECIMAL_POINT = "."
5✔
814
THOUSANDS_SEP = " "
5✔
815
assert len(DECIMAL_POINT) == 1, f"DECIMAL_POINT has unexpected len. {DECIMAL_POINT!r}"
5✔
816
assert len(THOUSANDS_SEP) == 1, f"THOUSANDS_SEP has unexpected len. {THOUSANDS_SEP!r}"
5✔
817

818

819
def format_satoshis(
5✔
820
        x: Union[int, float, Decimal, str, None],  # amount in satoshis
821
        *,
822
        num_zeros: int = 0,
823
        decimal_point: int = 8,  # how much to shift decimal point to left (default: sat->BTC)
824
        precision: int = 0,  # extra digits after satoshi precision
825
        is_diff: bool = False,  # if True, enforce a leading sign (+/-)
826
        whitespaces: bool = False,  # if True, add whitespaces, to align numbers in a column
827
        add_thousands_sep: bool = False,  # if True, add whitespaces, for better readability of the numbers
828
) -> str:
829
    if x is None:
5✔
830
        return 'unknown'
×
831
    if parse_max_spend(x):
5✔
832
        return f'max({x})'
×
833
    assert isinstance(x, (int, float, Decimal)), f"{x!r} should be a number"
5✔
834
    # lose redundant precision
835
    x = Decimal(x).quantize(Decimal(10) ** (-precision))
5✔
836
    # format string
837
    overall_precision = decimal_point + precision  # max digits after final decimal point
5✔
838
    decimal_format = "." + str(overall_precision) if overall_precision > 0 else ""
5✔
839
    if is_diff:
5✔
840
        decimal_format = '+' + decimal_format
5✔
841
    # initial result
842
    scale_factor = pow(10, decimal_point)
5✔
843
    result = ("{:" + decimal_format + "f}").format(x / scale_factor)
5✔
844
    if "." not in result: result += "."
5✔
845
    result = result.rstrip('0')
5✔
846
    # add extra decimal places (zeros)
847
    integer_part, fract_part = result.split(".")
5✔
848
    if len(fract_part) < num_zeros:
5✔
849
        fract_part += "0" * (num_zeros - len(fract_part))
5✔
850
    # add whitespaces as thousands' separator for better readability of numbers
851
    if add_thousands_sep:
5✔
852
        sign = integer_part[0] if integer_part[0] in ("+", "-") else ""
5✔
853
        if sign == "-":
5✔
854
            integer_part = integer_part[1:]
5✔
855
        integer_part = "{:,}".format(int(integer_part)).replace(',', THOUSANDS_SEP)
5✔
856
        integer_part = sign + integer_part
5✔
857
        fract_part = THOUSANDS_SEP.join(fract_part[i:i+3] for i in range(0, len(fract_part), 3))
5✔
858
    result = integer_part + DECIMAL_POINT + fract_part
5✔
859
    # add leading/trailing whitespaces so that numbers can be aligned in a column
860
    if whitespaces:
5✔
861
        target_fract_len = overall_precision
5✔
862
        target_integer_len = 14 - decimal_point  # should be enough for up to unsigned 999999 BTC
5✔
863
        if add_thousands_sep:
5✔
864
            target_fract_len += max(0, (target_fract_len - 1) // 3)
5✔
865
            target_integer_len += max(0, (target_integer_len - 1) // 3)
5✔
866
        # add trailing whitespaces
867
        result += " " * (target_fract_len - len(fract_part))
5✔
868
        # add leading whitespaces
869
        target_total_len = target_integer_len + 1 + target_fract_len
5✔
870
        result = " " * (target_total_len - len(result)) + result
5✔
871
    return result
5✔
872

873

874
FEERATE_PRECISION = 1  # num fractional decimal places for sat/byte fee rates
5✔
875
_feerate_quanta = Decimal(10) ** (-FEERATE_PRECISION)
5✔
876
UI_UNIT_NAME_FEERATE_SAT_PER_VBYTE = "sat/vbyte"
5✔
877
UI_UNIT_NAME_FEERATE_SAT_PER_VB = "sat/vB"
5✔
878
UI_UNIT_NAME_TXSIZE_VBYTES = "vbytes"
5✔
879
UI_UNIT_NAME_MEMPOOL_MB = "vMB"
5✔
880

881

882
def format_fee_satoshis(fee, *, num_zeros=0, precision=None):
5✔
883
    if precision is None:
5✔
884
        precision = FEERATE_PRECISION
5✔
885
    num_zeros = min(num_zeros, FEERATE_PRECISION)  # no more zeroes than available prec
5✔
886
    return format_satoshis(fee, num_zeros=num_zeros, decimal_point=0, precision=precision)
5✔
887

888

889
def quantize_feerate(fee) -> Union[None, Decimal, int]:
5✔
890
    """Strip sat/byte fee rate of excess precision."""
891
    if fee is None:
5✔
892
        return None
×
893
    return Decimal(fee).quantize(_feerate_quanta, rounding=decimal.ROUND_HALF_DOWN)
5✔
894

895

896
def timestamp_to_datetime(timestamp: Union[int, float, None], *, utc: bool = False) -> Optional[datetime]:
5✔
897
    if timestamp is None:
5✔
898
        return None
×
899
    tz = None
5✔
900
    if utc:
5✔
901
        tz = timezone.utc
×
902
    return datetime.fromtimestamp(timestamp, tz=tz)
5✔
903

904

905
def format_time(timestamp: Union[int, float, None]) -> str:
5✔
906
    date = timestamp_to_datetime(timestamp)
×
907
    return date.isoformat(' ', timespec="minutes") if date else _("Unknown")
×
908

909

910
def age(
5✔
911
    from_date: Union[int, float, None],  # POSIX timestamp
912
    *,
913
    since_date: datetime = None,
914
    target_tz=None,
915
    include_seconds: bool = False,
916
) -> str:
917
    """Takes a timestamp and returns a string with the approximation of the age"""
918
    if from_date is None:
5✔
919
        return _("Unknown")
5✔
920
    from_date = datetime.fromtimestamp(from_date)
5✔
921
    if since_date is None:
5✔
922
        since_date = datetime.now(target_tz)
×
923
    distance_in_time = from_date - since_date
5✔
924
    is_in_past = from_date < since_date
5✔
925
    s = delta_time_str(distance_in_time, include_seconds=include_seconds)
5✔
926
    return _("{} ago").format(s) if is_in_past else _("in {}").format(s)
5✔
927

928

929
def delta_time_str(distance_in_time: timedelta, *, include_seconds: bool = False) -> str:
5✔
930
    distance_in_seconds = int(round(abs(distance_in_time.days * 86400 + distance_in_time.seconds)))
5✔
931
    distance_in_minutes = int(round(distance_in_seconds / 60))
5✔
932
    if distance_in_minutes == 0:
5✔
933
        if include_seconds:
5✔
934
            return _("{} seconds").format(distance_in_seconds)
5✔
935
        else:
936
            return _("less than a minute")
5✔
937
    elif distance_in_minutes < 45:
5✔
938
        return _("about {} minutes").format(distance_in_minutes)
5✔
939
    elif distance_in_minutes < 90:
5✔
940
        return _("about 1 hour")
5✔
941
    elif distance_in_minutes < 1440:
5✔
942
        return _("about {} hours").format(round(distance_in_minutes / 60.0))
5✔
943
    elif distance_in_minutes < 2880:
5✔
944
        return _("about 1 day")
5✔
945
    elif distance_in_minutes < 43220:
5✔
946
        return _("about {} days").format(round(distance_in_minutes / 1440))
5✔
947
    elif distance_in_minutes < 86400:
5✔
948
        return _("about 1 month")
5✔
949
    elif distance_in_minutes < 525600:
5✔
950
        return _("about {} months").format(round(distance_in_minutes / 43200))
5✔
951
    elif distance_in_minutes < 1051200:
5✔
952
        return _("about 1 year")
5✔
953
    else:
954
        return _("over {} years").format(round(distance_in_minutes / 525600))
5✔
955

956

957
mainnet_block_explorers = {
5✔
958
    '3xpl.com': ('https://3xpl.com/bitcoin/',
959
                        {'tx': 'transaction/', 'addr': 'address/'}),
960
    'Bitflyer.jp': ('https://chainflyer.bitflyer.jp/',
961
                        {'tx': 'Transaction/', 'addr': 'Address/'}),
962
    'Blockchain.info': ('https://blockchain.com/btc/',
963
                        {'tx': 'tx/', 'addr': 'address/'}),
964
    'Blockstream.info': ('https://blockstream.info/',
965
                        {'tx': 'tx/', 'addr': 'address/'}),
966
    'Bitaps.com': ('https://btc.bitaps.com/',
967
                        {'tx': '', 'addr': ''}),
968
    'BTC.com': ('https://btc.com/',
969
                        {'tx': '', 'addr': ''}),
970
    'Chain.so': ('https://www.chain.so/',
971
                        {'tx': 'tx/BTC/', 'addr': 'address/BTC/'}),
972
    'Insight.is': ('https://insight.bitpay.com/',
973
                        {'tx': 'tx/', 'addr': 'address/'}),
974
    'BlockCypher.com': ('https://live.blockcypher.com/btc/',
975
                        {'tx': 'tx/', 'addr': 'address/'}),
976
    'Blockchair.com': ('https://blockchair.com/bitcoin/',
977
                        {'tx': 'transaction/', 'addr': 'address/'}),
978
    'blockonomics.co': ('https://www.blockonomics.co/',
979
                        {'tx': 'api/tx?txid=', 'addr': '#/search?q='}),
980
    'mempool.space': ('https://mempool.space/',
981
                        {'tx': 'tx/', 'addr': 'address/'}),
982
    'mempool.emzy.de': ('https://mempool.emzy.de/',
983
                        {'tx': 'tx/', 'addr': 'address/'}),
984
    'OXT.me': ('https://oxt.me/',
985
                        {'tx': 'transaction/', 'addr': 'address/'}),
986
    'mynode.local': ('http://mynode.local:3002/',
987
                        {'tx': 'tx/', 'addr': 'address/'}),
988
    'system default': ('blockchain:/',
989
                        {'tx': 'tx/', 'addr': 'address/'}),
990
}
991

992
testnet_block_explorers = {
5✔
993
    'Bitaps.com': ('https://tbtc.bitaps.com/',
994
                       {'tx': '', 'addr': ''}),
995
    'BlockCypher.com': ('https://live.blockcypher.com/btc-testnet/',
996
                       {'tx': 'tx/', 'addr': 'address/'}),
997
    'Blockchain.info': ('https://www.blockchain.com/btc-testnet/',
998
                       {'tx': 'tx/', 'addr': 'address/'}),
999
    'Blockstream.info': ('https://blockstream.info/testnet/',
1000
                        {'tx': 'tx/', 'addr': 'address/'}),
1001
    'mempool.space': ('https://mempool.space/testnet/',
1002
                        {'tx': 'tx/', 'addr': 'address/'}),
1003
    'smartbit.com.au': ('https://testnet.smartbit.com.au/',
1004
                       {'tx': 'tx/', 'addr': 'address/'}),
1005
    'system default': ('blockchain://000000000933ea01ad0ee984209779baaec3ced90fa3f408719526f8d77f4943/',
1006
                       {'tx': 'tx/', 'addr': 'address/'}),
1007
}
1008

1009
testnet4_block_explorers = {
5✔
1010
    'mempool.space': ('https://mempool.space/testnet4/',
1011
                        {'tx': 'tx/', 'addr': 'address/'}),
1012
    'wakiyamap.dev': ('https://testnet4-explorer.wakiyamap.dev/',
1013
                       {'tx': 'tx/', 'addr': 'address/'}),
1014
}
1015

1016
signet_block_explorers = {
5✔
1017
    'bc-2.jp': ('https://explorer.bc-2.jp/',
1018
                        {'tx': 'tx/', 'addr': 'address/'}),
1019
    'mempool.space': ('https://mempool.space/signet/',
1020
                        {'tx': 'tx/', 'addr': 'address/'}),
1021
    'bitcoinexplorer.org': ('https://signet.bitcoinexplorer.org/',
1022
                       {'tx': 'tx/', 'addr': 'address/'}),
1023
    'wakiyamap.dev': ('https://signet-explorer.wakiyamap.dev/',
1024
                       {'tx': 'tx/', 'addr': 'address/'}),
1025
    'ex.signet.bublina.eu.org': ('https://ex.signet.bublina.eu.org/',
1026
                       {'tx': 'tx/', 'addr': 'address/'}),
1027
    'system default': ('blockchain:/',
1028
                       {'tx': 'tx/', 'addr': 'address/'}),
1029
}
1030

1031
_block_explorer_default_api_loc = {'tx': 'tx/', 'addr': 'address/'}
5✔
1032

1033

1034
def block_explorer_info():
5✔
1035
    from . import constants
×
1036
    if constants.net.NET_NAME == "testnet":
×
1037
        return testnet_block_explorers
×
1038
    elif constants.net.NET_NAME == "testnet4":
×
1039
        return testnet4_block_explorers
×
1040
    elif constants.net.NET_NAME == "signet":
×
1041
        return signet_block_explorers
×
1042
    return mainnet_block_explorers
×
1043

1044

1045
def block_explorer(config: 'SimpleConfig') -> Optional[str]:
5✔
1046
    """Returns name of selected block explorer,
1047
    or None if a custom one (not among hardcoded ones) is configured.
1048
    """
1049
    if config.BLOCK_EXPLORER_CUSTOM is not None:
×
1050
        return None
×
1051
    be_key = config.BLOCK_EXPLORER
×
1052
    be_tuple = block_explorer_info().get(be_key)
×
1053
    if be_tuple is None:
×
1054
        be_key = config.cv.BLOCK_EXPLORER.get_default_value()
×
1055
    assert isinstance(be_key, str), f"{be_key!r} should be str"
×
1056
    return be_key
×
1057

1058

1059
def block_explorer_tuple(config: 'SimpleConfig') -> Optional[Tuple[str, dict]]:
5✔
1060
    custom_be = config.BLOCK_EXPLORER_CUSTOM
×
1061
    if custom_be:
×
1062
        if isinstance(custom_be, str):
×
1063
            return custom_be, _block_explorer_default_api_loc
×
1064
        if isinstance(custom_be, (tuple, list)) and len(custom_be) == 2:
×
1065
            return tuple(custom_be)
×
1066
        _logger.warning(f"not using {config.cv.BLOCK_EXPLORER_CUSTOM.key()!r} from config. "
×
1067
                        f"expected a str or a pair but got {custom_be!r}")
1068
        return None
×
1069
    else:
1070
        # using one of the hardcoded block explorers
1071
        return block_explorer_info().get(block_explorer(config))
×
1072

1073

1074
def block_explorer_URL(config: 'SimpleConfig', kind: str, item: str) -> Optional[str]:
5✔
1075
    be_tuple = block_explorer_tuple(config)
×
1076
    if not be_tuple:
×
1077
        return
×
1078
    explorer_url, explorer_dict = be_tuple
×
1079
    kind_str = explorer_dict.get(kind)
×
1080
    if kind_str is None:
×
1081
        return
×
1082
    if explorer_url[-1] != "/":
×
1083
        explorer_url += "/"
×
1084
    url_parts = [explorer_url, kind_str, item]
×
1085
    return ''.join(url_parts)
×
1086

1087

1088
# Python bug (http://bugs.python.org/issue1927) causes raw_input
1089
# to be redirected improperly between stdin/stderr on Unix systems
1090
#TODO: py3
1091
def raw_input(prompt=None):
5✔
1092
    if prompt:
×
1093
        sys.stdout.write(prompt)
×
1094
    return builtin_raw_input()
×
1095

1096

1097
builtin_raw_input = builtins.input
5✔
1098
builtins.input = raw_input
5✔
1099

1100

1101
def parse_json(message):
5✔
1102
    # TODO: check \r\n pattern
1103
    n = message.find(b'\n')
×
1104
    if n == -1:
×
1105
        return None, message
×
1106
    try:
×
1107
        j = json.loads(message[0:n].decode('utf8'))
×
1108
    except Exception:
×
1109
        j = None
×
1110
    return j, message[n+1:]
×
1111

1112

1113
def setup_thread_excepthook():
5✔
1114
    """
1115
    Workaround for `sys.excepthook` thread bug from:
1116
    http://bugs.python.org/issue1230540
1117

1118
    Call once from the main thread before creating any threads.
1119
    """
1120

1121
    init_original = threading.Thread.__init__
×
1122

1123
    def init(self, *args, **kwargs):
×
1124

1125
        init_original(self, *args, **kwargs)
×
1126
        run_original = self.run
×
1127

1128
        def run_with_except_hook(*args2, **kwargs2):
×
1129
            try:
×
1130
                run_original(*args2, **kwargs2)
×
1131
            except Exception:
×
1132
                sys.excepthook(*sys.exc_info())
×
1133

1134
        self.run = run_with_except_hook
×
1135

1136
    threading.Thread.__init__ = init
×
1137

1138

1139
def send_exception_to_crash_reporter(e: BaseException):
5✔
1140
    from .base_crash_reporter import send_exception_to_crash_reporter
×
1141
    send_exception_to_crash_reporter(e)
×
1142

1143

1144
def versiontuple(v):
5✔
1145
    return tuple(map(int, (v.split("."))))
5✔
1146

1147

1148
def read_json_file(path):
5✔
1149
    try:
5✔
1150
        with open(path, 'r', encoding='utf-8') as f:
5✔
1151
            data = json.loads(f.read())
5✔
1152
    except json.JSONDecodeError:
×
1153
        _logger.exception('')
×
1154
        raise FileImportFailed(_("Invalid JSON code."))
×
1155
    except BaseException as e:
×
1156
        _logger.exception('')
×
1157
        raise FileImportFailed(e)
×
1158
    return data
5✔
1159

1160

1161
def write_json_file(path, data):
5✔
1162
    try:
×
1163
        with open(path, 'w+', encoding='utf-8') as f:
×
1164
            json.dump(data, f, indent=4, sort_keys=True, cls=MyEncoder)
×
1165
    except (IOError, os.error) as e:
×
1166
        _logger.exception('')
×
1167
        raise FileExportFailed(e)
×
1168

1169

1170
def os_chmod(path, mode):
5✔
1171
    """os.chmod aware of tmpfs"""
1172
    try:
5✔
1173
        os.chmod(path, mode)
5✔
1174
    except OSError as e:
×
1175
        xdg_runtime_dir = os.environ.get("XDG_RUNTIME_DIR", None)
×
1176
        if xdg_runtime_dir and is_subpath(path, xdg_runtime_dir):
×
1177
            _logger.info(f"Tried to chmod in tmpfs. Skipping... {e!r}")
×
1178
        else:
1179
            raise
×
1180

1181

1182
def make_dir(path, *, allow_symlink=True):
5✔
1183
    """Makes directory if it does not yet exist.
1184
    Also sets sane 0700 permissions on the dir.
1185
    """
1186
    if not os.path.exists(path):
5✔
1187
        if not allow_symlink and os.path.islink(path):
5✔
1188
            raise Exception('Dangling link: ' + path)
×
1189
        try:
5✔
1190
            os.mkdir(path)
5✔
1191
        except FileExistsError:
×
1192
            # this can happen in a multiprocess race, e.g. when an electrum daemon
1193
            # and an electrum cli command are launched in rapid fire
1194
            pass
×
1195
        os_chmod(path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
5✔
1196
        assert os.path.exists(path)
5✔
1197

1198

1199
def is_subpath(long_path: str, short_path: str) -> bool:
5✔
1200
    """Returns whether long_path is a sub-path of short_path."""
1201
    try:
5✔
1202
        common = os.path.commonpath([long_path, short_path])
5✔
1203
    except ValueError:
5✔
1204
        return False
5✔
1205
    short_path = standardize_path(short_path)
5✔
1206
    common     = standardize_path(common)
5✔
1207
    return short_path == common
5✔
1208

1209

1210
def log_exceptions(func):
5✔
1211
    """Decorator to log AND re-raise exceptions."""
1212
    assert asyncio.iscoroutinefunction(func), 'func needs to be a coroutine'
5✔
1213

1214
    @functools.wraps(func)
5✔
1215
    async def wrapper(*args, **kwargs):
5✔
1216
        self = args[0] if len(args) > 0 else None
5✔
1217
        try:
5✔
1218
            return await func(*args, **kwargs)
5✔
1219
        except asyncio.CancelledError as e:
5✔
1220
            raise
5✔
1221
        except BaseException as e:
5✔
1222
            mylogger = self.logger if hasattr(self, 'logger') else _logger
5✔
1223
            try:
5✔
1224
                mylogger.exception(f"Exception in {func.__name__}: {repr(e)}")
5✔
1225
            except BaseException as e2:
×
1226
                print(f"logging exception raised: {repr(e2)}... orig exc: {repr(e)} in {func.__name__}")
×
1227
            raise
5✔
1228
    return wrapper
5✔
1229

1230

1231
def ignore_exceptions(func):
5✔
1232
    """Decorator to silently swallow all exceptions."""
1233
    assert asyncio.iscoroutinefunction(func), 'func needs to be a coroutine'
5✔
1234

1235
    @functools.wraps(func)
5✔
1236
    async def wrapper(*args, **kwargs):
5✔
1237
        try:
×
1238
            return await func(*args, **kwargs)
×
1239
        except Exception as e:
×
1240
            pass
×
1241
    return wrapper
5✔
1242

1243

1244
def with_lock(func):
5✔
1245
    """Decorator to enforce a lock on a function call."""
1246
    @functools.wraps(func)
5✔
1247
    def func_wrapper(self, *args, **kwargs):
5✔
1248
        with self.lock:
5✔
1249
            return func(self, *args, **kwargs)
5✔
1250
    return func_wrapper
5✔
1251

1252

1253
class TxMinedInfo(NamedTuple):
5✔
1254
    height: int                        # height of block that mined tx
5✔
1255
    conf: Optional[int] = None         # number of confirmations, SPV verified. >=0, or None (None means unknown)
5✔
1256
    timestamp: Optional[int] = None    # timestamp of block that mined tx
5✔
1257
    txpos: Optional[int] = None        # position of tx in serialized block
5✔
1258
    header_hash: Optional[str] = None  # hash of block that mined tx
5✔
1259
    wanted_height: Optional[int] = None  # in case of timelock, min abs block height
5✔
1260

1261
    def short_id(self) -> Optional[str]:
5✔
1262
        if self.txpos is not None and self.txpos >= 0:
×
1263
            assert self.height > 0
×
1264
            return f"{self.height}x{self.txpos}"
×
1265
        return None
×
1266

1267
    def is_local_like(self) -> bool:
5✔
1268
        """Returns whether the tx is local-like (LOCAL/FUTURE)."""
1269
        from .address_synchronizer import TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT
×
1270
        if self.height > 0:
×
1271
            return False
×
1272
        if self.height in (TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT):
×
1273
            return False
×
1274
        return True
×
1275

1276

1277
class ShortID(bytes):
5✔
1278

1279
    def __repr__(self):
5✔
1280
        return f"<ShortID: {format_short_id(self)}>"
5✔
1281

1282
    def __str__(self):
5✔
1283
        return format_short_id(self)
5✔
1284

1285
    @classmethod
5✔
1286
    def from_components(cls, block_height: int, tx_pos_in_block: int, output_index: int) -> 'ShortID':
5✔
1287
        bh = block_height.to_bytes(3, byteorder='big')
5✔
1288
        tpos = tx_pos_in_block.to_bytes(3, byteorder='big')
5✔
1289
        oi = output_index.to_bytes(2, byteorder='big')
5✔
1290
        return ShortID(bh + tpos + oi)
5✔
1291

1292
    @classmethod
5✔
1293
    def from_str(cls, scid: str) -> 'ShortID':
5✔
1294
        """Parses a formatted scid str, e.g. '643920x356x0'."""
1295
        components = scid.split("x")
5✔
1296
        if len(components) != 3:
5✔
1297
            raise ValueError(f"failed to parse ShortID: {scid!r}")
×
1298
        try:
5✔
1299
            components = [int(x) for x in components]
5✔
1300
        except ValueError:
×
1301
            raise ValueError(f"failed to parse ShortID: {scid!r}") from None
×
1302
        return ShortID.from_components(*components)
5✔
1303

1304
    @classmethod
5✔
1305
    def normalize(cls, data: Union[None, str, bytes, 'ShortID']) -> Optional['ShortID']:
5✔
1306
        if isinstance(data, ShortID) or data is None:
5✔
1307
            return data
5✔
1308
        if isinstance(data, str):
5✔
1309
            assert len(data) == 16
5✔
1310
            return ShortID.fromhex(data)
5✔
1311
        if isinstance(data, (bytes, bytearray)):
5✔
1312
            assert len(data) == 8
5✔
1313
            return ShortID(data)
5✔
1314

1315
    @property
5✔
1316
    def block_height(self) -> int:
5✔
1317
        return int.from_bytes(self[:3], byteorder='big')
5✔
1318

1319
    @property
5✔
1320
    def txpos(self) -> int:
5✔
1321
        return int.from_bytes(self[3:6], byteorder='big')
5✔
1322

1323
    @property
5✔
1324
    def output_index(self) -> int:
5✔
1325
        return int.from_bytes(self[6:8], byteorder='big')
5✔
1326

1327

1328
def format_short_id(short_channel_id: Optional[bytes]):
5✔
1329
    if not short_channel_id:
5✔
1330
        return _('Not yet available')
×
1331
    return str(int.from_bytes(short_channel_id[:3], 'big')) \
5✔
1332
        + 'x' + str(int.from_bytes(short_channel_id[3:6], 'big')) \
1333
        + 'x' + str(int.from_bytes(short_channel_id[6:], 'big'))
1334

1335

1336
def make_aiohttp_proxy_connector(proxy: 'ProxySettings', ssl_context: Optional[ssl.SSLContext] = None) -> ProxyConnector:
5✔
1337
    return ProxyConnector(
×
1338
        proxy_type=ProxyType.SOCKS5 if proxy.mode == 'socks5' else ProxyType.SOCKS4,
1339
        host=proxy.host,
1340
        port=int(proxy.port),
1341
        username=proxy.user,
1342
        password=proxy.password,
1343
        rdns=True,  # needed to prevent DNS leaks over proxy
1344
        ssl=ssl_context,
1345
    )
1346

1347

1348
def make_aiohttp_session(proxy: Optional['ProxySettings'], headers=None, timeout=None):
5✔
1349
    if headers is None:
×
1350
        headers = {'User-Agent': 'Electrum'}
×
1351
    if timeout is None:
×
1352
        # The default timeout is high intentionally.
1353
        # DNS on some systems can be really slow, see e.g. #5337
1354
        timeout = aiohttp.ClientTimeout(total=45)
×
1355
    elif isinstance(timeout, (int, float)):
×
1356
        timeout = aiohttp.ClientTimeout(total=timeout)
×
1357
    ssl_context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_path)
×
1358

1359
    if proxy and proxy.enabled:
×
1360
        connector = make_aiohttp_proxy_connector(proxy, ssl_context)
×
1361
    else:
1362
        connector = aiohttp.TCPConnector(ssl=ssl_context)
×
1363

1364
    return aiohttp.ClientSession(headers=headers, timeout=timeout, connector=connector)
×
1365

1366

1367
class OldTaskGroup(aiorpcx.TaskGroup):
5✔
1368
    """Automatically raises exceptions on join; as in aiorpcx prior to version 0.20.
1369
    That is, when using TaskGroup as a context manager, if any task encounters an exception,
1370
    we would like that exception to be re-raised (propagated out). For the wait=all case,
1371
    the OldTaskGroup class is emulating the following code-snippet:
1372
    ```
1373
    async with TaskGroup() as group:
1374
        await group.spawn(task1())
1375
        await group.spawn(task2())
1376

1377
        async for task in group:
1378
            if not task.cancelled():
1379
                task.result()
1380
    ```
1381
    So instead of the above, one can just write:
1382
    ```
1383
    async with OldTaskGroup() as group:
1384
        await group.spawn(task1())
1385
        await group.spawn(task2())
1386
    ```
1387
    # TODO see if we can migrate to asyncio.timeout, introduced in python 3.11, and use stdlib instead of aiorpcx.curio...
1388
    """
1389
    async def join(self):
5✔
1390
        if self._wait is all:
5✔
1391
            exc = False
5✔
1392
            try:
5✔
1393
                async for task in self:
5✔
1394
                    if not task.cancelled():
5✔
1395
                        task.result()
5✔
1396
            except BaseException:  # including asyncio.CancelledError
5✔
1397
                exc = True
5✔
1398
                raise
5✔
1399
            finally:
1400
                if exc:
5✔
1401
                    await self.cancel_remaining()
5✔
1402
                await super().join()
5✔
1403
        else:
1404
            await super().join()
5✔
1405
            if self.completed:
5✔
1406
                self.completed.result()
5✔
1407

1408

1409
# We monkey-patch aiorpcx TimeoutAfter (used by timeout_after and ignore_after API),
1410
# to fix a timing issue present in asyncio as a whole re timing out tasks.
1411
# To see the issue we are trying to fix, consider example:
1412
#     async def outer_task():
1413
#         async with timeout_after(0.1):
1414
#             await inner_task()
1415
# When the 0.1 sec timeout expires, inner_task will get cancelled by timeout_after (=internal cancellation).
1416
# If around the same time (in terms of event loop iterations) another coroutine
1417
# cancels outer_task (=external cancellation), there will be a race.
1418
# Both cancellations work by propagating a CancelledError out to timeout_after, which then
1419
# needs to decide (in TimeoutAfter.__aexit__) whether it's due to an internal or external cancellation.
1420
# AFAICT asyncio provides no reliable way of distinguishing between the two.
1421
# This patch tries to always give priority to external cancellations.
1422
# see https://github.com/kyuupichan/aiorpcX/issues/44
1423
# see https://github.com/aio-libs/async-timeout/issues/229
1424
# see https://bugs.python.org/issue42130 and https://bugs.python.org/issue45098
1425
# TODO see if we can migrate to asyncio.timeout, introduced in python 3.11, and use stdlib instead of aiorpcx.curio...
1426
def _aiorpcx_monkeypatched_set_new_deadline(task, deadline):
5✔
1427
    def timeout_task():
5✔
1428
        task._orig_cancel()
5✔
1429
        task._timed_out = None if getattr(task, "_externally_cancelled", False) else deadline
5✔
1430

1431
    def mycancel(*args, **kwargs):
5✔
1432
        task._orig_cancel(*args, **kwargs)
5✔
1433
        task._externally_cancelled = True
5✔
1434
        task._timed_out = None
5✔
1435

1436
    if not hasattr(task, "_orig_cancel"):
5✔
1437
        task._orig_cancel = task.cancel
5✔
1438
        task.cancel = mycancel
5✔
1439
    task._deadline_handle = task._loop.call_at(deadline, timeout_task)
5✔
1440

1441

1442
def _aiorpcx_monkeypatched_set_task_deadline(task, deadline):
5✔
1443
    ret = _aiorpcx_orig_set_task_deadline(task, deadline)
5✔
1444
    task._externally_cancelled = None
5✔
1445
    return ret
5✔
1446

1447

1448
def _aiorpcx_monkeypatched_unset_task_deadline(task):
5✔
1449
    if hasattr(task, "_orig_cancel"):
5✔
1450
        task.cancel = task._orig_cancel
5✔
1451
        del task._orig_cancel
5✔
1452
    return _aiorpcx_orig_unset_task_deadline(task)
5✔
1453

1454

1455
_aiorpcx_orig_set_task_deadline    = aiorpcx.curio._set_task_deadline
5✔
1456
_aiorpcx_orig_unset_task_deadline  = aiorpcx.curio._unset_task_deadline
5✔
1457

1458
aiorpcx.curio._set_new_deadline    = _aiorpcx_monkeypatched_set_new_deadline
5✔
1459
aiorpcx.curio._set_task_deadline   = _aiorpcx_monkeypatched_set_task_deadline
5✔
1460
aiorpcx.curio._unset_task_deadline = _aiorpcx_monkeypatched_unset_task_deadline
5✔
1461

1462

1463
async def wait_for2(fut: Awaitable, timeout: Union[int, float, None]):
5✔
1464
    """Replacement for asyncio.wait_for,
1465
     due to bugs: https://bugs.python.org/issue42130 and https://github.com/python/cpython/issues/86296 ,
1466
     which are only fixed in python 3.12+.
1467
     """
1468
    if sys.version_info[:3] >= (3, 12):
5✔
1469
        return await asyncio.wait_for(fut, timeout)
3✔
1470
    else:
1471
        async with async_timeout(timeout):
2✔
1472
            return await asyncio.ensure_future(fut, loop=get_running_loop())
2✔
1473

1474

1475
if hasattr(asyncio, 'timeout'):  # python 3.11+
5✔
1476
    async_timeout = asyncio.timeout
4✔
1477
else:
1478
    class TimeoutAfterAsynciolike(aiorpcx.curio.TimeoutAfter):
1✔
1479
        async def __aexit__(self, exc_type, exc_value, tb):
1✔
1480
            try:
1✔
1481
                await super().__aexit__(exc_type, exc_value, tb)
1✔
1482
            except (aiorpcx.TaskTimeout, aiorpcx.UncaughtTimeoutError):
×
1483
                raise asyncio.TimeoutError from None
×
1484
            except aiorpcx.TimeoutCancellationError:
×
1485
                raise asyncio.CancelledError from None
×
1486

1487
    def async_timeout(delay: Union[int, float, None]):
1✔
1488
        if delay is None:
1✔
1489
            return nullcontext()
×
1490
        return TimeoutAfterAsynciolike(delay)
1✔
1491

1492

1493
class NetworkJobOnDefaultServer(Logger, ABC):
5✔
1494
    """An abstract base class for a job that runs on the main network
1495
    interface. Every time the main interface changes, the job is
1496
    restarted, and some of its internals are reset.
1497
    """
1498
    def __init__(self, network: 'Network'):
5✔
1499
        Logger.__init__(self)
5✔
1500
        self.network = network
5✔
1501
        self.interface = None  # type: Interface
5✔
1502
        self._restart_lock = asyncio.Lock()
5✔
1503
        # Ensure fairness between NetworkJobs. e.g. if multiple wallets
1504
        # are open, a large wallet's Synchronizer should not starve the small wallets:
1505
        self._network_request_semaphore = asyncio.Semaphore(100)
5✔
1506

1507
        self._reset()
5✔
1508
        # every time the main interface changes, restart:
1509
        register_callback(self._restart, ['default_server_changed'])
5✔
1510
        # also schedule a one-off restart now, as there might already be a main interface:
1511
        asyncio.run_coroutine_threadsafe(self._restart(), network.asyncio_loop)
5✔
1512

1513
    def _reset(self):
5✔
1514
        """Initialise fields. Called every time the underlying
1515
        server connection changes.
1516
        """
1517
        self.taskgroup = OldTaskGroup()
5✔
1518
        self.reset_request_counters()
5✔
1519

1520
    async def _start(self, interface: 'Interface'):
5✔
1521
        self.logger.debug(f"starting. interface.server={repr(str(interface.server))}")
×
1522
        self.interface = interface
×
1523

1524
        taskgroup = self.taskgroup
×
1525

1526
        async def run_tasks_wrapper():
×
1527
            self.logger.debug(f"starting taskgroup ({hex(id(taskgroup))}).")
×
1528
            try:
×
1529
                await self._run_tasks(taskgroup=taskgroup)
×
1530
            except Exception as e:
×
1531
                self.logger.error(f"taskgroup died ({hex(id(taskgroup))}). exc={e!r}")
×
1532
                raise
×
1533
            finally:
1534
                self.logger.debug(f"taskgroup stopped ({hex(id(taskgroup))}).")
×
1535
        await interface.taskgroup.spawn(run_tasks_wrapper)
×
1536

1537
    @abstractmethod
5✔
1538
    async def _run_tasks(self, *, taskgroup: OldTaskGroup) -> None:
5✔
1539
        """Start tasks in taskgroup. Called every time the underlying
1540
        server connection changes.
1541
        """
1542
        # If self.taskgroup changed, don't start tasks. This can happen if we have
1543
        # been restarted *just now*, i.e. after the _run_tasks coroutine object was created.
1544
        if taskgroup != self.taskgroup:
×
1545
            raise asyncio.CancelledError()
×
1546

1547
    async def stop(self, *, full_shutdown: bool = True):
5✔
1548
        self.logger.debug(f"stopping. {full_shutdown=}")
5✔
1549
        if full_shutdown:
5✔
1550
            unregister_callback(self._restart)
5✔
1551
        await self.taskgroup.cancel_remaining()
5✔
1552

1553
    @log_exceptions
5✔
1554
    async def _restart(self, *args):
5✔
1555
        interface = self.network.interface
5✔
1556
        if interface is None:
5✔
1557
            return  # we should get called again soon
5✔
1558

1559
        async with self._restart_lock:
×
1560
            await self.stop(full_shutdown=False)
×
1561
            self._reset()
×
1562
            await self._start(interface)
×
1563

1564
    def reset_request_counters(self):
5✔
1565
        self._requests_sent = 0
5✔
1566
        self._requests_answered = 0
5✔
1567

1568
    def num_requests_sent_and_answered(self) -> Tuple[int, int]:
5✔
1569
        return self._requests_sent, self._requests_answered
×
1570

1571
    @property
5✔
1572
    def session(self):
5✔
1573
        s = self.interface.session
×
1574
        assert s is not None
×
1575
        return s
×
1576

1577

1578
async def detect_tor_socks_proxy() -> Optional[Tuple[str, int]]:
5✔
1579
    # Probable ports for Tor to listen at
1580
    candidates = [
×
1581
        ("127.0.0.1", 9050),
1582
        ("127.0.0.1", 9051),
1583
        ("127.0.0.1", 9150),
1584
    ]
1585

1586
    proxy_addr = None
×
1587

1588
    async def test_net_addr(net_addr):
×
1589
        is_tor = await is_tor_socks_port(*net_addr)
×
1590
        # set result, and cancel remaining probes
1591
        if is_tor:
×
1592
            nonlocal proxy_addr
1593
            proxy_addr = net_addr
×
1594
            await group.cancel_remaining()
×
1595

1596
    async with OldTaskGroup() as group:
×
1597
        for net_addr in candidates:
×
1598
            await group.spawn(test_net_addr(net_addr))
×
1599
    return proxy_addr
×
1600

1601

1602
@log_exceptions
5✔
1603
async def is_tor_socks_port(host: str, port: int) -> bool:
5✔
1604
    # mimic "tor-resolve 0.0.0.0".
1605
    # see https://github.com/spesmilo/electrum/issues/7317#issuecomment-1369281075
1606
    # > this is a socks5 handshake, followed by a socks RESOLVE request as defined in
1607
    # > [tor's socks extension spec](https://github.com/torproject/torspec/blob/7116c9cdaba248aae07a3f1d0e15d9dd102f62c5/socks-extensions.txt#L63),
1608
    # > resolving 0.0.0.0, which being an IP, tor resolves itself without needing to ask a relay.
1609
    writer = None
×
1610
    try:
×
1611
        async with async_timeout(10):
×
1612
            reader, writer = await asyncio.open_connection(host, port)
×
1613
            writer.write(b'\x05\x01\x00\x05\xf0\x00\x03\x070.0.0.0\x00\x00')
×
1614
            await writer.drain()
×
1615
            data = await reader.read(1024)
×
1616
            if data == b'\x05\x00\x05\x00\x00\x01\x00\x00\x00\x00\x00\x00':
×
1617
                return True
×
1618
            return False
×
1619
    except (OSError, asyncio.TimeoutError):
×
1620
        return False
×
1621
    finally:
1622
        if writer:
×
1623
            writer.close()
×
1624

1625

1626
AS_LIB_USER_I_WANT_TO_MANAGE_MY_OWN_ASYNCIO_LOOP = False  # used by unit tests
5✔
1627

1628
_asyncio_event_loop = None  # type: Optional[asyncio.AbstractEventLoop]
5✔
1629

1630

1631
def get_asyncio_loop() -> asyncio.AbstractEventLoop:
5✔
1632
    """Returns the global asyncio event loop we use."""
1633
    if loop := _asyncio_event_loop:
5✔
1634
        return loop
5✔
1635
    if AS_LIB_USER_I_WANT_TO_MANAGE_MY_OWN_ASYNCIO_LOOP:
5✔
1636
        if loop := get_running_loop():
5✔
1637
            return loop
5✔
1638
    raise Exception("event loop not created yet")
×
1639

1640

1641
def create_and_start_event_loop() -> Tuple[asyncio.AbstractEventLoop,
5✔
1642
                                           asyncio.Future,
1643
                                           threading.Thread]:
1644
    global _asyncio_event_loop
1645
    if _asyncio_event_loop is not None:
×
1646
        raise Exception("there is already a running event loop")
×
1647

1648
    # asyncio.get_event_loop() became deprecated in python3.10. (see https://github.com/python/cpython/issues/83710)
1649
    # We set a custom event loop policy purely to be compatible with code that
1650
    # relies on asyncio.get_event_loop().
1651
    # - in python 3.8-3.9, asyncio.Event.__init__, asyncio.Lock.__init__,
1652
    #   and similar, calls get_event_loop. see https://github.com/python/cpython/pull/23420
1653
    class MyEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
×
1654
        def get_event_loop(self):
×
1655
            # In case electrum is being used as a library, there might be other
1656
            # event loops in use besides ours. To minimise interfering with those,
1657
            # if there is a loop running in the current thread, return that:
1658
            running_loop = get_running_loop()
×
1659
            if running_loop is not None:
×
1660
                return running_loop
×
1661
            # Otherwise, return our global loop:
1662
            return get_asyncio_loop()
×
1663
    asyncio.set_event_loop_policy(MyEventLoopPolicy())
×
1664

1665
    loop = asyncio.new_event_loop()
×
1666
    _asyncio_event_loop = loop
×
1667

1668
    def on_exception(loop, context):
×
1669
        """Suppress spurious messages it appears we cannot control."""
1670
        SUPPRESS_MESSAGE_REGEX = re.compile('SSL handshake|Fatal read error on|'
×
1671
                                            'SSL error in data received')
1672
        message = context.get('message')
×
1673
        if message and SUPPRESS_MESSAGE_REGEX.match(message):
×
1674
            return
×
1675
        loop.default_exception_handler(context)
×
1676

1677
    def run_event_loop():
×
1678
        try:
×
1679
            loop.run_until_complete(stopping_fut)
×
1680
        finally:
1681
            # clean-up
1682
            global _asyncio_event_loop
1683
            _asyncio_event_loop = None
×
1684

1685
    loop.set_exception_handler(on_exception)
×
1686
    _set_custom_task_factory(loop)
×
1687
    # loop.set_debug(True)
1688
    stopping_fut = loop.create_future()
×
1689
    loop_thread = threading.Thread(
×
1690
        target=run_event_loop,
1691
        name='EventLoop',
1692
    )
1693
    loop_thread.start()
×
1694
    # Wait until the loop actually starts.
1695
    # On a slow PC, or with a debugger attached, this can take a few dozens of ms,
1696
    # and if we returned without a running loop, weird things can happen...
1697
    t0 = time.monotonic()
×
1698
    while not loop.is_running():
×
1699
        time.sleep(0.01)
×
1700
        if time.monotonic() - t0 > 5:
×
1701
            raise Exception("been waiting for 5 seconds but asyncio loop would not start!")
×
1702
    return loop, stopping_fut, loop_thread
×
1703

1704

1705
_running_asyncio_tasks = set()  # type: Set[asyncio.Future]
5✔
1706

1707

1708
def _set_custom_task_factory(loop: asyncio.AbstractEventLoop):
5✔
1709
    """Wrap task creation to track pending and running tasks.
1710
    When tasks are created, asyncio only maintains a weak reference to them.
1711
    Hence, the garbage collector might destroy the task mid-execution.
1712
    To avoid this, we store a strong reference for the task until it completes.
1713

1714
    Without this, a lot of APIs are basically Heisenbug-generators... e.g.:
1715
    - "asyncio.create_task"
1716
    - "loop.create_task"
1717
    - "asyncio.ensure_future"
1718
    - "asyncio.run_coroutine_threadsafe"
1719

1720
    related:
1721
        - https://bugs.python.org/issue44665
1722
        - https://github.com/python/cpython/issues/88831
1723
        - https://github.com/python/cpython/issues/91887
1724
        - https://textual.textualize.io/blog/2023/02/11/the-heisenbug-lurking-in-your-async-code/
1725
        - https://github.com/python/cpython/issues/91887#issuecomment-1434816045
1726
        - "Task was destroyed but it is pending!"
1727
    """
1728

1729
    platform_task_factory = loop.get_task_factory()
5✔
1730

1731
    def factory(loop_, coro, **kwargs):
5✔
1732
        if platform_task_factory is not None:
5✔
1733
            task = platform_task_factory(loop_, coro, **kwargs)
×
1734
        else:
1735
            task = asyncio.Task(coro, loop=loop_, **kwargs)
5✔
1736
        _running_asyncio_tasks.add(task)
5✔
1737
        task.add_done_callback(_running_asyncio_tasks.discard)
5✔
1738
        return task
5✔
1739

1740
    loop.set_task_factory(factory)
5✔
1741

1742

1743
def run_sync_function_on_asyncio_thread(func: Callable, *, block: bool) -> None:
5✔
1744
    """Run a non-async fn on the asyncio thread. Can be called from any thread.
1745

1746
    If the current thread is already the asyncio thread, func is guaranteed
1747
    to have been completed when this method returns.
1748

1749
    For any other thread, we only wait for completion if `block` is True.
1750
    """
1751
    assert not asyncio.iscoroutinefunction(func), "func must be a non-async function"
5✔
1752
    asyncio_loop = get_asyncio_loop()
5✔
1753
    if get_running_loop() == asyncio_loop:  # we are running on the asyncio thread
5✔
1754
        func()
5✔
1755
    else:  # non-asyncio thread
1756
        async def wrapper():
×
1757
            return func()
×
1758
        fut = asyncio.run_coroutine_threadsafe(wrapper(), loop=asyncio_loop)
×
1759
        if block:
×
1760
            fut.result()
×
1761
        else:
1762
            # add explicit logging of exceptions, otherwise they might get lost
1763
            tb1 = traceback.format_stack()[:-1]
×
1764
            tb1_str = "".join(tb1)
×
1765

1766
            def on_done(fut_: concurrent.futures.Future):
×
1767
                assert fut_.done()
×
1768
                if fut_.cancelled():
×
1769
                    _logger.debug(f"func cancelled. {func=}.")
×
1770
                elif exc := fut_.exception():
×
1771
                    # note: We explicitly log the first part of the traceback, tb1_str.
1772
                    #       The second part gets logged by setting "exc_info".
1773
                    _logger.error(
×
1774
                        f"func errored. {func=}. {exc=}"
1775
                        f"\n{tb1_str}", exc_info=exc)
1776
            fut.add_done_callback(on_done)
×
1777

1778

1779
class OrderedDictWithIndex(OrderedDict):
5✔
1780
    """An OrderedDict that keeps track of the positions of keys.
1781

1782
    Note: very inefficient to modify contents, except to add new items.
1783
    """
1784

1785
    def __init__(self):
5✔
1786
        super().__init__()
×
1787
        self._key_to_pos = {}
×
1788
        self._pos_to_key = {}
×
1789

1790
    def _recalc_index(self):
5✔
1791
        self._key_to_pos = {key: pos for (pos, key) in enumerate(self.keys())}
×
1792
        self._pos_to_key = {pos: key for (pos, key) in enumerate(self.keys())}
×
1793

1794
    def pos_from_key(self, key):
5✔
1795
        return self._key_to_pos[key]
×
1796

1797
    def value_from_pos(self, pos):
5✔
1798
        key = self._pos_to_key[pos]
×
1799
        return self[key]
×
1800

1801
    def popitem(self, *args, **kwargs):
5✔
1802
        ret = super().popitem(*args, **kwargs)
×
1803
        self._recalc_index()
×
1804
        return ret
×
1805

1806
    def move_to_end(self, *args, **kwargs):
5✔
1807
        ret = super().move_to_end(*args, **kwargs)
×
1808
        self._recalc_index()
×
1809
        return ret
×
1810

1811
    def clear(self):
5✔
1812
        ret = super().clear()
×
1813
        self._recalc_index()
×
1814
        return ret
×
1815

1816
    def pop(self, *args, **kwargs):
5✔
1817
        ret = super().pop(*args, **kwargs)
×
1818
        self._recalc_index()
×
1819
        return ret
×
1820

1821
    def update(self, *args, **kwargs):
5✔
1822
        ret = super().update(*args, **kwargs)
×
1823
        self._recalc_index()
×
1824
        return ret
×
1825

1826
    def __delitem__(self, *args, **kwargs):
5✔
1827
        ret = super().__delitem__(*args, **kwargs)
×
1828
        self._recalc_index()
×
1829
        return ret
×
1830

1831
    def __setitem__(self, key, *args, **kwargs):
5✔
1832
        is_new_key = key not in self
×
1833
        ret = super().__setitem__(key, *args, **kwargs)
×
1834
        if is_new_key:
×
1835
            pos = len(self) - 1
×
1836
            self._key_to_pos[key] = pos
×
1837
            self._pos_to_key[pos] = key
×
1838
        return ret
×
1839

1840

1841
def multisig_type(wallet_type):
5✔
1842
    """If wallet_type is mofn multi-sig, return [m, n],
1843
    otherwise return None."""
1844
    if not wallet_type:
5✔
1845
        return None
×
1846
    match = re.match(r'(\d+)of(\d+)', wallet_type)
5✔
1847
    if match:
5✔
1848
        match = [int(x) for x in match.group(1, 2)]
5✔
1849
    return match
5✔
1850

1851

1852
def is_ip_address(x: Union[str, bytes]) -> bool:
5✔
1853
    if isinstance(x, bytes):
5✔
1854
        x = x.decode("utf-8")
×
1855
    try:
5✔
1856
        ipaddress.ip_address(x)
5✔
1857
        return True
5✔
1858
    except ValueError:
5✔
1859
        return False
5✔
1860

1861

1862
def is_localhost(host: str) -> bool:
5✔
1863
    if str(host) in ('localhost', 'localhost.',):
5✔
1864
        return True
5✔
1865
    if host[0] == '[' and host[-1] == ']':  # IPv6
5✔
1866
        host = host[1:-1]
5✔
1867
    try:
5✔
1868
        ip_addr = ipaddress.ip_address(host)  # type: Union[IPv4Address, IPv6Address]
5✔
1869
        return ip_addr.is_loopback
5✔
1870
    except ValueError:
5✔
1871
        pass  # not an IP
5✔
1872
    return False
5✔
1873

1874

1875
def is_private_netaddress(host: str) -> bool:
5✔
1876
    if is_localhost(host):
5✔
1877
        return True
5✔
1878
    if host[0] == '[' and host[-1] == ']':  # IPv6
5✔
1879
        host = host[1:-1]
5✔
1880
    try:
5✔
1881
        ip_addr = ipaddress.ip_address(host)  # type: Union[IPv4Address, IPv6Address]
5✔
1882
        return ip_addr.is_private
5✔
1883
    except ValueError:
5✔
1884
        pass  # not an IP
5✔
1885
    return False
5✔
1886

1887

1888
def list_enabled_bits(x: int) -> Sequence[int]:
5✔
1889
    """e.g. 77 (0b1001101) --> (0, 2, 3, 6)"""
1890
    binary = bin(x)[2:]
5✔
1891
    rev_bin = reversed(binary)
5✔
1892
    return tuple(i for i, b in enumerate(rev_bin) if b == '1')
5✔
1893

1894

1895
async def resolve_dns_srv(host: str):
5✔
1896
    # FIXME this method is not using the network proxy. (although the proxy might not support UDP?)
1897
    srv_records = await dns.asyncresolver.resolve(host, 'SRV')
×
1898
    # priority: prefer lower
1899
    # weight: tie breaker; prefer higher
1900
    srv_records = sorted(srv_records, key=lambda x: (x.priority, -x.weight))
×
1901

1902
    def dict_from_srv_record(srv):
×
1903
        return {
×
1904
            'host': str(srv.target),
1905
            'port': srv.port,
1906
        }
1907
    return [dict_from_srv_record(srv) for srv in srv_records]
×
1908

1909

1910
def randrange(bound: int) -> int:
5✔
1911
    """Return a random integer k such that 1 <= k < bound, uniformly
1912
    distributed across that range.
1913
    This is guaranteed to be cryptographically strong.
1914
    """
1915
    # secrets.randbelow(bound) returns a random int: 0 <= r < bound,
1916
    # hence transformations:
1917
    return secrets.randbelow(bound - 1) + 1
5✔
1918

1919

1920
class CallbackManager(Logger):
5✔
1921
    # callbacks set by the GUI or any thread
1922
    # guarantee: the callbacks will always get triggered from the asyncio thread.
1923

1924
    # FIXME: There should be a way to prevent circular callbacks.
1925
    # At the very least, we need a distinction between callbacks that
1926
    # are for the GUI and callbacks between wallet components
1927

1928
    def __init__(self):
5✔
1929
        Logger.__init__(self)
5✔
1930
        self.callback_lock = threading.Lock()
5✔
1931
        self.callbacks = defaultdict(list)      # note: needs self.callback_lock
5✔
1932

1933
    def register_callback(self, func, events):
5✔
1934
        with self.callback_lock:
5✔
1935
            for event in events:
5✔
1936
                self.callbacks[event].append(func)
5✔
1937

1938
    def unregister_callback(self, callback):
5✔
1939
        with self.callback_lock:
5✔
1940
            for callbacks in self.callbacks.values():
5✔
1941
                if callback in callbacks:
5✔
1942
                    callbacks.remove(callback)
5✔
1943

1944
    def trigger_callback(self, event, *args):
5✔
1945
        """Trigger a callback with given arguments.
1946
        Can be called from any thread. The callback itself will get scheduled
1947
        on the event loop.
1948
        """
1949
        loop = get_asyncio_loop()
5✔
1950
        assert loop.is_running(), "event loop not running"
5✔
1951
        with self.callback_lock:
5✔
1952
            callbacks = self.callbacks[event][:]
5✔
1953
        for callback in callbacks:
5✔
1954
            if asyncio.iscoroutinefunction(callback):  # async cb
5✔
1955
                fut = asyncio.run_coroutine_threadsafe(callback(*args), loop)
5✔
1956

1957
                def on_done(fut_: concurrent.futures.Future):
5✔
1958
                    assert fut_.done()
5✔
1959
                    if fut_.cancelled():
5✔
1960
                        self.logger.debug(f"cb cancelled. {event=}.")
5✔
1961
                    elif exc := fut_.exception():
5✔
1962
                        self.logger.error(f"cb errored. {event=}. {exc=}", exc_info=exc)
×
1963
                fut.add_done_callback(on_done)
5✔
1964
            else:  # non-async cb
1965
                run_sync_function_on_asyncio_thread(partial(callback, *args), block=False)
5✔
1966

1967

1968
callback_mgr = CallbackManager()
5✔
1969
trigger_callback = callback_mgr.trigger_callback
5✔
1970
register_callback = callback_mgr.register_callback
5✔
1971
unregister_callback = callback_mgr.unregister_callback
5✔
1972
_event_listeners = defaultdict(set)  # type: Dict[str, Set[str]]
5✔
1973

1974

1975
class EventListener:
5✔
1976
    """Use as a mixin for a class that has methods to be triggered on events.
1977
    - Methods that receive the callbacks should be named "on_event_*" and decorated with @event_listener.
1978
    - register_callbacks() should be called exactly once per instance of EventListener, e.g. in __init__
1979
    - unregister_callbacks() should be called at least once, e.g. when the instance is destroyed
1980
    """
1981

1982
    def _list_callbacks(self):
5✔
1983
        for c in self.__class__.__mro__:
5✔
1984
            classpath = f"{c.__module__}.{c.__name__}"
5✔
1985
            for method_name in _event_listeners[classpath]:
5✔
1986
                method = getattr(self, method_name)
5✔
1987
                assert callable(method)
5✔
1988
                assert method_name.startswith('on_event_')
5✔
1989
                yield method_name[len('on_event_'):], method
5✔
1990

1991
    def register_callbacks(self):
5✔
1992
        for name, method in self._list_callbacks():
5✔
1993
            #_logger.debug(f'registering callback {method}')
1994
            register_callback(method, [name])
5✔
1995

1996
    def unregister_callbacks(self):
5✔
1997
        for name, method in self._list_callbacks():
5✔
1998
            #_logger.debug(f'unregistering callback {method}')
1999
            unregister_callback(method)
5✔
2000

2001

2002
def event_listener(func):
5✔
2003
    """To be used in subclasses of EventListener only. (how to enforce this programmatically?)"""
2004
    classname, method_name = func.__qualname__.split('.')
5✔
2005
    assert method_name.startswith('on_event_')
5✔
2006
    classpath = f"{func.__module__}.{classname}"
5✔
2007
    _event_listeners[classpath].add(method_name)
5✔
2008
    return func
5✔
2009

2010

2011
_NetAddrType = TypeVar("_NetAddrType")
5✔
2012
# requirements for _NetAddrType:
2013
# - reasonable __hash__() implementation (e.g. based on host/port of remote endpoint)
2014

2015

2016
class NetworkRetryManager(Generic[_NetAddrType]):
5✔
2017
    """Truncated Exponential Backoff for network connections."""
2018

2019
    def __init__(
5✔
2020
            self, *,
2021
            max_retry_delay_normal: float,
2022
            init_retry_delay_normal: float,
2023
            max_retry_delay_urgent: float = None,
2024
            init_retry_delay_urgent: float = None,
2025
    ):
2026
        self._last_tried_addr = {}  # type: Dict[_NetAddrType, Tuple[float, int]]  # (unix ts, num_attempts)
5✔
2027

2028
        # note: these all use "seconds" as unit
2029
        if max_retry_delay_urgent is None:
5✔
2030
            max_retry_delay_urgent = max_retry_delay_normal
5✔
2031
        if init_retry_delay_urgent is None:
5✔
2032
            init_retry_delay_urgent = init_retry_delay_normal
5✔
2033
        self._max_retry_delay_normal = max_retry_delay_normal
5✔
2034
        self._init_retry_delay_normal = init_retry_delay_normal
5✔
2035
        self._max_retry_delay_urgent = max_retry_delay_urgent
5✔
2036
        self._init_retry_delay_urgent = init_retry_delay_urgent
5✔
2037

2038
    def _trying_addr_now(self, addr: _NetAddrType) -> None:
5✔
2039
        last_time, num_attempts = self._last_tried_addr.get(addr, (0, 0))
×
2040
        # we add up to 1 second of noise to the time, so that clients are less likely
2041
        # to get synchronised and bombard the remote in connection waves:
2042
        cur_time = time.time() + random.random()
×
2043
        self._last_tried_addr[addr] = cur_time, num_attempts + 1
×
2044

2045
    def _on_connection_successfully_established(self, addr: _NetAddrType) -> None:
5✔
2046
        self._last_tried_addr[addr] = time.time(), 0
×
2047

2048
    def _can_retry_addr(self, addr: _NetAddrType, *,
5✔
2049
                        now: float = None, urgent: bool = False) -> bool:
2050
        if now is None:
×
2051
            now = time.time()
×
2052
        last_time, num_attempts = self._last_tried_addr.get(addr, (0, 0))
×
2053
        if urgent:
×
2054
            max_delay = self._max_retry_delay_urgent
×
2055
            init_delay = self._init_retry_delay_urgent
×
2056
        else:
2057
            max_delay = self._max_retry_delay_normal
×
2058
            init_delay = self._init_retry_delay_normal
×
2059
        delay = self.__calc_delay(multiplier=init_delay, max_delay=max_delay, num_attempts=num_attempts)
×
2060
        next_time = last_time + delay
×
2061
        return next_time < now
×
2062

2063
    @classmethod
5✔
2064
    def __calc_delay(cls, *, multiplier: float, max_delay: float,
5✔
2065
                     num_attempts: int) -> float:
2066
        num_attempts = min(num_attempts, 100_000)
×
2067
        try:
×
2068
            res = multiplier * 2 ** num_attempts
×
2069
        except OverflowError:
×
2070
            return max_delay
×
2071
        return max(0, min(max_delay, res))
×
2072

2073
    def _clear_addr_retry_times(self) -> None:
5✔
2074
        self._last_tried_addr.clear()
5✔
2075

2076

2077
class ESocksProxy(aiorpcx.SOCKSProxy):
5✔
2078
    # note: proxy will not leak DNS as create_connection()
2079
    # sets (local DNS) resolve=False by default
2080

2081
    async def open_connection(self, host=None, port=None, **kwargs):
5✔
2082
        loop = asyncio.get_running_loop()
×
2083
        reader = asyncio.StreamReader(loop=loop)
×
2084
        protocol = asyncio.StreamReaderProtocol(reader, loop=loop)
×
2085
        transport, _ = await self.create_connection(
×
2086
            lambda: protocol, host, port, **kwargs)
2087
        writer = asyncio.StreamWriter(transport, protocol, reader, loop)
×
2088
        return reader, writer
×
2089

2090
    @classmethod
5✔
2091
    def from_network_settings(cls, network: Optional['Network']) -> Optional['ESocksProxy']:
5✔
2092
        if not network or not network.proxy or not network.proxy.enabled:
5✔
2093
            return None
5✔
2094
        proxy = network.proxy
×
2095
        username, pw = proxy.user, proxy.password
×
2096
        if not username or not pw:
×
2097
            # is_proxy_tor is tri-state; None indicates it is still probing the proxy to test for TOR
2098
            if network.is_proxy_tor:
×
2099
                auth = aiorpcx.socks.SOCKSRandomAuth()
×
2100
            else:
2101
                auth = None
×
2102
        else:
2103
            auth = aiorpcx.socks.SOCKSUserAuth(username, pw)
×
2104
        addr = aiorpcx.NetAddress(proxy.host, proxy.port)
×
2105
        if proxy.mode == "socks4":
×
2106
            ret = cls(addr, aiorpcx.socks.SOCKS4a, auth)
×
2107
        elif proxy.mode == "socks5":
×
2108
            ret = cls(addr, aiorpcx.socks.SOCKS5, auth)
×
2109
        else:
2110
            raise NotImplementedError  # http proxy not available with aiorpcx
×
2111
        return ret
×
2112

2113

2114
class JsonRPCError(Exception):
5✔
2115

2116
    class Codes(enum.IntEnum):
5✔
2117
        # application-specific error codes
2118
        USERFACING = 1
5✔
2119
        INTERNAL = 2
5✔
2120

2121
    def __init__(self, *, code: int, message: str, data: Optional[dict] = None):
5✔
2122
        Exception.__init__(self)
×
2123
        self.code = code
×
2124
        self.message = message
×
2125
        self.data = data
×
2126

2127

2128
class JsonRPCClient:
5✔
2129

2130
    def __init__(self, session: aiohttp.ClientSession, url: str):
5✔
2131
        self.session = session
×
2132
        self.url = url
×
2133
        self._id = 0
×
2134

2135
    async def request(self, endpoint, *args):
5✔
2136
        """Send request to server, parse and return result.
2137
        note: parsing code is naive, the server is assumed to be well-behaved.
2138
              Up to the caller to handle exceptions, including those arising from parsing errors.
2139
        """
2140
        self._id += 1
×
2141
        data = ('{"jsonrpc": "2.0", "id":"%d", "method": "%s", "params": %s }'
×
2142
                % (self._id, endpoint, json.dumps(args)))
2143
        async with self.session.post(self.url, data=data) as resp:
×
2144
            if resp.status == 200:
×
2145
                r = await resp.json()
×
2146
                result = r.get('result')
×
2147
                error = r.get('error')
×
2148
                if error:
×
2149
                    raise JsonRPCError(code=error["code"], message=error["message"], data=error.get("data"))
×
2150
                else:
2151
                    return result
×
2152
            else:
2153
                text = await resp.text()
×
2154
                return 'Error: ' + str(text)
×
2155

2156
    def add_method(self, endpoint):
5✔
2157
        async def coro(*args):
×
2158
            return await self.request(endpoint, *args)
×
2159
        setattr(self, endpoint, coro)
×
2160

2161

2162
T = TypeVar('T')
5✔
2163

2164

2165
def random_shuffled_copy(x: Iterable[T]) -> List[T]:
5✔
2166
    """Returns a shuffled copy of the input."""
2167
    x_copy = list(x)  # copy
5✔
2168
    random.shuffle(x_copy)  # shuffle in-place
5✔
2169
    return x_copy
5✔
2170

2171

2172
def test_read_write_permissions(path) -> None:
5✔
2173
    # note: There might already be a file at 'path'.
2174
    #       Make sure we do NOT overwrite/corrupt that!
2175
    temp_path = "%s.tmptest.%s" % (path, os.getpid())
5✔
2176
    echo = "fs r/w test"
5✔
2177
    try:
5✔
2178
        # test READ permissions for actual path
2179
        if os.path.exists(path):
5✔
2180
            with open(path, "rb") as f:
5✔
2181
                f.read(1)  # read 1 byte
5✔
2182
        # test R/W sanity for "similar" path
2183
        with open(temp_path, "w", encoding='utf-8') as f:
5✔
2184
            f.write(echo)
5✔
2185
        with open(temp_path, "r", encoding='utf-8') as f:
5✔
2186
            echo2 = f.read()
5✔
2187
        os.remove(temp_path)
5✔
2188
    except Exception as e:
×
2189
        raise IOError(e) from e
×
2190
    if echo != echo2:
5✔
2191
        raise IOError('echo sanity-check failed')
×
2192

2193

2194
class classproperty(property):
5✔
2195
    """~read-only class-level @property
2196
    from https://stackoverflow.com/a/13624858 by denis-ryzhkov
2197
    """
2198
    def __get__(self, owner_self, owner_cls):
5✔
2199
        return self.fget(owner_cls)
5✔
2200

2201

2202
def get_running_loop() -> Optional[asyncio.AbstractEventLoop]:
5✔
2203
    """Returns the asyncio event loop that is *running in this thread*, if any."""
2204
    try:
5✔
2205
        return asyncio.get_running_loop()
5✔
2206
    except RuntimeError:
×
2207
        return None
×
2208

2209

2210
def error_text_str_to_safe_str(err: str, *, max_len: Optional[int] = 500) -> str:
5✔
2211
    """Converts an untrusted error string to a sane printable ascii str.
2212
    Never raises.
2213
    """
2214
    text = error_text_bytes_to_safe_str(
5✔
2215
        err.encode("ascii", errors='backslashreplace'),
2216
        max_len=None)
2217
    return truncate_text(text, max_len=max_len)
5✔
2218

2219

2220
def error_text_bytes_to_safe_str(err: bytes, *, max_len: Optional[int] = 500) -> str:
5✔
2221
    """Converts an untrusted error bytes text to a sane printable ascii str.
2222
    Never raises.
2223

2224
    Note that naive ascii conversion would be insufficient. Fun stuff:
2225
    >>> b = b"my_long_prefix_blabla" + 21 * b"\x08" + b"malicious_stuff"
2226
    >>> s = b.decode("ascii")
2227
    >>> print(s)
2228
    malicious_stuffblabla
2229
    """
2230
    # convert to ascii, to get rid of unicode stuff
2231
    ascii_text = err.decode("ascii", errors='backslashreplace')
5✔
2232
    # do repr to handle ascii special chars (especially when printing/logging the str)
2233
    text = repr(ascii_text)
5✔
2234
    return truncate_text(text, max_len=max_len)
5✔
2235

2236

2237
def truncate_text(text: str, *, max_len: Optional[int]) -> str:
5✔
2238
    if max_len is None or len(text) <= max_len:
5✔
2239
        return text
5✔
2240
    else:
2241
        return text[:max_len] + f"... (truncated. orig_len={len(text)})"
5✔
2242

2243

2244
def nostr_pow_worker(nonce, nostr_pubk, target_bits, hash_function, hash_len_bits, shutdown):
5✔
2245
    """Function to generate PoW for Nostr, to be spawned in a ProcessPoolExecutor."""
2246
    hash_preimage = b'electrum-' + nostr_pubk
×
2247
    while True:
×
2248
        # we cannot check is_set on each iteration as it has a lot of overhead, this way we can check
2249
        # it with low overhead (just the additional range counter)
2250
        for i in range(1000000):
×
2251
            digest = hash_function(hash_preimage + nonce.to_bytes(32, 'big')).digest()
×
2252
            if int.from_bytes(digest, 'big') < (1 << (hash_len_bits - target_bits)):
×
2253
                shutdown.set()
×
2254
                return hash, nonce
×
2255
            nonce += 1
×
2256
        if shutdown.is_set():
×
2257
            return None, None
×
2258

2259

2260
async def gen_nostr_ann_pow(nostr_pubk: bytes, target_bits: int) -> Tuple[int, int]:
5✔
2261
    """Generate a PoW for a Nostr announcement. The PoW is hash[b'electrum-'+pubk+nonce]"""
2262
    import multiprocessing  # not available on Android, so we import it here
×
2263
    hash_function = hashlib.sha256
×
2264
    hash_len_bits = 256
×
2265
    max_nonce: int = (1 << (32 * 8)) - 1  # 32-byte nonce
×
2266
    start_nonce = 0
×
2267

2268
    max_workers = max(multiprocessing.cpu_count() - 1, 1)  # use all but one CPU
×
2269
    manager = multiprocessing.Manager()
×
2270
    shutdown = manager.Event()
×
2271
    with ProcessPoolExecutor(max_workers=max_workers) as executor:
×
2272
        tasks = []
×
2273
        loop = asyncio.get_running_loop()
×
2274
        for task in range(0, max_workers):
×
2275
            task = loop.run_in_executor(
×
2276
                executor,
2277
                nostr_pow_worker,
2278
                start_nonce,
2279
                nostr_pubk,
2280
                target_bits,
2281
                hash_function,
2282
                hash_len_bits,
2283
                shutdown
2284
            )
2285
            tasks.append(task)
×
2286
            start_nonce += max_nonce // max_workers  # split the nonce range between the processes
×
2287
            if start_nonce > max_nonce:  # make sure we don't go over the max_nonce
×
2288
                start_nonce = random.randint(0, int(max_nonce * 0.75))
×
2289

2290
        done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
×
2291
        hash_res, nonce_res = done.pop().result()
×
2292
        executor.shutdown(wait=False, cancel_futures=True)
×
2293

2294
    return nonce_res, get_nostr_ann_pow_amount(nostr_pubk, nonce_res)
×
2295

2296

2297
def get_nostr_ann_pow_amount(nostr_pubk: bytes, nonce: Optional[int]) -> int:
5✔
2298
    """Return the amount of leading zero bits for a nostr announcement PoW."""
2299
    if not nonce:
×
2300
        return 0
×
2301
    hash_function = hashlib.sha256
×
2302
    hash_len_bits = 256
×
2303
    hash_preimage = b'electrum-' + nostr_pubk
×
2304

2305
    digest = hash_function(hash_preimage + nonce.to_bytes(32, 'big')).digest()
×
2306
    digest = int.from_bytes(digest, 'big')
×
2307
    return hash_len_bits - digest.bit_length()
×
2308

2309

2310
class OnchainHistoryItem(NamedTuple):
5✔
2311
    txid: str
5✔
2312
    amount_sat: int
5✔
2313
    fee_sat: int
5✔
2314
    balance_sat: int
5✔
2315
    tx_mined_status: TxMinedInfo
5✔
2316
    group_id: Optional[str]
5✔
2317
    label: Optional[str]
5✔
2318
    monotonic_timestamp: int
5✔
2319
    group_id: Optional[str]
5✔
2320
    def to_dict(self):
5✔
2321
        return {
×
2322
            'txid': self.txid,
2323
            'amount_sat': self.amount_sat,
2324
            'fee_sat': self.fee_sat,
2325
            'height': self.tx_mined_status.height,
2326
            'confirmations': self.tx_mined_status.conf,
2327
            'timestamp': self.tx_mined_status.timestamp,
2328
            'monotonic_timestamp': self.monotonic_timestamp,
2329
            'incoming': True if self.amount_sat>0 else False,
2330
            'bc_value': Satoshis(self.amount_sat),
2331
            'bc_balance': Satoshis(self.balance_sat),
2332
            'date': timestamp_to_datetime(self.tx_mined_status.timestamp),
2333
            'txpos_in_block': self.tx_mined_status.txpos,
2334
            'wanted_height': self.tx_mined_status.wanted_height,
2335
            'label': self.label,
2336
            'group_id': self.group_id,
2337
        }
2338

2339

2340
class LightningHistoryItem(NamedTuple):
5✔
2341
    payment_hash: Optional[str]
5✔
2342
    preimage: Optional[str]
5✔
2343
    amount_msat: int
5✔
2344
    fee_msat: Optional[int]
5✔
2345
    type: str
5✔
2346
    group_id: Optional[str]
5✔
2347
    timestamp: int
5✔
2348
    label: Optional[str]
5✔
2349
    direction: Optional[int]
5✔
2350
    def to_dict(self):
5✔
2351
        return {
×
2352
            'type': self.type,
2353
            'label': self.label,
2354
            'timestamp': self.timestamp or 0,
2355
            'date': timestamp_to_datetime(self.timestamp),
2356
            'amount_msat': self.amount_msat,
2357
            'fee_msat': self.fee_msat,
2358
            'payment_hash': self.payment_hash,
2359
            'preimage': self.preimage,
2360
            'group_id': self.group_id,
2361
            'ln_value': Satoshis(Decimal(self.amount_msat) / 1000),
2362
            'direction': self.direction,
2363
        }
2364

2365

2366
@dataclass(kw_only=True, slots=True)
5✔
2367
class ChoiceItem:
5✔
2368
    key: Any
5✔
2369
    label: str  # user facing string
5✔
2370
    extra_data: Any = None
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc