• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bitcoindevkit / bdk / 5997032315

28 Aug 2023 07:39AM UTC coverage: 79.072% (+0.4%) from 78.694%
5997032315

Pull #1041

github

web-flow
Merge 105d6c601 into 8f978f86b
Pull Request #1041: Add `bitcoind_rpc` chain source module.

328 of 328 new or added lines in 7 files covered. (100.0%)

8199 of 10369 relevant lines covered (79.07%)

5211.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.47
/crates/bitcoind_rpc/src/lib.rs
1
//! This crate is used for updating [`bdk_chain`] structures with data from the `bitcoind` RPC
1✔
2
//! interface (excluding the RPC wallet API).
3
//!
4
//! [`Emitter`] is the main structure which sources blockchain data from [`bitcoincore_rpc::Client`].
5
//!
6
//! To only get block updates (exlude mempool transactions), the caller can use
7
//! [`Emitter::emit_block`] until it returns `Ok(None)` (which means the chain tip is reached). A
8
//! separate method, [`Emitter::emit_mempool`] can be used to emit the whole mempool. Another
9
//! method, [`Emitter::emit_update`] is avaliable, which emits block updates until the block tip is
10
//! reached, then the next update will be the mempool.
11
//!
12
//! # [`IntoIterator`] implementation
13
//!
14
//! [`Emitter`] implements [`IntoIterator`] which transforms itself into [`UpdateIter`]. The
15
//! iterator is implemented in a way that even after a call to [`Iterator::next`] returns [`None`],
16
//! subsequent calls may resume returning [`Some`].
17
//!
18
//! The iterator initially returns blocks in increasing height order. After the chain tip is
19
//! reached, the next update is the mempool. After the mempool update is released, the first
20
//! succeeding call to [`Iterator::next`] will return [`None`].
21
//!
22
//! This logic is useful if the caller wishes to "update once".
23
//!
24
//! ```rust,no_run
25
//! use bdk_bitcoind_rpc::{EmittedUpdate, Emitter};
26
//! # let client: bdk_bitcoind_rpc::bitcoincore_rpc::Client = todo!();
27
//!
28
//! for r in Emitter::new(&client, 709_632, None) {
29
//!     let update = r.expect("todo: deal with the error properly");
30
//!
31
//!     match update.checkpoint() {
32
//!         Some(cp) => println!("block {}:{}", cp.height(), cp.hash()),
33
//!         None => println!("mempool!"),
34
//!     }
35
//! }
36
//! ```
37
//!
38
//! Alternatively, if the caller wishes to keep [`Emitter`] in a dedicated update-thread, the caller
39
//! can continue to poll [`Iterator::next`] with a delay.
40

41
#![warn(missing_docs)]
42

43
use bdk_chain::{
44
    bitcoin::{Block, Transaction},
45
    indexed_tx_graph::TxItem,
46
    local_chain::{self, CheckPoint},
47
    BlockId, ConfirmationHeightAnchor, ConfirmationTimeAnchor,
48
};
49
pub use bitcoincore_rpc;
50
use bitcoincore_rpc::{json::GetBlockResult, RpcApi};
51
use std::fmt::Debug;
52

53
/// An update emitted from [`Emitter`]. This can either be of a block or a subset of
54
/// mempool transactions.
55
#[derive(Debug, Clone)]
×
56
pub enum EmittedUpdate {
57
    /// An emitted block.
58
    Block(EmittedBlock),
59
    /// An emitted subset of mempool transactions.
60
    ///
61
    /// [`Emitter`] attempts to avoid re-emitting transactions.
62
    Mempool(EmittedMempool),
63
}
64

65
impl EmittedUpdate {
66
    /// Returns whether the update is of a subset of the mempool.
67
    pub fn is_mempool(&self) -> bool {
1✔
68
        matches!(self, Self::Mempool { .. })
1✔
69
    }
1✔
70

71
    /// Returns whether the update is of a block.
72
    pub fn is_block(&self) -> bool {
1✔
73
        matches!(self, Self::Block { .. })
1✔
74
    }
1✔
75

76
    /// Get the emission's checkpoint.
77
    ///
78
    /// The emission will only have a checkpoint if it is the [`EmittedUpdate::Block`] variant.
79
    pub fn checkpoint(&self) -> Option<CheckPoint> {
104✔
80
        match self {
104✔
81
            EmittedUpdate::Block(e) => Some(e.checkpoint()),
103✔
82
            EmittedUpdate::Mempool(_) => None,
1✔
83
        }
84
    }
104✔
85

86
    /// Convenience method to get [`local_chain::Update`].
87
    pub fn chain_update(&self) -> Option<local_chain::Update> {
104✔
88
        Some(local_chain::Update {
104✔
89
            tip: self.checkpoint()?,
104✔
90
            introduce_older_blocks: false,
91
        })
92
    }
104✔
93

94
    /// Return transaction items to be consumed by [`IndexedTxGraph::insert_relevant_txs`].
95
    ///
96
    /// The `anchor_map` parameter takes in a closure that creates anchors of a specific type.
97
    /// [`confirmation_height_anchor`] and [`confirmation_time_anchor`] are avaliable to create
98
    /// updates with [`ConfirmationHeightAnchor`] and [`ConfirmationTimeAnchor`] respectively.
99
    ///
100
    /// [`IndexedTxGraph::insert_relevant_txs`]: bdk_chain::IndexedTxGraph::insert_relevant_txs
101
    pub fn indexed_tx_graph_update<M, A>(&self, anchor_map: M) -> Vec<TxItem<'_, Option<A>>>
105✔
102
    where
105✔
103
        M: Fn(&CheckPoint, &Block, usize) -> A,
105✔
104
        A: Clone + Ord + PartialEq,
105✔
105
    {
105✔
106
        match self {
105✔
107
            EmittedUpdate::Block(e) => e.indexed_tx_graph_update(anchor_map).collect(),
103✔
108
            EmittedUpdate::Mempool(e) => e.indexed_tx_graph_update().collect(),
2✔
109
        }
110
    }
105✔
111
}
112

113
/// An emitted block.
114
#[derive(Debug, Clone)]
×
115
pub struct EmittedBlock {
116
    /// The checkpoint constructed from the block's height/hash and connected to the previous block.
117
    pub cp: CheckPoint,
118
    /// The actual block of the chain.
119
    pub block: Block,
120
}
121

122
impl EmittedBlock {
123
    /// Get the emission's checkpoint.
124
    pub fn checkpoint(&self) -> CheckPoint {
211✔
125
        self.cp.clone()
211✔
126
    }
211✔
127

128
    /// Convenience method to get [`local_chain::Update`].
129
    pub fn chain_update(&self) -> local_chain::Update {
×
130
        local_chain::Update {
×
131
            tip: self.cp.clone(),
×
132
            introduce_older_blocks: false,
×
133
        }
×
134
    }
×
135

136
    /// Return transaction items to be consumed by [`IndexedTxGraph::insert_relevant_txs`].
137
    ///
138
    /// Refer to [`EmittedUpdate::indexed_tx_graph_update`] for more.
139
    ///
140
    /// [`IndexedTxGraph::insert_relevant_txs`]: bdk_chain::IndexedTxGraph::insert_relevant_txs
141
    pub fn indexed_tx_graph_update<M, A>(
103✔
142
        &self,
103✔
143
        anchor_map: M,
103✔
144
    ) -> impl Iterator<Item = TxItem<'_, Option<A>>>
103✔
145
    where
103✔
146
        M: Fn(&CheckPoint, &Block, usize) -> A,
103✔
147
        A: Clone + Ord + PartialEq,
103✔
148
    {
103✔
149
        self.block
103✔
150
            .txdata
103✔
151
            .iter()
103✔
152
            .enumerate()
103✔
153
            .map(move |(i, tx)| (tx, Some(anchor_map(&self.cp, &self.block, i)), None))
106✔
154
    }
103✔
155
}
156

157
/// An emitted subset of mempool transactions.
158
#[derive(Debug, Clone)]
×
159
pub struct EmittedMempool {
160
    /// Subset of mempool transactions as tuples of `(tx, seen_at)`.
161
    ///
162
    /// `seen_at` is the unix timestamp of when the transaction was first seen in the mempool.
163
    pub txs: Vec<(Transaction, u64)>,
164
}
165

166
impl EmittedMempool {
167
    /// Return transaction items to be consumed by [`IndexedTxGraph::insert_relevant_txs`].
168
    ///
169
    /// Refer to [`EmittedUpdate::indexed_tx_graph_update`] for more.
170
    ///
171
    /// [`IndexedTxGraph::insert_relevant_txs`]: bdk_chain::IndexedTxGraph::insert_relevant_txs
172
    pub fn indexed_tx_graph_update<A>(&self) -> impl Iterator<Item = TxItem<'_, Option<A>>>
2✔
173
    where
2✔
174
        A: Clone + Ord + PartialEq,
2✔
175
    {
2✔
176
        self.txs
2✔
177
            .iter()
2✔
178
            .map(|(tx, seen_at)| (tx, None, Some(*seen_at)))
3✔
179
    }
2✔
180
}
181

182
/// A closure that transforms a [`EmittedUpdate`] into a [`ConfirmationHeightAnchor`].
183
///
184
/// This is to be used as an input to [`EmittedUpdate::indexed_tx_graph_update`].
185
pub fn confirmation_height_anchor(
106✔
186
    cp: &CheckPoint,
106✔
187
    _block: &Block,
106✔
188
    _tx_pos: usize,
106✔
189
) -> ConfirmationHeightAnchor {
106✔
190
    let anchor_block = cp.block_id();
106✔
191
    ConfirmationHeightAnchor {
106✔
192
        anchor_block,
106✔
193
        confirmation_height: anchor_block.height,
106✔
194
    }
106✔
195
}
106✔
196

197
/// A closure that transforms a [`EmittedUpdate`] into a [`ConfirmationTimeAnchor`].
198
///
199
/// This is to be used as an input to [`EmittedUpdate::indexed_tx_graph_update`].
200
pub fn confirmation_time_anchor(
×
201
    cp: &CheckPoint,
×
202
    block: &Block,
×
203
    _tx_pos: usize,
×
204
) -> ConfirmationTimeAnchor {
×
205
    let anchor_block = cp.block_id();
×
206
    ConfirmationTimeAnchor {
×
207
        anchor_block,
×
208
        confirmation_height: anchor_block.height,
×
209
        confirmation_time: block.header.time as _,
×
210
    }
×
211
}
×
212

213
/// A structure that emits updates for [`bdk_chain`] structures, sourcing blockchain data from
214
/// [`bitcoincore_rpc::Client`].
215
///
216
/// Refer to [module-level documentation] for more.
217
///
218
/// [module-level documentation]: crate
219
pub struct Emitter<'c, C> {
220
    client: &'c C,
221
    fallback_height: u32,
222

223
    last_cp: Option<CheckPoint>,
224
    last_info: Option<GetBlockResult>,
225
}
226

227
impl<'c, C: RpcApi> IntoIterator for Emitter<'c, C> {
228
    type Item = <UpdateIter<'c, C> as Iterator>::Item;
229
    type IntoIter = UpdateIter<'c, C>;
230

231
    fn into_iter(self) -> Self::IntoIter {
1✔
232
        UpdateIter {
1✔
233
            emitter: self,
1✔
234
            last_emission_was_mempool: false,
1✔
235
        }
1✔
236
    }
1✔
237
}
238

239
impl<'c, C: RpcApi> Emitter<'c, C> {
240
    /// Constructs a new [`Emitter`] with the provided [`bitcoincore_rpc::Client`].
241
    ///
242
    /// * `fallback_height` is the block height to start from if `last_cp` is not provided, or a
243
    ///     point of agreement is not found.
244
    /// * `last_cp` is the last known checkpoint to build updates on (if any).
245
    pub fn new(client: &'c C, fallback_height: u32, last_cp: Option<CheckPoint>) -> Self {
5✔
246
        Self {
5✔
247
            client,
5✔
248
            fallback_height,
5✔
249
            last_cp,
5✔
250
            last_info: None,
5✔
251
        }
5✔
252
    }
5✔
253

254
    /// Emits the whole mempool contents.
255
    pub fn emit_mempool(&self) -> Result<EmittedMempool, bitcoincore_rpc::Error> {
2✔
256
        let txs = self
2✔
257
            .client
2✔
258
            .get_raw_mempool()?
2✔
259
            .into_iter()
2✔
260
            .map(
2✔
261
                |txid| -> Result<(Transaction, u64), bitcoincore_rpc::Error> {
3✔
262
                    let first_seen = self
3✔
263
                        .client
3✔
264
                        .get_mempool_entry(&txid)
3✔
265
                        .map(|entry| entry.time)?;
3✔
266
                    let tx = self.client.get_raw_transaction(&txid, None)?;
3✔
267
                    Ok((tx, first_seen))
3✔
268
                },
3✔
269
            )
2✔
270
            .collect::<Result<Vec<_>, _>>()?;
2✔
271
        Ok(EmittedMempool { txs })
2✔
272
    }
2✔
273

274
    /// Emits the next block (if any).
275
    pub fn emit_block(&mut self) -> Result<Option<EmittedBlock>, bitcoincore_rpc::Error> {
215✔
276
        enum PollResponse {
277
            /// A new block that is in chain is found. Congratulations!
278
            Block {
279
                cp: CheckPoint,
280
                info: GetBlockResult,
281
            },
282
            /// This either signals that we have reached the tip, or that the blocks ahead are not
283
            /// in the best chain. In either case, we need to find the agreement point again.
284
            NoMoreBlocks,
285
            /// We have exhausted the local checkpoint history and there is no agreement point. We
286
            /// should emit from the fallback height for the next round.
287
            AgreementPointNotFound,
288
            /// We have found an agreement point! Do not emit this one, emit the one higher.
289
            AgreementPointFound {
290
                cp: CheckPoint,
291
                info: GetBlockResult,
292
            },
293
        }
294

295
        fn poll<C>(emitter: &mut Emitter<C>) -> Result<PollResponse, bitcoincore_rpc::Error>
218✔
296
        where
218✔
297
            C: RpcApi,
218✔
298
        {
218✔
299
            let client = emitter.client;
218✔
300

218✔
301
            match (&mut emitter.last_cp, &mut emitter.last_info) {
218✔
302
                (None, None) => {
303
                    let info = client
2✔
304
                        .get_block_info(&client.get_block_hash(emitter.fallback_height as _)?)?;
2✔
305
                    let cp = CheckPoint::new(BlockId {
2✔
306
                        height: info.height as _,
2✔
307
                        hash: info.hash,
2✔
308
                    });
2✔
309
                    Ok(PollResponse::Block { cp, info })
2✔
310
                }
311
                (Some(last_cp), None) => {
3✔
312
                    for cp in last_cp.iter() {
9✔
313
                        let cp_block = cp.block_id();
9✔
314
                        let info = client.get_block_info(&cp_block.hash)?;
9✔
315
                        if info.confirmations < 0 {
9✔
316
                            // block is not in the main chain
317
                            continue;
6✔
318
                        }
3✔
319
                        // agreement point found
3✔
320
                        return Ok(PollResponse::AgreementPointFound { cp, info });
3✔
321
                    }
322
                    // no agreement point found
323
                    Ok(PollResponse::AgreementPointNotFound)
×
324
                }
325
                (Some(last_cp), Some(last_info)) => {
213✔
326
                    let next_hash = match last_info.nextblockhash {
213✔
327
                        None => return Ok(PollResponse::NoMoreBlocks),
4✔
328
                        Some(next_hash) => next_hash,
209✔
329
                    };
330
                    let info = client.get_block_info(&next_hash)?;
209✔
331
                    if info.confirmations < 0 {
209✔
332
                        return Ok(PollResponse::NoMoreBlocks);
×
333
                    }
209✔
334
                    let cp = last_cp
209✔
335
                        .clone()
209✔
336
                        .push(BlockId {
209✔
337
                            height: info.height as _,
209✔
338
                            hash: info.hash,
209✔
339
                        })
209✔
340
                        .expect("must extend from checkpoint");
209✔
341
                    Ok(PollResponse::Block { cp, info })
209✔
342
                }
343
                (None, Some(last_info)) => unreachable!(
×
344
                    "info cannot exist without checkpoint: info={:#?}",
×
345
                    last_info
×
346
                ),
×
347
            }
348
        }
218✔
349

350
        loop {
351
            match poll(self)? {
218✔
352
                PollResponse::Block { cp, info } => {
211✔
353
                    let block = self.client.get_block(&info.hash)?;
211✔
354
                    self.last_cp = Some(cp.clone());
211✔
355
                    self.last_info = Some(info);
211✔
356
                    return Ok(Some(EmittedBlock { cp, block }));
211✔
357
                }
358
                PollResponse::NoMoreBlocks => {
359
                    // we have reached the tip, try find agreement point in next round
360
                    self.last_info = None;
4✔
361
                    return Ok(None);
4✔
362
                }
363
                PollResponse::AgreementPointNotFound => {
364
                    self.last_cp = None;
×
365
                    self.last_info = None;
×
366
                    continue;
×
367
                }
368
                PollResponse::AgreementPointFound { cp, info } => {
3✔
369
                    self.last_cp = Some(cp);
3✔
370
                    self.last_info = Some(info);
3✔
371
                    continue;
3✔
372
                }
373
            }
374
        }
375
    }
215✔
376

377
    /// Continuously poll [`bitcoincore_rpc::Client`] until an update is found.
378
    pub fn emit_update(&mut self) -> Result<EmittedUpdate, bitcoincore_rpc::Error> {
379
        match self.emit_block()? {
105✔
380
            Some(emitted_block) => Ok(EmittedUpdate::Block(emitted_block)),
103✔
381
            None => self.emit_mempool().map(EmittedUpdate::Mempool),
2✔
382
        }
383
    }
105✔
384
}
385

386
/// Extends [`bitcoincore_rpc::Error`].
387
pub trait BitcoindRpcErrorExt {
388
    /// Returns whether the error is a "not found" error.
389
    ///
390
    /// This is useful since [`Emitter`] emits [`Result<_, bitcoincore_rpc::Error>`]s as
391
    /// [`Iterator::Item`].
392
    fn is_not_found_error(&self) -> bool;
393
}
394

395
impl BitcoindRpcErrorExt for bitcoincore_rpc::Error {
396
    fn is_not_found_error(&self) -> bool {
397
        if let bitcoincore_rpc::Error::JsonRpc(bitcoincore_rpc::jsonrpc::Error::Rpc(rpc_err)) = self
×
398
        {
399
            rpc_err.code == -5
×
400
        } else {
401
            false
×
402
        }
403
    }
×
404
}
405

406
/// An [`Iterator`] that wraps an [`Emitter`], and emits [`Result`]s of [`EmittedUpdate`].
407
///
408
/// ```rust,no_run
409
/// use bdk_bitcoind_rpc::{EmittedUpdate, Emitter, UpdateIter};
410
/// use core::iter::{IntoIterator, Iterator};
411
/// # let client: bdk_bitcoind_rpc::bitcoincore_rpc::Client = todo!();
412
///
413
/// let mut update_iter = Emitter::new(&client, 706_932, None).into_iter();
414
/// let update = update_iter.next().expect("must get next update");
415
/// println!("got update: {:?}", update);
416
/// ```
417
///
418
/// Refer to [module-level documentation] for more.
419
///
420
/// [module-level documentation]: crate
421
pub struct UpdateIter<'c, C> {
422
    emitter: Emitter<'c, C>,
423
    last_emission_was_mempool: bool,
424
}
425

426
impl<'c, C: RpcApi> Iterator for UpdateIter<'c, C> {
427
    type Item = Result<EmittedUpdate, bitcoincore_rpc::Error>;
428

429
    fn next(&mut self) -> Option<Self::Item> {
104✔
430
        if self.last_emission_was_mempool {
104✔
431
            self.last_emission_was_mempool = false;
1✔
432
            None
1✔
433
        } else {
434
            let update = self.emitter.emit_update();
103✔
435
            if matches!(update, Ok(EmittedUpdate::Mempool(_))) {
103✔
436
                self.last_emission_was_mempool = true;
1✔
437
            }
102✔
438
            Some(update)
103✔
439
        }
440
    }
104✔
441
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc