• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bitcoindevkit / bdk / 5471370832

pending completion
5471370832

Pull #1028

github

web-flow
Merge d99e4a595 into 81c761339
Pull Request #1028: Add CreateTxError and use as error type for TxBuilder::finish()

47 of 47 new or added lines in 2 files covered. (100.0%)

7715 of 9720 relevant lines covered (79.37%)

5270.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/crates/esplora/src/async_ext.rs
1
use async_trait::async_trait;
2
use bdk_chain::{
3
    bitcoin::{BlockHash, OutPoint, Script, Txid},
4
    collections::BTreeMap,
5
    keychain::LocalUpdate,
6
    BlockId, ConfirmationTimeAnchor,
7
};
8
use esplora_client::{Error, OutputStatus, TxStatus};
9
use futures::{stream::FuturesOrdered, TryStreamExt};
10

11
use crate::map_confirmation_time_anchor;
12

13
/// Trait to extend [`esplora_client::AsyncClient`] functionality.
14
///
15
/// This is the async version of [`EsploraExt`]. Refer to
16
/// [crate-level documentation] for more.
17
///
18
/// [`EsploraExt`]: crate::EsploraExt
19
/// [crate-level documentation]: crate
20
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
21
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
22
pub trait EsploraAsyncExt {
23
    /// Scan the blockchain (via esplora) for the data specified and returns a
24
    /// [`LocalUpdate<K, ConfirmationTimeAnchor>`].
25
    ///
26
    /// - `local_chain`: the most recent block hashes present locally
27
    /// - `keychain_spks`: keychains that we want to scan transactions for
28
    /// - `txids`: transactions for which we want updated [`ConfirmationTimeAnchor`]s
29
    /// - `outpoints`: transactions associated with these outpoints (residing, spending) that we
30
    ///     want to included in the update
31
    ///
32
    /// The scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated
33
    /// transactions. `parallel_requests` specifies the max number of HTTP requests to make in
34
    /// parallel.
35
    #[allow(clippy::result_large_err)] // FIXME
36
    async fn scan<K: Ord + Clone + Send>(
37
        &self,
38
        local_chain: &BTreeMap<u32, BlockHash>,
39
        keychain_spks: BTreeMap<
40
            K,
41
            impl IntoIterator<IntoIter = impl Iterator<Item = (u32, Script)> + Send> + Send,
42
        >,
43
        txids: impl IntoIterator<IntoIter = impl Iterator<Item = Txid> + Send> + Send,
44
        outpoints: impl IntoIterator<IntoIter = impl Iterator<Item = OutPoint> + Send> + Send,
45
        stop_gap: usize,
46
        parallel_requests: usize,
47
    ) -> Result<LocalUpdate<K, ConfirmationTimeAnchor>, Error>;
48

49
    /// Convenience method to call [`scan`] without requiring a keychain.
50
    ///
51
    /// [`scan`]: EsploraAsyncExt::scan
52
    #[allow(clippy::result_large_err)] // FIXME
53
    async fn scan_without_keychain(
×
54
        &self,
×
55
        local_chain: &BTreeMap<u32, BlockHash>,
×
56
        misc_spks: impl IntoIterator<IntoIter = impl Iterator<Item = Script> + Send> + Send,
×
57
        txids: impl IntoIterator<IntoIter = impl Iterator<Item = Txid> + Send> + Send,
×
58
        outpoints: impl IntoIterator<IntoIter = impl Iterator<Item = OutPoint> + Send> + Send,
×
59
        parallel_requests: usize,
×
60
    ) -> Result<LocalUpdate<(), ConfirmationTimeAnchor>, Error> {
×
61
        self.scan(
×
62
            local_chain,
×
63
            [(
×
64
                (),
×
65
                misc_spks
×
66
                    .into_iter()
×
67
                    .enumerate()
×
68
                    .map(|(i, spk)| (i as u32, spk)),
×
69
            )]
×
70
            .into(),
×
71
            txids,
×
72
            outpoints,
×
73
            usize::MAX,
×
74
            parallel_requests,
×
75
        )
×
76
        .await
×
77
    }
×
78
}
79

80
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
81
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
82
impl EsploraAsyncExt for esplora_client::AsyncClient {
83
    #[allow(clippy::result_large_err)] // FIXME
84
    async fn scan<K: Ord + Clone + Send>(
×
85
        &self,
×
86
        local_chain: &BTreeMap<u32, BlockHash>,
×
87
        keychain_spks: BTreeMap<
×
88
            K,
×
89
            impl IntoIterator<IntoIter = impl Iterator<Item = (u32, Script)> + Send> + Send,
×
90
        >,
×
91
        txids: impl IntoIterator<IntoIter = impl Iterator<Item = Txid> + Send> + Send,
×
92
        outpoints: impl IntoIterator<IntoIter = impl Iterator<Item = OutPoint> + Send> + Send,
×
93
        stop_gap: usize,
×
94
        parallel_requests: usize,
×
95
    ) -> Result<LocalUpdate<K, ConfirmationTimeAnchor>, Error> {
×
96
        let parallel_requests = Ord::max(parallel_requests, 1);
×
97

98
        let (mut update, tip_at_start) = loop {
×
99
            let mut update = LocalUpdate::<K, ConfirmationTimeAnchor>::default();
×
100

101
            for (&height, &original_hash) in local_chain.iter().rev() {
×
102
                let update_block_id = BlockId {
×
103
                    height,
×
104
                    hash: self.get_block_hash(height).await?,
×
105
                };
106
                let _ = update
×
107
                    .chain
×
108
                    .insert_block(update_block_id)
×
109
                    .expect("cannot repeat height here");
×
110
                if update_block_id.hash == original_hash {
×
111
                    break;
×
112
                }
×
113
            }
114

115
            let tip_at_start = BlockId {
×
116
                height: self.get_height().await?,
×
117
                hash: self.get_tip_hash().await?,
×
118
            };
119

120
            if update.chain.insert_block(tip_at_start).is_ok() {
×
121
                break (update, tip_at_start);
×
122
            }
×
123
        };
124

125
        for (keychain, spks) in keychain_spks {
×
126
            let mut spks = spks.into_iter();
×
127
            let mut last_active_index = None;
×
128
            let mut empty_scripts = 0;
×
129
            type IndexWithTxs = (u32, Vec<esplora_client::Tx>);
130

131
            loop {
×
132
                let futures = (0..parallel_requests)
×
133
                    .filter_map(|_| {
×
134
                        let (index, script) = spks.next()?;
×
135
                        let client = self.clone();
×
136
                        Some(async move {
×
137
                            let mut related_txs = client.scripthash_txs(&script, None).await?;
×
138

139
                            let n_confirmed =
×
140
                                related_txs.iter().filter(|tx| tx.status.confirmed).count();
×
141
                            // esplora pages on 25 confirmed transactions. If there are 25 or more we
×
142
                            // keep requesting to see if there's more.
×
143
                            if n_confirmed >= 25 {
×
144
                                loop {
145
                                    let new_related_txs = client
×
146
                                        .scripthash_txs(
×
147
                                            &script,
×
148
                                            Some(related_txs.last().unwrap().txid),
×
149
                                        )
×
150
                                        .await?;
×
151
                                    let n = new_related_txs.len();
×
152
                                    related_txs.extend(new_related_txs);
×
153
                                    // we've reached the end
×
154
                                    if n < 25 {
×
155
                                        break;
×
156
                                    }
×
157
                                }
158
                            }
×
159

160
                            Result::<_, esplora_client::Error>::Ok((index, related_txs))
×
161
                        })
×
162
                    })
×
163
                    .collect::<FuturesOrdered<_>>();
×
164

×
165
                let n_futures = futures.len();
×
166

167
                for (index, related_txs) in futures.try_collect::<Vec<IndexWithTxs>>().await? {
×
168
                    if related_txs.is_empty() {
×
169
                        empty_scripts += 1;
×
170
                    } else {
×
171
                        last_active_index = Some(index);
×
172
                        empty_scripts = 0;
×
173
                    }
×
174
                    for tx in related_txs {
×
175
                        let anchor = map_confirmation_time_anchor(&tx.status, tip_at_start);
×
176

×
177
                        let _ = update.graph.insert_tx(tx.to_tx());
×
178
                        if let Some(anchor) = anchor {
×
179
                            let _ = update.graph.insert_anchor(tx.txid, anchor);
×
180
                        }
×
181
                    }
182
                }
183

184
                if n_futures == 0 || empty_scripts >= stop_gap {
×
185
                    break;
×
186
                }
×
187
            }
188

189
            if let Some(last_active_index) = last_active_index {
×
190
                update.keychain.insert(keychain, last_active_index);
×
191
            }
×
192
        }
193

194
        for txid in txids.into_iter() {
×
195
            if update.graph.get_tx(txid).is_none() {
×
196
                match self.get_tx(&txid).await? {
×
197
                    Some(tx) => {
×
198
                        let _ = update.graph.insert_tx(tx);
×
199
                    }
×
200
                    None => continue,
×
201
                }
202
            }
×
203
            match self.get_tx_status(&txid).await? {
×
204
                tx_status if tx_status.confirmed => {
×
205
                    if let Some(anchor) = map_confirmation_time_anchor(&tx_status, tip_at_start) {
×
206
                        let _ = update.graph.insert_anchor(txid, anchor);
×
207
                    }
×
208
                }
209
                _ => continue,
×
210
            }
211
        }
212

213
        for op in outpoints.into_iter() {
×
214
            let mut op_txs = Vec::with_capacity(2);
×
215
            if let (
216
                Some(tx),
×
217
                tx_status @ TxStatus {
×
218
                    confirmed: true, ..
219
                },
220
            ) = (
221
                self.get_tx(&op.txid).await?,
×
222
                self.get_tx_status(&op.txid).await?,
×
223
            ) {
224
                op_txs.push((tx, tx_status));
×
225
                if let Some(OutputStatus {
226
                    txid: Some(txid),
×
227
                    status: Some(spend_status),
×
228
                    ..
229
                }) = self.get_output_status(&op.txid, op.vout as _).await?
×
230
                {
231
                    if let Some(spend_tx) = self.get_tx(&txid).await? {
×
232
                        op_txs.push((spend_tx, spend_status));
×
233
                    }
×
234
                }
×
235
            }
×
236

237
            for (tx, status) in op_txs {
×
238
                let txid = tx.txid();
×
239
                let anchor = map_confirmation_time_anchor(&status, tip_at_start);
×
240

×
241
                let _ = update.graph.insert_tx(tx);
×
242
                if let Some(anchor) = anchor {
×
243
                    let _ = update.graph.insert_anchor(txid, anchor);
×
244
                }
×
245
            }
246
        }
247

248
        if tip_at_start.hash != self.get_block_hash(tip_at_start.height).await? {
×
249
            // A reorg occurred, so let's find out where all the txids we found are now in the chain
250
            let txids_found = update
×
251
                .graph
×
252
                .full_txs()
×
253
                .map(|tx_node| tx_node.txid)
×
254
                .collect::<Vec<_>>();
×
255
            update.chain = EsploraAsyncExt::scan_without_keychain(
×
256
                self,
×
257
                local_chain,
×
258
                [],
×
259
                txids_found,
×
260
                [],
×
261
                parallel_requests,
×
262
            )
×
263
            .await?
×
264
            .chain;
265
        }
×
266

267
        Ok(update)
×
268
    }
×
269
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc