• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 6728725317812224

18 Dec 2025 12:54AM UTC coverage: 62.395% (-0.02%) from 62.417%
6728725317812224

push

CirrusCI

web-flow
Merge pull request #10318 from SomberNight/202511_lnsweep_keepwatching

lnsweep: lnwatcher needs to keep_waiting for pending hold_invoice

12 of 37 new or added lines in 3 files covered. (32.43%)

3 existing lines in 3 files now uncovered.

23742 of 38051 relevant lines covered (62.4%)

0.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

36.56
/electrum/lnwatcher.py
1
# Copyright (C) 2018 The Electrum developers
2
# Distributed under the MIT software license, see the accompanying
3
# file LICENCE or http://www.opensource.org/licenses/mit-license.php
4

5
from typing import TYPE_CHECKING, Optional, Dict, Callable, Awaitable
1✔
6

7
from . import util
1✔
8
from .util import TxMinedInfo, BelowDustLimit, NoDynamicFeeEstimates
1✔
9
from .util import EventListener, event_listener, log_exceptions, ignore_exceptions
1✔
10
from .transaction import Transaction, TxOutpoint
1✔
11
from .logging import Logger
1✔
12
from .address_synchronizer import TX_HEIGHT_LOCAL
1✔
13
from .lnutil import REDEEM_AFTER_DOUBLE_SPENT_DELAY
1✔
14
from .lnsweep import KeepWatchingTXO, SweepInfo
1✔
15

16
if TYPE_CHECKING:
17
    from .network import Network
18
    from .lnworker import LNWallet
19
    from .lnchannel import AbstractChannel
20

21

22
class LNWatcher(Logger, EventListener):
1✔
23

24
    def __init__(self, lnworker: 'LNWallet'):
1✔
25
        self.lnworker = lnworker
1✔
26
        Logger.__init__(self)
1✔
27
        self.adb = lnworker.wallet.adb
1✔
28
        self.config = lnworker.config
1✔
29
        self.callbacks = {}  # type: Dict[str, Callable[[], Awaitable[None]]]  # address -> lambda function
1✔
30
        self.network = None
1✔
31
        self.register_callbacks()
1✔
32
        self._pending_force_closes = set()
1✔
33

34
    def start_network(self, network: 'Network'):
1✔
35
        self.network = network
1✔
36

37
    def stop(self):
1✔
38
        self.unregister_callbacks()
×
39

40
    def remove_callback(self, address: str) -> None:
1✔
41
        self.callbacks.pop(address, None)
×
42

43
    def add_callback(
1✔
44
        self,
45
        address: str,
46
        callback: Callable[[], Awaitable[None]],
47
        *,
48
        subscribe: bool = True,
49
    ) -> None:
50
        if subscribe:
1✔
51
            self.adb.add_address(address)
1✔
52
        self.callbacks[address] = callback
1✔
53

54
    async def trigger_callbacks(self, *, requires_synchronizer: bool = True):
1✔
55
        if requires_synchronizer and not self.adb.synchronizer:
1✔
56
            self.logger.info("synchronizer not set yet")
1✔
57
            return
1✔
58
        for address, callback in list(self.callbacks.items()):
1✔
59
            try:
×
60
                await callback()
×
61
            except Exception:
×
62
                self.logger.exception(f"LNWatcher callback failed {address=}")
×
63
        # send callback to GUI
64
        util.trigger_callback('wallet_updated', self.lnworker.wallet)
1✔
65

66
    @event_listener
1✔
67
    async def on_event_blockchain_updated(self, *args):
1✔
68
        await self.trigger_callbacks()
1✔
69

70
    @event_listener
1✔
71
    async def on_event_adb_added_tx(self, adb, tx_hash, tx):
1✔
72
        # called if we add local tx
73
        if adb != self.adb:
1✔
74
            return
1✔
75
        await self.trigger_callbacks()
1✔
76

77
    @event_listener
1✔
78
    async def on_event_adb_added_verified_tx(self, adb, tx_hash):
1✔
79
        if adb != self.adb:
1✔
80
            return
1✔
81
        await self.trigger_callbacks()
1✔
82

83
    @event_listener
1✔
84
    async def on_event_adb_set_up_to_date(self, adb):
1✔
85
        if adb != self.adb:
1✔
86
            return
1✔
87
        await self.trigger_callbacks()
1✔
88

89
    def add_channel(self, chan: 'AbstractChannel') -> None:
1✔
90
        outpoint = chan.funding_outpoint.to_str()
1✔
91
        address = chan.get_funding_address()
1✔
92
        callback = lambda: self.check_onchain_situation(address, outpoint)
1✔
93
        self.add_callback(address, callback, subscribe=chan.need_to_subscribe())
1✔
94

95
    @ignore_exceptions
1✔
96
    @log_exceptions
1✔
97
    async def check_onchain_situation(self, address: str, funding_outpoint: str) -> None:
1✔
98
        # early return if address has not been added yet
99
        if not self.adb.is_mine(address):
×
100
            return
×
101
        # inspect_tx_candidate might have added new addresses, in which case we return early
102
        # note: maybe we should wait until adb.is_up_to_date... (?)
103
        funding_txid = funding_outpoint.split(':')[0]
×
104
        funding_height = self.adb.get_tx_height(funding_txid)
×
105
        closing_txid = self.adb.get_spender(funding_outpoint)
×
106
        closing_height = self.adb.get_tx_height(closing_txid)
×
107
        if closing_txid:
×
108
            closing_tx = self.adb.get_transaction(closing_txid)
×
109
            if closing_tx:
×
110
                keep_watching = await self.sweep_commitment_transaction(funding_outpoint, closing_tx)
×
111
                if not keep_watching:
×
112
                    self.remove_callback(address)
×
113
            else:
114
                self.logger.info(f"channel {funding_outpoint} closed by {closing_txid}. still waiting for tx itself...")
×
115
                keep_watching = True
×
116
        else:
117
            keep_watching = True
×
118
        await self.update_channel_state(
×
119
            funding_outpoint=funding_outpoint,
120
            funding_txid=funding_txid,
121
            funding_height=funding_height,
122
            closing_txid=closing_txid,
123
            closing_height=closing_height,
124
            keep_watching=keep_watching)
125

126
    def diagnostic_name(self):
1✔
127
        return f"{self.lnworker.wallet.diagnostic_name()}-LNW"
1✔
128

129
    async def update_channel_state(
1✔
130
            self, *, funding_outpoint: str, funding_txid: str,
131
            funding_height: TxMinedInfo, closing_txid: str,
132
            closing_height: TxMinedInfo, keep_watching: bool) -> None:
133
        chan = self.lnworker.channel_by_txo(funding_outpoint)
×
134
        if not chan:
×
135
            return
×
136
        chan.update_onchain_state(
×
137
            funding_txid=funding_txid,
138
            funding_height=funding_height,
139
            closing_txid=closing_txid,
140
            closing_height=closing_height,
141
            keep_watching=keep_watching)
142
        if closing_height.conf > 0:
×
143
            self._pending_force_closes.discard(chan)
×
144
        await self.lnworker.handle_onchain_state(chan)
×
145

146
    async def sweep_commitment_transaction(self, funding_outpoint: str, closing_tx: Transaction) -> bool:
1✔
147
        """This function is called when a channel was closed. In this case
148
        we need to check for redeemable outputs of the commitment transaction
149
        or spenders down the line (HTLC-timeout/success transactions).
150

151
        Returns whether we should continue to monitor.
152

153
        Side-effects:
154
          - sets defaults labels
155
          - populates wallet._accounting_addresses
156
        """
157
        assert closing_tx
×
158
        chan = self.lnworker.channel_by_txo(funding_outpoint)
×
159
        if not chan:
×
160
            return False
×
NEW
161
        local_height = self.adb.get_local_height()
×
162
        # detect who closed and get information about how to claim outputs
163
        is_local_ctx, sweep_info_dict = chan.get_ctx_sweep_info(closing_tx)
×
164
        # note: we need to keep watching *at least* until the closing tx is deeply mined,
165
        #       possibly longer if there are TXOs to sweep
166
        keep_watching = not self.adb.is_deeply_mined(closing_tx.txid())
×
167
        # create and broadcast transactions
168
        for prevout, sweep_info in sweep_info_dict.items():
×
169
            prev_txid, prev_index = prevout.split(':')
×
170
            name = sweep_info.name + ' ' + chan.get_id_for_log()
×
171
            self.lnworker.wallet.set_default_label(prevout, name)
×
NEW
172
            if isinstance(sweep_info, KeepWatchingTXO):  # haven't yet decided if we want to sweep
×
NEW
173
                keep_watching |= sweep_info.until_height > local_height
×
NEW
174
                continue
×
NEW
175
            assert isinstance(sweep_info, SweepInfo), sweep_info
×
UNCOV
176
            if not self.adb.get_transaction(prev_txid):
×
177
                # do not keep watching if prevout does not exist
178
                self.logger.info(f'prevout does not exist for {name}: {prevout}')
×
179
                continue
×
180
            watch_sweep_info = self.maybe_redeem(sweep_info)
×
181
            spender_txid = self.adb.get_spender(prevout)  # note: LOCAL spenders don't count
×
182
            spender_tx = self.adb.get_transaction(spender_txid) if spender_txid else None
×
183
            if spender_tx:
×
184
                # the spender might be the remote, revoked or not
185
                htlc_sweepinfo = chan.maybe_sweep_htlcs(closing_tx, spender_tx)
×
186
                for prevout2, htlc_sweep_info in htlc_sweepinfo.items():
×
NEW
187
                    self.lnworker.wallet.set_default_label(prevout2, htlc_sweep_info.name)
×
NEW
188
                    if isinstance(htlc_sweep_info, KeepWatchingTXO):  # haven't yet decided if we want to sweep
×
NEW
189
                        keep_watching |= htlc_sweep_info.until_height > local_height
×
NEW
190
                        continue
×
NEW
191
                    assert isinstance(htlc_sweep_info, SweepInfo), htlc_sweep_info
×
192
                    watch_htlc_sweep_info = self.maybe_redeem(htlc_sweep_info)
×
193
                    htlc_tx_spender = self.adb.get_spender(prevout2)
×
194
                    if htlc_tx_spender:
×
195
                        keep_watching |= not self.adb.is_deeply_mined(htlc_tx_spender)
×
196
                        self.maybe_add_accounting_address(htlc_tx_spender, htlc_sweep_info)
×
197
                    else:
198
                        keep_watching |= watch_htlc_sweep_info
×
199
                keep_watching |= not self.adb.is_deeply_mined(spender_txid)
×
200
                self.maybe_extract_preimage(chan, spender_tx, prevout)
×
201
                self.maybe_add_accounting_address(spender_txid, sweep_info)
×
202
            else:
203
                keep_watching |= watch_sweep_info
×
204
            self.maybe_add_pending_forceclose(
×
205
                chan=chan,
206
                spender_txid=spender_txid,
207
                is_local_ctx=is_local_ctx,
208
                sweep_info=sweep_info,
209
            )
210
        return keep_watching
×
211

212
    def get_pending_force_closes(self):
1✔
213
        return self._pending_force_closes
×
214

215
    def maybe_redeem(self, sweep_info: 'SweepInfo') -> bool:
1✔
216
        """ returns 'keep_watching' """
217
        try:
×
218
            self.lnworker.wallet.txbatcher.add_sweep_input('lnwatcher', sweep_info)
×
219
        except BelowDustLimit:
×
220
            self.logger.debug(f"maybe_redeem: BelowDustLimit: {sweep_info.name}")
×
221
            # utxo is considered dust at *current* fee estimates.
222
            # but maybe the fees atm are very high? We will retry later.
223
            pass
×
224
        except NoDynamicFeeEstimates:
×
225
            self.logger.debug(f"maybe_redeem: NoDynamicFeeEstimates: {sweep_info.name}")
×
226
            pass  # will retry later
×
227
        if sweep_info.is_anchor():
×
228
            return False
×
229
        return True
×
230

231
    def maybe_extract_preimage(self, chan: 'AbstractChannel', spender_tx: Transaction, prevout: str):
1✔
232
        if not spender_tx.is_complete():
×
233
            self.logger.info('spender tx is unsigned')
×
234
            return
×
235
        txin_idx = spender_tx.get_input_idx_that_spent_prevout(TxOutpoint.from_str(prevout))
×
236
        assert txin_idx is not None
×
237
        spender_txin = spender_tx.inputs()[txin_idx]
×
238
        chan.extract_preimage_from_htlc_txin(
×
239
            spender_txin,
240
            is_deeply_mined=self.adb.is_deeply_mined(spender_tx.txid()),
241
        )
242

243
    def maybe_add_accounting_address(self, spender_txid: str, sweep_info: 'SweepInfo'):
1✔
244
        spender_tx = self.adb.get_transaction(spender_txid) if spender_txid else None
×
245
        if not spender_tx:
×
246
            return
×
247
        for i, txin in enumerate(spender_tx.inputs()):
×
248
            if txin.prevout == sweep_info.txin.prevout:
×
249
                break
×
250
        else:
251
            return
×
252
        if sweep_info.name in ['offered-htlc', 'received-htlc']:
×
253
            # always consider ours
254
            pass
×
255
        else:
256
            witness = txin.witness_elements()
×
257
            for sig in witness:
×
258
                # fixme: verify sig is ours
259
                witness2 = sweep_info.txin.make_witness(sig)
×
260
                if txin.witness == witness2:
×
261
                    break
×
262
            else:
263
                self.logger.info(f"signature not found {sweep_info.name}, {txin.prevout.to_str()}")
×
264
                return
×
265
        self.logger.info(f'adding txin address {sweep_info.name}, {txin.prevout.to_str()}')
×
266
        prev_txid, prev_index = txin.prevout.to_str().split(':')
×
267
        prev_tx = self.adb.get_transaction(prev_txid)
×
268
        txout = prev_tx.outputs()[int(prev_index)]
×
269
        self.lnworker.wallet._accounting_addresses.add(txout.address)
×
270

271
    def maybe_add_pending_forceclose(
1✔
272
        self,
273
        *,
274
        chan: 'AbstractChannel',
275
        spender_txid: Optional[str],
276
        is_local_ctx: bool,
277
        sweep_info: 'SweepInfo',
278
    ) -> None:
279
        """Adds chan into set of ongoing force-closures if the user should keep the wallet open, waiting for it.
280
        (we are waiting for ctx to be confirmed and there are received htlcs)
281
        """
282
        if is_local_ctx and sweep_info.name == 'received-htlc':
×
283
            cltv = sweep_info.cltv_abs
×
284
            assert cltv is not None, f"missing cltv for {sweep_info}"
×
285
            if self.adb.get_local_height() > cltv + REDEEM_AFTER_DOUBLE_SPENT_DELAY:
×
286
                # We had plenty of time to sweep. The remote also had time to time out the htlc.
287
                # Maybe its value has been ~dust at current and past fee levels (every time we checked).
288
                # We should not keep warning the user forever.
289
                return
×
290
            tx_mined_status = self.adb.get_tx_height(spender_txid)
×
291
            if tx_mined_status.height() == TX_HEIGHT_LOCAL:
×
292
                self._pending_force_closes.add(chan)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc