• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bitcoindevkit / bdk / 5984736767

26 Aug 2023 12:20PM CUT coverage: 78.856% (+0.2%) from 78.694%
5984736767

Pull #1084

github

web-flow
Merge 8bdb5a43d into 8f978f86b
Pull Request #1084: Enhance bdk chain structures

86 of 86 new or added lines in 5 files covered. (100.0%)

8022 of 10173 relevant lines covered (78.86%)

5091.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/crates/esplora/src/async_ext.rs
1
use async_trait::async_trait;
2
use bdk_chain::collections::btree_map;
3
use bdk_chain::{
4
    bitcoin::{BlockHash, OutPoint, ScriptBuf, Txid},
5
    collections::{BTreeMap, BTreeSet},
6
    local_chain::{self, CheckPoint},
7
    BlockId, ConfirmationTimeAnchor, TxGraph,
8
};
9
use esplora_client::{Error, TxStatus};
10
use futures::{stream::FuturesOrdered, TryStreamExt};
11

12
use crate::{anchor_from_status, ASSUME_FINAL_DEPTH};
13

14
/// Trait to extend the functionality of [`esplora_client::AsyncClient`].
15
///
16
/// Refer to [crate-level documentation] for more.
17
///
18
/// [crate-level documentation]: crate
19
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
20
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
21
pub trait EsploraAsyncExt {
22
    /// Prepare an [`LocalChain`] update with blocks fetched from Esplora.
23
    ///
24
    /// * `prev_tip` is the previous tip of [`LocalChain::tip`].
25
    /// * `get_heights` is the block heights that we are interested in fetching from Esplora.
26
    ///
27
    /// The result of this method can be applied to [`LocalChain::apply_update`].
28
    ///
29
    /// [`LocalChain`]: bdk_chain::local_chain::LocalChain
30
    /// [`LocalChain::tip`]: bdk_chain::local_chain::LocalChain::tip
31
    /// [`LocalChain::apply_update`]: bdk_chain::local_chain::LocalChain::apply_update
32
    #[allow(clippy::result_large_err)]
33
    async fn update_local_chain(
34
        &self,
35
        local_tip: Option<CheckPoint>,
36
        request_heights: impl IntoIterator<IntoIter = impl Iterator<Item = u32> + Send> + Send,
37
    ) -> Result<local_chain::Update, Error>;
38

39
    /// Scan Esplora for the data specified and return a [`TxGraph`] and a map of last active
40
    /// indices.
41
    ///
42
    /// * `keychain_spks`: keychains that we want to scan transactions for
43
    /// * `txids`: transactions for which we want updated [`ConfirmationTimeAnchor`]s
44
    /// * `outpoints`: transactions associated with these outpoints (residing, spending) that we
45
    ///     want to include in the update
46
    ///
47
    /// The scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated
48
    /// transactions. `parallel_requests` specifies the max number of HTTP requests to make in
49
    /// parallel.
50
    #[allow(clippy::result_large_err)]
51
    async fn update_tx_graph<K: Ord + Clone + Send>(
52
        &self,
53
        keychain_spks: BTreeMap<
54
            K,
55
            impl IntoIterator<IntoIter = impl Iterator<Item = (u32, ScriptBuf)> + Send> + Send,
56
        >,
57
        txids: impl IntoIterator<IntoIter = impl Iterator<Item = Txid> + Send> + Send,
58
        outpoints: impl IntoIterator<IntoIter = impl Iterator<Item = OutPoint> + Send> + Send,
59
        stop_gap: usize,
60
        parallel_requests: usize,
61
    ) -> Result<(TxGraph<ConfirmationTimeAnchor>, BTreeMap<K, u32>), Error>;
62

63
    /// Convenience method to call [`update_tx_graph`] without requiring a keychain.
64
    ///
65
    /// [`update_tx_graph`]: EsploraAsyncExt::update_tx_graph
66
    #[allow(clippy::result_large_err)]
67
    async fn update_tx_graph_without_keychain(
×
68
        &self,
×
69
        misc_spks: impl IntoIterator<IntoIter = impl Iterator<Item = ScriptBuf> + Send> + Send,
×
70
        txids: impl IntoIterator<IntoIter = impl Iterator<Item = Txid> + Send> + Send,
×
71
        outpoints: impl IntoIterator<IntoIter = impl Iterator<Item = OutPoint> + Send> + Send,
×
72
        parallel_requests: usize,
×
73
    ) -> Result<TxGraph<ConfirmationTimeAnchor>, Error> {
×
74
        self.update_tx_graph(
×
75
            [(
×
76
                (),
×
77
                misc_spks
×
78
                    .into_iter()
×
79
                    .enumerate()
×
80
                    .map(|(i, spk)| (i as u32, spk)),
×
81
            )]
×
82
            .into(),
×
83
            txids,
×
84
            outpoints,
×
85
            usize::MAX,
×
86
            parallel_requests,
×
87
        )
×
88
        .await
×
89
        .map(|(g, _)| g)
×
90
    }
×
91
}
92

93
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
94
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
95
impl EsploraAsyncExt for esplora_client::AsyncClient {
96
    async fn update_local_chain(
×
97
        &self,
×
98
        local_tip: Option<CheckPoint>,
×
99
        request_heights: impl IntoIterator<IntoIter = impl Iterator<Item = u32> + Send> + Send,
×
100
    ) -> Result<local_chain::Update, Error> {
×
101
        let request_heights = request_heights.into_iter().collect::<BTreeSet<_>>();
×
102
        let new_tip_height = self.get_height().await?;
×
103

104
        // atomically fetch blocks from esplora
105
        let mut fetched_blocks = {
×
106
            let heights = (0..=new_tip_height).rev();
×
107
            let hashes = self
×
108
                .get_blocks(Some(new_tip_height))
×
109
                .await?
×
110
                .into_iter()
×
111
                .map(|b| b.id);
×
112
            heights.zip(hashes).collect::<BTreeMap<u32, BlockHash>>()
×
113
        };
114

115
        // fetch heights that the caller is interested in
116
        for height in request_heights {
×
117
            // do not fetch blocks higher than remote tip
118
            if height > new_tip_height {
×
119
                continue;
×
120
            }
×
121
            // only fetch what is missing
122
            if let btree_map::Entry::Vacant(entry) = fetched_blocks.entry(height) {
×
123
                let hash = self.get_block_hash(height).await?;
×
124
                entry.insert(hash);
×
125
            }
×
126
        }
127

128
        // find the earliest point of agreement between local chain and fetched chain
129
        let earliest_agreement_cp = {
×
130
            let mut earliest_agreement_cp = Option::<CheckPoint>::None;
×
131

132
            if let Some(local_tip) = local_tip {
×
133
                let local_tip_height = local_tip.height();
×
134
                for local_cp in local_tip.iter() {
×
135
                    let local_block = local_cp.block_id();
×
136

137
                    // the updated hash (block hash at this height after the update), can either be:
138
                    // 1. a block that already existed in `fetched_blocks`
139
                    // 2. a block that exists locally and atleast has a depth of ASSUME_FINAL_DEPTH
140
                    // 3. otherwise we can freshly fetch the block from remote, which is safe as it
141
                    //    is guaranteed that this would be at or below ASSUME_FINAL_DEPTH from the
142
                    //    remote tip
143
                    let updated_hash = match fetched_blocks.entry(local_block.height) {
×
144
                        btree_map::Entry::Occupied(entry) => *entry.get(),
×
145
                        btree_map::Entry::Vacant(entry) => *entry.insert(
×
146
                            if local_tip_height - local_block.height >= ASSUME_FINAL_DEPTH {
×
147
                                local_block.hash
×
148
                            } else {
149
                                self.get_block_hash(local_block.height).await?
×
150
                            },
151
                        ),
152
                    };
153

154
                    // since we may introduce blocks below the point of agreement, we cannot break
155
                    // here unconditionally - we only break if we guarantee there are no new heights
156
                    // below our current local checkpoint
157
                    if local_block.hash == updated_hash {
×
158
                        earliest_agreement_cp = Some(local_cp);
×
159

×
160
                        let first_new_height = *fetched_blocks
×
161
                            .keys()
×
162
                            .next()
×
163
                            .expect("must have atleast one new block");
×
164
                        if first_new_height >= local_block.height {
×
165
                            break;
×
166
                        }
×
167
                    }
×
168
                }
169
            }
×
170

171
            earliest_agreement_cp
×
172
        };
173

174
        let tip = {
×
175
            // first checkpoint to use for the update chain
×
176
            let first_cp = match earliest_agreement_cp {
×
177
                Some(cp) => cp,
×
178
                None => {
179
                    let (&height, &hash) = fetched_blocks
×
180
                        .iter()
×
181
                        .next()
×
182
                        .expect("must have atleast one new block");
×
183
                    CheckPoint::new(BlockId { height, hash })
×
184
                }
185
            };
186
            // transform fetched chain into the update chain
187
            fetched_blocks
×
188
                // we exclude anything at or below the first cp of the update chain otherwise
×
189
                // building the chain will fail
×
190
                .split_off(&(first_cp.height() + 1))
×
191
                .into_iter()
×
192
                .map(|(height, hash)| BlockId { height, hash })
×
193
                .fold(first_cp, |prev_cp, block| {
×
194
                    prev_cp.push(block).expect("must extend checkpoint")
×
195
                })
×
196
        };
×
197

×
198
        Ok(local_chain::Update {
×
199
            tip,
×
200
            introduce_older_blocks: true,
×
201
        })
×
202
    }
×
203

204
    async fn update_tx_graph<K: Ord + Clone + Send>(
×
205
        &self,
×
206
        keychain_spks: BTreeMap<
×
207
            K,
×
208
            impl IntoIterator<IntoIter = impl Iterator<Item = (u32, ScriptBuf)> + Send> + Send,
×
209
        >,
×
210
        txids: impl IntoIterator<IntoIter = impl Iterator<Item = Txid> + Send> + Send,
×
211
        outpoints: impl IntoIterator<IntoIter = impl Iterator<Item = OutPoint> + Send> + Send,
×
212
        stop_gap: usize,
×
213
        parallel_requests: usize,
×
214
    ) -> Result<(TxGraph<ConfirmationTimeAnchor>, BTreeMap<K, u32>), Error> {
×
215
        type TxsOfSpkIndex = (u32, Vec<esplora_client::Tx>);
216
        let parallel_requests = Ord::max(parallel_requests, 1);
×
217
        let mut graph = TxGraph::<ConfirmationTimeAnchor>::default();
×
218
        let mut last_active_indexes = BTreeMap::<K, u32>::new();
×
219

220
        for (keychain, spks) in keychain_spks {
×
221
            let mut spks = spks.into_iter();
×
222
            let mut last_index = Option::<u32>::None;
×
223
            let mut last_active_index = Option::<u32>::None;
×
224

225
            loop {
×
226
                let handles = spks
×
227
                    .by_ref()
×
228
                    .take(parallel_requests)
×
229
                    .map(|(spk_index, spk)| {
×
230
                        let client = self.clone();
×
231
                        async move {
×
232
                            let mut last_seen = None;
×
233
                            let mut spk_txs = Vec::new();
×
234
                            loop {
235
                                let txs = client.scripthash_txs(&spk, last_seen).await?;
×
236
                                let tx_count = txs.len();
×
237
                                last_seen = txs.last().map(|tx| tx.txid);
×
238
                                spk_txs.extend(txs);
×
239
                                if tx_count < 25 {
×
240
                                    break Result::<_, Error>::Ok((spk_index, spk_txs));
×
241
                                }
×
242
                            }
243
                        }
×
244
                    })
×
245
                    .collect::<FuturesOrdered<_>>();
×
246

×
247
                if handles.is_empty() {
×
248
                    break;
×
249
                }
×
250

251
                for (index, txs) in handles.try_collect::<Vec<TxsOfSpkIndex>>().await? {
×
252
                    last_index = Some(index);
×
253
                    if !txs.is_empty() {
×
254
                        last_active_index = Some(index);
×
255
                    }
×
256
                    for tx in txs {
×
257
                        let _ = graph.insert_tx(tx.to_tx());
×
258
                        if let Some(anchor) = anchor_from_status(&tx.status) {
×
259
                            let _ = graph.insert_anchor(tx.txid, anchor);
×
260
                        }
×
261
                    }
262
                }
263

264
                if last_index > last_active_index.map(|i| i + stop_gap as u32) {
×
265
                    break;
×
266
                }
×
267
            }
268

269
            if let Some(last_active_index) = last_active_index {
×
270
                last_active_indexes.insert(keychain, last_active_index);
×
271
            }
×
272
        }
273

274
        let mut txids = txids.into_iter();
×
275
        loop {
×
276
            let handles = txids
×
277
                .by_ref()
×
278
                .take(parallel_requests)
×
279
                .filter(|&txid| graph.get_tx(txid).is_none())
×
280
                .map(|txid| {
×
281
                    let client = self.clone();
×
282
                    async move { client.get_tx_status(&txid).await.map(|s| (txid, s)) }
×
283
                })
×
284
                .collect::<FuturesOrdered<_>>();
×
285

×
286
            if handles.is_empty() {
×
287
                break;
×
288
            }
×
289

290
            for (txid, status) in handles.try_collect::<Vec<(Txid, TxStatus)>>().await? {
×
291
                if let Some(anchor) = anchor_from_status(&status) {
×
292
                    let _ = graph.insert_anchor(txid, anchor);
×
293
                }
×
294
            }
295
        }
296

297
        for op in outpoints.into_iter() {
×
298
            if graph.get_tx(op.txid).is_none() {
×
299
                if let Some(tx) = self.get_tx(&op.txid).await? {
×
300
                    let _ = graph.insert_tx(tx);
×
301
                }
×
302
                let status = self.get_tx_status(&op.txid).await?;
×
303
                if let Some(anchor) = anchor_from_status(&status) {
×
304
                    let _ = graph.insert_anchor(op.txid, anchor);
×
305
                }
×
306
            }
×
307

308
            if let Some(op_status) = self.get_output_status(&op.txid, op.vout as _).await? {
×
309
                if let Some(txid) = op_status.txid {
×
310
                    if graph.get_tx(txid).is_none() {
×
311
                        if let Some(tx) = self.get_tx(&txid).await? {
×
312
                            let _ = graph.insert_tx(tx);
×
313
                        }
×
314
                        let status = self.get_tx_status(&txid).await?;
×
315
                        if let Some(anchor) = anchor_from_status(&status) {
×
316
                            let _ = graph.insert_anchor(txid, anchor);
×
317
                        }
×
318
                    }
×
319
                }
×
320
            }
×
321
        }
322

323
        Ok((graph, last_active_indexes))
×
324
    }
×
325
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc