• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bitcoindevkit / bdk / 10348217438

12 Aug 2024 08:10AM CUT coverage: 81.794% (-0.02%) from 81.813%
10348217438

Pull #1535

github

web-flow
Merge 2c0bc45ec into 98c49592d
Pull Request #1535: test(electrum): Test sync in reorg and no-reorg situations

19 of 25 new or added lines in 1 file covered. (76.0%)

1 existing line in 1 file now uncovered.

10908 of 13336 relevant lines covered (81.79%)

12995.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.18
/crates/electrum/src/bdk_electrum_client.rs
1
use bdk_chain::{
2
    bitcoin::{block::Header, BlockHash, OutPoint, ScriptBuf, Transaction, Txid},
3
    collections::{BTreeMap, HashMap},
4
    local_chain::CheckPoint,
5
    spk_client::{FullScanRequest, FullScanResult, SyncRequest, SyncResult},
6
    tx_graph::TxGraph,
7
    Anchor, BlockId, ConfirmationBlockTime,
8
};
9
use electrum_client::{ElectrumApi, Error, HeaderNotification};
10
use std::{
11
    collections::BTreeSet,
12
    sync::{Arc, Mutex},
13
};
14

15
/// We include a chain suffix of a certain length for the purpose of robustness.
16
const CHAIN_SUFFIX_LENGTH: u32 = 8;
17

18
/// Wrapper around an [`electrum_client::ElectrumApi`] which includes an internal in-memory
19
/// transaction cache to avoid re-fetching already downloaded transactions.
20
#[derive(Debug)]
21
pub struct BdkElectrumClient<E> {
22
    /// The internal [`electrum_client::ElectrumApi`]
23
    pub inner: E,
24
    /// The transaction cache
25
    tx_cache: Mutex<HashMap<Txid, Arc<Transaction>>>,
26
    /// The header cache
27
    block_header_cache: Mutex<HashMap<u32, Header>>,
28
}
29

30
impl<E: ElectrumApi> BdkElectrumClient<E> {
31
    /// Creates a new bdk client from a [`electrum_client::ElectrumApi`]
32
    pub fn new(client: E) -> Self {
4✔
33
        Self {
4✔
34
            inner: client,
4✔
35
            tx_cache: Default::default(),
4✔
36
            block_header_cache: Default::default(),
4✔
37
        }
4✔
38
    }
4✔
39

40
    /// Inserts transactions into the transaction cache so that the client will not fetch these
41
    /// transactions.
42
    pub fn populate_tx_cache<A>(&self, tx_graph: impl AsRef<TxGraph<A>>) {
×
43
        let txs = tx_graph
×
44
            .as_ref()
×
45
            .full_txs()
×
46
            .map(|tx_node| (tx_node.txid, tx_node.tx));
×
47

×
48
        let mut tx_cache = self.tx_cache.lock().unwrap();
×
49
        for (txid, tx) in txs {
×
50
            tx_cache.insert(txid, tx);
×
51
        }
×
52
    }
×
53

54
    /// Fetch transaction of given `txid`.
55
    ///
56
    /// If it hits the cache it will return the cached version and avoid making the request.
57
    pub fn fetch_tx(&self, txid: Txid) -> Result<Arc<Transaction>, Error> {
108✔
58
        let tx_cache = self.tx_cache.lock().unwrap();
108✔
59

60
        if let Some(tx) = tx_cache.get(&txid) {
108✔
61
            return Ok(Arc::clone(tx));
84✔
62
        }
24✔
63

24✔
64
        drop(tx_cache);
24✔
65

66
        let tx = Arc::new(self.inner.transaction_get(&txid)?);
24✔
67

68
        self.tx_cache.lock().unwrap().insert(txid, Arc::clone(&tx));
24✔
69

24✔
70
        Ok(tx)
24✔
71
    }
108✔
72

73
    /// Fetch block header of given `height`.
74
    ///
75
    /// If it hits the cache it will return the cached version and avoid making the request.
76
    fn fetch_header(&self, height: u32) -> Result<Header, Error> {
44✔
77
        let block_header_cache = self.block_header_cache.lock().unwrap();
44✔
78

79
        if let Some(header) = block_header_cache.get(&height) {
44✔
80
            return Ok(*header);
31✔
81
        }
13✔
82

13✔
83
        drop(block_header_cache);
13✔
84

13✔
85
        self.update_header(height)
13✔
86
    }
44✔
87

88
    /// Update a block header at given `height`. Returns the updated header.
89
    fn update_header(&self, height: u32) -> Result<Header, Error> {
13✔
90
        let header = self.inner.block_header(height as usize)?;
13✔
91

92
        self.block_header_cache
13✔
93
            .lock()
13✔
94
            .unwrap()
13✔
95
            .insert(height, header);
13✔
96

13✔
97
        Ok(header)
13✔
98
    }
13✔
99

100
    /// Broadcasts a transaction to the network.
101
    ///
102
    /// This is a re-export of [`ElectrumApi::transaction_broadcast`].
103
    pub fn transaction_broadcast(&self, tx: &Transaction) -> Result<Txid, Error> {
×
104
        self.inner.transaction_broadcast(tx)
×
105
    }
×
106

107
    /// Full scan the keychain scripts specified with the blockchain (via an Electrum client) and
108
    /// returns updates for [`bdk_chain`] data structures.
109
    ///
110
    /// - `request`: struct with data required to perform a spk-based blockchain client full scan,
111
    ///              see [`FullScanRequest`]
112
    /// - `stop_gap`: the full scan for each keychain stops after a gap of script pubkeys with no
113
    ///              associated transactions
114
    /// - `batch_size`: specifies the max number of script pubkeys to request for in a single batch
115
    ///              request
116
    /// - `fetch_prev_txouts`: specifies whether or not we want previous `TxOut`s for fee
117
    pub fn full_scan<K: Ord + Clone>(
18✔
118
        &self,
18✔
119
        request: FullScanRequest<K>,
18✔
120
        stop_gap: usize,
18✔
121
        batch_size: usize,
18✔
122
        fetch_prev_txouts: bool,
18✔
123
    ) -> Result<FullScanResult<K>, Error> {
18✔
124
        let (tip, latest_blocks) =
18✔
125
            fetch_tip_and_latest_blocks(&self.inner, request.chain_tip.clone())?;
18✔
126
        let mut graph_update = TxGraph::<ConfirmationBlockTime>::default();
18✔
127
        let mut last_active_indices = BTreeMap::<K, u32>::new();
18✔
128

129
        for (keychain, spks) in request.spks_by_keychain {
36✔
130
            if let Some(last_active_index) =
17✔
131
                self.populate_with_spks(&mut graph_update, spks, stop_gap, batch_size)?
18✔
132
            {
17✔
133
                last_active_indices.insert(keychain, last_active_index);
17✔
134
            }
17✔
135
        }
136

137
        let chain_update = chain_update(tip, &latest_blocks, graph_update.all_anchors())?;
18✔
138

139
        // Fetch previous `TxOut`s for fee calculation if flag is enabled.
140
        if fetch_prev_txouts {
18✔
141
            self.fetch_prev_txout(&mut graph_update)?;
×
142
        }
18✔
143

144
        Ok(FullScanResult {
18✔
145
            graph_update,
18✔
146
            chain_update,
18✔
147
            last_active_indices,
18✔
148
        })
18✔
149
    }
18✔
150

151
    /// Sync a set of scripts with the blockchain (via an Electrum client) for the data specified
152
    /// and returns updates for [`bdk_chain`] data structures.
153
    ///
154
    /// - `request`: struct with data required to perform a spk-based blockchain client sync,
155
    ///              see [`SyncRequest`]
156
    /// - `batch_size`: specifies the max number of script pubkeys to request for in a single batch
157
    ///              request
158
    /// - `fetch_prev_txouts`: specifies whether or not we want previous `TxOut`s for fee
159
    ///              calculation
160
    ///
161
    /// If the scripts to sync are unknown, such as when restoring or importing a keychain that
162
    /// may include scripts that have been used, use [`full_scan`] with the keychain.
163
    ///
164
    /// [`full_scan`]: Self::full_scan
165
    pub fn sync(
14✔
166
        &self,
14✔
167
        request: SyncRequest,
14✔
168
        batch_size: usize,
14✔
169
        fetch_prev_txouts: bool,
14✔
170
    ) -> Result<SyncResult, Error> {
14✔
171
        let full_scan_req = FullScanRequest::from_chain_tip(request.chain_tip.clone())
14✔
172
            .set_spks_for_keychain((), request.spks.enumerate().map(|(i, spk)| (i as u32, spk)));
15✔
173
        let mut full_scan_res = self.full_scan(full_scan_req, usize::MAX, batch_size, false)?;
14✔
174
        let (tip, latest_blocks) =
14✔
175
            fetch_tip_and_latest_blocks(&self.inner, request.chain_tip.clone())?;
14✔
176

177
        self.populate_with_txids(&mut full_scan_res.graph_update, request.txids)?;
14✔
178
        self.populate_with_outpoints(&mut full_scan_res.graph_update, request.outpoints)?;
14✔
179

180
        let chain_update = chain_update(
14✔
181
            tip,
14✔
182
            &latest_blocks,
14✔
183
            full_scan_res.graph_update.all_anchors(),
14✔
184
        )?;
14✔
185

186
        // Fetch previous `TxOut`s for fee calculation if flag is enabled.
187
        if fetch_prev_txouts {
14✔
188
            self.fetch_prev_txout(&mut full_scan_res.graph_update)?;
14✔
UNCOV
189
        }
×
190

191
        Ok(SyncResult {
14✔
192
            chain_update,
14✔
193
            graph_update: full_scan_res.graph_update,
14✔
194
        })
14✔
195
    }
14✔
196

197
    /// Populate the `graph_update` with transactions/anchors associated with the given `spks`.
198
    ///
199
    /// Transactions that contains an output with requested spk, or spends form an output with
200
    /// requested spk will be added to `graph_update`. Anchors of the aforementioned transactions are
201
    /// also included.
202
    fn populate_with_spks(
18✔
203
        &self,
18✔
204
        graph_update: &mut TxGraph<ConfirmationBlockTime>,
18✔
205
        mut spks: impl Iterator<Item = (u32, ScriptBuf)>,
18✔
206
        stop_gap: usize,
18✔
207
        batch_size: usize,
18✔
208
    ) -> Result<Option<u32>, Error> {
18✔
209
        let mut unused_spk_count = 0_usize;
18✔
210
        let mut last_active_index = Option::<u32>::None;
18✔
211

212
        loop {
60✔
213
            let spks = (0..batch_size)
60✔
214
                .map_while(|_| spks.next())
73✔
215
                .collect::<Vec<_>>();
60✔
216
            if spks.is_empty() {
60✔
217
                return Ok(last_active_index);
15✔
218
            }
45✔
219

220
            let spk_histories = self
45✔
221
                .inner
45✔
222
                .batch_script_get_history(spks.iter().map(|(_, s)| s.as_script()))?;
45✔
223

224
            for ((spk_index, _spk), spk_history) in spks.into_iter().zip(spk_histories) {
45✔
225
                if spk_history.is_empty() {
45✔
226
                    unused_spk_count = unused_spk_count.saturating_add(1);
26✔
227
                    if unused_spk_count >= stop_gap {
26✔
228
                        return Ok(last_active_index);
3✔
229
                    }
23✔
230
                    continue;
23✔
231
                } else {
19✔
232
                    last_active_index = Some(spk_index);
19✔
233
                    unused_spk_count = 0;
19✔
234
                }
19✔
235

236
                for tx_res in spk_history {
75✔
237
                    let _ = graph_update.insert_tx(self.fetch_tx(tx_res.tx_hash)?);
56✔
238
                    self.validate_merkle_for_anchor(graph_update, tx_res.tx_hash, tx_res.height)?;
56✔
239
                }
240
            }
241
        }
242
    }
18✔
243

244
    /// Populate the `graph_update` with associated transactions/anchors of `outpoints`.
245
    ///
246
    /// Transactions in which the outpoint resides, and transactions that spend from the outpoint are
247
    /// included. Anchors of the aforementioned transactions are included.
248
    fn populate_with_outpoints(
14✔
249
        &self,
14✔
250
        graph_update: &mut TxGraph<ConfirmationBlockTime>,
14✔
251
        outpoints: impl IntoIterator<Item = OutPoint>,
14✔
252
    ) -> Result<(), Error> {
14✔
253
        for outpoint in outpoints {
14✔
254
            let op_txid = outpoint.txid;
×
255
            let op_tx = self.fetch_tx(op_txid)?;
×
256
            let op_txout = match op_tx.output.get(outpoint.vout as usize) {
×
257
                Some(txout) => txout,
×
258
                None => continue,
×
259
            };
260
            debug_assert_eq!(op_tx.compute_txid(), op_txid);
×
261

262
            // attempt to find the following transactions (alongside their chain positions), and
263
            // add to our sparsechain `update`:
264
            let mut has_residing = false; // tx in which the outpoint resides
×
265
            let mut has_spending = false; // tx that spends the outpoint
×
266
            for res in self.inner.script_get_history(&op_txout.script_pubkey)? {
×
267
                if has_residing && has_spending {
×
268
                    break;
×
269
                }
×
270

×
271
                if !has_residing && res.tx_hash == op_txid {
×
272
                    has_residing = true;
×
273
                    let _ = graph_update.insert_tx(Arc::clone(&op_tx));
×
274
                    self.validate_merkle_for_anchor(graph_update, res.tx_hash, res.height)?;
×
275
                }
×
276

277
                if !has_spending && res.tx_hash != op_txid {
×
278
                    let res_tx = self.fetch_tx(res.tx_hash)?;
×
279
                    // we exclude txs/anchors that do not spend our specified outpoint(s)
280
                    has_spending = res_tx
×
281
                        .input
×
282
                        .iter()
×
283
                        .any(|txin| txin.previous_output == outpoint);
×
284
                    if !has_spending {
×
285
                        continue;
×
286
                    }
×
287
                    let _ = graph_update.insert_tx(Arc::clone(&res_tx));
×
288
                    self.validate_merkle_for_anchor(graph_update, res.tx_hash, res.height)?;
×
289
                }
×
290
            }
291
        }
292
        Ok(())
14✔
293
    }
14✔
294

295
    /// Populate the `graph_update` with transactions/anchors of the provided `txids`.
296
    fn populate_with_txids(
14✔
297
        &self,
14✔
298
        graph_update: &mut TxGraph<ConfirmationBlockTime>,
14✔
299
        txids: impl IntoIterator<Item = Txid>,
14✔
300
    ) -> Result<(), Error> {
14✔
301
        for txid in txids {
14✔
302
            let tx = match self.fetch_tx(txid) {
×
303
                Ok(tx) => tx,
×
304
                Err(electrum_client::Error::Protocol(_)) => continue,
×
305
                Err(other_err) => return Err(other_err),
×
306
            };
307

308
            let spk = tx
×
309
                .output
×
310
                .first()
×
311
                .map(|txo| &txo.script_pubkey)
×
312
                .expect("tx must have an output");
×
313

314
            // because of restrictions of the Electrum API, we have to use the `script_get_history`
315
            // call to get confirmation status of our transaction
316
            if let Some(r) = self
×
317
                .inner
×
318
                .script_get_history(spk)?
×
319
                .into_iter()
×
320
                .find(|r| r.tx_hash == txid)
×
321
            {
322
                self.validate_merkle_for_anchor(graph_update, txid, r.height)?;
×
323
            }
×
324

325
            let _ = graph_update.insert_tx(tx);
×
326
        }
327
        Ok(())
14✔
328
    }
14✔
329

330
    // Helper function which checks if a transaction is confirmed by validating the merkle proof.
331
    // An anchor is inserted if the transaction is validated to be in a confirmed block.
332
    fn validate_merkle_for_anchor(
56✔
333
        &self,
56✔
334
        graph_update: &mut TxGraph<ConfirmationBlockTime>,
56✔
335
        txid: Txid,
56✔
336
        confirmation_height: i32,
56✔
337
    ) -> Result<(), Error> {
56✔
338
        if let Ok(merkle_res) = self
56✔
339
            .inner
56✔
340
            .transaction_get_merkle(&txid, confirmation_height as usize)
56✔
341
        {
342
            let mut header = self.fetch_header(merkle_res.block_height as u32)?;
44✔
343
            let mut is_confirmed_tx = electrum_client::utils::validate_merkle_proof(
44✔
344
                &txid,
44✔
345
                &header.merkle_root,
44✔
346
                &merkle_res,
44✔
347
            );
44✔
348

44✔
349
            // Merkle validation will fail if the header in `block_header_cache` is outdated, so we
44✔
350
            // want to check if there is a new header and validate against the new one.
44✔
351
            if !is_confirmed_tx {
44✔
352
                header = self.update_header(merkle_res.block_height as u32)?;
×
353
                is_confirmed_tx = electrum_client::utils::validate_merkle_proof(
×
354
                    &txid,
×
355
                    &header.merkle_root,
×
356
                    &merkle_res,
×
357
                );
×
358
            }
44✔
359

360
            if is_confirmed_tx {
44✔
361
                let _ = graph_update.insert_anchor(
44✔
362
                    txid,
44✔
363
                    ConfirmationBlockTime {
44✔
364
                        confirmation_time: header.time as u64,
44✔
365
                        block_id: BlockId {
44✔
366
                            height: merkle_res.block_height as u32,
44✔
367
                            hash: header.block_hash(),
44✔
368
                        },
44✔
369
                    },
44✔
370
                );
44✔
371
            }
44✔
372
        }
12✔
373
        Ok(())
56✔
374
    }
56✔
375

376
    // Helper function which fetches the `TxOut`s of our relevant transactions' previous transactions,
377
    // which we do not have by default. This data is needed to calculate the transaction fee.
378
    fn fetch_prev_txout(
14✔
379
        &self,
14✔
380
        graph_update: &mut TxGraph<ConfirmationBlockTime>,
14✔
381
    ) -> Result<(), Error> {
14✔
382
        let full_txs: Vec<Arc<Transaction>> =
14✔
383
            graph_update.full_txs().map(|tx_node| tx_node.tx).collect();
52✔
384
        for tx in full_txs {
66✔
385
            for vin in &tx.input {
52✔
386
                let outpoint = vin.previous_output;
52✔
387
                let vout = outpoint.vout;
52✔
388
                let prev_tx = self.fetch_tx(outpoint.txid)?;
52✔
389
                let txout = prev_tx.output[vout as usize].clone();
52✔
390
                let _ = graph_update.insert_txout(outpoint, txout);
52✔
391
            }
392
        }
393
        Ok(())
14✔
394
    }
14✔
395
}
396

397
/// Return a [`CheckPoint`] of the latest tip, that connects with `prev_tip`. The latest blocks are
398
/// fetched to construct checkpoint updates with the proper [`BlockHash`] in case of re-org.
399
fn fetch_tip_and_latest_blocks(
32✔
400
    client: &impl ElectrumApi,
32✔
401
    prev_tip: CheckPoint,
32✔
402
) -> Result<(CheckPoint, BTreeMap<u32, BlockHash>), Error> {
32✔
403
    let HeaderNotification { height, .. } = client.block_headers_subscribe()?;
32✔
404
    let new_tip_height = height as u32;
32✔
405

32✔
406
    // If electrum returns a tip height that is lower than our previous tip, then checkpoints do
32✔
407
    // not need updating. We just return the previous tip and use that as the point of agreement.
32✔
408
    if new_tip_height < prev_tip.height() {
32✔
409
        return Ok((prev_tip, BTreeMap::new()));
×
410
    }
32✔
411

412
    // Atomically fetch the latest `CHAIN_SUFFIX_LENGTH` count of blocks from Electrum. We use this
413
    // to construct our checkpoint update.
414
    let mut new_blocks = {
32✔
415
        let start_height = new_tip_height.saturating_sub(CHAIN_SUFFIX_LENGTH - 1);
32✔
416
        let hashes = client
32✔
417
            .block_headers(start_height as _, CHAIN_SUFFIX_LENGTH as _)?
32✔
418
            .headers
419
            .into_iter()
32✔
420
            .map(|h| h.block_hash());
256✔
421
        (start_height..).zip(hashes).collect::<BTreeMap<u32, _>>()
32✔
422
    };
423

424
    // Find the "point of agreement" (if any).
425
    let agreement_cp = {
32✔
426
        let mut agreement_cp = Option::<CheckPoint>::None;
32✔
427
        for cp in prev_tip.iter() {
106✔
428
            let cp_block = cp.block_id();
106✔
429
            let hash = match new_blocks.get(&cp_block.height) {
106✔
430
                Some(&hash) => hash,
100✔
431
                None => {
432
                    assert!(
6✔
433
                        new_tip_height >= cp_block.height,
6✔
434
                        "already checked that electrum's tip cannot be smaller"
×
435
                    );
436
                    let hash = client.block_header(cp_block.height as _)?.block_hash();
6✔
437
                    new_blocks.insert(cp_block.height, hash);
6✔
438
                    hash
6✔
439
                }
440
            };
441
            if hash == cp_block.hash {
106✔
442
                agreement_cp = Some(cp);
32✔
443
                break;
32✔
444
            }
74✔
445
        }
446
        agreement_cp
32✔
447
    };
32✔
448

32✔
449
    let agreement_height = agreement_cp.as_ref().map(CheckPoint::height);
32✔
450

32✔
451
    let new_tip = new_blocks
32✔
452
        .iter()
32✔
453
        // Prune `new_blocks` to only include blocks that are actually new.
32✔
454
        .filter(|(height, _)| Some(*<&u32>::clone(height)) > agreement_height)
262✔
455
        .map(|(height, hash)| BlockId {
112✔
456
            height: *height,
112✔
457
            hash: *hash,
112✔
458
        })
112✔
459
        .fold(agreement_cp, |prev_cp, block| {
112✔
460
            Some(match prev_cp {
112✔
461
                Some(cp) => cp.push(block).expect("must extend checkpoint"),
112✔
462
                None => CheckPoint::new(block),
×
463
            })
464
        })
112✔
465
        .expect("must have at least one checkpoint");
32✔
466

32✔
467
    Ok((new_tip, new_blocks))
32✔
468
}
32✔
469

470
// Add a corresponding checkpoint per anchor height if it does not yet exist. Checkpoints should not
471
// surpass `latest_blocks`.
472
fn chain_update<A: Anchor>(
32✔
473
    mut tip: CheckPoint,
32✔
474
    latest_blocks: &BTreeMap<u32, BlockHash>,
32✔
475
    anchors: &BTreeSet<(A, Txid)>,
32✔
476
) -> Result<CheckPoint, Error> {
32✔
477
    for anchor in anchors {
116✔
478
        let height = anchor.0.anchor_block().height;
84✔
479

84✔
480
        // Checkpoint uses the `BlockHash` from `latest_blocks` so that the hash will be consistent
84✔
481
        // in case of a re-org.
84✔
482
        if tip.get(height).is_none() && height <= tip.height() {
84✔
483
            let hash = match latest_blocks.get(&height) {
×
484
                Some(&hash) => hash,
×
485
                None => anchor.0.anchor_block().hash,
×
486
            };
487
            tip = tip.insert(BlockId { hash, height });
×
488
        }
84✔
489
    }
490
    Ok(tip)
32✔
491
}
32✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc