• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bitcoindevkit / bdk / 5984736767

26 Aug 2023 12:20PM UTC coverage: 78.856% (+0.2%) from 78.694%
5984736767

Pull #1084

github

web-flow
Merge 8bdb5a43d into 8f978f86b
Pull Request #1084: Enhance bdk chain structures

86 of 86 new or added lines in 5 files covered. (100.0%)

8022 of 10173 relevant lines covered (78.86%)

5091.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/crates/esplora/src/blocking_ext.rs
1
use std::thread::JoinHandle;
2

3
use bdk_chain::collections::btree_map;
4
use bdk_chain::collections::{BTreeMap, BTreeSet};
5
use bdk_chain::{
6
    bitcoin::{BlockHash, OutPoint, ScriptBuf, Txid},
7
    local_chain::{self, CheckPoint},
8
    BlockId, ConfirmationTimeAnchor, TxGraph,
9
};
10
use esplora_client::{Error, TxStatus};
11

12
use crate::{anchor_from_status, ASSUME_FINAL_DEPTH};
13

14
/// Trait to extend the functionality of [`esplora_client::BlockingClient`].
15
///
16
/// Refer to [crate-level documentation] for more.
17
///
18
/// [crate-level documentation]: crate
19
pub trait EsploraExt {
20
    /// Prepare an [`LocalChain`] update with blocks fetched from Esplora.
21
    ///
22
    /// * `prev_tip` is the previous tip of [`LocalChain::tip`].
23
    /// * `get_heights` is the block heights that we are interested in fetching from Esplora.
24
    ///
25
    /// The result of this method can be applied to [`LocalChain::apply_update`].
26
    ///
27
    /// [`LocalChain`]: bdk_chain::local_chain::LocalChain
28
    /// [`LocalChain::tip`]: bdk_chain::local_chain::LocalChain::tip
29
    /// [`LocalChain::apply_update`]: bdk_chain::local_chain::LocalChain::apply_update
30
    #[allow(clippy::result_large_err)]
31
    fn update_local_chain(
32
        &self,
33
        local_tip: Option<CheckPoint>,
34
        request_heights: impl IntoIterator<Item = u32>,
35
    ) -> Result<local_chain::Update, Error>;
36

37
    /// Scan Esplora for the data specified and return a [`TxGraph`] and a map of last active
38
    /// indices.
39
    ///
40
    /// * `keychain_spks`: keychains that we want to scan transactions for
41
    /// * `txids`: transactions for which we want updated [`ConfirmationTimeAnchor`]s
42
    /// * `outpoints`: transactions associated with these outpoints (residing, spending) that we
43
    ///     want to include in the update
44
    ///
45
    /// The scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated
46
    /// transactions. `parallel_requests` specifies the max number of HTTP requests to make in
47
    /// parallel.
48
    #[allow(clippy::result_large_err)]
49
    fn update_tx_graph<K: Ord + Clone>(
50
        &self,
51
        keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, ScriptBuf)>>,
52
        txids: impl IntoIterator<Item = Txid>,
53
        outpoints: impl IntoIterator<Item = OutPoint>,
54
        stop_gap: usize,
55
        parallel_requests: usize,
56
    ) -> Result<(TxGraph<ConfirmationTimeAnchor>, BTreeMap<K, u32>), Error>;
57

58
    /// Convenience method to call [`update_tx_graph`] without requiring a keychain.
59
    ///
60
    /// [`update_tx_graph`]: EsploraExt::update_tx_graph
61
    #[allow(clippy::result_large_err)]
62
    fn update_tx_graph_without_keychain(
×
63
        &self,
×
64
        misc_spks: impl IntoIterator<Item = ScriptBuf>,
×
65
        txids: impl IntoIterator<Item = Txid>,
×
66
        outpoints: impl IntoIterator<Item = OutPoint>,
×
67
        parallel_requests: usize,
×
68
    ) -> Result<TxGraph<ConfirmationTimeAnchor>, Error> {
×
69
        self.update_tx_graph(
×
70
            [(
×
71
                (),
×
72
                misc_spks
×
73
                    .into_iter()
×
74
                    .enumerate()
×
75
                    .map(|(i, spk)| (i as u32, spk)),
×
76
            )]
×
77
            .into(),
×
78
            txids,
×
79
            outpoints,
×
80
            usize::MAX,
×
81
            parallel_requests,
×
82
        )
×
83
        .map(|(g, _)| g)
×
84
    }
×
85
}
86

87
impl EsploraExt for esplora_client::BlockingClient {
88
    fn update_local_chain(
×
89
        &self,
×
90
        local_tip: Option<CheckPoint>,
×
91
        request_heights: impl IntoIterator<Item = u32>,
×
92
    ) -> Result<local_chain::Update, Error> {
×
93
        let request_heights = request_heights.into_iter().collect::<BTreeSet<_>>();
×
94
        let new_tip_height = self.get_height()?;
×
95

96
        // atomically fetch blocks from esplora
97
        let mut fetched_blocks = {
×
98
            let heights = (0..=new_tip_height).rev();
×
99
            let hashes = self
×
100
                .get_blocks(Some(new_tip_height))?
×
101
                .into_iter()
×
102
                .map(|b| b.id);
×
103
            heights.zip(hashes).collect::<BTreeMap<u32, BlockHash>>()
×
104
        };
105

106
        // fetch heights that the caller is interested in
107
        for height in request_heights {
×
108
            // do not fetch blocks higher than remote tip
109
            if height > new_tip_height {
×
110
                continue;
×
111
            }
×
112
            // only fetch what is missing
113
            if let btree_map::Entry::Vacant(entry) = fetched_blocks.entry(height) {
×
114
                let hash = self.get_block_hash(height)?;
×
115
                entry.insert(hash);
×
116
            }
×
117
        }
118

119
        // find the earliest point of agreement between local chain and fetched chain
120
        let earliest_agreement_cp = {
×
121
            let mut earliest_agreement_cp = Option::<CheckPoint>::None;
×
122

123
            if let Some(local_tip) = local_tip {
×
124
                let local_tip_height = local_tip.height();
×
125
                for local_cp in local_tip.iter() {
×
126
                    let local_block = local_cp.block_id();
×
127

128
                    // the updated hash (block hash at this height after the update), can either be:
129
                    // 1. a block that already existed in `fetched_blocks`
130
                    // 2. a block that exists locally and atleast has a depth of ASSUME_FINAL_DEPTH
131
                    // 3. otherwise we can freshly fetch the block from remote, which is safe as it
132
                    //    is guaranteed that this would be at or below ASSUME_FINAL_DEPTH from the
133
                    //    remote tip
134
                    let updated_hash = match fetched_blocks.entry(local_block.height) {
×
135
                        btree_map::Entry::Occupied(entry) => *entry.get(),
×
136
                        btree_map::Entry::Vacant(entry) => *entry.insert(
×
137
                            if local_tip_height - local_block.height >= ASSUME_FINAL_DEPTH {
×
138
                                local_block.hash
×
139
                            } else {
140
                                self.get_block_hash(local_block.height)?
×
141
                            },
142
                        ),
143
                    };
144

145
                    // since we may introduce blocks below the point of agreement, we cannot break
146
                    // here unconditionally - we only break if we guarantee there are no new heights
147
                    // below our current local checkpoint
148
                    if local_block.hash == updated_hash {
×
149
                        earliest_agreement_cp = Some(local_cp);
×
150

×
151
                        let first_new_height = *fetched_blocks
×
152
                            .keys()
×
153
                            .next()
×
154
                            .expect("must have atleast one new block");
×
155
                        if first_new_height >= local_block.height {
×
156
                            break;
×
157
                        }
×
158
                    }
×
159
                }
160
            }
×
161

162
            earliest_agreement_cp
×
163
        };
164

165
        let tip = {
×
166
            // first checkpoint to use for the update chain
×
167
            let first_cp = match earliest_agreement_cp {
×
168
                Some(cp) => cp,
×
169
                None => {
170
                    let (&height, &hash) = fetched_blocks
×
171
                        .iter()
×
172
                        .next()
×
173
                        .expect("must have atleast one new block");
×
174
                    CheckPoint::new(BlockId { height, hash })
×
175
                }
176
            };
177
            // transform fetched chain into the update chain
178
            fetched_blocks
×
179
                // we exclude anything at or below the first cp of the update chain otherwise
×
180
                // building the chain will fail
×
181
                .split_off(&(first_cp.height() + 1))
×
182
                .into_iter()
×
183
                .map(|(height, hash)| BlockId { height, hash })
×
184
                .fold(first_cp, |prev_cp, block| {
×
185
                    prev_cp.push(block).expect("must extend checkpoint")
×
186
                })
×
187
        };
×
188

×
189
        Ok(local_chain::Update {
×
190
            tip,
×
191
            introduce_older_blocks: true,
×
192
        })
×
193
    }
×
194

195
    fn update_tx_graph<K: Ord + Clone>(
×
196
        &self,
×
197
        keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, ScriptBuf)>>,
×
198
        txids: impl IntoIterator<Item = Txid>,
×
199
        outpoints: impl IntoIterator<Item = OutPoint>,
×
200
        stop_gap: usize,
×
201
        parallel_requests: usize,
×
202
    ) -> Result<(TxGraph<ConfirmationTimeAnchor>, BTreeMap<K, u32>), Error> {
×
203
        type TxsOfSpkIndex = (u32, Vec<esplora_client::Tx>);
×
204
        let parallel_requests = Ord::max(parallel_requests, 1);
×
205
        let mut graph = TxGraph::<ConfirmationTimeAnchor>::default();
×
206
        let mut last_active_indexes = BTreeMap::<K, u32>::new();
×
207

208
        for (keychain, spks) in keychain_spks {
×
209
            let mut spks = spks.into_iter();
×
210
            let mut last_index = Option::<u32>::None;
×
211
            let mut last_active_index = Option::<u32>::None;
×
212

213
            loop {
×
214
                let handles = spks
×
215
                    .by_ref()
×
216
                    .take(parallel_requests)
×
217
                    .map(|(spk_index, spk)| {
×
218
                        std::thread::spawn({
×
219
                            let client = self.clone();
×
220
                            move || -> Result<TxsOfSpkIndex, Error> {
×
221
                                let mut last_seen = None;
×
222
                                let mut spk_txs = Vec::new();
×
223
                                loop {
224
                                    let txs = client.scripthash_txs(&spk, last_seen)?;
×
225
                                    let tx_count = txs.len();
×
226
                                    last_seen = txs.last().map(|tx| tx.txid);
×
227
                                    spk_txs.extend(txs);
×
228
                                    if tx_count < 25 {
×
229
                                        break Ok((spk_index, spk_txs));
×
230
                                    }
×
231
                                }
232
                            }
×
233
                        })
×
234
                    })
×
235
                    .collect::<Vec<JoinHandle<Result<TxsOfSpkIndex, Error>>>>();
×
236

×
237
                if handles.is_empty() {
×
238
                    break;
×
239
                }
×
240

241
                for handle in handles {
×
242
                    let (index, txs) = handle.join().expect("thread must not panic")?;
×
243
                    last_index = Some(index);
×
244
                    if !txs.is_empty() {
×
245
                        last_active_index = Some(index);
×
246
                    }
×
247
                    for tx in txs {
×
248
                        let _ = graph.insert_tx(tx.to_tx());
×
249
                        if let Some(anchor) = anchor_from_status(&tx.status) {
×
250
                            let _ = graph.insert_anchor(tx.txid, anchor);
×
251
                        }
×
252
                    }
253
                }
254

255
                if last_index > last_active_index.map(|i| i + stop_gap as u32) {
×
256
                    break;
×
257
                }
×
258
            }
259

260
            if let Some(last_active_index) = last_active_index {
×
261
                last_active_indexes.insert(keychain, last_active_index);
×
262
            }
×
263
        }
264

265
        let mut txids = txids.into_iter();
×
266
        loop {
×
267
            let handles = txids
×
268
                .by_ref()
×
269
                .take(parallel_requests)
×
270
                .filter(|&txid| graph.get_tx(txid).is_none())
×
271
                .map(|txid| {
×
272
                    std::thread::spawn({
×
273
                        let client = self.clone();
×
274
                        move || client.get_tx_status(&txid).map(|s| (txid, s))
×
275
                    })
×
276
                })
×
277
                .collect::<Vec<JoinHandle<Result<(Txid, TxStatus), Error>>>>();
×
278

×
279
            if handles.is_empty() {
×
280
                break;
×
281
            }
×
282

283
            for handle in handles {
×
284
                let (txid, status) = handle.join().expect("thread must not panic")?;
×
285
                if let Some(anchor) = anchor_from_status(&status) {
×
286
                    let _ = graph.insert_anchor(txid, anchor);
×
287
                }
×
288
            }
289
        }
290

291
        for op in outpoints.into_iter() {
×
292
            if graph.get_tx(op.txid).is_none() {
×
293
                if let Some(tx) = self.get_tx(&op.txid)? {
×
294
                    let _ = graph.insert_tx(tx);
×
295
                }
×
296
                let status = self.get_tx_status(&op.txid)?;
×
297
                if let Some(anchor) = anchor_from_status(&status) {
×
298
                    let _ = graph.insert_anchor(op.txid, anchor);
×
299
                }
×
300
            }
×
301

302
            if let Some(op_status) = self.get_output_status(&op.txid, op.vout as _)? {
×
303
                if let Some(txid) = op_status.txid {
×
304
                    if graph.get_tx(txid).is_none() {
×
305
                        if let Some(tx) = self.get_tx(&txid)? {
×
306
                            let _ = graph.insert_tx(tx);
×
307
                        }
×
308
                        let status = self.get_tx_status(&txid)?;
×
309
                        if let Some(anchor) = anchor_from_status(&status) {
×
310
                            let _ = graph.insert_anchor(txid, anchor);
×
311
                        }
×
312
                    }
×
313
                }
×
314
            }
×
315
        }
316

317
        Ok((graph, last_active_indexes))
×
318
    }
×
319
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc