• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bitcoindevkit / bdk / 5487935819

pending completion
5487935819

Pull #1002

github

web-flow
Merge 084a72be6 into 81c761339
Pull Request #1002: Implement linked-list `LocalChain` and add rpc-chain module/example

761 of 761 new or added lines in 8 files covered. (100.0%)

7165 of 9315 relevant lines covered (76.92%)

5520.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/crates/esplora/src/blocking_ext.rs
1
use std::thread::JoinHandle;
2

3
use bdk_chain::bitcoin::{OutPoint, Txid};
4
use bdk_chain::collections::btree_map;
5
use bdk_chain::collections::BTreeMap;
6
use bdk_chain::{
7
    bitcoin::{BlockHash, Script},
8
    local_chain::CheckPoint,
9
};
10
use bdk_chain::{BlockId, ConfirmationTimeAnchor, TxGraph};
11
use esplora_client::{Error, TxStatus};
12

13
const ASSUME_FINAL_DEPTH: u32 = 15;
14

15
/// Trait to extend the functionality of [`esplora_client::BlockingClient`].
16
///
17
/// Refer to [crate-level documentation] for more.
18
///
19
/// [crate-level documentation]: crate
20
pub trait EsploraExt {
21
    /// Prepare an [`LocalChain`] update with blocks fetched from Esplora.
22
    ///
23
    /// * `prev_tip` is the previous tip of [`LocalChain::tip`].
24
    /// * `get_heights` is the block heights that we are interested in fetching from Esplora.
25
    ///
26
    /// The result of this method can be applied to [`LocalChain::update`].
27
    ///
28
    /// [`LocalChain`]: bdk_chain::local_chain::LocalChain
29
    /// [`LocalChain::tip`]: bdk_chain::local_chain::LocalChain::tip
30
    /// [`LocalChain::update`]: bdk_chain::local_chain::LocalChain::update
31
    #[allow(clippy::result_large_err)]
32
    fn update_local_chain(
33
        &self,
34
        prev_tip: Option<CheckPoint>,
35
        get_heights: impl IntoIterator<Item = u32>,
36
    ) -> Result<CheckPoint, Error>;
37

38
    /// Scan Esplora for the data specified and return a [`TxGraph`] and a map of last active
39
    /// indices.
40
    ///
41
    /// * `keychain_spks`: keychains that we want to scan transactions for
42
    /// * `txids`: transactions for which we want updated [`ConfirmationTimeAnchor`]s
43
    /// * `outpoints`: transactions associated with these outpoints (residing, spending) that we
44
    ///     want to include in the update
45
    ///
46
    /// The scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated
47
    /// transactions. `parallel_requests` specifies the max number of HTTP requests to make in
48
    /// parallel.
49
    #[allow(clippy::result_large_err)]
50
    fn update_tx_graph<K: Ord + Clone>(
51
        &self,
52
        keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, Script)>>,
53
        txids: impl IntoIterator<Item = Txid>,
54
        outpoints: impl IntoIterator<Item = OutPoint>,
55
        stop_gap: usize,
56
        parallel_requests: usize,
57
    ) -> Result<(TxGraph<ConfirmationTimeAnchor>, BTreeMap<K, u32>), Error>;
58

59
    /// Convenience method to call [`update_tx_graph`] without requiring a keychain.
60
    ///
61
    /// [`update_tx_graph`]: EsploraExt::update_tx_graph
62
    #[allow(clippy::result_large_err)]
63
    fn update_tx_graph_without_keychain(
×
64
        &self,
×
65
        misc_spks: impl IntoIterator<Item = Script>,
×
66
        txids: impl IntoIterator<Item = Txid>,
×
67
        outpoints: impl IntoIterator<Item = OutPoint>,
×
68
        stop_gap: usize,
×
69
        parallel_requests: usize,
×
70
    ) -> Result<TxGraph<ConfirmationTimeAnchor>, Error> {
×
71
        self.update_tx_graph(
×
72
            [(
×
73
                (),
×
74
                misc_spks
×
75
                    .into_iter()
×
76
                    .enumerate()
×
77
                    .map(|(i, spk)| (i as u32, spk)),
×
78
            )]
×
79
            .into(),
×
80
            txids,
×
81
            outpoints,
×
82
            stop_gap,
×
83
            parallel_requests,
×
84
        )
×
85
        .map(|(g, _)| g)
×
86
    }
×
87
}
88

89
impl EsploraExt for esplora_client::BlockingClient {
90
    fn update_local_chain(
×
91
        &self,
×
92
        prev_tip: Option<CheckPoint>,
×
93
        get_heights: impl IntoIterator<Item = u32>,
×
94
    ) -> Result<CheckPoint, Error> {
×
95
        let new_tip_height = self.get_height()?;
×
96

97
        // If esplora returns a tip height that is lower than our previous tip, then checkpoints do
98
        // not need updating. We just return the previous tip and use that as the point of
99
        // agreement.
100
        if let Some(prev_tip) = prev_tip.as_ref() {
×
101
            if new_tip_height < prev_tip.height() {
×
102
                return Ok(prev_tip.clone());
×
103
            }
×
104
        }
×
105

106
        // Fetch new block IDs that are to be included in the update. This includes:
107
        // 1. Atomically fetched most-recent blocks so we have a consistent view even during reorgs.
108
        // 2. Heights the caller is interested in (as specified in `get_heights`).
109
        let mut new_blocks = {
×
110
            let heights = (0..new_tip_height).rev();
×
111
            let hashes = self
×
112
                .get_blocks(Some(new_tip_height))?
×
113
                .into_iter()
×
114
                .map(|b| b.id);
×
115

×
116
            let mut new_blocks = heights.zip(hashes).collect::<BTreeMap<u32, BlockHash>>();
×
117

118
            for height in get_heights {
×
119
                // do not fetch blocks higher than known tip
120
                if height > new_tip_height {
×
121
                    continue;
×
122
                }
×
123
                if let btree_map::Entry::Vacant(entry) = new_blocks.entry(height) {
×
124
                    let hash = self.get_block_hash(height)?;
×
125
                    entry.insert(hash);
×
126
                }
×
127
            }
128

129
            new_blocks
×
130
        };
131

132
        // Determine the checkpoint to start building our update tip from.
133
        let first_cp = match prev_tip {
×
134
            Some(old_tip) => {
×
135
                let old_tip_height = old_tip.height();
×
136
                let mut earliest_agreement_cp = Option::<CheckPoint>::None;
×
137

138
                for old_cp in old_tip.iter() {
×
139
                    let old_block = old_cp.block_id();
×
140

141
                    let new_hash = match new_blocks.entry(old_block.height) {
×
142
                        btree_map::Entry::Vacant(entry) => *entry.insert(
×
143
                            if old_tip_height - old_block.height >= ASSUME_FINAL_DEPTH {
×
144
                                old_block.hash
×
145
                            } else {
146
                                self.get_block_hash(old_block.height)?
×
147
                            },
148
                        ),
149
                        btree_map::Entry::Occupied(entry) => *entry.get(),
×
150
                    };
151

152
                    // Since we may introduce blocks below the point of agreement, we cannot break
153
                    // here unconditionally. We only break if we guarantee there are no new heights
154
                    // below our current.
155
                    if old_block.hash == new_hash {
×
156
                        earliest_agreement_cp = Some(old_cp);
×
157

×
158
                        let first_new_height = *new_blocks
×
159
                            .keys()
×
160
                            .next()
×
161
                            .expect("must have atleast one new block");
×
162
                        if first_new_height <= old_block.height {
×
163
                            break;
×
164
                        }
×
165
                    }
×
166
                }
167

168
                earliest_agreement_cp
×
169
            }
170
            None => None,
×
171
        }
172
        .unwrap_or_else(|| {
×
173
            let (&height, &hash) = new_blocks
×
174
                .iter()
×
175
                .next()
×
176
                .expect("must have atleast one new block");
×
177
            CheckPoint::new(BlockId { height, hash })
×
178
        });
×
179

×
180
        let new_tip = new_blocks
×
181
            .split_off(&(first_cp.height() + 1))
×
182
            .into_iter()
×
183
            .map(|(height, hash)| BlockId { height, hash })
×
184
            .fold(first_cp, |prev_cp, block| {
×
185
                prev_cp.push(block).expect("must extend checkpoint")
×
186
            });
×
187

×
188
        Ok(new_tip)
×
189
    }
×
190

191
    fn update_tx_graph<K: Ord + Clone>(
×
192
        &self,
×
193
        keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, Script)>>,
×
194
        txids: impl IntoIterator<Item = Txid>,
×
195
        outpoints: impl IntoIterator<Item = OutPoint>,
×
196
        stop_gap: usize,
×
197
        parallel_requests: usize,
×
198
    ) -> Result<(TxGraph<ConfirmationTimeAnchor>, BTreeMap<K, u32>), Error> {
×
199
        type TxsOfSpkIndex = (u32, Vec<esplora_client::Tx>);
×
200
        let parallel_requests = Ord::max(parallel_requests, 1);
×
201
        let mut graph = TxGraph::<ConfirmationTimeAnchor>::default();
×
202
        let mut last_active_indexes = BTreeMap::<K, u32>::new();
×
203

204
        for (keychain, spks) in keychain_spks {
×
205
            let mut spks = spks.into_iter();
×
206
            let mut last_index = Option::<u32>::None;
×
207
            let mut last_active_index = Option::<u32>::None;
×
208

209
            loop {
×
210
                let handles = spks
×
211
                    .by_ref()
×
212
                    .take(parallel_requests)
×
213
                    .map(|(spk_index, spk)| {
×
214
                        std::thread::spawn({
×
215
                            let client = self.clone();
×
216
                            move || -> Result<TxsOfSpkIndex, Error> {
×
217
                                let mut last_seen = None;
×
218
                                let mut spk_txs = Vec::new();
×
219
                                loop {
220
                                    let txs = client.scripthash_txs(&spk, last_seen)?;
×
221
                                    let tx_count = txs.len();
×
222
                                    last_seen = txs.last().map(|tx| tx.txid);
×
223
                                    spk_txs.extend(txs);
×
224
                                    if tx_count < 25 {
×
225
                                        break Ok((spk_index, spk_txs));
×
226
                                    }
×
227
                                }
228
                            }
×
229
                        })
×
230
                    })
×
231
                    .collect::<Vec<JoinHandle<Result<TxsOfSpkIndex, Error>>>>();
×
232

×
233
                if handles.is_empty() {
×
234
                    break;
×
235
                }
×
236

237
                for handle in handles {
×
238
                    let (index, txs) = handle.join().expect("thread must not panic")?;
×
239
                    last_index = Some(index);
×
240
                    if !txs.is_empty() {
×
241
                        last_active_index = Some(index);
×
242
                    }
×
243
                    for tx in txs {
×
244
                        let _ = graph.insert_tx(tx.to_tx());
×
245
                        if let Some(anchor) = anchor_from_status(&tx.status) {
×
246
                            let _ = graph.insert_anchor(tx.txid, anchor);
×
247
                        }
×
248
                    }
249
                }
250

251
                if last_index > last_active_index.map(|i| i + stop_gap as u32) {
×
252
                    break;
×
253
                }
×
254
            }
255

256
            if let Some(last_active_index) = last_active_index {
×
257
                last_active_indexes.insert(keychain, last_active_index);
×
258
            }
×
259
        }
260

261
        let mut txids = txids.into_iter();
×
262
        loop {
×
263
            let handles = txids
×
264
                .by_ref()
×
265
                .take(parallel_requests)
×
266
                .filter(|&txid| graph.get_tx(txid).is_none())
×
267
                .map(|txid| {
×
268
                    std::thread::spawn({
×
269
                        let client = self.clone();
×
270
                        move || client.get_tx_status(&txid).map(|s| (txid, s))
×
271
                    })
×
272
                })
×
273
                .collect::<Vec<JoinHandle<Result<(Txid, TxStatus), Error>>>>();
×
274

×
275
            if handles.is_empty() {
×
276
                break;
×
277
            }
×
278

279
            for handle in handles {
×
280
                let (txid, status) = handle.join().expect("thread must not panic")?;
×
281
                if let Some(anchor) = anchor_from_status(&status) {
×
282
                    let _ = graph.insert_anchor(txid, anchor);
×
283
                }
×
284
            }
285
        }
286

287
        for op in outpoints.into_iter() {
×
288
            if graph.get_tx(op.txid).is_none() {
×
289
                if let Some(tx) = self.get_tx(&op.txid)? {
×
290
                    let _ = graph.insert_tx(tx);
×
291
                }
×
292
                let status = self.get_tx_status(&op.txid)?;
×
293
                if let Some(anchor) = anchor_from_status(&status) {
×
294
                    let _ = graph.insert_anchor(op.txid, anchor);
×
295
                }
×
296
            }
×
297

298
            if let Some(op_status) = self.get_output_status(&op.txid, op.vout as _)? {
×
299
                if let Some(txid) = op_status.txid {
×
300
                    if graph.get_tx(txid).is_none() {
×
301
                        if let Some(tx) = self.get_tx(&txid)? {
×
302
                            let _ = graph.insert_tx(tx);
×
303
                        }
×
304
                        let status = self.get_tx_status(&txid)?;
×
305
                        if let Some(anchor) = anchor_from_status(&status) {
×
306
                            let _ = graph.insert_anchor(txid, anchor);
×
307
                        }
×
308
                    }
×
309
                }
×
310
            }
×
311
        }
312

313
        Ok((graph, last_active_indexes))
×
314
    }
×
315
}
316

317
fn anchor_from_status(status: &TxStatus) -> Option<ConfirmationTimeAnchor> {
318
    if let TxStatus {
319
        block_height: Some(height),
×
320
        block_hash: Some(hash),
×
321
        block_time: Some(time),
×
322
        ..
323
    } = status.clone()
×
324
    {
325
        Some(ConfirmationTimeAnchor {
×
326
            anchor_block: BlockId { height, hash },
×
327
            confirmation_height: height,
×
328
            confirmation_time: time,
×
329
        })
×
330
    } else {
331
        None
×
332
    }
333
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc