• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bitcoindevkit / bdk / 5640280513

pending completion
5640280513

Pull #1034

github

web-flow
Merge 1cef2b574 into f4d2a7666
Pull Request #1034: Implement linked-list `LocalChain` and update chain-src crates/examples

681 of 681 new or added lines in 6 files covered. (100.0%)

6175 of 8004 relevant lines covered (77.15%)

6398.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/crates/esplora/src/async_ext.rs
1
use async_trait::async_trait;
2
use bdk_chain::collections::btree_map;
3
use bdk_chain::local_chain;
4
use bdk_chain::{
5
    bitcoin::{BlockHash, OutPoint, Script, Txid},
6
    collections::BTreeMap,
7
    local_chain::CheckPoint,
8
    BlockId, ConfirmationTimeAnchor, TxGraph,
9
};
10
use esplora_client::{Error, TxStatus};
11
use futures::{stream::FuturesOrdered, TryStreamExt};
12

13
use crate::{anchor_from_status, ASSUME_FINAL_DEPTH};
14

15
/// Trait to extend the functionality of [`esplora_client::AsyncClient`].
16
///
17
/// Refer to [crate-level documentation] for more.
18
///
19
/// [crate-level documentation]: crate
20
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
21
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
22
pub trait EsploraAsyncExt {
23
    /// Prepare an [`LocalChain`] update with blocks fetched from Esplora.
24
    ///
25
    /// * `prev_tip` is the previous tip of [`LocalChain::tip`].
26
    /// * `get_heights` is the block heights that we are interested in fetching from Esplora.
27
    ///
28
    /// The result of this method can be applied to [`LocalChain::apply_update`].
29
    ///
30
    /// [`LocalChain`]: bdk_chain::local_chain::LocalChain
31
    /// [`LocalChain::tip`]: bdk_chain::local_chain::LocalChain::tip
32
    /// [`LocalChain::apply_update`]: bdk_chain::local_chain::LocalChain::apply_update
33
    #[allow(clippy::result_large_err)]
34
    async fn update_local_chain(
35
        &self,
36
        prev_tip: Option<CheckPoint>,
37
        get_heights: impl IntoIterator<IntoIter = impl Iterator<Item = u32> + Send> + Send,
38
    ) -> Result<local_chain::Update, Error>;
39

40
    /// Scan Esplora for the data specified and return a [`TxGraph`] and a map of last active
41
    /// indices.
42
    ///
43
    /// * `keychain_spks`: keychains that we want to scan transactions for
44
    /// * `txids`: transactions for which we want updated [`ConfirmationTimeAnchor`]s
45
    /// * `outpoints`: transactions associated with these outpoints (residing, spending) that we
46
    ///     want to include in the update
47
    ///
48
    /// The scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated
49
    /// transactions. `parallel_requests` specifies the max number of HTTP requests to make in
50
    /// parallel.
51
    #[allow(clippy::result_large_err)]
52
    async fn update_tx_graph<K: Ord + Clone + Send>(
53
        &self,
54
        keychain_spks: BTreeMap<
55
            K,
56
            impl IntoIterator<IntoIter = impl Iterator<Item = (u32, Script)> + Send> + Send,
57
        >,
58
        txids: impl IntoIterator<IntoIter = impl Iterator<Item = Txid> + Send> + Send,
59
        outpoints: impl IntoIterator<IntoIter = impl Iterator<Item = OutPoint> + Send> + Send,
60
        stop_gap: usize,
61
        parallel_requests: usize,
62
    ) -> Result<(TxGraph<ConfirmationTimeAnchor>, BTreeMap<K, u32>), Error>;
63

64
    /// Convenience method to call [`update_tx_graph`] without requiring a keychain.
65
    ///
66
    /// [`update_tx_graph`]: EsploraAsyncExt::update_tx_graph
67
    #[allow(clippy::result_large_err)]
68
    async fn update_tx_graph_without_keychain(
×
69
        &self,
×
70
        misc_spks: impl IntoIterator<IntoIter = impl Iterator<Item = Script> + Send> + Send,
×
71
        txids: impl IntoIterator<IntoIter = impl Iterator<Item = Txid> + Send> + Send,
×
72
        outpoints: impl IntoIterator<IntoIter = impl Iterator<Item = OutPoint> + Send> + Send,
×
73
        parallel_requests: usize,
×
74
    ) -> Result<TxGraph<ConfirmationTimeAnchor>, Error> {
×
75
        self.update_tx_graph(
×
76
            [(
×
77
                (),
×
78
                misc_spks
×
79
                    .into_iter()
×
80
                    .enumerate()
×
81
                    .map(|(i, spk)| (i as u32, spk)),
×
82
            )]
×
83
            .into(),
×
84
            txids,
×
85
            outpoints,
×
86
            usize::MAX,
×
87
            parallel_requests,
×
88
        )
×
89
        .await
×
90
        .map(|(g, _)| g)
×
91
    }
×
92
}
93

94
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
95
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
96
impl EsploraAsyncExt for esplora_client::AsyncClient {
97
    async fn update_local_chain(
×
98
        &self,
×
99
        prev_tip: Option<CheckPoint>,
×
100
        get_heights: impl IntoIterator<IntoIter = impl Iterator<Item = u32> + Send> + Send,
×
101
    ) -> Result<local_chain::Update, Error> {
×
102
        let new_tip_height = self.get_height().await?;
×
103

104
        // If esplora returns a tip height that is lower than our previous tip, then checkpoints do
105
        // not need updating. We just return the previous tip and use that as the point of
106
        // agreement.
107
        if let Some(prev_tip) = prev_tip.as_ref() {
×
108
            if new_tip_height < prev_tip.height() {
×
109
                return Ok(local_chain::Update {
×
110
                    tip: prev_tip.clone(),
×
111
                    introduce_older_blocks: true,
×
112
                });
×
113
            }
×
114
        }
×
115

116
        // Fetch new block IDs that are to be included in the update. This includes:
117
        // 1. Atomically fetched most-recent blocks so we have a consistent view even during reorgs.
118
        // 2. Heights the caller is interested in (as specified in `get_heights`).
119
        let mut new_blocks = {
×
120
            let heights = (0..=new_tip_height).rev();
×
121
            let hashes = self
×
122
                .get_blocks(Some(new_tip_height))
×
123
                .await?
×
124
                .into_iter()
×
125
                .map(|b| b.id);
×
126

×
127
            let mut new_blocks = heights.zip(hashes).collect::<BTreeMap<u32, BlockHash>>();
×
128

129
            for height in get_heights {
×
130
                // do not fetch blocks higher than known tip
131
                if height > new_tip_height {
×
132
                    continue;
×
133
                }
×
134
                if let btree_map::Entry::Vacant(entry) = new_blocks.entry(height) {
×
135
                    let hash = self.get_block_hash(height).await?;
×
136
                    entry.insert(hash);
×
137
                }
×
138
            }
139

140
            new_blocks
×
141
        };
142

143
        // Determine the checkpoint to start building our update tip from.
144
        let first_cp = match prev_tip {
×
145
            Some(old_tip) => {
×
146
                let old_tip_height = old_tip.height();
×
147
                let mut earliest_agreement_cp = Option::<CheckPoint>::None;
×
148

149
                for old_cp in old_tip.iter() {
×
150
                    let old_block = old_cp.block_id();
×
151

152
                    let new_hash = match new_blocks.entry(old_block.height) {
×
153
                        btree_map::Entry::Vacant(entry) => *entry.insert(
×
154
                            if old_tip_height - old_block.height >= ASSUME_FINAL_DEPTH {
×
155
                                old_block.hash
×
156
                            } else {
157
                                self.get_block_hash(old_block.height).await?
×
158
                            },
159
                        ),
160
                        btree_map::Entry::Occupied(entry) => *entry.get(),
×
161
                    };
162

163
                    // Since we may introduce blocks below the point of agreement, we cannot break
164
                    // here unconditionally. We only break if we guarantee there are no new heights
165
                    // below our current.
166
                    if old_block.hash == new_hash {
×
167
                        earliest_agreement_cp = Some(old_cp);
×
168

×
169
                        let first_new_height = *new_blocks
×
170
                            .keys()
×
171
                            .next()
×
172
                            .expect("must have atleast one new block");
×
173
                        if first_new_height >= old_block.height {
×
174
                            break;
×
175
                        }
×
176
                    }
×
177
                }
178

179
                earliest_agreement_cp
×
180
            }
181
            None => None,
×
182
        }
183
        .unwrap_or_else(|| {
×
184
            let (&height, &hash) = new_blocks
×
185
                .iter()
×
186
                .next()
×
187
                .expect("must have atleast one new block");
×
188
            CheckPoint::new(BlockId { height, hash })
×
189
        });
×
190

×
191
        let new_tip = new_blocks
×
192
            .split_off(&(first_cp.height() + 1))
×
193
            .into_iter()
×
194
            .map(|(height, hash)| BlockId { height, hash })
×
195
            .fold(first_cp, |prev_cp, block| {
×
196
                prev_cp
×
197
                    .extend_with_blocks(core::iter::once(block))
×
198
                    .expect("must extend checkpoint")
×
199
            });
×
200

×
201
        Ok(local_chain::Update {
×
202
            tip: new_tip,
×
203
            introduce_older_blocks: true,
×
204
        })
×
205
    }
×
206

207
    async fn update_tx_graph<K: Ord + Clone + Send>(
×
208
        &self,
×
209
        keychain_spks: BTreeMap<
×
210
            K,
×
211
            impl IntoIterator<IntoIter = impl Iterator<Item = (u32, Script)> + Send> + Send,
×
212
        >,
×
213
        txids: impl IntoIterator<IntoIter = impl Iterator<Item = Txid> + Send> + Send,
×
214
        outpoints: impl IntoIterator<IntoIter = impl Iterator<Item = OutPoint> + Send> + Send,
×
215
        stop_gap: usize,
×
216
        parallel_requests: usize,
×
217
    ) -> Result<(TxGraph<ConfirmationTimeAnchor>, BTreeMap<K, u32>), Error> {
×
218
        type TxsOfSpkIndex = (u32, Vec<esplora_client::Tx>);
219
        let parallel_requests = Ord::max(parallel_requests, 1);
×
220
        let mut graph = TxGraph::<ConfirmationTimeAnchor>::default();
×
221
        let mut last_active_indexes = BTreeMap::<K, u32>::new();
×
222

223
        for (keychain, spks) in keychain_spks {
×
224
            let mut spks = spks.into_iter();
×
225
            let mut last_index = Option::<u32>::None;
×
226
            let mut last_active_index = Option::<u32>::None;
×
227

228
            loop {
×
229
                let handles = spks
×
230
                    .by_ref()
×
231
                    .take(parallel_requests)
×
232
                    .map(|(spk_index, spk)| {
×
233
                        let client = self.clone();
×
234
                        async move {
×
235
                            let mut last_seen = None;
×
236
                            let mut spk_txs = Vec::new();
×
237
                            loop {
238
                                let txs = client.scripthash_txs(&spk, last_seen).await?;
×
239
                                let tx_count = txs.len();
×
240
                                last_seen = txs.last().map(|tx| tx.txid);
×
241
                                spk_txs.extend(txs);
×
242
                                if tx_count < 25 {
×
243
                                    break Result::<_, Error>::Ok((spk_index, spk_txs));
×
244
                                }
×
245
                            }
246
                        }
×
247
                    })
×
248
                    .collect::<FuturesOrdered<_>>();
×
249

×
250
                if handles.is_empty() {
×
251
                    break;
×
252
                }
×
253

254
                for (index, txs) in handles.try_collect::<Vec<TxsOfSpkIndex>>().await? {
×
255
                    last_index = Some(index);
×
256
                    if !txs.is_empty() {
×
257
                        last_active_index = Some(index);
×
258
                    }
×
259
                    for tx in txs {
×
260
                        let _ = graph.insert_tx(tx.to_tx());
×
261
                        if let Some(anchor) = anchor_from_status(&tx.status) {
×
262
                            let _ = graph.insert_anchor(tx.txid, anchor);
×
263
                        }
×
264
                    }
265
                }
266

267
                if last_index > last_active_index.map(|i| i + stop_gap as u32) {
×
268
                    break;
×
269
                }
×
270
            }
271

272
            if let Some(last_active_index) = last_active_index {
×
273
                last_active_indexes.insert(keychain, last_active_index);
×
274
            }
×
275
        }
276

277
        let mut txids = txids.into_iter();
×
278
        loop {
×
279
            let handles = txids
×
280
                .by_ref()
×
281
                .take(parallel_requests)
×
282
                .filter(|&txid| graph.get_tx(txid).is_none())
×
283
                .map(|txid| {
×
284
                    let client = self.clone();
×
285
                    async move { client.get_tx_status(&txid).await.map(|s| (txid, s)) }
×
286
                })
×
287
                .collect::<FuturesOrdered<_>>();
×
288

×
289
            if handles.is_empty() {
×
290
                break;
×
291
            }
×
292

293
            for (txid, status) in handles.try_collect::<Vec<(Txid, TxStatus)>>().await? {
×
294
                if let Some(anchor) = anchor_from_status(&status) {
×
295
                    let _ = graph.insert_anchor(txid, anchor);
×
296
                }
×
297
            }
298
        }
299

300
        for op in outpoints.into_iter() {
×
301
            if graph.get_tx(op.txid).is_none() {
×
302
                if let Some(tx) = self.get_tx(&op.txid).await? {
×
303
                    let _ = graph.insert_tx(tx);
×
304
                }
×
305
                let status = self.get_tx_status(&op.txid).await?;
×
306
                if let Some(anchor) = anchor_from_status(&status) {
×
307
                    let _ = graph.insert_anchor(op.txid, anchor);
×
308
                }
×
309
            }
×
310

311
            if let Some(op_status) = self.get_output_status(&op.txid, op.vout as _).await? {
×
312
                if let Some(txid) = op_status.txid {
×
313
                    if graph.get_tx(txid).is_none() {
×
314
                        if let Some(tx) = self.get_tx(&txid).await? {
×
315
                            let _ = graph.insert_tx(tx);
×
316
                        }
×
317
                        let status = self.get_tx_status(&txid).await?;
×
318
                        if let Some(anchor) = anchor_from_status(&status) {
×
319
                            let _ = graph.insert_anchor(txid, anchor);
×
320
                        }
×
321
                    }
×
322
                }
×
323
            }
×
324
        }
325

326
        Ok((graph, last_active_indexes))
×
327
    }
×
328
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc