• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bitcoindevkit / bdk / 6395258855

03 Oct 2023 03:40PM UTC coverage: 80.778% (-0.07%) from 80.851%
6395258855

Pull #1041

github

web-flow
Merge 167f54972 into 9bd528607
Pull Request #1041: Add `bitcoind_rpc` chain source module.

250 of 250 new or added lines in 4 files covered. (100.0%)

8405 of 10405 relevant lines covered (80.78%)

5885.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.0
/crates/bitcoind_rpc/src/lib.rs
1
//! This crate is used for emitting blockchain data from the `bitcoind` RPC interface (excluding the
2
//! RPC wallet API).
3
//!
4
//! [`Emitter`] is the main structure which sources blockchain data from [`bitcoincore_rpc::Client`].
5
//!
6
//! To only get block updates (exlude mempool transactions), the caller can use
7
//! [`Emitter::next_block`] or/and [`Emitter::next_header`] until it returns `Ok(None)` (which means
8
//! the chain tip is reached). A separate method, [`Emitter::mempool`] can be used to emit the whole
9
//! mempool.
10
//!
11
//! # [`Iter`]
12
//!
13
//! [`Emitter::into_iterator<B>`] transforms the emitter into an iterator that either returns
14
//! [`EmittedBlock`] or [`EmittedHeader`].
15
//!
16
//! ```rust,no_run
17
//! use bdk_bitcoind_rpc::{EmittedBlock, Emitter};
18
//! # let client: bdk_bitcoind_rpc::bitcoincore_rpc::Client = todo!();
19
//!
20
//! for r in Emitter::new(&client, 709_632).into_iterator::<EmittedBlock>() {
21
//!     let emitted_block = r.expect("todo: handle error");
22
//!     println!(
23
//!         "block {}: {}",
24
//!         emitted_block.height,
25
//!         emitted_block.block.block_hash()
26
//!     );
27
//! }
28
//! ```
29
#![warn(missing_docs)]
30

31
use std::{collections::BTreeMap, marker::PhantomData};
32

33
use bitcoin::{block::Header, Block, BlockHash, Transaction};
34
pub use bitcoincore_rpc;
35
use bitcoincore_rpc::bitcoincore_rpc_json;
36

37
/// Represents a transaction that exists in the mempool.
38
#[derive(Debug)]
×
39
pub struct MempoolTx {
40
    /// The transaction.
41
    pub tx: Transaction,
42
    /// Time when transaction first entered the mempool (in epoch seconds).
43
    pub time: u64,
44
}
45

46
/// A block obtained from `bitcoind`.
47
#[derive(Debug)]
×
48
pub struct EmittedBlock {
49
    /// The actual block.
50
    pub block: Block,
51
    /// The height of the block.
52
    pub height: u32,
53
}
54

55
/// A block header obtained from `bitcoind`.
56
#[derive(Debug)]
×
57
pub struct EmittedHeader {
58
    /// The actual block header.
59
    pub header: Header,
60
    /// The height of the block header.
61
    pub height: u32,
62
}
63

64
/// A structure that emits data sourced from [`bitcoincore_rpc::Client`].
65
///
66
/// Refer to [module-level documentation] for more.
67
///
68
/// [module-level documentation]: crate
69
pub struct Emitter<'c, C> {
70
    client: &'c C,
71
    start_height: u32,
72

73
    emitted_blocks: BTreeMap<u32, BlockHash>,
74
    last_block: Option<bitcoincore_rpc_json::GetBlockResult>,
75

76
    /// The latest first-seen epoch of emitted mempool transactions. This is used to determine
77
    /// whether a mempool transaction is already emitted.
78
    last_mempool_time: usize,
79

80
    /// The last emitted block during our last mempool emission. This is used to determine whether
81
    /// there has been a reorg since our last mempool emission.
82
    last_mempool_tip: Option<(u32, BlockHash)>,
83
}
84

85
impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
86
    /// Constructs a new [`Emitter`] with the provided [`bitcoincore_rpc::Client`].
87
    ///
88
    /// `start_height` is the block height to start emitting blocks from.
89
    pub fn new(client: &'c C, start_height: u32) -> Self {
8✔
90
        Self {
8✔
91
            client,
8✔
92
            start_height,
8✔
93
            emitted_blocks: BTreeMap::new(),
8✔
94
            last_block: None,
8✔
95
            last_mempool_time: 0,
8✔
96
            last_mempool_tip: None,
8✔
97
        }
8✔
98
    }
8✔
99

100
    /// Emit mempool transactions.
101
    ///
102
    /// This avoids re-emitting transactions (where viable). We can do this if all blocks
103
    /// containing ancestor transactions are already emitted.
104
    pub fn mempool(&mut self) -> Result<Vec<MempoolTx>, bitcoincore_rpc::Error> {
66✔
105
        let client = self.client;
66✔
106

107
        let prev_mempool_tip = match self.last_mempool_tip {
66✔
108
            // use 'avoid-re-emission' logic if there is no reorg
109
            Some((height, hash)) if self.emitted_blocks.get(&height) == Some(&hash) => height,
62✔
110
            _ => 0,
12✔
111
        };
112

113
        let prev_mempool_time = self.last_mempool_time;
66✔
114
        let mut latest_time = prev_mempool_time;
66✔
115

116
        let txs_to_emit = client
66✔
117
            .get_raw_mempool_verbose()?
66✔
118
            .into_iter()
66✔
119
            .filter_map({
66✔
120
                let latest_time = &mut latest_time;
66✔
121
                move |(txid, tx_entry)| -> Option<Result<_, bitcoincore_rpc::Error>> {
1,033✔
122
                    let tx_time = tx_entry.time as usize;
1,033✔
123
                    if tx_time > *latest_time {
1,033✔
124
                        *latest_time = tx_time;
4✔
125
                    }
1,029✔
126

127
                    // Avoid emitting transactions that are already emitted if we can guarantee
128
                    // blocks containing ancestors are already emitted. The bitcoind rpc interface
129
                    // provides us with the block height that the tx is introduced to the mempool.
130
                    // If we have already emitted the block of height, we can assume that all
131
                    // ancestor txs have been processed by the receiver.
132
                    let is_already_emitted = tx_time <= prev_mempool_time;
1,033✔
133
                    let is_within_height = tx_entry.height <= prev_mempool_tip as _;
1,033✔
134
                    if is_already_emitted && is_within_height {
1,033✔
135
                        return None;
492✔
136
                    }
541✔
137

138
                    let tx = match client.get_raw_transaction(&txid, None) {
541✔
139
                        Ok(tx) => tx,
541✔
140
                        Err(err) => return Some(Err(err)),
×
141
                    };
142

143
                    Some(Ok(MempoolTx {
541✔
144
                        tx,
541✔
145
                        time: tx_time as u64,
541✔
146
                    }))
541✔
147
                }
1,033✔
148
            })
66✔
149
            .collect::<Result<Vec<_>, _>>()?;
66✔
150

151
        self.last_mempool_time = latest_time;
66✔
152
        self.last_mempool_tip = self
66✔
153
            .emitted_blocks
66✔
154
            .iter()
66✔
155
            .last()
66✔
156
            .map(|(&height, &hash)| (height, hash));
66✔
157

66✔
158
        Ok(txs_to_emit)
66✔
159
    }
66✔
160

161
    /// Emit the next block header (if any).
162
    pub fn next_header(&mut self) -> Result<Option<EmittedHeader>, bitcoincore_rpc::Error> {
568✔
163
        let poll_res = poll(self, |hash| self.client.get_block_header(hash))?;
568✔
164
        Ok(poll_res.map(|(height, header)| EmittedHeader { header, height }))
568✔
165
    }
568✔
166

167
    /// Emit the next block (if any).
168
    pub fn next_block(&mut self) -> Result<Option<EmittedBlock>, bitcoincore_rpc::Error> {
406✔
169
        let poll_res = poll(self, |hash| self.client.get_block(hash))?;
406✔
170
        Ok(poll_res.map(|(height, block)| EmittedBlock { block, height }))
406✔
171
    }
406✔
172

173
    /// Transforms `self` into an iterator of either [`EmittedBlock`]s or [`EmittedHeader`]s.
174
    ///
175
    /// Refer to [module-level documentation] for more.
176
    ///
177
    /// [module-level documentation]: crate
178
    pub fn into_iterator<B>(self) -> Iter<'c, C, B> {
×
179
        Iter {
×
180
            emitter: self,
×
181
            phantom: PhantomData,
×
182
        }
×
183
    }
×
184
}
185

186
/// An [`Iterator`] that wraps an [`Emitter`], and emits [`Result`]s of either [`EmittedHeader`]s
187
/// or [`EmittedBlock`]s.
188
///
189
/// This is constructed with [`Emitter::into_iterator`].
190
pub struct Iter<'c, C, B = EmittedBlock> {
191
    emitter: Emitter<'c, C>,
192
    phantom: PhantomData<B>,
193
}
194

195
impl<'c, C: bitcoincore_rpc::RpcApi> Iterator for Iter<'c, C, EmittedBlock> {
196
    type Item = Result<EmittedBlock, bitcoincore_rpc::Error>;
197

198
    fn next(&mut self) -> Option<Self::Item> {
×
199
        self.emitter.next_block().transpose()
×
200
    }
×
201
}
202

203
impl<'c, C: bitcoincore_rpc::RpcApi> Iterator for Iter<'c, C, EmittedHeader> {
204
    type Item = Result<EmittedHeader, bitcoincore_rpc::Error>;
205

206
    fn next(&mut self) -> Option<Self::Item> {
×
207
        self.emitter.next_header().transpose()
×
208
    }
×
209
}
210

211
enum PollResponse {
212
    Block(bitcoincore_rpc_json::GetBlockResult),
213
    NoMoreBlocks,
214
    /// Fetched block is not in the best chain.
215
    BlockNotInBestChain,
216
    AgreementFound(bitcoincore_rpc_json::GetBlockResult),
217
    AgreementPointNotFound,
218
}
219

220
fn poll_once<C>(emitter: &Emitter<C>) -> Result<PollResponse, bitcoincore_rpc::Error>
1,011✔
221
where
1,011✔
222
    C: bitcoincore_rpc::RpcApi,
1,011✔
223
{
1,011✔
224
    let client = emitter.client;
1,011✔
225

226
    if let Some(last_res) = &emitter.last_block {
1,011✔
227
        assert!(!emitter.emitted_blocks.is_empty());
966✔
228

229
        let next_hash = match last_res.nextblockhash {
966✔
230
            None => return Ok(PollResponse::NoMoreBlocks),
40✔
231
            Some(next_hash) => next_hash,
926✔
232
        };
233

234
        let res = client.get_block_info(&next_hash)?;
926✔
235
        if res.confirmations < 0 {
926✔
236
            return Ok(PollResponse::BlockNotInBestChain);
1✔
237
        }
925✔
238
        return Ok(PollResponse::Block(res));
925✔
239
    }
45✔
240

45✔
241
    if emitter.emitted_blocks.is_empty() {
45✔
242
        let hash = client.get_block_hash(emitter.start_height as _)?;
9✔
243

244
        let res = client.get_block_info(&hash)?;
9✔
245
        if res.confirmations < 0 {
9✔
246
            return Ok(PollResponse::BlockNotInBestChain);
×
247
        }
9✔
248
        return Ok(PollResponse::Block(res));
9✔
249
    }
36✔
250

251
    for (&_, hash) in emitter.emitted_blocks.iter().rev() {
209✔
252
        let res = client.get_block_info(hash)?;
209✔
253
        if res.confirmations < 0 {
209✔
254
            // block is not in best chain
255
            continue;
174✔
256
        }
35✔
257

35✔
258
        // agreement point found
35✔
259
        return Ok(PollResponse::AgreementFound(res));
35✔
260
    }
261

262
    Ok(PollResponse::AgreementPointNotFound)
1✔
263
}
1,011✔
264

265
fn poll<C, V, F>(
974✔
266
    emitter: &mut Emitter<C>,
974✔
267
    get_item: F,
974✔
268
) -> Result<Option<(u32, V)>, bitcoincore_rpc::Error>
974✔
269
where
974✔
270
    C: bitcoincore_rpc::RpcApi,
974✔
271
    F: Fn(&BlockHash) -> Result<V, bitcoincore_rpc::Error>,
974✔
272
{
974✔
273
    loop {
274
        match poll_once(emitter)? {
1,011✔
275
            PollResponse::Block(res) => {
934✔
276
                let height = res.height as u32;
934✔
277
                let item = get_item(&res.hash)?;
934✔
278
                assert_eq!(emitter.emitted_blocks.insert(height, res.hash), None);
934✔
279
                emitter.last_block = Some(res);
934✔
280
                return Ok(Some((height, item)));
934✔
281
            }
282
            PollResponse::NoMoreBlocks => {
283
                emitter.last_block = None;
40✔
284
                return Ok(None);
40✔
285
            }
286
            PollResponse::BlockNotInBestChain => {
287
                emitter.last_block = None;
1✔
288
                continue;
1✔
289
            }
290
            PollResponse::AgreementFound(res) => {
35✔
291
                emitter.emitted_blocks.split_off(&(res.height as u32 + 1));
35✔
292
                emitter.last_block = Some(res);
35✔
293
                continue;
35✔
294
            }
295
            PollResponse::AgreementPointNotFound => {
296
                emitter.emitted_blocks.clear();
1✔
297
                emitter.last_block = None;
1✔
298
                continue;
1✔
299
            }
300
        }
301
    }
302
}
974✔
303

304
/// Extends [`bitcoincore_rpc::Error`].
305
pub trait BitcoindRpcErrorExt {
306
    /// Returns whether the error is a "not found" error.
307
    ///
308
    /// This is useful since [`Emitter`] emits [`Result<_, bitcoincore_rpc::Error>`]s as
309
    /// [`Iterator::Item`].
310
    fn is_not_found_error(&self) -> bool;
311
}
312

313
impl BitcoindRpcErrorExt for bitcoincore_rpc::Error {
314
    fn is_not_found_error(&self) -> bool {
315
        if let bitcoincore_rpc::Error::JsonRpc(bitcoincore_rpc::jsonrpc::Error::Rpc(rpc_err)) = self
×
316
        {
317
            rpc_err.code == -5
×
318
        } else {
319
            false
×
320
        }
321
    }
×
322
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc