• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bitcoindevkit / bdk / 5834188079

pending completion
5834188079

Pull #1071

github

web-flow
Merge 68b42331c into 0ba6bbe11
Pull Request #1071: Update rust bitcoin (BDK 0.28)

563 of 563 new or added lines in 28 files covered. (100.0%)

14625 of 18342 relevant lines covered (79.74%)

9267.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.45
/src/blockchain/rpc.rs
1
// Bitcoin Dev Kit
2
// Written in 2021 by Riccardo Casatta <riccardo@casatta.it>
3
//
4
// Copyright (c) 2020-2021 Bitcoin Dev Kit Developers
5
//
6
// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
7
// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
8
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
9
// You may not use this file except in accordance with one or both of these
10
// licenses.
11

12
//! Rpc Blockchain
13
//!
14
//! Backend that gets blockchain data from Bitcoin Core RPC
15
//!
16
//! This is an **EXPERIMENTAL** feature, API and other major changes are expected.
17
//!
18
//! ## Example
19
//!
20
//! ```no_run
21
//! # use bdk::blockchain::{RpcConfig, RpcBlockchain, ConfigurableBlockchain, rpc::Auth};
22
//! let config = RpcConfig {
23
//!     url: "127.0.0.1:18332".to_string(),
24
//!     auth: Auth::Cookie {
25
//!         file: "/home/user/.bitcoin/.cookie".into(),
26
//!     },
27
//!     network: bdk::bitcoin::Network::Testnet,
28
//!     wallet_name: "wallet_name".to_string(),
29
//!     sync_params: None,
30
//! };
31
//! let blockchain = RpcBlockchain::from_config(&config);
32
//! ```
33

34
use crate::bitcoin::{Network, OutPoint, Transaction, TxOut, Txid};
35
use crate::blockchain::*;
36
use crate::database::{BatchDatabase, BatchOperations, DatabaseUtils};
37
use crate::descriptor::calc_checksum;
38
use crate::error::MissingCachedScripts;
39
use crate::{BlockTime, Error, FeeRate, KeychainKind, LocalUtxo, TransactionDetails};
40
use bitcoin::{Script, ScriptBuf};
41
use bitcoincore_rpc::json::{
42
    GetTransactionResultDetailCategory, ImportMultiOptions, ImportMultiRequest,
43
    ImportMultiRequestScriptPubkey, ListTransactionResult, ListUnspentResultEntry, ScanningDetails,
44
    Timestamp,
45
};
46
use bitcoincore_rpc::jsonrpc::serde_json::{json, Value};
47
use bitcoincore_rpc::Auth as RpcAuth;
48
use bitcoincore_rpc::{Client, RpcApi};
49
use log::{debug, info};
50
use serde::{Deserialize, Serialize};
51
use std::cell::RefCell;
52
use std::collections::{HashMap, HashSet};
53
use std::ops::{Deref, DerefMut};
54
use std::path::PathBuf;
55
use std::thread;
56
use std::time::Duration;
57

58
/// The main struct for RPC backend implementing the [crate::blockchain::Blockchain] trait
59
#[derive(Debug)]
×
60
pub struct RpcBlockchain {
61
    /// Rpc client to the node, includes the wallet name
62
    client: Client,
63
    /// Whether the wallet is a "descriptor" or "legacy" wallet in Core
64
    is_descriptors: bool,
65
    /// Blockchain capabilities, cached here at startup
66
    capabilities: HashSet<Capability>,
67
    /// Sync parameters.
68
    sync_params: RpcSyncParams,
69
}
70

71
impl Deref for RpcBlockchain {
72
    type Target = Client;
73

74
    fn deref(&self) -> &Self::Target {
×
75
        &self.client
×
76
    }
×
77
}
78

79
/// RpcBlockchain configuration options
80
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
×
81
pub struct RpcConfig {
82
    /// The bitcoin node url
83
    pub url: String,
84
    /// The bitcoin node authentication mechanism
85
    pub auth: Auth,
86
    /// The network we are using (it will be checked the bitcoin node network matches this)
87
    pub network: Network,
88
    /// The wallet name in the bitcoin node, consider using [crate::wallet::wallet_name_from_descriptor] for this
89
    pub wallet_name: String,
90
    /// Sync parameters
91
    pub sync_params: Option<RpcSyncParams>,
92
}
93

94
/// Sync parameters for Bitcoin Core RPC.
95
///
96
/// In general, BDK tries to sync `scriptPubKey`s cached in [`crate::database::Database`] with
97
/// `scriptPubKey`s imported in the Bitcoin Core Wallet. These parameters are used for determining
98
/// how the `importdescriptors` RPC calls are to be made.
99
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
×
100
pub struct RpcSyncParams {
101
    /// The minimum number of scripts to scan for on initial sync.
102
    pub start_script_count: usize,
103
    /// Time in unix seconds in which initial sync will start scanning from (0 to start from genesis).
104
    pub start_time: u64,
105
    /// Forces every sync to use `start_time` as import timestamp.
106
    pub force_start_time: bool,
107
    /// RPC poll rate (in seconds) to get state updates.
108
    pub poll_rate_sec: u64,
109
}
110

111
impl Default for RpcSyncParams {
112
    fn default() -> Self {
70✔
113
        Self {
70✔
114
            start_script_count: 100,
70✔
115
            start_time: 0,
70✔
116
            force_start_time: false,
70✔
117
            poll_rate_sec: 3,
70✔
118
        }
70✔
119
    }
70✔
120
}
121

122
/// This struct is equivalent to [bitcoincore_rpc::Auth] but it implements [serde::Serialize]
123
/// To be removed once upstream equivalent is implementing Serialize (json serialization format
124
/// should be the same), see [rust-bitcoincore-rpc/pull/181](https://github.com/rust-bitcoin/rust-bitcoincore-rpc/pull/181)
125
#[derive(Clone, Debug, Hash, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)]
76✔
126
#[serde(rename_all = "snake_case")]
127
#[serde(untagged)]
128
pub enum Auth {
129
    /// None authentication
130
    None,
131
    /// Authentication with username and password, usually [Auth::Cookie] should be preferred
132
    UserPass {
133
        /// Username
134
        username: String,
135
        /// Password
136
        password: String,
137
    },
138
    /// Authentication with a cookie file
139
    Cookie {
140
        /// Cookie file
141
        file: PathBuf,
142
    },
143
}
144

145
impl From<Auth> for RpcAuth {
146
    fn from(auth: Auth) -> Self {
70✔
147
        match auth {
70✔
148
            Auth::None => RpcAuth::None,
×
149
            Auth::UserPass { username, password } => RpcAuth::UserPass(username, password),
×
150
            Auth::Cookie { file } => RpcAuth::CookieFile(file),
70✔
151
        }
152
    }
70✔
153
}
154

155
impl Blockchain for RpcBlockchain {
156
    fn get_capabilities(&self) -> HashSet<Capability> {
×
157
        self.capabilities.clone()
×
158
    }
×
159

160
    fn broadcast(&self, tx: &Transaction) -> Result<(), Error> {
46✔
161
        Ok(self.client.send_raw_transaction(tx).map(|_| ())?)
69✔
162
    }
46✔
163

164
    fn estimate_fee(&self, target: usize) -> Result<FeeRate, Error> {
×
165
        let sat_per_kb = self
×
166
            .client
×
167
            .estimate_smart_fee(target as u16, None)?
×
168
            .fee_rate
169
            .ok_or(Error::FeeRateUnavailable)?
×
170
            .to_sat() as f64;
×
171

×
172
        Ok(FeeRate::from_sat_per_vb((sat_per_kb / 1000f64) as f32))
×
173
    }
×
174
}
175

176
impl GetTx for RpcBlockchain {
177
    fn get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, Error> {
×
178
        Ok(Some(self.client.get_raw_transaction(txid, None)?))
×
179
    }
×
180
}
181

182
impl GetHeight for RpcBlockchain {
183
    fn get_height(&self) -> Result<u32, Error> {
190✔
184
        Ok(self.client.get_blockchain_info().map(|i| i.blocks as u32)?)
285✔
185
    }
190✔
186
}
187

188
impl GetBlockHash for RpcBlockchain {
189
    fn get_block_hash(&self, height: u64) -> Result<BlockHash, Error> {
6✔
190
        Ok(self.client.get_block_hash(height)?)
6✔
191
    }
6✔
192
}
193

194
impl WalletSync for RpcBlockchain {
195
    fn wallet_setup<D>(&self, db: &RefCell<D>, prog: Box<dyn Progress>) -> Result<(), Error>
190✔
196
    where
190✔
197
        D: BatchDatabase,
190✔
198
    {
190✔
199
        let mut db = db.borrow_mut();
190✔
200
        let db = db.deref_mut();
190✔
201
        let batch = DbState::new(db, &self.sync_params, &*prog)?
190✔
202
            .sync_with_core(&self.client, self.is_descriptors)?
190✔
203
            .as_db_batch()?;
190✔
204

205
        db.commit_batch(batch)
190✔
206
    }
190✔
207
}
208

209
impl ConfigurableBlockchain for RpcBlockchain {
210
    type Config = RpcConfig;
211

212
    /// Returns RpcBlockchain backend creating an RPC client to a specific wallet named as the descriptor's checksum
213
    /// if it's the first time it creates the wallet in the node and upon return is granted the wallet is loaded
214
    fn from_config(config: &Self::Config) -> Result<Self, Error> {
70✔
215
        let wallet_url = format!("{}/wallet/{}", config.url, &config.wallet_name);
70✔
216

217
        let client = Client::new(wallet_url.as_str(), config.auth.clone().into())?;
70✔
218
        let rpc_version = client.version()?;
70✔
219

220
        info!("connected to '{}' with auth: {:?}", wallet_url, config.auth);
70✔
221

222
        if client.list_wallets()?.contains(&config.wallet_name) {
70✔
223
            info!("wallet already loaded: {}", config.wallet_name);
×
224
        } else if list_wallet_dir(&client)?.contains(&config.wallet_name) {
70✔
225
            client.load_wallet(&config.wallet_name)?;
×
226
            info!("wallet loaded: {}", config.wallet_name);
×
227
        } else {
228
            // pre-0.21 use legacy wallets
229
            if rpc_version < 210_000 {
70✔
230
                client.create_wallet(&config.wallet_name, Some(true), None, None, None)?;
×
231
            } else {
232
                // TODO: move back to api call when https://github.com/rust-bitcoin/rust-bitcoincore-rpc/issues/225 is closed
233
                let args = [
70✔
234
                    Value::String(config.wallet_name.clone()),
70✔
235
                    Value::Bool(true),
70✔
236
                    Value::Bool(false),
70✔
237
                    Value::Null,
70✔
238
                    Value::Bool(false),
70✔
239
                    Value::Bool(true),
70✔
240
                ];
70✔
241
                let _: Value = client.call("createwallet", &args)?;
70✔
242
            }
243

244
            info!("wallet created: {}", config.wallet_name);
70✔
245
        }
246

247
        let is_descriptors = is_wallet_descriptor(&client)?;
70✔
248

249
        let blockchain_info = client.get_blockchain_info()?;
70✔
250
        let network = match blockchain_info.chain.as_str() {
70✔
251
            "main" => Network::Bitcoin,
70✔
252
            "test" => Network::Testnet,
70✔
253
            "regtest" => Network::Regtest,
70✔
254
            "signet" => Network::Signet,
×
255
            _ => return Err(Error::Generic("Invalid network".to_string())),
×
256
        };
257
        if network != config.network {
70✔
258
            return Err(Error::InvalidNetwork {
×
259
                requested: config.network,
×
260
                found: network,
×
261
            });
×
262
        }
70✔
263

70✔
264
        let mut capabilities: HashSet<_> = vec![Capability::FullHistory].into_iter().collect();
70✔
265
        if rpc_version >= 210_000 {
70✔
266
            let info: HashMap<String, Value> = client.call("getindexinfo", &[]).unwrap();
70✔
267
            if info.contains_key("txindex") {
70✔
268
                capabilities.insert(Capability::GetAnyTx);
×
269
                capabilities.insert(Capability::AccurateFees);
×
270
            }
70✔
271
        }
×
272

273
        Ok(RpcBlockchain {
70✔
274
            client,
70✔
275
            capabilities,
70✔
276
            is_descriptors,
70✔
277
            sync_params: config.sync_params.clone().unwrap_or_default(),
70✔
278
        })
70✔
279
    }
70✔
280
}
281

282
/// return the wallets available in default wallet directory
283
//TODO use bitcoincore_rpc method when PR #179 lands
284
fn list_wallet_dir(client: &Client) -> Result<Vec<String>, Error> {
70✔
285
    #[derive(Deserialize)]
216✔
286
    struct Name {
287
        name: String,
288
    }
289
    #[derive(Deserialize)]
210✔
290
    struct CallResult {
291
        wallets: Vec<Name>,
292
    }
293

294
    let result: CallResult = client.call("listwalletdir", &[])?;
70✔
295
    Ok(result.wallets.into_iter().map(|n| n.name).collect())
107✔
296
}
70✔
297

298
/// Represents the state of the [`crate::database::Database`].
299
struct DbState<'a, D> {
300
    db: &'a D,
301
    params: &'a RpcSyncParams,
302
    prog: &'a dyn Progress,
303

304
    ext_spks: Vec<ScriptBuf>,
305
    int_spks: Vec<ScriptBuf>,
306
    txs: HashMap<Txid, TransactionDetails>,
307
    utxos: HashSet<LocalUtxo>,
308
    last_indexes: HashMap<KeychainKind, u32>,
309

310
    // "deltas" to apply to database
311
    retained_txs: HashSet<Txid>, // txs to retain (everything else should be deleted)
312
    updated_txs: HashSet<Txid>,  // txs to update
313
    updated_utxos: HashSet<LocalUtxo>, // utxos to update
314
}
315

316
impl<'a, D: BatchDatabase> DbState<'a, D> {
317
    /// Obtain [DbState] from [crate::database::Database].
318
    fn new(db: &'a D, params: &'a RpcSyncParams, prog: &'a dyn Progress) -> Result<Self, Error> {
190✔
319
        let ext_spks = db.iter_script_pubkeys(Some(KeychainKind::External))?;
190✔
320
        let int_spks = db.iter_script_pubkeys(Some(KeychainKind::Internal))?;
190✔
321

322
        // This is a hack to see whether atleast one of the keychains comes from a derivable
323
        // descriptor. We assume that non-derivable descriptors always has a script count of 1.
324
        let last_count = std::cmp::max(ext_spks.len(), int_spks.len());
190✔
325
        let has_derivable = last_count > 1;
190✔
326

190✔
327
        // If at least one descriptor is derivable, we need to ensure scriptPubKeys are sufficiently
190✔
328
        // cached.
190✔
329
        if has_derivable && last_count < params.start_script_count {
190✔
330
            let inner_err = MissingCachedScripts {
×
331
                last_count,
×
332
                missing_count: params.start_script_count - last_count,
×
333
            };
×
334
            debug!("requesting more spks with: {:?}", inner_err);
×
335
            return Err(Error::MissingCachedScripts(inner_err));
×
336
        }
190✔
337

338
        let txs = db
190✔
339
            .iter_txs(true)?
190✔
340
            .into_iter()
190✔
341
            .map(|tx| (tx.txid, tx))
190✔
342
            .collect::<HashMap<_, _>>();
190✔
343

344
        let utxos = db.iter_utxos()?.into_iter().collect::<HashSet<_>>();
190✔
345

346
        let last_indexes = [KeychainKind::External, KeychainKind::Internal]
190✔
347
            .iter()
190✔
348
            .filter_map(|keychain| match db.get_last_index(*keychain) {
380✔
349
                Ok(li_opt) => li_opt.map(|li| Ok((*keychain, li))),
380✔
350
                Err(err) => Some(Err(err)),
×
351
            })
380✔
352
            .collect::<Result<HashMap<_, _>, Error>>()?;
190✔
353

354
        info!("initial db state: txs={} utxos={}", txs.len(), utxos.len());
190✔
355

356
        // "delta" fields
357
        let retained_txs = HashSet::with_capacity(txs.len());
190✔
358
        let updated_txs = HashSet::with_capacity(txs.len());
190✔
359
        let updated_utxos = HashSet::with_capacity(utxos.len());
190✔
360

190✔
361
        Ok(Self {
190✔
362
            db,
190✔
363
            params,
190✔
364
            prog,
190✔
365
            ext_spks,
190✔
366
            int_spks,
190✔
367
            txs,
190✔
368
            utxos,
190✔
369
            last_indexes,
190✔
370
            retained_txs,
190✔
371
            updated_txs,
190✔
372
            updated_utxos,
190✔
373
        })
190✔
374
    }
190✔
375

376
    /// Sync states of [BatchDatabase] and Core wallet.
377
    /// First we import all `scriptPubKey`s from database into core wallet
378
    fn sync_with_core(&mut self, client: &Client, is_descriptor: bool) -> Result<&mut Self, Error> {
190✔
379
        // this tells Core wallet where to sync from for imported scripts
380
        let start_epoch = if self.params.force_start_time {
190✔
381
            self.params.start_time
×
382
        } else {
383
            self.db
190✔
384
                .get_sync_time()?
190✔
385
                .map_or(self.params.start_time, |st| st.block_time.timestamp)
190✔
386
        };
387

388
        // sync scriptPubKeys from Database to Core wallet
389
        let scripts_iter = self.ext_spks.iter().chain(&self.int_spks);
190✔
390
        if is_descriptor {
190✔
391
            import_descriptors(client, start_epoch, scripts_iter)?;
190✔
392
        } else {
393
            import_multi(client, start_epoch, scripts_iter)?;
×
394
        }
395

396
        // wait for Core wallet to rescan (TODO: maybe make this async)
397
        await_wallet_scan(client, self.params.poll_rate_sec, self.prog)?;
190✔
398

399
        // obtain iterator of pagenated `listtransactions` RPC calls
400
        const LIST_TX_PAGE_SIZE: usize = 100; // item count per page
401
        let tx_iter = list_transactions(client, LIST_TX_PAGE_SIZE)?.filter(|item| {
676✔
402
            // filter out conflicting transactions - only accept transactions that are already
676✔
403
            // confirmed, or exists in mempool
676✔
404
            item.info.confirmations > 0 || client.get_mempool_entry(&item.info.txid).is_ok()
676✔
405
        });
676✔
406

407
        // iterate through chronological results of `listtransactions`
408
        for tx_res in tx_iter {
832✔
409
            let mut updated = false;
642✔
410

642✔
411
            let db_tx = self.txs.entry(tx_res.info.txid).or_insert_with(|| {
642✔
412
                updated = true;
366✔
413
                TransactionDetails {
366✔
414
                    txid: tx_res.info.txid,
366✔
415
                    transaction: None,
366✔
416

366✔
417
                    received: 0,
366✔
418
                    sent: 0,
366✔
419
                    fee: None,
366✔
420
                    confirmation_time: None,
366✔
421
                }
366✔
422
            });
642✔
423

424
            // update raw tx (if needed)
425
            let raw_tx =
642✔
426
                &*match &mut db_tx.transaction {
642✔
427
                    Some(raw_tx) => raw_tx,
276✔
428
                    db_tx_opt => {
366✔
429
                        updated = true;
366✔
430
                        db_tx_opt.insert(client.get_raw_transaction(
366✔
431
                            &tx_res.info.txid,
366✔
432
                            tx_res.info.blockhash.as_ref(),
366✔
433
                        )?)
366✔
434
                    }
435
                };
436

437
            // update fee (if needed)
438
            if let (None, Some(new_fee)) = (db_tx.fee, tx_res.detail.fee) {
642✔
439
                updated = true;
52✔
440
                db_tx.fee = Some(new_fee.to_sat().unsigned_abs());
52✔
441
            }
590✔
442

443
            // update confirmation time (if needed)
444
            let conf_time = BlockTime::new(tx_res.info.blockheight, tx_res.info.blocktime);
642✔
445
            if db_tx.confirmation_time != conf_time {
642✔
446
                updated = true;
214✔
447
                db_tx.confirmation_time = conf_time;
214✔
448
            }
428✔
449

450
            // update received (if needed)
451
            let received = Self::received_from_raw_tx(self.db, raw_tx)?;
642✔
452
            if db_tx.received != received {
642✔
453
                updated = true;
360✔
454
                db_tx.received = received;
360✔
455
            }
360✔
456

457
            // check if tx has an immature coinbase output (add to updated UTXOs)
458
            // this is required because `listunspent` does not include immature coinbase outputs
459
            if tx_res.detail.category == GetTransactionResultDetailCategory::Immature {
642✔
460
                let txout = raw_tx
2✔
461
                    .output
2✔
462
                    .get(tx_res.detail.vout as usize)
2✔
463
                    .cloned()
2✔
464
                    .ok_or_else(|| {
2✔
465
                        Error::Generic(format!(
×
466
                            "Core RPC returned detail with invalid vout '{}' for tx '{}'",
×
467
                            tx_res.detail.vout, tx_res.info.txid,
×
468
                        ))
×
469
                    })?;
2✔
470

471
                if let Some((keychain, index)) =
2✔
472
                    self.db.get_path_from_script_pubkey(&txout.script_pubkey)?
2✔
473
                {
2✔
474
                    let utxo = LocalUtxo {
2✔
475
                        outpoint: OutPoint::new(tx_res.info.txid, tx_res.detail.vout),
2✔
476
                        txout,
2✔
477
                        keychain,
2✔
478
                        is_spent: false,
2✔
479
                    };
2✔
480
                    self.updated_utxos.insert(utxo);
2✔
481
                    self.update_last_index(keychain, index);
2✔
482
                }
2✔
483
            }
640✔
484

485
            // update tx deltas
486
            self.retained_txs.insert(tx_res.info.txid);
642✔
487
            if updated {
642✔
488
                self.updated_txs.insert(tx_res.info.txid);
430✔
489
            }
430✔
490
        }
491

492
        // obtain vector of `TransactionDetails::sent` changes
493
        let sent_updates = self
190✔
494
            .txs
190✔
495
            .values()
190✔
496
            // only bother to update txs that are retained
190✔
497
            .filter(|db_tx| self.retained_txs.contains(&db_tx.txid))
466✔
498
            // only bother to update txs where the raw tx is accessable
190✔
499
            .filter_map(|db_tx| (db_tx.transaction.as_ref().map(|tx| (tx, db_tx.sent))))
456✔
500
            // recalcuate sent value, only update txs in which sent value is changed
190✔
501
            .filter_map(|(raw_tx, old_sent)| {
456✔
502
                self.sent_from_raw_tx(raw_tx)
456✔
503
                    .map(|sent| {
456✔
504
                        if sent != old_sent {
456✔
505
                            Some((raw_tx.txid(), sent))
50✔
506
                        } else {
507
                            None
406✔
508
                        }
509
                    })
456✔
510
                    .transpose()
456✔
511
            })
456✔
512
            .collect::<Result<Vec<_>, _>>()?;
190✔
513

514
        // record send updates
515
        sent_updates.iter().for_each(|&(txid, sent)| {
190✔
516
            // apply sent field changes
50✔
517
            self.txs.entry(txid).and_modify(|db_tx| db_tx.sent = sent);
50✔
518
            // mark tx as modified
50✔
519
            self.updated_txs.insert(txid);
50✔
520
        });
190✔
521

522
        // obtain UTXOs from Core wallet
523
        let core_utxos = client
190✔
524
            .list_unspent(Some(0), None, None, Some(true), None)?
190✔
525
            .into_iter()
190✔
526
            .filter_map(|utxo_entry| {
382✔
527
                let path_result = self
382✔
528
                    .db
382✔
529
                    .get_path_from_script_pubkey(&utxo_entry.script_pub_key)
382✔
530
                    .transpose()?;
382✔
531

532
                let utxo_result = match path_result {
378✔
533
                    Ok((keychain, index)) => {
378✔
534
                        self.update_last_index(keychain, index);
378✔
535
                        Ok(Self::make_local_utxo(utxo_entry, keychain, false))
378✔
536
                    }
537
                    Err(err) => Err(err),
×
538
                };
539

540
                Some(utxo_result)
378✔
541
            })
382✔
542
            .collect::<Result<HashSet<_>, Error>>()?;
190✔
543

544
        // mark "spent utxos" to be updated in database
545
        let spent_utxos = self.utxos.difference(&core_utxos).cloned().map(|mut utxo| {
190✔
546
            utxo.is_spent = true;
86✔
547
            utxo
86✔
548
        });
190✔
549

190✔
550
        // mark new utxos to be added in database
190✔
551
        let new_utxos = core_utxos.difference(&self.utxos).cloned();
190✔
552

190✔
553
        // add to updated utxos
190✔
554
        self.updated_utxos.extend(spent_utxos.chain(new_utxos));
190✔
555

190✔
556
        Ok(self)
190✔
557
    }
190✔
558

559
    /// Calculates received amount from raw tx.
560
    fn received_from_raw_tx(db: &D, raw_tx: &Transaction) -> Result<u64, Error> {
642✔
561
        raw_tx.output.iter().try_fold(0_u64, |recv, txo| {
1,328✔
562
            let v = if db.is_mine(&txo.script_pubkey)? {
1,328✔
563
                txo.value
674✔
564
            } else {
565
                0
654✔
566
            };
567
            Ok(recv + v)
1,328✔
568
        })
1,328✔
569
    }
642✔
570

571
    /// Calculates sent from raw tx.
572
    fn sent_from_raw_tx(&self, raw_tx: &Transaction) -> Result<u64, Error> {
456✔
573
        let get_output = |outpoint: &OutPoint| {
460✔
574
            let raw_tx = self.txs.get(&outpoint.txid)?.transaction.as_ref()?;
460✔
575
            raw_tx.output.get(outpoint.vout as usize)
318✔
576
        };
460✔
577

578
        raw_tx.input.iter().try_fold(0_u64, |sent, txin| {
460✔
579
            let v = match get_output(&txin.previous_output) {
460✔
580
                Some(prev_txo) => {
318✔
581
                    if self.db.is_mine(&prev_txo.script_pubkey)? {
318✔
582
                        prev_txo.value
88✔
583
                    } else {
584
                        0
230✔
585
                    }
586
                }
587
                None => 0_u64,
142✔
588
            };
589
            Ok(sent + v)
460✔
590
        })
460✔
591
    }
456✔
592

593
    // updates the db state's last_index for the given keychain (if larger than current last_index)
594
    fn update_last_index(&mut self, keychain: KeychainKind, index: u32) {
380✔
595
        self.last_indexes
380✔
596
            .entry(keychain)
380✔
597
            .and_modify(|last| {
380✔
598
                if *last < index {
324✔
599
                    *last = index;
10✔
600
                }
314✔
601
            })
380✔
602
            .or_insert_with(|| index);
380✔
603
    }
380✔
604

605
    fn make_local_utxo(
378✔
606
        entry: ListUnspentResultEntry,
378✔
607
        keychain: KeychainKind,
378✔
608
        is_spent: bool,
378✔
609
    ) -> LocalUtxo {
378✔
610
        LocalUtxo {
378✔
611
            outpoint: OutPoint::new(entry.txid, entry.vout),
378✔
612
            txout: TxOut {
378✔
613
                value: entry.amount.to_sat(),
378✔
614
                script_pubkey: entry.script_pub_key,
378✔
615
            },
378✔
616
            keychain,
378✔
617
            is_spent,
378✔
618
        }
378✔
619
    }
378✔
620

621
    /// Prepare db batch operations.
622
    fn as_db_batch(&self) -> Result<D::Batch, Error> {
190✔
623
        let mut batch = self.db.begin_batch();
190✔
624
        let mut del_txs = 0_u32;
190✔
625

190✔
626
        // delete stale (not retained) txs from db
190✔
627
        self.txs
190✔
628
            .keys()
190✔
629
            .filter(|&txid| !self.retained_txs.contains(txid))
466✔
630
            .try_for_each(|txid| -> Result<(), Error> {
190✔
631
                batch.del_tx(txid, false)?;
10✔
632
                del_txs += 1;
10✔
633
                Ok(())
10✔
634
            })?;
190✔
635

636
        // update txs
637
        self.updated_txs
190✔
638
            .iter()
190✔
639
            .inspect(|&txid| debug!("updating tx: {}", txid))
382✔
640
            .try_for_each(|txid| batch.set_tx(self.txs.get(txid).unwrap()))?;
382✔
641

642
        // update utxos
643
        self.updated_utxos
190✔
644
            .iter()
190✔
645
            .inspect(|&utxo| debug!("updating utxo: {}", utxo.outpoint))
444✔
646
            .try_for_each(|utxo| batch.set_utxo(utxo))?;
444✔
647

648
        // update last indexes
649
        self.last_indexes
190✔
650
            .iter()
190✔
651
            .try_for_each(|(&keychain, &index)| batch.set_last_index(keychain, index))?;
190✔
652

653
        info!(
190✔
654
            "db batch updates: del_txs={}, update_txs={}, update_utxos={}",
×
655
            del_txs,
×
656
            self.updated_txs.len(),
×
657
            self.updated_utxos.len()
×
658
        );
659

660
        Ok(batch)
190✔
661
    }
190✔
662
}
663

664
fn import_descriptors<'a, S>(
192✔
665
    client: &Client,
192✔
666
    start_epoch: u64,
192✔
667
    scripts_iter: S,
192✔
668
) -> Result<(), Error>
192✔
669
where
192✔
670
    S: Iterator<Item = &'a ScriptBuf>,
192✔
671
{
192✔
672
    let requests = Value::Array(
192✔
673
        scripts_iter
192✔
674
            .map(|script| {
36,110✔
675
                let desc = descriptor_from_script_pubkey(script);
36,110✔
676
                json!({ "timestamp": start_epoch, "desc": desc })
36,110✔
677
            })
36,110✔
678
            .collect(),
192✔
679
    );
192✔
680
    for v in client.call::<Vec<Value>>("importdescriptors", &[requests])? {
36,110✔
681
        match v["success"].as_bool() {
36,110✔
682
            Some(true) => continue,
36,110✔
683
            Some(false) => {
684
                return Err(Error::Generic(
×
685
                    v["error"]["message"]
×
686
                        .as_str()
×
687
                        .map_or("unknown error".into(), ToString::to_string),
×
688
                ))
×
689
            }
690
            _ => return Err(Error::Generic("Unexpected response form Core".to_string())),
×
691
        }
692
    }
693
    Ok(())
192✔
694
}
192✔
695

696
fn import_multi<'a, S>(client: &Client, start_epoch: u64, scripts_iter: S) -> Result<(), Error>
×
697
where
×
698
    S: Iterator<Item = &'a ScriptBuf>,
×
699
{
×
700
    let requests = scripts_iter
×
701
        .map(|script| ImportMultiRequest {
×
702
            timestamp: Timestamp::Time(start_epoch),
×
703
            script_pubkey: Some(ImportMultiRequestScriptPubkey::Script(script)),
×
704
            watchonly: Some(true),
×
705
            ..Default::default()
×
706
        })
×
707
        .collect::<Vec<_>>();
×
708
    let options = ImportMultiOptions { rescan: Some(true) };
×
709
    for v in client.import_multi(&requests, Some(&options))? {
×
710
        if let Some(err) = v.error {
×
711
            return Err(Error::Generic(format!(
×
712
                "{} (code: {})",
×
713
                err.message, err.code
×
714
            )));
×
715
        }
×
716
    }
717
    Ok(())
×
718
}
×
719

720
/// Calls the `listtransactions` RPC method in `page_size`s and returns iterator of the tx results
721
/// in chronological order.
722
///
723
/// `page_size` cannot be less than 1 and cannot be greater than 1000.
724
fn list_transactions(
204✔
725
    client: &Client,
204✔
726
    page_size: usize,
204✔
727
) -> Result<impl Iterator<Item = ListTransactionResult>, Error> {
204✔
728
    if !(1..=1000).contains(&page_size) {
204✔
729
        return Err(Error::Generic(format!(
×
730
            "Core RPC method `listtransactions` must have `page_size` in range [1 to 1000]: got {}",
×
731
            page_size
×
732
        )));
×
733
    }
204✔
734

204✔
735
    // `.take_while` helper to obtain the first error (TODO: remove when we can use `.map_while`)
204✔
736
    let mut got_err = false;
204✔
737

738
    // obtain results in batches (of `page_size`)
739
    let nested_list = (0_usize..)
204✔
740
        .map(|page_index| {
606✔
741
            client.list_transactions(
504✔
742
                None,
504✔
743
                Some(page_size),
504✔
744
                Some(page_size * page_index),
504✔
745
                Some(true),
504✔
746
            )
504✔
747
        })
606✔
748
        // take until returned rpc call is empty or until error
204✔
749
        // TODO: replace with the following when MSRV is 1.57.0:
204✔
750
        // `.map_while(|res| res.map(|l| if l.is_empty() { None } else { Some(l) }).transpose())`
204✔
751
        .take_while(|res| {
606✔
752
            if got_err || matches!(res, Ok(list) if list.is_empty()) {
504✔
753
                // break if last iteration was an error, or if the current result is empty
754
                false
204✔
755
            } else {
756
                // record whether result is error or not
757
                got_err = res.is_err();
300✔
758
                // continue on non-empty result or first error
300✔
759
                true
300✔
760
            }
761
        })
606✔
762
        .collect::<Result<Vec<_>, _>>()
204✔
763
        .map_err(Error::Rpc)?;
204✔
764

765
    // reverse here to have txs in chronological order
766
    Ok(nested_list.into_iter().rev().flatten())
204✔
767
}
204✔
768

769
fn await_wallet_scan(client: &Client, rate_sec: u64, progress: &dyn Progress) -> Result<(), Error> {
192✔
770
    #[derive(Deserialize)]
3,168✔
771
    struct CallResult {
192✔
772
        scanning: ScanningDetails,
192✔
773
    }
192✔
774

192✔
775
    let dur = Duration::from_secs(rate_sec);
192✔
776
    loop {
777
        match client.call::<CallResult>("getwalletinfo", &[])?.scanning {
192✔
778
            ScanningDetails::Scanning {
779
                duration,
×
780
                progress: pc,
×
781
            } => {
×
782
                debug!("scanning: duration={}, progress={}", duration, pc);
×
783
                progress.update(pc, Some(format!("elapsed for {} seconds", duration)))?;
×
784
                thread::sleep(dur);
×
785
            }
786
            ScanningDetails::NotScanning(_) => {
787
                progress.update(1.0, None)?;
192✔
788
                info!("scanning: done!");
192✔
789
                return Ok(());
192✔
790
            }
791
        };
792
    }
793
}
192✔
794

795
/// Returns whether a wallet is legacy or descriptors by calling `getwalletinfo`.
796
///
797
/// This API is mapped by bitcoincore_rpc, but it doesn't have the fields we need (either
798
/// "descriptors" or "format") so we have to call the RPC manually
799
fn is_wallet_descriptor(client: &Client) -> Result<bool, Error> {
70✔
800
    #[derive(Deserialize)]
1,190✔
801
    struct CallResult {
802
        descriptors: Option<bool>,
803
    }
804

805
    let result: CallResult = client.call("getwalletinfo", &[])?;
70✔
806
    Ok(result.descriptors.unwrap_or(false))
70✔
807
}
70✔
808

809
fn descriptor_from_script_pubkey(script: &Script) -> String {
36,110✔
810
    let desc = format!("raw({})", script.to_hex_string());
36,110✔
811
    format!("{}#{}", desc, calc_checksum(&desc).unwrap())
36,110✔
812
}
36,110✔
813

814
/// Factory of [`RpcBlockchain`] instances, implements [`BlockchainFactory`]
815
///
816
/// Internally caches the node url and authentication params and allows getting many different [`RpcBlockchain`]
817
/// objects for different wallet names and with different rescan heights.
818
///
819
/// ## Example
820
///
821
/// ```no_run
822
/// # use bdk::bitcoin::Network;
823
/// # use bdk::blockchain::BlockchainFactory;
824
/// # use bdk::blockchain::rpc::{Auth, RpcBlockchainFactory};
825
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
826
/// let factory = RpcBlockchainFactory {
827
///     url: "http://127.0.0.1:18332".to_string(),
828
///     auth: Auth::Cookie {
829
///         file: "/home/user/.bitcoin/.cookie".into(),
830
///     },
831
///     network: Network::Testnet,
832
///     wallet_name_prefix: Some("prefix-".to_string()),
833
///     default_skip_blocks: 100_000,
834
///     sync_params: None,
835
/// };
836
/// let main_wallet_blockchain = factory.build("main_wallet", Some(200_000))?;
837
/// # Ok(())
838
/// # }
839
/// ```
840
#[derive(Debug, Clone)]
×
841
pub struct RpcBlockchainFactory {
842
    /// The bitcoin node url
843
    pub url: String,
844
    /// The bitcoin node authentication mechanism
845
    pub auth: Auth,
846
    /// The network we are using (it will be checked the bitcoin node network matches this)
847
    pub network: Network,
848
    /// The optional prefix used to build the full wallet name for blockchains
849
    pub wallet_name_prefix: Option<String>,
850
    /// Default number of blocks to skip which will be inherited by blockchain unless overridden
851
    pub default_skip_blocks: u32,
852
    /// Sync parameters
853
    pub sync_params: Option<RpcSyncParams>,
854
}
855

856
impl BlockchainFactory for RpcBlockchainFactory {
857
    type Inner = RpcBlockchain;
858

859
    fn build(
6✔
860
        &self,
6✔
861
        checksum: &str,
6✔
862
        _override_skip_blocks: Option<u32>,
6✔
863
    ) -> Result<Self::Inner, Error> {
6✔
864
        RpcBlockchain::from_config(&RpcConfig {
6✔
865
            url: self.url.clone(),
6✔
866
            auth: self.auth.clone(),
6✔
867
            network: self.network,
6✔
868
            wallet_name: format!(
6✔
869
                "{}{}",
6✔
870
                self.wallet_name_prefix.as_ref().unwrap_or(&String::new()),
6✔
871
                checksum
6✔
872
            ),
6✔
873
            sync_params: self.sync_params.clone(),
6✔
874
        })
6✔
875
    }
6✔
876
}
877

878
#[cfg(test)]
879
#[cfg(any(feature = "test-rpc", feature = "test-rpc-legacy"))]
880
mod test {
881
    use super::*;
882
    use crate::{
883
        descriptor::into_wallet_descriptor_checked, testutils::blockchain_tests::TestClient,
884
        wallet::utils::SecpCtx,
885
    };
886

887
    use bitcoin::{Address, Network};
888
    use bitcoincore_rpc::RpcApi;
889
    use log::LevelFilter;
890

891
    crate::bdk_blockchain_tests! {
156✔
892
        fn test_instance(test_client: &TestClient) -> RpcBlockchain {
156✔
893
            let config = RpcConfig {
156✔
894
                url: test_client.bitcoind.rpc_url(),
156✔
895
                auth: Auth::Cookie { file: test_client.bitcoind.params.cookie_file.clone() },
156✔
896
                network: Network::Regtest,
156✔
897
                wallet_name: format!("client-wallet-test-{}", std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos() ),
156✔
898
                sync_params: None,
156✔
899
            };
156✔
900
            RpcBlockchain::from_config(&config).unwrap()
156✔
901
        }
156✔
902
    }
156✔
903

904
    fn get_factory() -> (TestClient, RpcBlockchainFactory) {
4✔
905
        let test_client = TestClient::default();
4✔
906

4✔
907
        let factory = RpcBlockchainFactory {
4✔
908
            url: test_client.bitcoind.rpc_url(),
4✔
909
            auth: Auth::Cookie {
4✔
910
                file: test_client.bitcoind.params.cookie_file.clone(),
4✔
911
            },
4✔
912
            network: Network::Regtest,
4✔
913
            wallet_name_prefix: Some("prefix-".into()),
4✔
914
            default_skip_blocks: 0,
4✔
915
            sync_params: None,
4✔
916
        };
4✔
917

4✔
918
        (test_client, factory)
4✔
919
    }
4✔
920

921
    #[test]
2✔
922
    fn test_rpc_blockchain_factory() {
2✔
923
        let (_test_client, factory) = get_factory();
2✔
924

2✔
925
        let a = factory.build("aaaaaa", None).unwrap();
2✔
926
        assert_eq!(
2✔
927
            a.client
2✔
928
                .get_wallet_info()
2✔
929
                .expect("Node connection isn't working")
2✔
930
                .wallet_name,
2✔
931
            "prefix-aaaaaa"
2✔
932
        );
2✔
933

934
        let b = factory.build("bbbbbb", Some(100)).unwrap();
2✔
935
        assert_eq!(
2✔
936
            b.client
2✔
937
                .get_wallet_info()
2✔
938
                .expect("Node connection isn't working")
2✔
939
                .wallet_name,
2✔
940
            "prefix-bbbbbb"
2✔
941
        );
2✔
942
    }
2✔
943

944
    /// This test ensures that [list_transactions] always iterates through transactions in
945
    /// chronological order, independent of the `page_size`.
946
    #[test]
2✔
947
    fn test_list_transactions() {
2✔
948
        let _ = env_logger::builder()
2✔
949
            .filter_level(LevelFilter::Info)
2✔
950
            .default_format()
2✔
951
            .try_init();
2✔
952

2✔
953
        const DESC: &'static str = "wpkh(tpubD9zMNV59kgbWgKK55SHJugmKKSt6wQXczxpucGYqNKwGmJp1x7Ar2nrLUXYHDdCctXmyDoSCn2JVMzMUDfib3FaDhwxCEMUELoq19xLSx66/*)";
2✔
954
        const AMOUNT_PER_TX: u64 = 10_000;
2✔
955
        const TX_COUNT: u32 = 50;
2✔
956

2✔
957
        let secp = SecpCtx::default();
2✔
958
        let network = Network::Regtest;
2✔
959
        let (desc, ..) = into_wallet_descriptor_checked(DESC, &secp, network).unwrap();
2✔
960

2✔
961
        let (mut test_client, factory) = get_factory();
2✔
962
        let bc = factory.build("itertest", None).unwrap();
2✔
963

2✔
964
        // generate scripts (1 tx per script)
2✔
965
        let scripts = (0..TX_COUNT)
2✔
966
            .map(|index| desc.at_derivation_index(index).unwrap().script_pubkey())
101✔
967
            .collect::<Vec<_>>();
2✔
968

2✔
969
        // import scripts and wait
2✔
970
        if bc.is_descriptors {
2✔
971
            import_descriptors(&bc.client, 0, scripts.iter()).unwrap();
2✔
972
        } else {
2✔
973
            import_multi(&bc.client, 0, scripts.iter()).unwrap();
×
974
        }
×
975
        await_wallet_scan(&bc.client, 2, &NoopProgress).unwrap();
2✔
976

2✔
977
        // create and broadcast txs
2✔
978
        let expected_txids = scripts
2✔
979
            .iter()
2✔
980
            .map(|script| {
101✔
981
                let addr = Address::from_script(script, network).unwrap();
100✔
982
                let txid =
100✔
983
                    test_client.receive(testutils! { @tx ( (@addr addr) => AMOUNT_PER_TX ) });
100✔
984
                test_client.generate(1, None);
100✔
985
                txid
100✔
986
            })
101✔
987
            .collect::<Vec<_>>();
2✔
988

2✔
989
        // iterate through different page sizes - should always return txs in chronological order
2✔
990
        [1000, 1, 2, 6, 25, 49, 50].iter().for_each(|page_size| {
15✔
991
            println!("trying with page_size: {}", page_size);
14✔
992

14✔
993
            let txids = list_transactions(&bc.client, *page_size)
14✔
994
                .unwrap()
14✔
995
                .map(|res| res.info.txid)
700✔
996
                .collect::<Vec<_>>();
14✔
997

14✔
998
            assert_eq!(txids.len(), expected_txids.len());
14✔
999
            assert_eq!(txids, expected_txids);
14✔
1000
        });
15✔
1001
    }
2✔
1002
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc