• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bitcoindevkit / bdk / 5582722505

pending completion
5582722505

Pull #1002

github

web-flow
Merge 98a52d0cb into 81c761339
Pull Request #1002: Implement linked-list `LocalChain` and add rpc-chain module/example

945 of 945 new or added lines in 10 files covered. (100.0%)

8019 of 10332 relevant lines covered (77.61%)

5036.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.58
/crates/bitcoind_rpc/src/lib.rs
1
//! This crate is used for updating [`bdk_chain`] structures with data from the `bitcoind` RPC
1✔
2
//! interface.
3

4
#![warn(missing_docs)]
5

6
use bdk_chain::{
7
    bitcoin::{Block, Transaction, Txid},
8
    keychain::LocalUpdate,
9
    local_chain::CheckPoint,
10
    BlockId, ConfirmationHeightAnchor, ConfirmationTimeAnchor, TxGraph,
11
};
12
pub use bitcoincore_rpc;
13
use bitcoincore_rpc::{bitcoincore_rpc_json::GetBlockResult, Client, RpcApi};
14
use std::collections::HashSet;
15

16
/// An update emitted from [`BitcoindRpcEmitter`]. This can either be of a block or a subset of
17
/// mempool transactions.
18
#[derive(Debug, Clone)]
×
19
pub enum BitcoindRpcUpdate {
20
    /// An emitted block.
21
    Block {
22
        /// The checkpoint constructed from the block's height/hash and connected to the previous
23
        /// block.
24
        cp: CheckPoint,
25
        /// The result obtained from the `getblock` RPC call of this block's hash.
26
        info: Box<GetBlockResult>,
27
        ///
28
        block: Box<Block>,
29
    },
30
    /// An emitted subset of mempool transactions.
31
    ///
32
    /// [`BitcoindRpcEmitter`] attempts to avoid re-emitting transactions.
33
    Mempool {
34
        /// The checkpoint of the last-seen tip.
35
        cp: CheckPoint,
36
        /// Subset of mempool transactions.
37
        txs: Vec<(Transaction, u64)>,
38
    },
39
}
40

41
/// A closure that transforms a [`BitcoindRpcUpdate`] into a [`ConfirmationHeightAnchor`].
42
///
43
/// This is to be used as an input to [`BitcoindRpcUpdate::into_update`].
44
pub fn confirmation_height_anchor(
×
45
    info: &GetBlockResult,
×
46
    _txid: Txid,
×
47
    _tx_pos: usize,
×
48
) -> ConfirmationHeightAnchor {
×
49
    ConfirmationHeightAnchor {
×
50
        anchor_block: BlockId {
×
51
            height: info.height as _,
×
52
            hash: info.hash,
×
53
        },
×
54
        confirmation_height: info.height as _,
×
55
    }
×
56
}
×
57

58
/// A closure that transforms a [`BitcoindRpcUpdate`] into a [`ConfirmationTimeAnchor`].
59
///
60
/// This is to be used as an input to [`BitcoindRpcUpdate::into_update`].
61
pub fn confirmation_time_anchor(
×
62
    info: &GetBlockResult,
×
63
    _txid: Txid,
×
64
    _tx_pos: usize,
×
65
) -> ConfirmationTimeAnchor {
×
66
    ConfirmationTimeAnchor {
×
67
        anchor_block: BlockId {
×
68
            height: info.height as _,
×
69
            hash: info.hash,
×
70
        },
×
71
        confirmation_height: info.height as _,
×
72
        confirmation_time: info.time as _,
×
73
    }
×
74
}
×
75

76
impl BitcoindRpcUpdate {
77
    /// Returns whether the update is of a subset of the mempool.
78
    pub fn is_mempool(&self) -> bool {
×
79
        matches!(self, Self::Mempool { .. })
×
80
    }
×
81

82
    /// Returns whether the update is of a block.
83
    pub fn is_block(&self) -> bool {
×
84
        matches!(self, Self::Block { .. })
×
85
    }
×
86

87
    /// Transforms the [`BitcoindRpcUpdate`] into a [`LocalUpdate`].
88
    ///
89
    /// [`confirmation_height_anchor`] and [`confirmation_time_anchor`] can be used as the `anchor`
90
    /// intput to construct updates with [`ConfirmationHeightAnchor`]s and
91
    /// [`ConfirmationTimeAnchor`]s respectively.
92
    pub fn into_update<K, A, F>(self, anchor: F) -> LocalUpdate<K, A>
×
93
    where
×
94
        A: Clone + Ord + PartialOrd,
×
95
        F: Fn(&GetBlockResult, Txid, usize) -> A,
×
96
    {
×
97
        match self {
×
98
            BitcoindRpcUpdate::Block { cp, info, block } => LocalUpdate {
×
99
                graph: {
×
100
                    let mut g = TxGraph::<A>::new(block.txdata);
×
101
                    for (tx_pos, &txid) in info.tx.iter().enumerate() {
×
102
                        let _ = g.insert_anchor(txid, anchor(&info, txid, tx_pos));
×
103
                    }
×
104
                    g
×
105
                },
×
106
                ..LocalUpdate::new(cp)
×
107
            },
108
            BitcoindRpcUpdate::Mempool { cp, txs } => LocalUpdate {
×
109
                graph: {
×
110
                    let mut last_seens = Vec::<(Txid, u64)>::with_capacity(txs.len());
×
111
                    let mut g = TxGraph::<A>::new(txs.into_iter().map(|(tx, last_seen)| {
×
112
                        last_seens.push((tx.txid(), last_seen));
×
113
                        tx
×
114
                    }));
×
115
                    for (txid, seen_at) in last_seens {
×
116
                        let _ = g.insert_seen_at(txid, seen_at);
×
117
                    }
×
118
                    g
×
119
                },
×
120
                ..LocalUpdate::new(cp)
×
121
            },
122
        }
123
    }
×
124
}
125

126
/// A structure that emits updates for [`bdk_chain`] structures, sourcing blockchain data from
127
/// [`bitcoincore_rpc::Client`].
128
///
129
/// Updates are of type [`BitcoindRpcUpdate`], where each update can either be of a whole block, or
130
/// a subset of the mempool.
131
///
132
/// A [`BitcoindRpcEmitter`] emits updates starting from the `fallback_height` provided in [`new`],
133
/// or if `last_cp` is provided, we start from the height above the agreed-upon blockhash (between
134
/// `last_cp` and the state of `bitcoind`). Blocks are emitted in sequence (ascending order), and
135
/// the mempool contents emitted if the last emission is the chain tip.
136
///
137
/// # [`Iterator`] implementation
138
///
139
/// [`BitcoindRpcEmitter`] implements [`Iterator`] in a way such that even after [`Iterator::next`]
140
/// returns [`None`], subsequent calls may resume returning [`Some`].
141
///
142
/// Returning [`None`] means that the previous call to [`next`] is the mempool. This is useful if
143
/// the caller wishes to update once.
144
///
145
/// ```rust,no_run
146
/// use bdk_bitcoind_rpc::{BitcoindRpcEmitter, BitcoindRpcUpdate};
147
/// # let client = todo!();
148
///
149
/// for update in BitcoindRpcEmitter::new(&client, 709_632, None) {
150
///     match update.expect("todo: deal with the error properly") {
151
///         BitcoindRpcUpdate::Block { cp, .. } => println!("block {}:{}", cp.height(), cp.hash()),
152
///         BitcoindRpcUpdate::Mempool { .. } => println!("mempool"),
153
///     }
154
/// }
155
/// ```
156
///
157
/// Alternatively, if the caller wishes to keep [`BitcoindRpcEmitter`] in a dedicated update-thread,
158
/// the caller can continue to poll [`next`] (potentially with a delay).
159
///
160
/// [`new`]: BitcoindRpcEmitter::new
161
/// [`next`]: Iterator::next
162
pub struct BitcoindRpcEmitter<'a> {
163
    client: &'a Client,
164
    fallback_height: u32,
165

166
    last_cp: Option<CheckPoint>,
167
    last_info: Option<GetBlockResult>,
168

169
    seen_txids: HashSet<Txid>,
170
    last_emission_was_mempool: bool,
171
}
172

173
impl<'a> Iterator for BitcoindRpcEmitter<'a> {
174
    /// Represents an emitted item.
175
    type Item = Result<BitcoindRpcUpdate, bitcoincore_rpc::Error>;
176

177
    fn next(&mut self) -> Option<Self::Item> {
×
178
        if self.last_emission_was_mempool {
×
179
            self.last_emission_was_mempool = false;
×
180
            None
×
181
        } else {
182
            Some(self.next_update())
×
183
        }
184
    }
×
185
}
186

187
impl<'a> BitcoindRpcEmitter<'a> {
188
    /// Constructs a new [`BitcoindRpcEmitter`] with the provided [`bitcoincore_rpc::Client`].
189
    ///
190
    /// * `fallback_height` is the block height to start from if `last_cp` is not provided, or a
191
    ///     point of agreement is not found.
192
    /// * `last_cp` is the last known checkpoint to build updates on (if any).
193
    pub fn new(client: &'a Client, fallback_height: u32, last_cp: Option<CheckPoint>) -> Self {
×
194
        Self {
×
195
            client,
×
196
            fallback_height,
×
197
            last_cp,
×
198
            last_info: None,
×
199
            seen_txids: HashSet::new(),
×
200
            last_emission_was_mempool: false,
×
201
        }
×
202
    }
×
203

204
    /// Continuously poll [`bitcoincore_rpc::Client`] until an update is found.
205
    pub fn next_update(&mut self) -> Result<BitcoindRpcUpdate, bitcoincore_rpc::Error> {
×
206
        loop {
207
            match self.poll()? {
×
208
                Some(item) => return Ok(item),
×
209
                None => continue,
×
210
            };
211
        }
212
    }
×
213

214
    /// Performs a single round of polling [`bitcoincore_rpc::Client`] and updating the internal
215
    /// state. This returns [`Ok(Some(BitcoindRpcUpdate))`] if an update is found.
216
    pub fn poll(&mut self) -> Result<Option<BitcoindRpcUpdate>, bitcoincore_rpc::Error> {
×
217
        let client = self.client;
×
218
        self.last_emission_was_mempool = false;
×
219

×
220
        match (&mut self.last_cp, &mut self.last_info) {
×
221
            // If `last_cp` and `last_info` are both none, we need to emit from the
222
            // `fallback_height`. `last_cp` and `last_info` will both be updated to the emitted
223
            // block.
224
            (last_cp @ None, last_info @ None) => {
×
225
                let info =
×
226
                    client.get_block_info(&client.get_block_hash(self.fallback_height as _)?)?;
×
227
                let block = self.client.get_block(&info.hash)?;
×
228
                let cp = CheckPoint::new(BlockId {
×
229
                    height: info.height as _,
×
230
                    hash: info.hash,
×
231
                });
×
232
                *last_cp = Some(cp.clone());
×
233
                *last_info = Some(info.clone());
×
234
                Ok(Some(BitcoindRpcUpdate::Block {
×
235
                    cp,
×
236
                    info: Box::new(info),
×
237
                    block: Box::new(block),
×
238
                }))
×
239
            }
240
            // If `last_cp` exists, but `last_info` does not, it means we have not fetched a
241
            // block from the client yet, but we have a previous checkpoint which we can use to
242
            // find the point of agreement with.
243
            //
244
            // We don't emit in this match case. Instead, we set the state to either:
245
            // * { last_cp: Some, last_info: Some } : When we find a point of agreement.
246
            // * { last_cp: None, last_indo: None } : When we cannot find a point of agreement.
247
            (last_cp @ Some(_), last_info @ None) => {
×
248
                for cp in last_cp.clone().iter().flat_map(CheckPoint::iter) {
×
249
                    let cp_block = cp.block_id();
×
250

251
                    let info = client.get_block_info(&cp_block.hash)?;
×
252
                    if info.confirmations < 0 {
×
253
                        // block is not in the main chain
254
                        continue;
×
255
                    }
×
256
                    // agreement found
×
257
                    *last_cp = Some(cp);
×
258
                    *last_info = Some(info);
×
259
                    return Ok(None);
×
260
                }
261

262
                // no point of agreement found, next call will emit block @ fallback height
263
                *last_cp = None;
×
264
                *last_info = None;
×
265
                Ok(None)
×
266
            }
267
            // If `last_cp` and `last_info` is both `Some`, we either emit a block at
268
            // `last_info.nextblockhash` (if it exists), or we emit a subset of the mempool.
269
            (Some(last_cp), last_info @ Some(_)) => {
×
270
                // find next block
×
271
                match last_info.as_ref().unwrap().nextblockhash {
×
272
                    Some(next_hash) => {
×
273
                        let info = self.client.get_block_info(&next_hash)?;
×
274

275
                        if info.confirmations < 0 {
×
276
                            *last_info = None;
×
277
                            return Ok(None);
×
278
                        }
×
279

280
                        let block = self.client.get_block(&info.hash)?;
×
281
                        let cp = last_cp
×
282
                            .clone()
×
283
                            .push(BlockId {
×
284
                                height: info.height as _,
×
285
                                hash: info.hash,
×
286
                            })
×
287
                            .expect("must extend from checkpoint");
×
288

×
289
                        *last_cp = cp.clone();
×
290
                        *last_info = Some(info.clone());
×
291

×
292
                        Ok(Some(BitcoindRpcUpdate::Block {
×
293
                            cp,
×
294
                            info: Box::new(info),
×
295
                            block: Box::new(block),
×
296
                        }))
×
297
                    }
298
                    None => {
299
                        let mempool_txs = client
×
300
                            .get_raw_mempool()?
×
301
                            .into_iter()
×
302
                            .filter(|&txid| self.seen_txids.insert(txid))
×
303
                            .map(
×
304
                                |txid| -> Result<(Transaction, u64), bitcoincore_rpc::Error> {
×
305
                                    let first_seen =
×
306
                                        client.get_mempool_entry(&txid).map(|entry| entry.time)?;
×
307
                                    let tx = client.get_raw_transaction(&txid, None)?;
×
308
                                    Ok((tx, first_seen))
×
309
                                },
×
310
                            )
×
311
                            .collect::<Result<Vec<_>, _>>()?;
×
312

313
                        // After a mempool emission, we want to find the point of agreement in
314
                        // the next round.
315
                        *last_info = None;
×
316

×
317
                        self.last_emission_was_mempool = true;
×
318
                        Ok(Some(BitcoindRpcUpdate::Mempool {
×
319
                            txs: mempool_txs,
×
320
                            cp: last_cp.clone(),
×
321
                        }))
×
322
                    }
323
                }
324
            }
325
            (None, Some(info)) => unreachable!("got info with no checkpoint? info={:#?}", info),
×
326
        }
327
    }
×
328
}
329

330
/// Extends [`bitcoincore_rpc::Error`].
331
pub trait BitcoindRpcErrorExt {
332
    /// Returns whether the error is a "not found" error.
333
    ///
334
    /// This is useful since [`BitcoindRpcEmitter`] emits [`Result<_, bitcoincore_rpc::Error>`]s as
335
    /// [`Iterator::Item`].
336
    fn is_not_found_error(&self) -> bool;
337
}
338

339
impl BitcoindRpcErrorExt for bitcoincore_rpc::Error {
340
    fn is_not_found_error(&self) -> bool {
341
        if let bitcoincore_rpc::Error::JsonRpc(bitcoincore_rpc::jsonrpc::Error::Rpc(rpc_err)) = self
×
342
        {
343
            rpc_err.code == -5
×
344
        } else {
345
            false
×
346
        }
347
    }
×
348
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc