• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 5735552722403328

16 May 2025 10:28AM UTC coverage: 59.722% (+0.002%) from 59.72%
5735552722403328

Pull #9833

CirrusCI

f321x
make lightning dns seed fetching async
Pull Request #9833: dns: use async dnspython interface

22 of 50 new or added lines in 7 files covered. (44.0%)

1107 existing lines in 11 files now uncovered.

21549 of 36082 relevant lines covered (59.72%)

2.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

37.57
/electrum/lnwatcher.py
1
# Copyright (C) 2018 The Electrum developers
2
# Distributed under the MIT software license, see the accompanying
3
# file LICENCE or http://www.opensource.org/licenses/mit-license.php
4

5
from typing import TYPE_CHECKING
4✔
6

7
from .util import TxMinedInfo, BelowDustLimit
4✔
8
from .util import EventListener, event_listener, log_exceptions, ignore_exceptions
4✔
9
from .transaction import Transaction, TxOutpoint
4✔
10
from .logging import Logger
4✔
11
from .address_synchronizer import TX_HEIGHT_LOCAL
4✔
12

13

14
if TYPE_CHECKING:
4✔
15
    from .network import Network
×
16
    from .lnsweep import SweepInfo
×
17
    from .lnworker import LNWallet
×
18
    from .lnchannel import AbstractChannel
×
19

20

21
class LNWatcher(Logger, EventListener):
4✔
22

23
    LOGGING_SHORTCUT = 'W'
4✔
24

25
    def __init__(self, lnworker: 'LNWallet'):
4✔
26
        self.lnworker = lnworker
4✔
27
        Logger.__init__(self)
4✔
28
        self.adb = lnworker.wallet.adb
4✔
29
        self.config = lnworker.config
4✔
30
        self.callbacks = {}  # address -> lambda function
4✔
31
        self.network = None
4✔
32
        self.register_callbacks()
4✔
33
        # status gets populated when we run
34
        self.channel_status = {}
4✔
35
        self._pending_force_closes = set()
4✔
36

37
    def start_network(self, network: 'Network'):
4✔
38
        self.network = network
×
39

40
    def stop(self):
4✔
41
        self.unregister_callbacks()
×
42

43
    def get_channel_status(self, outpoint):
4✔
44
        return self.channel_status.get(outpoint, 'unknown')
×
45

46
    def remove_callback(self, address):
4✔
47
        self.callbacks.pop(address, None)
×
48

49
    def add_callback(self, address, callback):
4✔
50
        self.adb.add_address(address)
4✔
51
        self.callbacks[address] = callback
4✔
52

53
    async def trigger_callbacks(self, *, requires_synchronizer=True):
4✔
54
        if requires_synchronizer and not self.adb.synchronizer:
4✔
55
            self.logger.info("synchronizer not set yet")
4✔
56
            return
4✔
57
        for address, callback in list(self.callbacks.items()):
×
58
            await callback()
×
59

60
    @event_listener
4✔
61
    async def on_event_blockchain_updated(self, *args):
4✔
62
        # we invalidate the cache on each new block because
63
        # some processes affect the list of sweep transactions
64
        # (hold invoice preimage revealed, MPP completed, etc)
UNCOV
65
        for chan in self.lnworker.channels.values():
×
UNCOV
66
            chan._sweep_info.clear()
×
UNCOV
67
        await self.trigger_callbacks()
×
68

69
    @event_listener
4✔
70
    async def on_event_wallet_updated(self, wallet):
4✔
71
        # called if we add local tx
72
        if wallet.adb != self.adb:
4✔
73
            return
4✔
UNCOV
74
        await self.trigger_callbacks()
×
75

76
    @event_listener
4✔
77
    async def on_event_adb_added_verified_tx(self, adb, tx_hash):
4✔
78
        if adb != self.adb:
4✔
79
            return
4✔
80
        await self.trigger_callbacks()
4✔
81

82
    @event_listener
4✔
83
    async def on_event_adb_set_up_to_date(self, adb):
4✔
84
        if adb != self.adb:
4✔
85
            return
4✔
86
        await self.trigger_callbacks()
4✔
87

88
    def add_channel(self, chan: 'AbstractChannel') -> None:
4✔
89
        outpoint = chan.funding_outpoint.to_str()
4✔
90
        address = chan.get_funding_address()
4✔
91
        callback = lambda: self.check_onchain_situation(address, outpoint)
4✔
92
        if chan.need_to_subscribe():
4✔
93
            self.add_callback(address, callback)
4✔
94

95
    def unwatch_channel(self, address, funding_outpoint):
4✔
UNCOV
96
        self.logger.info(f'unwatching {funding_outpoint}')
×
UNCOV
97
        self.remove_callback(address)
×
98

99
    @ignore_exceptions
4✔
100
    @log_exceptions
4✔
101
    async def check_onchain_situation(self, address, funding_outpoint):
4✔
102
        # early return if address has not been added yet
103
        if not self.adb.is_mine(address):
×
104
            return
×
105
        # inspect_tx_candidate might have added new addresses, in which case we return early
106
        funding_txid = funding_outpoint.split(':')[0]
×
107
        funding_height = self.adb.get_tx_height(funding_txid)
×
108
        closing_txid = self.adb.get_spender(funding_outpoint)
×
UNCOV
109
        closing_height = self.adb.get_tx_height(closing_txid)
×
110
        if closing_txid:
×
111
            closing_tx = self.adb.get_transaction(closing_txid)
×
UNCOV
112
            if closing_tx:
×
113
                keep_watching = await self.sweep_commitment_transaction(funding_outpoint, closing_tx)
×
114
            else:
UNCOV
115
                self.logger.info(f"channel {funding_outpoint} closed by {closing_txid}. still waiting for tx itself...")
×
UNCOV
116
                keep_watching = True
×
117
        else:
UNCOV
118
            keep_watching = True
×
UNCOV
119
        await self.update_channel_state(
×
120
            funding_outpoint=funding_outpoint,
121
            funding_txid=funding_txid,
122
            funding_height=funding_height,
123
            closing_txid=closing_txid,
124
            closing_height=closing_height,
125
            keep_watching=keep_watching)
UNCOV
126
        if not keep_watching:
×
UNCOV
127
            self.unwatch_channel(address, funding_outpoint)
×
128

129
    def diagnostic_name(self):
4✔
130
        return f"{self.lnworker.wallet.diagnostic_name()}-LNW"
4✔
131

132
    async def update_channel_state(
4✔
133
            self, *, funding_outpoint: str, funding_txid: str,
134
            funding_height: TxMinedInfo, closing_txid: str,
135
            closing_height: TxMinedInfo, keep_watching: bool) -> None:
UNCOV
136
        chan = self.lnworker.channel_by_txo(funding_outpoint)
×
UNCOV
137
        if not chan:
×
UNCOV
138
            return
×
UNCOV
139
        chan.update_onchain_state(
×
140
            funding_txid=funding_txid,
141
            funding_height=funding_height,
142
            closing_txid=closing_txid,
143
            closing_height=closing_height,
144
            keep_watching=keep_watching)
UNCOV
145
        if closing_height.conf > 0:
×
UNCOV
146
            self._pending_force_closes.discard(chan)
×
UNCOV
147
        await self.lnworker.handle_onchain_state(chan)
×
148

149
    async def sweep_commitment_transaction(self, funding_outpoint, closing_tx) -> bool:
4✔
150
        """This function is called when a channel was closed. In this case
151
        we need to check for redeemable outputs of the commitment transaction
152
        or spenders down the line (HTLC-timeout/success transactions).
153

154
        Returns whether we should continue to monitor.
155

156
        Side-effécts:
157
          - sets defaults labels
158
          - populates wallet._accounting_addresses
159
        """
160
        chan = self.lnworker.channel_by_txo(funding_outpoint)
×
UNCOV
161
        if not chan:
×
162
            return False
×
163
        # detect who closed and get information about how to claim outputs
164
        is_local_ctx, sweep_info_dict = chan.get_ctx_sweep_info(closing_tx)
×
165
        keep_watching = False if sweep_info_dict else not self.adb.is_deeply_mined(closing_tx.txid())
×
166
        # create and broadcast transactions
UNCOV
167
        for prevout, sweep_info in sweep_info_dict.items():
×
168
            prev_txid, prev_index = prevout.split(':')
×
169
            name = sweep_info.name + ' ' + chan.get_id_for_log()
×
170
            self.lnworker.wallet.set_default_label(prevout, name)
×
171
            if not self.adb.get_transaction(prev_txid):
×
172
                # do not keep watching if prevout does not exist
173
                self.logger.info(f'prevout does not exist for {name}: {prevout}')
×
UNCOV
174
                continue
×
175
            was_added = self.maybe_redeem(sweep_info)
×
176
            spender_txid = self.adb.get_spender(prevout)
×
177
            spender_tx = self.adb.get_transaction(spender_txid) if spender_txid else None
×
178
            if spender_tx:
×
179
                # the spender might be the remote, revoked or not
180
                htlc_sweepinfo = chan.maybe_sweep_htlcs(closing_tx, spender_tx)
×
181
                for prevout2, htlc_sweep_info in htlc_sweepinfo.items():
×
182
                    htlc_was_added = self.maybe_redeem(htlc_sweep_info)
×
UNCOV
183
                    htlc_tx_spender = self.adb.get_spender(prevout2)
×
184
                    self.lnworker.wallet.set_default_label(prevout2, htlc_sweep_info.name)
×
185
                    if htlc_tx_spender:
×
186
                        keep_watching |= not self.adb.is_deeply_mined(htlc_tx_spender)
×
187
                        self.maybe_add_accounting_address(htlc_tx_spender, htlc_sweep_info)
×
188
                    else:
189
                        keep_watching |= htlc_was_added
×
190
                keep_watching |= not self.adb.is_deeply_mined(spender_txid)
×
191
                self.maybe_extract_preimage(chan, spender_tx, prevout)
×
UNCOV
192
                self.maybe_add_accounting_address(spender_txid, sweep_info)
×
193
            else:
194
                keep_watching |= was_added
×
UNCOV
195
            self.maybe_add_pending_forceclose(chan, spender_txid, is_local_ctx, sweep_info, was_added)
×
UNCOV
196
        return keep_watching
×
197

198
    def get_pending_force_closes(self):
4✔
199
        return self._pending_force_closes
×
200

201
    def maybe_redeem(self, sweep_info: 'SweepInfo') -> bool:
4✔
202
        """ returns False if it was dust """
UNCOV
203
        try:
×
UNCOV
204
            self.lnworker.wallet.txbatcher.add_sweep_input('lnwatcher', sweep_info, self.config.FEE_POLICY_LIGHTNING)
×
205
        except BelowDustLimit:
×
206
            return False
×
207
        return True
×
208

209
    def maybe_extract_preimage(self, chan: 'AbstractChannel', spender_tx: Transaction, prevout: str):
4✔
210
        if not spender_tx.is_complete():
×
211
            self.logger.info('spender tx is unsigned')
×
UNCOV
212
            return
×
UNCOV
213
        txin_idx = spender_tx.get_input_idx_that_spent_prevout(TxOutpoint.from_str(prevout))
×
UNCOV
214
        assert txin_idx is not None
×
UNCOV
215
        spender_txin = spender_tx.inputs()[txin_idx]
×
UNCOV
216
        chan.extract_preimage_from_htlc_txin(
×
217
            spender_txin,
218
            is_deeply_mined=self.adb.is_deeply_mined(spender_tx.txid()),
219
        )
220

221
    def maybe_add_accounting_address(self, spender_txid: str, sweep_info: 'SweepInfo'):
4✔
222
        spender_tx = self.adb.get_transaction(spender_txid) if spender_txid else None
×
UNCOV
223
        if not spender_tx:
×
224
            return
×
225
        for i, txin in enumerate(spender_tx.inputs()):
×
UNCOV
226
            if txin.prevout == sweep_info.txin.prevout:
×
227
                break
×
228
        else:
229
            return
×
230
        if sweep_info.name in ['offered-htlc', 'received-htlc']:
×
231
            # always consider ours
232
            pass
×
233
        else:
234
            witness = txin.witness_elements()
×
UNCOV
235
            for sig in witness:
×
236
                # fixme: verify sig is ours
237
                witness2 = sweep_info.txin.make_witness(sig)
×
238
                if txin.witness == witness2:
×
239
                    break
×
240
            else:
241
                self.logger.info(f"signature not found {sweep_info.name}, {txin.prevout.to_str()}")
×
242
                return
×
UNCOV
243
        self.logger.info(f'adding txin address {sweep_info.name}, {txin.prevout.to_str()}')
×
UNCOV
244
        prev_txid, prev_index = txin.prevout.to_str().split(':')
×
UNCOV
245
        prev_tx = self.adb.get_transaction(prev_txid)
×
246
        txout = prev_tx.outputs()[int(prev_index)]
×
247
        self.lnworker.wallet._accounting_addresses.add(txout.address)
×
248

249
    def maybe_add_pending_forceclose(self, chan, spender_txid, is_local_ctx, sweep_info, was_added):
4✔
250
        """ we are waiting for ctx to be confirmed and there are received htlcs """
UNCOV
251
        if was_added and is_local_ctx and sweep_info.name == 'received-htlc' and chan.has_anchors():
×
UNCOV
252
            tx_mined_status = self.adb.get_tx_height(spender_txid)
×
UNCOV
253
            if tx_mined_status.height == TX_HEIGHT_LOCAL:
×
UNCOV
254
                self._pending_force_closes.add(chan)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc