• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bitcoindevkit / bdk / 6342250108

28 Sep 2023 05:26PM UTC coverage: 80.121% (-0.8%) from 80.872%
6342250108

Pull #1041

github

web-flow
Merge 78c4ed3d3 into 37d5e5319
Pull Request #1041: Add `bitcoind_rpc` chain source module.

242 of 242 new or added lines in 3 files covered. (100.0%)

8335 of 10403 relevant lines covered (80.12%)

211776.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.64
/crates/bitcoind_rpc/src/lib.rs
1
//! This crate is used for emitting blockchain data from the `bitcoind` RPC interface (excluding the
1✔
2
//! RPC wallet API).
3
//!
4
//! [`Emitter`] is the main structure which sources blockchain data from [`bitcoincore_rpc::Client`].
5
//!
6
//! To only get block updates (exlude mempool transactions), the caller can use
7
//! [`Emitter::next_block`] or/and [`Emitter::next_header`] until it returns `Ok(None)` (which means
8
//! the chain tip is reached). A separate method, [`Emitter::mempool`] can be used to emit the whole
9
//! mempool.
10
//!
11
//! # [`Iter`]
12
//!
13
//! [`Emitter::into_iterator<B>`] transforms the emitter into an iterator that returns
14
//! [`Emission<B>`] items. The `B` generic can either be [`EmittedBlock`] or [`EmittedHeader`] and
15
//! determines whether we are iterating blocks or headers.
16
//!
17
//! The iterator initially returns blocks/headers in increasing height order. After the chain tip is
18
//! reached, the next update is the mempool. After the mempool update is released, the first
19
//! succeeding call to [`Iterator::next`] will return [`None`]. Subsequent calls will resume
20
//! returning [`Some`] once more blocks are found.
21
//!
22
//! ```rust,no_run
23
//! use bdk_bitcoind_rpc::{Emission, EmittedBlock, Emitter};
24
//! # let client: bdk_bitcoind_rpc::bitcoincore_rpc::Client = todo!();
25
//!
26
//! for r in Emitter::new(&client, 709_632).into_iterator::<EmittedBlock>() {
27
//!     let emission = r.expect("todo: handle error");
28
//!     match emission {
29
//!         Emission::Block(b) => println!("block {}: {}", b.height, b.block.block_hash()),
30
//!         Emission::Mempool(m) => println!("mempool: {} txs", m.len()),
31
//!     }
32
//! }
33
//! ```
34
#![warn(missing_docs)]
35

36
use std::{collections::BTreeMap, marker::PhantomData};
37

38
use bitcoin::{block::Header, Block, BlockHash, Transaction};
39
pub use bitcoincore_rpc;
40
use bitcoincore_rpc::bitcoincore_rpc_json;
41

42
/// Represents a transaction that exists in the mempool.
43
pub struct MempoolTx {
44
    /// The transaction.
45
    pub tx: Transaction,
46
    /// Time when transaction first entered the mempool (in epoch seconds).
47
    pub time: u64,
48
}
49

50
/// A block obtained from `bitcoind`.
51
pub struct EmittedBlock {
52
    /// The actual block.
53
    pub block: Block,
54
    /// The height of the block.
55
    pub height: u32,
56
}
57

58
/// A block header obtained from `bitcoind`.
59
pub struct EmittedHeader {
60
    /// The actual block header.
61
    pub header: Header,
62
    /// The height of the block header.
63
    pub height: u32,
64
}
65

66
/// A structure that emits data sourced from [`bitcoincore_rpc::Client`].
67
///
68
/// Refer to [module-level documentation] for more.
69
///
70
/// [module-level documentation]: crate
71
pub struct Emitter<'c, C> {
72
    client: &'c C,
73
    start_height: u32,
74

75
    emitted: BTreeMap<u32, BlockHash>,
76
    last_block: Option<bitcoincore_rpc_json::GetBlockResult>,
77
    /// Records the latest first-seen epoch of emitted mempool transactions.
78
    ///
79
    /// This allows us to avoid re-emitting some mempool transactions. Mempool transactions need to
80
    /// be re-emitted if the latest block that may contain the transaction's ancestors have not yet
81
    /// been emitted.
82
    last_mempool_time: usize,
83
}
84

85
impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
86
    /// Constructs a new [`Emitter`] with the provided [`bitcoincore_rpc::Client`].
87
    ///
88
    /// `start_height` is the block height to start emitting blocks from.
89
    pub fn new(client: &'c C, start_height: u32) -> Self {
2✔
90
        Self {
2✔
91
            client,
2✔
92
            start_height,
2✔
93
            emitted: BTreeMap::new(),
2✔
94
            last_block: None,
2✔
95
            last_mempool_time: 0,
2✔
96
        }
2✔
97
    }
2✔
98

99
    /// Emit mempool transactions.
100
    ///
101
    /// This avoids re-emitting transactions (where viable). We can do this if all blocks
102
    /// containing ancestor transactions are already emitted.
103
    pub fn mempool(&mut self) -> Result<Vec<MempoolTx>, bitcoincore_rpc::Error> {
1✔
104
        let client = self.client;
1✔
105
        let prev_block_height = self.last_block.as_ref().map_or(0, |r| r.height);
1✔
106
        let prev_mempool_time = self.last_mempool_time;
1✔
107
        let mut latest_time = self.last_mempool_time;
1✔
108

109
        let txs_to_emit = client
1✔
110
            .get_raw_mempool_verbose()?
1✔
111
            .into_iter()
1✔
112
            .filter_map(
1✔
113
                move |(txid, tx_entry)| -> Option<Result<_, bitcoincore_rpc::Error>> {
3✔
114
                    let tx_time = tx_entry.time as usize;
3✔
115
                    if tx_time > latest_time {
3✔
116
                        latest_time = tx_time;
1✔
117
                    }
2✔
118

119
                    // Avoid emitting transactions that are already emitted if we can guarantee
120
                    // blocks containing ancestors are already emitted. We only check block height
121
                    // because in case of reorg, we reset `self.last_mempool_time` to force
122
                    // emitting all mempool txs.
123
                    let is_emitted = tx_time < prev_mempool_time;
3✔
124
                    let is_within_known_tip = tx_entry.height as usize <= prev_block_height;
3✔
125
                    if is_emitted && is_within_known_tip {
3✔
126
                        return None;
×
127
                    }
3✔
128

129
                    let tx = match client.get_raw_transaction(&txid, None) {
3✔
130
                        Ok(tx) => tx,
3✔
131
                        Err(err) => return Some(Err(err)),
×
132
                    };
133

134
                    Some(Ok(MempoolTx {
3✔
135
                        tx,
3✔
136
                        time: tx_time as u64,
3✔
137
                    }))
3✔
138
                },
3✔
139
            )
1✔
140
            .collect::<Result<Vec<_>, _>>()?;
1✔
141

142
        self.last_mempool_time = latest_time;
1✔
143
        Ok(txs_to_emit)
1✔
144
    }
1✔
145

146
    /// Emit the next block header (if any).
147
    pub fn next_header(&mut self) -> Result<Option<EmittedHeader>, bitcoincore_rpc::Error> {
×
148
        let poll_res = poll(self, |hash| self.client.get_block_header(hash))?;
×
149
        Ok(poll_res.map(|(height, header)| EmittedHeader { header, height }))
×
150
    }
×
151

152
    /// Emit the next block (if any).
153
    pub fn next_block(&mut self) -> Result<Option<EmittedBlock>, bitcoincore_rpc::Error> {
215✔
154
        let poll_res = poll(self, |hash| self.client.get_block(hash))?;
215✔
155
        Ok(poll_res.map(|(height, block)| EmittedBlock { block, height }))
215✔
156
    }
215✔
157

158
    /// Transforms `self` into an iterator of [`Emission`]s.
159
    ///
160
    /// Refer to [module-level documentation] for more.
161
    ///
162
    /// [module-level documentation]: crate
163
    pub fn into_iterator<B>(self) -> Iter<'c, C, B> {
×
164
        Iter {
×
165
            emitter: self,
×
166
            last_emission_was_mempool: false,
×
167
            phantom: PhantomData,
×
168
        }
×
169
    }
×
170
}
171

172
/// This is the [`Iterator::Item`] of [`Iter`]. This can either represent a block/header or the set
173
/// of mempool transactions.
174
///
175
/// Refer to [module-level documentation] for more.
176
///
177
/// [module-level documentation]: crate
178
pub enum Emission<B> {
179
    /// An emitted set of mempool transactions.
180
    Mempool(Vec<MempoolTx>),
181
    /// An emitted block.
182
    Block(B),
183
}
184

185
impl<B> Emission<B> {
186
    /// Whether the emission if of mempool transactions.
187
    pub fn is_mempool(&self) -> bool {
×
188
        matches!(self, Self::Mempool(_))
×
189
    }
×
190

191
    /// Wether the emission if of a block.
192
    pub fn is_block(&self) -> bool {
×
193
        matches!(self, Self::Block(_))
×
194
    }
×
195
}
196

197
/// An [`Iterator`] that wraps an [`Emitter`], and emits [`Result`]s of [`Emission`].
198
///
199
/// This is constructed with [`Emitter::into_iterator`].
200
pub struct Iter<'c, C, B = EmittedBlock> {
201
    emitter: Emitter<'c, C>,
202
    last_emission_was_mempool: bool,
203
    phantom: PhantomData<B>,
204
}
205

206
impl<'c, C: bitcoincore_rpc::RpcApi, B> Iter<'c, C, B> {
207
    fn next_with<F>(&mut self, f: F) -> Option<Result<Emission<B>, bitcoincore_rpc::Error>>
×
208
    where
×
209
        F: Fn(&mut Emitter<'c, C>) -> Result<Option<B>, bitcoincore_rpc::Error>,
×
210
    {
×
211
        if self.last_emission_was_mempool {
×
212
            self.last_emission_was_mempool = false;
×
213
            return None;
×
214
        }
×
215

×
216
        match f(&mut self.emitter) {
×
217
            Ok(None) => {
218
                self.last_emission_was_mempool = true;
×
219
                Some(self.emitter.mempool().map(Emission::<B>::Mempool))
×
220
            }
221
            Ok(Some(b)) => Some(Ok(Emission::Block(b))),
×
222
            Err(err) => Some(Err(err)),
×
223
        }
224
    }
×
225
}
226

227
impl<'c, C: bitcoincore_rpc::RpcApi> Iterator for Iter<'c, C, EmittedBlock> {
228
    type Item = Result<Emission<EmittedBlock>, bitcoincore_rpc::Error>;
229

230
    fn next(&mut self) -> Option<Self::Item> {
×
231
        self.next_with(Emitter::next_block)
×
232
    }
×
233
}
234

235
impl<'c, C: bitcoincore_rpc::RpcApi> Iterator for Iter<'c, C, EmittedHeader> {
236
    type Item = Result<Emission<EmittedHeader>, bitcoincore_rpc::Error>;
237

238
    fn next(&mut self) -> Option<Self::Item> {
×
239
        self.next_with(Emitter::next_header)
×
240
    }
×
241
}
242

243
enum PollResponse {
244
    Block(bitcoincore_rpc_json::GetBlockResult),
245
    NoMoreBlocks,
246
    /// Fetched block is not in the best chain.
247
    BlockNotInBestChain,
248
    AgreementFound(bitcoincore_rpc_json::GetBlockResult),
249
    AgreementPointNotFound,
250
}
251

252
fn poll_once<C>(emitter: &Emitter<C>) -> Result<PollResponse, bitcoincore_rpc::Error>
218✔
253
where
218✔
254
    C: bitcoincore_rpc::RpcApi,
218✔
255
{
218✔
256
    let client = emitter.client;
218✔
257

258
    if let Some(last_res) = &emitter.last_block {
218✔
259
        assert!(!emitter.emitted.is_empty());
213✔
260

261
        let next_hash = match last_res.nextblockhash {
213✔
262
            None => return Ok(PollResponse::NoMoreBlocks),
4✔
263
            Some(next_hash) => next_hash,
209✔
264
        };
265

266
        let res = client.get_block_info(&next_hash)?;
209✔
267
        if res.confirmations < 0 {
209✔
268
            return Ok(PollResponse::BlockNotInBestChain);
×
269
        }
209✔
270
        return Ok(PollResponse::Block(res));
209✔
271
    }
5✔
272

5✔
273
    if emitter.emitted.is_empty() {
5✔
274
        let hash = client.get_block_hash(emitter.start_height as _)?;
2✔
275

276
        let res = client.get_block_info(&hash)?;
2✔
277
        if res.confirmations < 0 {
2✔
278
            return Ok(PollResponse::BlockNotInBestChain);
×
279
        }
2✔
280
        return Ok(PollResponse::Block(res));
2✔
281
    }
3✔
282

283
    for (&_, hash) in emitter.emitted.iter().rev() {
9✔
284
        let res = client.get_block_info(hash)?;
9✔
285
        if res.confirmations < 0 {
9✔
286
            // block is not in best chain
287
            continue;
6✔
288
        }
3✔
289

3✔
290
        // agreement point found
3✔
291
        return Ok(PollResponse::AgreementFound(res));
3✔
292
    }
293

294
    Ok(PollResponse::AgreementPointNotFound)
×
295
}
218✔
296

297
fn poll<C, V, F>(
215✔
298
    emitter: &mut Emitter<C>,
215✔
299
    get_item: F,
215✔
300
) -> Result<Option<(u32, V)>, bitcoincore_rpc::Error>
215✔
301
where
215✔
302
    C: bitcoincore_rpc::RpcApi,
215✔
303
    F: Fn(&BlockHash) -> Result<V, bitcoincore_rpc::Error>,
215✔
304
{
215✔
305
    loop {
306
        match poll_once(emitter)? {
218✔
307
            PollResponse::Block(res) => {
211✔
308
                let height = res.height as u32;
211✔
309
                let item = get_item(&res.hash)?;
211✔
310
                assert_eq!(emitter.emitted.insert(height, res.hash), None);
211✔
311
                emitter.last_block = Some(res);
211✔
312
                return Ok(Some((height, item)));
211✔
313
            }
314
            PollResponse::NoMoreBlocks => {
315
                emitter.last_block = None;
4✔
316
                return Ok(None);
4✔
317
            }
318
            PollResponse::BlockNotInBestChain => {
319
                emitter.last_block = None;
×
320
                // we want to re-emit all mempool txs on reorg
×
321
                emitter.last_mempool_time = 0;
×
322
                continue;
×
323
            }
324
            PollResponse::AgreementFound(res) => {
3✔
325
                emitter.emitted.split_off(&(res.height as u32 + 1));
3✔
326
                emitter.last_block = Some(res);
3✔
327
                continue;
3✔
328
            }
329
            PollResponse::AgreementPointNotFound => {
330
                emitter.emitted.clear();
×
331
                emitter.last_block = None;
×
332
                // we want to re-emit all mempool txs on reorg
×
333
                emitter.last_mempool_time = 0;
×
334
                continue;
×
335
            }
336
        }
337
    }
338
}
215✔
339

340
/// Extends [`bitcoincore_rpc::Error`].
341
pub trait BitcoindRpcErrorExt {
342
    /// Returns whether the error is a "not found" error.
343
    ///
344
    /// This is useful since [`Emitter`] emits [`Result<_, bitcoincore_rpc::Error>`]s as
345
    /// [`Iterator::Item`].
346
    fn is_not_found_error(&self) -> bool;
347
}
348

349
impl BitcoindRpcErrorExt for bitcoincore_rpc::Error {
350
    fn is_not_found_error(&self) -> bool {
351
        if let bitcoincore_rpc::Error::JsonRpc(bitcoincore_rpc::jsonrpc::Error::Rpc(rpc_err)) = self
×
352
        {
353
            rpc_err.code == -5
×
354
        } else {
355
            false
×
356
        }
357
    }
×
358
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc