• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 5735552722403328

16 May 2025 10:28AM UTC coverage: 59.722% (+0.002%) from 59.72%
5735552722403328

Pull #9833

CirrusCI

f321x
make lightning dns seed fetching async
Pull Request #9833: dns: use async dnspython interface

22 of 50 new or added lines in 7 files covered. (44.0%)

1107 existing lines in 11 files now uncovered.

21549 of 36082 relevant lines covered (59.72%)

2.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

29.0
/electrum/synchronizer.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2014 Thomas Voegtlin
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25
import asyncio
4✔
26
import hashlib
4✔
27
from typing import Dict, List, TYPE_CHECKING, Tuple, Set
4✔
28
from collections import defaultdict
4✔
29
import logging
4✔
30

31
from aiorpcx import run_in_thread, RPCError
4✔
32

33
from . import util
4✔
34
from .transaction import Transaction, PartialTransaction
4✔
35
from .util import make_aiohttp_session, NetworkJobOnDefaultServer, random_shuffled_copy, OldTaskGroup
4✔
36
from .bitcoin import address_to_scripthash, is_address
4✔
37
from .logging import Logger
4✔
38
from .interface import GracefulDisconnect, NetworkTimeout
4✔
39

40
if TYPE_CHECKING:
4✔
41
    from .network import Network
×
42
    from .address_synchronizer import AddressSynchronizer
×
43

44

45
class SynchronizerFailure(Exception): pass
4✔
46

47

48
def history_status(h):
4✔
49
    if not h:
×
50
        return None
×
51
    status = ''
×
52
    for tx_hash, height in h:
×
53
        status += tx_hash + ':%d:' % height
×
54
    return hashlib.sha256(status.encode('ascii')).digest().hex()
×
55

56

57
class SynchronizerBase(NetworkJobOnDefaultServer):
4✔
58
    """Subscribe over the network to a set of addresses, and monitor their statuses.
59
    Every time a status changes, run a coroutine provided by the subclass.
60
    """
61
    def __init__(self, network: 'Network'):
4✔
62
        self.asyncio_loop = network.asyncio_loop
4✔
63

64
        NetworkJobOnDefaultServer.__init__(self, network)
4✔
65

66
    def _reset(self):
4✔
67
        super()._reset()
4✔
68
        self._adding_addrs = set()
4✔
69
        self.requested_addrs = set()
4✔
70
        self._handling_addr_statuses = set()
4✔
71
        self.scripthash_to_address = {}
4✔
72
        self._processed_some_notifications = False  # so that we don't miss them
4✔
73
        # Queues
74
        self.status_queue = asyncio.Queue()
4✔
75

76
    async def _run_tasks(self, *, taskgroup):
4✔
77
        await super()._run_tasks(taskgroup=taskgroup)
×
78
        try:
×
79
            async with taskgroup as group:
×
80
                await group.spawn(self.handle_status())
×
81
                await group.spawn(self.main())
×
82
        finally:
83
            # we are being cancelled now
84
            self.session.unsubscribe(self.status_queue)
×
85

86
    def add(self, addr):
4✔
87
        if not is_address(addr): raise ValueError(f"invalid bitcoin address {addr}")
×
88
        self._adding_addrs.add(addr)  # this lets is_up_to_date already know about addr
×
89

90
    async def _add_address(self, addr: str):
4✔
91
        try:
×
92
            if not is_address(addr): raise ValueError(f"invalid bitcoin address {addr}")
×
93
            if addr in self.requested_addrs: return
×
94
            self.requested_addrs.add(addr)
×
95
            await self.taskgroup.spawn(self._subscribe_to_address, addr)
×
96
        finally:
97
            self._adding_addrs.discard(addr)  # ok for addr not to be present
×
98

99
    async def _on_address_status(self, addr, status):
4✔
100
        """Handle the change of the status of an address.
101
        Should remove addr from self._handling_addr_statuses when done.
102
        """
103
        raise NotImplementedError()  # implemented by subclasses
×
104

105
    async def _subscribe_to_address(self, addr):
4✔
106
        h = address_to_scripthash(addr)
×
107
        self.scripthash_to_address[h] = addr
×
108
        self._requests_sent += 1
×
109
        try:
×
110
            async with self._network_request_semaphore:
×
111
                await self.session.subscribe('blockchain.scripthash.subscribe', [h], self.status_queue)
×
112
        except RPCError as e:
×
113
            if e.message == 'history too large':  # no unique error code
×
114
                raise GracefulDisconnect(e, log_level=logging.ERROR) from e
×
115
            raise
×
116
        self._requests_answered += 1
×
117

118
    async def handle_status(self):
4✔
119
        while True:
×
120
            h, status = await self.status_queue.get()
×
121
            addr = self.scripthash_to_address[h]
×
122
            self._handling_addr_statuses.add(addr)
×
123
            self.requested_addrs.discard(addr)  # ok for addr not to be present
×
124
            await self.taskgroup.spawn(self._on_address_status, addr, status)
×
125
            self._processed_some_notifications = True
×
126

127
    async def main(self):
4✔
128
        raise NotImplementedError()  # implemented by subclasses
×
129

130

131
class Synchronizer(SynchronizerBase):
4✔
132
    '''The synchronizer keeps the wallet up-to-date with its set of
133
    addresses and their transactions.  It subscribes over the network
134
    to wallet addresses, gets the wallet to generate new addresses
135
    when necessary, requests the transaction history of any addresses
136
    we don't have the full history of, and requests binary transaction
137
    data of any transactions the wallet doesn't have.
138
    '''
139
    def __init__(self, adb: 'AddressSynchronizer'):
4✔
140
        self.adb = adb
4✔
141
        SynchronizerBase.__init__(self, adb.network)
4✔
142

143
    def _reset(self):
4✔
144
        super()._reset()
4✔
145
        self._init_done = False
4✔
146
        self.requested_tx = {}
4✔
147
        self.requested_histories = set()
4✔
148
        self._stale_histories = dict()  # type: Dict[str, asyncio.Task]
4✔
149

150
    def diagnostic_name(self):
4✔
151
        return self.adb.diagnostic_name()
4✔
152

153
    def is_up_to_date(self):
4✔
154
        return (self._init_done
4✔
155
                and not self._adding_addrs
156
                and not self.requested_addrs
157
                and not self._handling_addr_statuses
158
                and not self.requested_histories
159
                and not self.requested_tx
160
                and not self._stale_histories
161
                and self.status_queue.empty())
162

163
    async def _on_address_status(self, addr, status):
4✔
164
        try:
×
165
            history = self.adb.db.get_addr_history(addr)
×
166
            if history_status(history) == status:
×
167
                return
×
168
            # No point in requesting history twice for the same announced status.
169
            # However if we got announced a new status, we should request history again:
170
            if (addr, status) in self.requested_histories:
×
171
                return
×
172
            # request address history
173
            self.requested_histories.add((addr, status))
×
174
            self._stale_histories.pop(addr, asyncio.Future()).cancel()
×
175
        finally:
176
            self._handling_addr_statuses.discard(addr)
×
177
        h = address_to_scripthash(addr)
×
178
        self._requests_sent += 1
×
179
        async with self._network_request_semaphore:
×
180
            result = await self.interface.get_history_for_scripthash(h)
×
181
        self._requests_answered += 1
×
182
        self.logger.info(f"receiving history {addr} {len(result)}")
×
183
        hist = list(map(lambda item: (item['tx_hash'], item['height']), result))
×
184
        # tx_fees
185
        tx_fees = [(item['tx_hash'], item.get('fee')) for item in result]
×
186
        tx_fees = dict(filter(lambda x:x[1] is not None, tx_fees))
×
187
        # Check that the status corresponds to what was announced
188
        if history_status(hist) != status:
×
189
            # could happen naturally if history changed between getting status and history (race)
190
            self.logger.info(f"error: status mismatch: {addr}. we'll wait a bit for status update.")
×
191
            # The server is supposed to send a new status notification, which will trigger a new
192
            # get_history. We shall wait a bit for this to happen, otherwise we disconnect.
193
            async def disconnect_if_still_stale():
×
194
                timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Generic)
×
195
                await asyncio.sleep(timeout)
×
196
                raise SynchronizerFailure(f"timeout reached waiting for addr {addr}: history still stale")
×
197
            self._stale_histories[addr] = await self.taskgroup.spawn(disconnect_if_still_stale)
×
198
        else:
199
            self._stale_histories.pop(addr, asyncio.Future()).cancel()
×
200
            # Store received history
201
            self.adb.receive_history_callback(addr, hist, tx_fees)
×
202
            # Request transactions we don't have
203
            await self._request_missing_txs(hist)
×
204

205
        # Remove request; this allows up_to_date to be True
206
        self.requested_histories.discard((addr, status))
×
207

208
    async def _request_missing_txs(self, hist, *, allow_server_not_finding_tx=False):
4✔
209
        # "hist" is a list of [tx_hash, tx_height] lists
210
        transaction_hashes = []
×
211
        for tx_hash, tx_height in hist:
×
212
            if tx_hash in self.requested_tx:
×
213
                continue
×
214
            tx = self.adb.db.get_transaction(tx_hash)
×
215
            if tx and not isinstance(tx, PartialTransaction):
×
216
                continue  # already have complete tx
×
217
            transaction_hashes.append(tx_hash)
×
UNCOV
218
            self.requested_tx[tx_hash] = tx_height
×
219

UNCOV
220
        if not transaction_hashes: return
×
221
        async with OldTaskGroup() as group:
×
222
            for tx_hash in transaction_hashes:
×
223
                await group.spawn(self._get_transaction(tx_hash, allow_server_not_finding_tx=allow_server_not_finding_tx))
×
224

225
    async def _get_transaction(self, tx_hash, *, allow_server_not_finding_tx=False):
4✔
UNCOV
226
        self._requests_sent += 1
×
227
        try:
×
228
            async with self._network_request_semaphore:
×
229
                raw_tx = await self.interface.get_transaction(tx_hash)
×
230
        except RPCError as e:
×
231
            # most likely, "No such mempool or blockchain transaction"
UNCOV
232
            if allow_server_not_finding_tx:
×
233
                self.requested_tx.pop(tx_hash)
×
234
                return
×
235
            else:
UNCOV
236
                raise
×
237
        finally:
UNCOV
238
            self._requests_answered += 1
×
239
        tx = Transaction(raw_tx)
×
240
        if tx_hash != tx.txid():
×
241
            raise SynchronizerFailure(f"received tx does not match expected txid ({tx_hash} != {tx.txid()})")
×
242
        tx_height = self.requested_tx.pop(tx_hash)
×
243
        self.adb.receive_tx_callback(tx, tx_height)
×
244
        self.logger.info(f"received tx {tx_hash} height: {tx_height} bytes: {len(raw_tx)}")
×
245

246
    async def main(self):
4✔
UNCOV
247
        self.adb.up_to_date_changed()
×
248
        # request missing txns, if any
UNCOV
249
        for addr in random_shuffled_copy(self.adb.db.get_history()):
×
250
            history = self.adb.db.get_addr_history(addr)
×
251
            # Old electrum servers returned ['*'] when all history for the address
252
            # was pruned. This no longer happens but may remain in old wallets.
UNCOV
253
            if history == ['*']: continue
×
254
            await self._request_missing_txs(history, allow_server_not_finding_tx=True)
×
255
        # add addresses to bootstrap
UNCOV
256
        for addr in random_shuffled_copy(self.adb.get_addresses()):
×
257
            await self._add_address(addr)
×
258
        # main loop
UNCOV
259
        self._init_done = True
×
260
        prev_uptodate = False
×
261
        while True:
×
262
            await asyncio.sleep(0.1)
×
263
            for addr in self._adding_addrs.copy(): # copy set to ensure iterator stability
×
264
                await self._add_address(addr)
×
265
            up_to_date = self.adb.is_up_to_date()
×
266
            # see if status changed
UNCOV
267
            if (up_to_date != prev_uptodate
×
268
                    or up_to_date and self._processed_some_notifications):
UNCOV
269
                self._processed_some_notifications = False
×
270
                self.adb.up_to_date_changed()
×
271
            prev_uptodate = up_to_date
×
272

273

274
class Notifier(SynchronizerBase):
4✔
275
    """Watch addresses. Every time the status of an address changes,
276
    an HTTP POST is sent to the corresponding URL.
277
    """
278
    def __init__(self, network):
4✔
UNCOV
279
        SynchronizerBase.__init__(self, network)
×
280
        self.watched_addresses = defaultdict(list)  # type: Dict[str, List[str]]
×
281
        self._start_watching_queue = asyncio.Queue()  # type: asyncio.Queue[Tuple[str, str]]
×
282

283
    async def main(self):
4✔
284
        # resend existing subscriptions if we were restarted
UNCOV
285
        for addr in self.watched_addresses:
×
286
            await self._add_address(addr)
×
287
        # main loop
UNCOV
288
        while True:
×
289
            addr, url = await self._start_watching_queue.get()
×
290
            self.watched_addresses[addr].append(url)
×
291
            await self._add_address(addr)
×
292

293
    async def start_watching_addr(self, addr: str, url: str):
4✔
UNCOV
294
        await self._start_watching_queue.put((addr, url))
×
295

296
    async def stop_watching_addr(self, addr: str):
4✔
UNCOV
297
        self.watched_addresses.pop(addr, None)
×
298
        # TODO blockchain.scripthash.unsubscribe
299

300
    async def _on_address_status(self, addr, status):
4✔
UNCOV
301
        if addr not in self.watched_addresses:
×
302
            return
×
303
        self.logger.info(f'new status for addr {addr}')
×
304
        headers = {'content-type': 'application/json'}
×
305
        data = {'address': addr, 'status': status}
×
306
        for url in self.watched_addresses[addr]:
×
307
            try:
×
308
                async with make_aiohttp_session(proxy=self.network.proxy, headers=headers) as session:
×
309
                    async with session.post(url, json=data, headers=headers) as resp:
×
310
                        await resp.text()
×
311
            except Exception as e:
×
312
                self.logger.info(repr(e))
×
313
            else:
UNCOV
314
                self.logger.info(f'Got Response for {addr}')
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc