• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bitcoindevkit / bdk / 9507819776

13 Jun 2024 10:52PM UTC coverage: 83.064% (-0.05%) from 83.114%
9507819776

push

github

notmandatory
Merge bitcoindevkit/bdk#1454: Refactor wallet and persist mod, remove bdk_persist crate

ec36c7ecc feat(example): use changeset staging with rpc polling example (志宇)
19328d499 feat(wallet)!: change persist API to use `StageExt` and `StageExtAsync` (志宇)
2e40b0118 feat(chain): reintroduce a way to stage changesets before persisting (志宇)
36e82ec68 chore(chain): relax `miniscript` feature flag scope (志宇)
9e97ac033 refactor(persist): update file_store, sqlite, wallet to use bdk_chain::persist (Steve Myers)
54b0c11cb feat(persist): add PersistAsync trait and StagedPersistAsync struct (Steve Myers)
aa640ab27 refactor(persist): rename PersistBackend to Persist, move to chain crate (Steve Myers)

Pull request description:

  ### Description

  Sorry to submit another refactor PR for the persist related stuff, but I think it's worth revisiting. My primary motivations are:

  1. remove `db` from `Wallet` so users have the ability to use `async` storage crates, for example using `sqlx`. I updated docs and examples to let users know they are responsible for persisting changes.
  2. remove the `anyhow` dependency everywhere (except as a dev test dependency). It really doesn't belong in a lib and by removing persistence from `Wallet` it isn't needed.
  3. remove the `bdk_persist` crate and revert back to the original design with generic error types. I kept the `Debug` and `Display` constrains on persist errors so they could still be used with the `anyhow!` macro.

  ### Notes to the reviewers

  I also replaced/renamed old `Persist` with `StagedPersist` struct inspired by #1453, it is only used in examples. The `Wallet` handles it's own staging.

  ### Changelog notice

  Changed

  - Removed `db` from `Wallet`, users are now responsible for persisting ... (continued)

176 of 248 new or added lines in 6 files covered. (70.97%)

3 existing lines in 2 files now uncovered.

11153 of 13427 relevant lines covered (83.06%)

16771.13 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.7
/crates/sqlite/src/store.rs
1
use bdk_chain::bitcoin::consensus::{deserialize, serialize};
2
use bdk_chain::bitcoin::hashes::Hash;
3
use bdk_chain::bitcoin::{Amount, Network, OutPoint, ScriptBuf, Transaction, TxOut};
4
use bdk_chain::bitcoin::{BlockHash, Txid};
5
use bdk_chain::miniscript::descriptor::{Descriptor, DescriptorPublicKey};
6
use rusqlite::{named_params, Connection};
7
use serde::{Deserialize, Serialize};
8
use std::collections::{BTreeMap, BTreeSet};
9
use std::fmt::Debug;
10
use std::marker::PhantomData;
11
use std::str::FromStr;
12
use std::sync::{Arc, Mutex};
13

14
use crate::Error;
15
use bdk_chain::persist::{CombinedChangeSet, PersistBackend};
16
use bdk_chain::{
17
    indexed_tx_graph, keychain, local_chain, tx_graph, Anchor, Append, DescriptorExt, DescriptorId,
18
};
19

20
/// Persists data in to a relational schema based [SQLite] database file.
21
///
22
/// The changesets loaded or stored represent changes to keychain and blockchain data.
23
///
24
/// [SQLite]: https://www.sqlite.org/index.html
25
pub struct Store<K, A> {
26
    // A rusqlite connection to the SQLite database. Uses a Mutex for thread safety.
27
    conn: Mutex<Connection>,
28
    keychain_marker: PhantomData<K>,
29
    anchor_marker: PhantomData<A>,
30
}
31

32
impl<K, A> Debug for Store<K, A> {
33
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
34
        Debug::fmt(&self.conn, f)
×
35
    }
×
36
}
37

38
impl<K, A> Store<K, A>
39
where
40
    K: Ord + for<'de> Deserialize<'de> + Serialize + Send,
41
    A: Anchor + for<'de> Deserialize<'de> + Serialize + Send,
42
{
43
    /// Creates a new store from a [`Connection`].
44
    pub fn new(mut conn: Connection) -> Result<Self, rusqlite::Error> {
11✔
45
        Self::migrate(&mut conn)?;
11✔
46

47
        Ok(Self {
11✔
48
            conn: Mutex::new(conn),
11✔
49
            keychain_marker: Default::default(),
11✔
50
            anchor_marker: Default::default(),
11✔
51
        })
11✔
52
    }
11✔
53

54
    pub(crate) fn db_transaction(&mut self) -> Result<rusqlite::Transaction, Error> {
20✔
55
        let connection = self.conn.get_mut().expect("unlocked connection mutex");
20✔
56
        connection.transaction().map_err(Error::Sqlite)
20✔
57
    }
20✔
58
}
59

60
impl<K, A> PersistBackend<CombinedChangeSet<K, A>> for Store<K, A>
61
where
62
    K: Ord + for<'de> Deserialize<'de> + Serialize + Send,
63
    A: Anchor + for<'de> Deserialize<'de> + Serialize + Send,
64
{
65
    type WriteError = Error;
66
    type LoadError = Error;
67

68
    fn write_changes(
11✔
69
        &mut self,
11✔
70
        changeset: &CombinedChangeSet<K, A>,
11✔
71
    ) -> Result<(), Self::WriteError> {
11✔
72
        self.write(changeset)
11✔
73
    }
11✔
74

75
    fn load_changes(&mut self) -> Result<Option<CombinedChangeSet<K, A>>, Self::LoadError> {
9✔
76
        self.read()
9✔
77
    }
9✔
78
}
79

80
/// Network table related functions.
81
impl<K, A> Store<K, A> {
82
    /// Insert [`Network`] for which all other tables data is valid.
83
    ///
84
    /// Error if trying to insert different network value.
85
    fn insert_network(
11✔
86
        current_network: &Option<Network>,
11✔
87
        db_transaction: &rusqlite::Transaction,
11✔
88
        network_changeset: &Option<Network>,
11✔
89
    ) -> Result<(), Error> {
11✔
90
        if let Some(network) = network_changeset {
11✔
91
            match current_network {
×
92
                // if no network change do nothing
×
93
                Some(current_network) if current_network == network => Ok(()),
×
94
                // if new network not the same as current, error
95
                Some(current_network) => Err(Error::Network {
×
96
                    expected: *current_network,
×
97
                    given: *network,
×
98
                }),
×
99
                // insert network if none exists
100
                None => {
101
                    let insert_network_stmt = &mut db_transaction
5✔
102
                        .prepare_cached("INSERT INTO network (name) VALUES (:name)")
5✔
103
                        .expect("insert network statement");
5✔
104
                    let name = network.to_string();
5✔
105
                    insert_network_stmt
5✔
106
                        .execute(named_params! {":name": name })
5✔
107
                        .map_err(Error::Sqlite)?;
5✔
108
                    Ok(())
5✔
109
                }
110
            }
111
        } else {
112
            Ok(())
6✔
113
        }
114
    }
11✔
115

116
    /// Select the valid [`Network`] for this database, or `None` if not set.
117
    fn select_network(db_transaction: &rusqlite::Transaction) -> Result<Option<Network>, Error> {
20✔
118
        let mut select_network_stmt = db_transaction
20✔
119
            .prepare_cached("SELECT name FROM network WHERE rowid = 1")
20✔
120
            .expect("select network statement");
20✔
121

20✔
122
        let network = select_network_stmt
20✔
123
            .query_row([], |row| {
20✔
124
                let network = row.get_unwrap::<usize, String>(0);
15✔
125
                let network = Network::from_str(network.as_str()).expect("valid network");
15✔
126
                Ok(network)
15✔
127
            })
20✔
128
            .map_err(Error::Sqlite);
20✔
129
        match network {
5✔
130
            Ok(network) => Ok(Some(network)),
15✔
131
            Err(Error::Sqlite(rusqlite::Error::QueryReturnedNoRows)) => Ok(None),
5✔
132
            Err(e) => Err(e),
×
133
        }
134
    }
20✔
135
}
136

137
/// Block table related functions.
138
impl<K, A> Store<K, A> {
139
    /// Insert or delete local chain blocks.
140
    ///
141
    /// Error if trying to insert existing block hash.
142
    fn insert_or_delete_blocks(
11✔
143
        db_transaction: &rusqlite::Transaction,
11✔
144
        chain_changeset: &local_chain::ChangeSet,
11✔
145
    ) -> Result<(), Error> {
11✔
146
        for (height, hash) in chain_changeset.iter() {
11✔
147
            match hash {
11✔
148
                // add new hash at height
149
                Some(hash) => {
11✔
150
                    let insert_block_stmt = &mut db_transaction
11✔
151
                        .prepare_cached("INSERT INTO block (hash, height) VALUES (:hash, :height)")
11✔
152
                        .expect("insert block statement");
11✔
153
                    let hash = hash.to_string();
11✔
154
                    insert_block_stmt
11✔
155
                        .execute(named_params! {":hash": hash, ":height": height })
11✔
156
                        .map_err(Error::Sqlite)?;
11✔
157
                }
158
                // delete block at height
159
                None => {
160
                    let delete_block_stmt = &mut db_transaction
×
161
                        .prepare_cached("DELETE FROM block WHERE height IS :height")
×
162
                        .expect("delete block statement");
×
163
                    delete_block_stmt
×
164
                        .execute(named_params! {":height": height })
×
165
                        .map_err(Error::Sqlite)?;
×
166
                }
167
            }
168
        }
169

170
        Ok(())
11✔
171
    }
11✔
172

173
    /// Select all blocks.
174
    fn select_blocks(
9✔
175
        db_transaction: &rusqlite::Transaction,
9✔
176
    ) -> Result<BTreeMap<u32, Option<BlockHash>>, Error> {
9✔
177
        let mut select_blocks_stmt = db_transaction
9✔
178
            .prepare_cached("SELECT height, hash FROM block")
9✔
179
            .expect("select blocks statement");
9✔
180

181
        let blocks = select_blocks_stmt
9✔
182
            .query_map([], |row| {
15✔
183
                let height = row.get_unwrap::<usize, u32>(0);
15✔
184
                let hash = row.get_unwrap::<usize, String>(1);
15✔
185
                let hash = Some(BlockHash::from_str(hash.as_str()).expect("block hash"));
15✔
186
                Ok((height, hash))
15✔
187
            })
15✔
188
            .map_err(Error::Sqlite)?;
9✔
189
        blocks
9✔
190
            .into_iter()
9✔
191
            .map(|row| row.map_err(Error::Sqlite))
15✔
192
            .collect()
9✔
193
    }
9✔
194
}
195

196
/// Keychain table related functions.
197
///
198
/// The keychain objects are stored as [`JSONB`] data.
199
/// [`JSONB`]: https://sqlite.org/json1.html#jsonb
200
impl<K, A> Store<K, A>
201
where
202
    K: Ord + for<'de> Deserialize<'de> + Serialize + Send,
203
    A: Anchor + Send,
204
{
205
    /// Insert keychain with descriptor and last active index.
206
    ///
207
    /// If keychain exists only update last active index.
208
    fn insert_keychains(
11✔
209
        db_transaction: &rusqlite::Transaction,
11✔
210
        tx_graph_changeset: &indexed_tx_graph::ChangeSet<A, keychain::ChangeSet<K>>,
11✔
211
    ) -> Result<(), Error> {
11✔
212
        let keychain_changeset = &tx_graph_changeset.indexer;
11✔
213
        for (keychain, descriptor) in keychain_changeset.keychains_added.iter() {
13✔
214
            let insert_keychain_stmt = &mut db_transaction
10✔
215
                .prepare_cached("INSERT INTO keychain (keychain, descriptor, descriptor_id) VALUES (jsonb(:keychain), :descriptor, :descriptor_id)")
10✔
216
                .expect("insert keychain statement");
10✔
217
            let keychain_json = serde_json::to_string(keychain).expect("keychain json");
10✔
218
            let descriptor_id = descriptor.descriptor_id().to_byte_array();
10✔
219
            let descriptor = descriptor.to_string();
10✔
220
            insert_keychain_stmt.execute(named_params! {":keychain": keychain_json, ":descriptor": descriptor, ":descriptor_id": descriptor_id })
10✔
221
                .map_err(Error::Sqlite)?;
10✔
222
        }
223
        Ok(())
11✔
224
    }
11✔
225

226
    /// Update descriptor last revealed index.
227
    fn update_last_revealed(
11✔
228
        db_transaction: &rusqlite::Transaction,
11✔
229
        tx_graph_changeset: &indexed_tx_graph::ChangeSet<A, keychain::ChangeSet<K>>,
11✔
230
    ) -> Result<(), Error> {
11✔
231
        let keychain_changeset = &tx_graph_changeset.indexer;
11✔
232
        for (descriptor_id, last_revealed) in keychain_changeset.last_revealed.iter() {
11✔
233
            let update_last_revealed_stmt = &mut db_transaction
7✔
234
                .prepare_cached(
7✔
235
                    "UPDATE keychain SET last_revealed = :last_revealed
7✔
236
                              WHERE descriptor_id = :descriptor_id",
7✔
237
                )
7✔
238
                .expect("update last revealed statement");
7✔
239
            let descriptor_id = descriptor_id.to_byte_array();
7✔
240
            update_last_revealed_stmt.execute(named_params! {":descriptor_id": descriptor_id, ":last_revealed": * last_revealed })
7✔
241
                .map_err(Error::Sqlite)?;
7✔
242
        }
243
        Ok(())
11✔
244
    }
11✔
245

246
    /// Select keychains added.
247
    fn select_keychains(
9✔
248
        db_transaction: &rusqlite::Transaction,
9✔
249
    ) -> Result<BTreeMap<K, Descriptor<DescriptorPublicKey>>, Error> {
9✔
250
        let mut select_keychains_added_stmt = db_transaction
9✔
251
            .prepare_cached("SELECT json(keychain), descriptor FROM keychain")
9✔
252
            .expect("select keychains statement");
9✔
253

254
        let keychains = select_keychains_added_stmt
9✔
255
            .query_map([], |row| {
18✔
256
                let keychain = row.get_unwrap::<usize, String>(0);
18✔
257
                let keychain = serde_json::from_str::<K>(keychain.as_str()).expect("keychain");
18✔
258
                let descriptor = row.get_unwrap::<usize, String>(1);
18✔
259
                let descriptor = Descriptor::from_str(descriptor.as_str()).expect("descriptor");
18✔
260
                Ok((keychain, descriptor))
18✔
261
            })
18✔
262
            .map_err(Error::Sqlite)?;
9✔
263
        keychains
9✔
264
            .into_iter()
9✔
265
            .map(|row| row.map_err(Error::Sqlite))
18✔
266
            .collect()
9✔
267
    }
9✔
268

269
    /// Select descriptor last revealed indexes.
270
    fn select_last_revealed(
9✔
271
        db_transaction: &rusqlite::Transaction,
9✔
272
    ) -> Result<BTreeMap<DescriptorId, u32>, Error> {
9✔
273
        let mut select_last_revealed_stmt = db_transaction
9✔
274
            .prepare_cached(
9✔
275
                "SELECT descriptor, last_revealed FROM keychain WHERE last_revealed IS NOT NULL",
9✔
276
            )
9✔
277
            .expect("select last revealed statement");
9✔
278

279
        let last_revealed = select_last_revealed_stmt
9✔
280
            .query_map([], |row| {
12✔
281
                let descriptor = row.get_unwrap::<usize, String>(0);
7✔
282
                let descriptor = Descriptor::from_str(descriptor.as_str()).expect("descriptor");
7✔
283
                let descriptor_id = descriptor.descriptor_id();
7✔
284
                let last_revealed = row.get_unwrap::<usize, u32>(1);
7✔
285
                Ok((descriptor_id, last_revealed))
7✔
286
            })
12✔
287
            .map_err(Error::Sqlite)?;
9✔
288
        last_revealed
9✔
289
            .into_iter()
9✔
290
            .map(|row| row.map_err(Error::Sqlite))
12✔
291
            .collect()
9✔
292
    }
9✔
293
}
294

295
/// Tx (transaction) and txout (transaction output) table related functions.
296
impl<K, A> Store<K, A> {
297
    /// Insert transactions.
298
    ///
299
    /// Error if trying to insert existing txid.
300
    fn insert_txs(
11✔
301
        db_transaction: &rusqlite::Transaction,
11✔
302
        tx_graph_changeset: &indexed_tx_graph::ChangeSet<A, keychain::ChangeSet<K>>,
11✔
303
    ) -> Result<(), Error> {
11✔
304
        for tx in tx_graph_changeset.graph.txs.iter() {
11✔
305
            let insert_tx_stmt = &mut db_transaction
9✔
306
                .prepare_cached("INSERT INTO tx (txid, whole_tx) VALUES (:txid, :whole_tx) ON CONFLICT (txid) DO UPDATE SET whole_tx = :whole_tx WHERE txid = :txid")
9✔
307
                .expect("insert or update tx whole_tx statement");
9✔
308
            let txid = tx.compute_txid().to_string();
9✔
309
            let whole_tx = serialize(&tx);
9✔
310
            insert_tx_stmt
9✔
311
                .execute(named_params! {":txid": txid, ":whole_tx": whole_tx })
9✔
312
                .map_err(Error::Sqlite)?;
9✔
313
        }
314
        Ok(())
11✔
315
    }
11✔
316

317
    /// Select all transactions.
318
    fn select_txs(
9✔
319
        db_transaction: &rusqlite::Transaction,
9✔
320
    ) -> Result<BTreeSet<Arc<Transaction>>, Error> {
9✔
321
        let mut select_tx_stmt = db_transaction
9✔
322
            .prepare_cached("SELECT whole_tx FROM tx WHERE whole_tx IS NOT NULL")
9✔
323
            .expect("select tx statement");
9✔
324

325
        let txs = select_tx_stmt
9✔
326
            .query_map([], |row| {
15✔
327
                let whole_tx = row.get_unwrap::<usize, Vec<u8>>(0);
9✔
328
                let whole_tx: Transaction = deserialize(&whole_tx).expect("transaction");
9✔
329
                Ok(Arc::new(whole_tx))
9✔
330
            })
15✔
331
            .map_err(Error::Sqlite)?;
9✔
332

333
        txs.into_iter()
9✔
334
            .map(|row| row.map_err(Error::Sqlite))
15✔
335
            .collect()
9✔
336
    }
9✔
337

338
    /// Select all transactions with last_seen values.
339
    fn select_last_seen(
9✔
340
        db_transaction: &rusqlite::Transaction,
9✔
341
    ) -> Result<BTreeMap<Txid, u64>, Error> {
9✔
342
        // load tx last_seen
9✔
343
        let mut select_last_seen_stmt = db_transaction
9✔
344
            .prepare_cached("SELECT txid, last_seen FROM tx WHERE last_seen IS NOT NULL")
9✔
345
            .expect("select tx last seen statement");
9✔
346

347
        let last_seen = select_last_seen_stmt
9✔
348
            .query_map([], |row| {
15✔
349
                let txid = row.get_unwrap::<usize, String>(0);
9✔
350
                let txid = Txid::from_str(&txid).expect("txid");
9✔
351
                let last_seen = row.get_unwrap::<usize, u64>(1);
9✔
352
                Ok((txid, last_seen))
9✔
353
            })
15✔
354
            .map_err(Error::Sqlite)?;
9✔
355
        last_seen
9✔
356
            .into_iter()
9✔
357
            .map(|row| row.map_err(Error::Sqlite))
15✔
358
            .collect()
9✔
359
    }
9✔
360

361
    /// Insert txouts.
362
    ///
363
    /// Error if trying to insert existing outpoint.
364
    fn insert_txouts(
11✔
365
        db_transaction: &rusqlite::Transaction,
11✔
366
        tx_graph_changeset: &indexed_tx_graph::ChangeSet<A, keychain::ChangeSet<K>>,
11✔
367
    ) -> Result<(), Error> {
11✔
368
        for txout in tx_graph_changeset.graph.txouts.iter() {
11✔
369
            let insert_txout_stmt = &mut db_transaction
6✔
370
                .prepare_cached("INSERT INTO txout (txid, vout, value, script) VALUES (:txid, :vout, :value, :script)")
6✔
371
                .expect("insert txout statement");
6✔
372
            let txid = txout.0.txid.to_string();
6✔
373
            let vout = txout.0.vout;
6✔
374
            let value = txout.1.value.to_sat();
6✔
375
            let script = txout.1.script_pubkey.as_bytes();
6✔
376
            insert_txout_stmt.execute(named_params! {":txid": txid, ":vout": vout, ":value": value, ":script": script })
6✔
377
                .map_err(Error::Sqlite)?;
6✔
378
        }
379
        Ok(())
11✔
380
    }
11✔
381

382
    /// Select all transaction outputs.
383
    fn select_txouts(
9✔
384
        db_transaction: &rusqlite::Transaction,
9✔
385
    ) -> Result<BTreeMap<OutPoint, TxOut>, Error> {
9✔
386
        // load tx outs
9✔
387
        let mut select_txout_stmt = db_transaction
9✔
388
            .prepare_cached("SELECT txid, vout, value, script FROM txout")
9✔
389
            .expect("select txout statement");
9✔
390

391
        let txouts = select_txout_stmt
9✔
392
            .query_map([], |row| {
12✔
393
                let txid = row.get_unwrap::<usize, String>(0);
6✔
394
                let txid = Txid::from_str(&txid).expect("txid");
6✔
395
                let vout = row.get_unwrap::<usize, u32>(1);
6✔
396
                let outpoint = OutPoint::new(txid, vout);
6✔
397
                let value = row.get_unwrap::<usize, u64>(2);
6✔
398
                let script_pubkey = row.get_unwrap::<usize, Vec<u8>>(3);
6✔
399
                let script_pubkey = ScriptBuf::from_bytes(script_pubkey);
6✔
400
                let txout = TxOut {
6✔
401
                    value: Amount::from_sat(value),
6✔
402
                    script_pubkey,
6✔
403
                };
6✔
404
                Ok((outpoint, txout))
6✔
405
            })
12✔
406
            .map_err(Error::Sqlite)?;
9✔
407
        txouts
9✔
408
            .into_iter()
9✔
409
            .map(|row| row.map_err(Error::Sqlite))
12✔
410
            .collect()
9✔
411
    }
9✔
412

413
    /// Update transaction last seen times.
414
    fn update_last_seen(
11✔
415
        db_transaction: &rusqlite::Transaction,
11✔
416
        tx_graph_changeset: &indexed_tx_graph::ChangeSet<A, keychain::ChangeSet<K>>,
11✔
417
    ) -> Result<(), Error> {
11✔
418
        for tx_last_seen in tx_graph_changeset.graph.last_seen.iter() {
14✔
419
            let insert_or_update_tx_stmt = &mut db_transaction
12✔
420
                .prepare_cached("INSERT INTO tx (txid, last_seen) VALUES (:txid, :last_seen) ON CONFLICT (txid) DO UPDATE SET last_seen = :last_seen WHERE txid = :txid")
12✔
421
                .expect("insert or update tx last_seen statement");
12✔
422
            let txid = tx_last_seen.0.to_string();
12✔
423
            let last_seen = *tx_last_seen.1;
12✔
424
            insert_or_update_tx_stmt
12✔
425
                .execute(named_params! {":txid": txid, ":last_seen": last_seen })
12✔
426
                .map_err(Error::Sqlite)?;
12✔
427
        }
428
        Ok(())
11✔
429
    }
11✔
430
}
431

432
/// Anchor table related functions.
433
impl<K, A> Store<K, A>
434
where
435
    K: Ord + for<'de> Deserialize<'de> + Serialize + Send,
436
    A: Anchor + for<'de> Deserialize<'de> + Serialize + Send,
437
{
438
    /// Insert anchors.
439
    fn insert_anchors(
11✔
440
        db_transaction: &rusqlite::Transaction,
11✔
441
        tx_graph_changeset: &indexed_tx_graph::ChangeSet<A, keychain::ChangeSet<K>>,
11✔
442
    ) -> Result<(), Error> {
11✔
443
        // serde_json::to_string
444
        for anchor in tx_graph_changeset.graph.anchors.iter() {
14✔
445
            let insert_anchor_stmt = &mut db_transaction
12✔
446
                .prepare_cached("INSERT INTO anchor_tx (block_hash, anchor, txid) VALUES (:block_hash, jsonb(:anchor), :txid)")
12✔
447
                .expect("insert anchor statement");
12✔
448
            let block_hash = anchor.0.anchor_block().hash.to_string();
12✔
449
            let anchor_json = serde_json::to_string(&anchor.0).expect("anchor json");
12✔
450
            let txid = anchor.1.to_string();
12✔
451
            insert_anchor_stmt.execute(named_params! {":block_hash": block_hash, ":anchor": anchor_json, ":txid": txid })
12✔
452
                .map_err(Error::Sqlite)?;
12✔
453
        }
454
        Ok(())
11✔
455
    }
11✔
456

457
    /// Select all anchors.
458
    fn select_anchors(
9✔
459
        db_transaction: &rusqlite::Transaction,
9✔
460
    ) -> Result<BTreeSet<(A, Txid)>, Error> {
9✔
461
        // serde_json::from_str
9✔
462
        let mut select_anchor_stmt = db_transaction
9✔
463
            .prepare_cached("SELECT block_hash, json(anchor), txid FROM anchor_tx")
9✔
464
            .expect("select anchor statement");
9✔
465
        let anchors = select_anchor_stmt
9✔
466
            .query_map([], |row| {
18✔
467
                let hash = row.get_unwrap::<usize, String>(0);
12✔
468
                let hash = BlockHash::from_str(hash.as_str()).expect("block hash");
12✔
469
                let anchor = row.get_unwrap::<usize, String>(1);
12✔
470
                let anchor: A = serde_json::from_str(anchor.as_str()).expect("anchor");
12✔
471
                // double check anchor blob block hash matches
12✔
472
                assert_eq!(hash, anchor.anchor_block().hash);
12✔
473
                let txid = row.get_unwrap::<usize, String>(2);
12✔
474
                let txid = Txid::from_str(&txid).expect("txid");
12✔
475
                Ok((anchor, txid))
12✔
476
            })
18✔
477
            .map_err(Error::Sqlite)?;
9✔
478
        anchors
9✔
479
            .into_iter()
9✔
480
            .map(|row| row.map_err(Error::Sqlite))
18✔
481
            .collect()
9✔
482
    }
9✔
483
}
484

485
/// Functions to read and write all [`ChangeSet`] data.
486
impl<K, A> Store<K, A>
487
where
488
    K: Ord + for<'de> Deserialize<'de> + Serialize + Send,
489
    A: Anchor + for<'de> Deserialize<'de> + Serialize + Send,
490
{
491
    fn write(&mut self, changeset: &CombinedChangeSet<K, A>) -> Result<(), Error> {
11✔
492
        // no need to write anything if changeset is empty
11✔
493
        if changeset.is_empty() {
11✔
494
            return Ok(());
×
495
        }
11✔
496

497
        let db_transaction = self.db_transaction()?;
11✔
498

499
        let network_changeset = &changeset.network;
11✔
500
        let current_network = Self::select_network(&db_transaction)?;
11✔
501
        Self::insert_network(&current_network, &db_transaction, network_changeset)?;
11✔
502

503
        let chain_changeset = &changeset.chain;
11✔
504
        Self::insert_or_delete_blocks(&db_transaction, chain_changeset)?;
11✔
505

506
        let tx_graph_changeset = &changeset.indexed_tx_graph;
11✔
507
        Self::insert_keychains(&db_transaction, tx_graph_changeset)?;
11✔
508
        Self::update_last_revealed(&db_transaction, tx_graph_changeset)?;
11✔
509
        Self::insert_txs(&db_transaction, tx_graph_changeset)?;
11✔
510
        Self::insert_txouts(&db_transaction, tx_graph_changeset)?;
11✔
511
        Self::insert_anchors(&db_transaction, tx_graph_changeset)?;
11✔
512
        Self::update_last_seen(&db_transaction, tx_graph_changeset)?;
11✔
513
        db_transaction.commit().map_err(Error::Sqlite)
11✔
514
    }
11✔
515

516
    fn read(&mut self) -> Result<Option<CombinedChangeSet<K, A>>, Error> {
9✔
517
        let db_transaction = self.db_transaction()?;
9✔
518

519
        let network = Self::select_network(&db_transaction)?;
9✔
520
        let chain = Self::select_blocks(&db_transaction)?;
9✔
521
        let keychains_added = Self::select_keychains(&db_transaction)?;
9✔
522
        let last_revealed = Self::select_last_revealed(&db_transaction)?;
9✔
523
        let txs = Self::select_txs(&db_transaction)?;
9✔
524
        let last_seen = Self::select_last_seen(&db_transaction)?;
9✔
525
        let txouts = Self::select_txouts(&db_transaction)?;
9✔
526
        let anchors = Self::select_anchors(&db_transaction)?;
9✔
527

528
        let graph: tx_graph::ChangeSet<A> = tx_graph::ChangeSet {
9✔
529
            txs,
9✔
530
            txouts,
9✔
531
            anchors,
9✔
532
            last_seen,
9✔
533
        };
9✔
534

9✔
535
        let indexer: keychain::ChangeSet<K> = keychain::ChangeSet {
9✔
536
            keychains_added,
9✔
537
            last_revealed,
9✔
538
        };
9✔
539

9✔
540
        let indexed_tx_graph: indexed_tx_graph::ChangeSet<A, keychain::ChangeSet<K>> =
9✔
541
            indexed_tx_graph::ChangeSet { graph, indexer };
9✔
542

9✔
543
        if network.is_none() && chain.is_empty() && indexed_tx_graph.is_empty() {
9✔
UNCOV
544
            Ok(None)
×
545
        } else {
546
            Ok(Some(CombinedChangeSet {
9✔
547
                chain,
9✔
548
                indexed_tx_graph,
9✔
549
                network,
9✔
550
            }))
9✔
551
        }
552
    }
9✔
553
}
554

555
#[cfg(test)]
556
mod test {
557
    use super::*;
558
    use crate::store::Append;
559
    use bdk_chain::bitcoin::consensus::encode::deserialize;
560
    use bdk_chain::bitcoin::constants::genesis_block;
561
    use bdk_chain::bitcoin::hashes::hex::FromHex;
562
    use bdk_chain::bitcoin::transaction::Transaction;
563
    use bdk_chain::bitcoin::Network::Testnet;
564
    use bdk_chain::bitcoin::{secp256k1, BlockHash, OutPoint};
565
    use bdk_chain::miniscript::Descriptor;
566
    use bdk_chain::persist::{CombinedChangeSet, PersistBackend};
567
    use bdk_chain::{
568
        indexed_tx_graph, keychain, tx_graph, BlockId, ConfirmationHeightAnchor,
569
        ConfirmationTimeHeightAnchor, DescriptorExt,
570
    };
571
    use std::str::FromStr;
572
    use std::sync::Arc;
573

574
    #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Hash, Debug, Serialize, Deserialize)]
30✔
575
    enum Keychain {
576
        External { account: u32, name: String },
577
        Internal { account: u32, name: String },
578
    }
579

580
    #[test]
581
    fn insert_and_load_aggregate_changesets_with_confirmation_time_height_anchor() {
1✔
582
        let (test_changesets, agg_test_changesets) =
1✔
583
            create_test_changesets(&|height, time, hash| ConfirmationTimeHeightAnchor {
2✔
584
                confirmation_height: height,
2✔
585
                confirmation_time: time,
2✔
586
                anchor_block: (height, hash).into(),
2✔
587
            });
2✔
588

1✔
589
        let conn = Connection::open_in_memory().expect("in memory connection");
1✔
590
        let mut store = Store::<Keychain, ConfirmationTimeHeightAnchor>::new(conn)
1✔
591
            .expect("create new memory db store");
1✔
592

1✔
593
        test_changesets.iter().for_each(|changeset| {
3✔
594
            store.write_changes(changeset).expect("write changeset");
3✔
595
        });
3✔
596

1✔
597
        let agg_changeset = store.load_changes().expect("aggregated changeset");
1✔
598

1✔
599
        assert_eq!(agg_changeset, Some(agg_test_changesets));
1✔
600
    }
1✔
601

602
    #[test]
603
    fn insert_and_load_aggregate_changesets_with_confirmation_height_anchor() {
1✔
604
        let (test_changesets, agg_test_changesets) =
1✔
605
            create_test_changesets(&|height, _time, hash| ConfirmationHeightAnchor {
2✔
606
                confirmation_height: height,
2✔
607
                anchor_block: (height, hash).into(),
2✔
608
            });
2✔
609

1✔
610
        let conn = Connection::open_in_memory().expect("in memory connection");
1✔
611
        let mut store = Store::<Keychain, ConfirmationHeightAnchor>::new(conn)
1✔
612
            .expect("create new memory db store");
1✔
613

1✔
614
        test_changesets.iter().for_each(|changeset| {
3✔
615
            store.write_changes(changeset).expect("write changeset");
3✔
616
        });
3✔
617

1✔
618
        let agg_changeset = store.load_changes().expect("aggregated changeset");
1✔
619

1✔
620
        assert_eq!(agg_changeset, Some(agg_test_changesets));
1✔
621
    }
1✔
622

623
    #[test]
624
    fn insert_and_load_aggregate_changesets_with_blockid_anchor() {
1✔
625
        let (test_changesets, agg_test_changesets) =
1✔
626
            create_test_changesets(&|height, _time, hash| BlockId { height, hash });
2✔
627

1✔
628
        let conn = Connection::open_in_memory().expect("in memory connection");
1✔
629
        let mut store = Store::<Keychain, BlockId>::new(conn).expect("create new memory db store");
1✔
630

1✔
631
        test_changesets.iter().for_each(|changeset| {
3✔
632
            store.write_changes(changeset).expect("write changeset");
3✔
633
        });
3✔
634

1✔
635
        let agg_changeset = store.load_changes().expect("aggregated changeset");
1✔
636

1✔
637
        assert_eq!(agg_changeset, Some(agg_test_changesets));
1✔
638
    }
1✔
639

640
    fn create_test_changesets<A: Anchor + Copy>(
3✔
641
        anchor_fn: &dyn Fn(u32, u64, BlockHash) -> A,
3✔
642
    ) -> (
3✔
643
        Vec<CombinedChangeSet<Keychain, A>>,
3✔
644
        CombinedChangeSet<Keychain, A>,
3✔
645
    ) {
3✔
646
        let secp = &secp256k1::Secp256k1::signing_only();
3✔
647

3✔
648
        let network_changeset = Some(Testnet);
3✔
649

3✔
650
        let block_hash_0: BlockHash = genesis_block(Testnet).block_hash();
3✔
651
        let block_hash_1 =
3✔
652
            BlockHash::from_str("00000000b873e79784647a6c82962c70d228557d24a747ea4d1b8bbe878e1206")
3✔
653
                .unwrap();
3✔
654
        let block_hash_2 =
3✔
655
            BlockHash::from_str("000000006c02c8ea6e4ff69651f7fcde348fb9d557a06e6957b65552002a7820")
3✔
656
                .unwrap();
3✔
657

3✔
658
        let block_changeset = [
3✔
659
            (0, Some(block_hash_0)),
3✔
660
            (1, Some(block_hash_1)),
3✔
661
            (2, Some(block_hash_2)),
3✔
662
        ]
3✔
663
        .into();
3✔
664

3✔
665
        let ext_keychain = Keychain::External {
3✔
666
            account: 0,
3✔
667
            name: "ext test".to_string(),
3✔
668
        };
3✔
669
        let (ext_desc, _ext_keymap) = Descriptor::parse_descriptor(secp, "wpkh(tprv8ZgxMBicQKsPcx5nBGsR63Pe8KnRUqmbJNENAfGftF3yuXoMMoVJJcYeUw5eVkm9WBPjWYt6HMWYJNesB5HaNVBaFc1M6dRjWSYnmewUMYy/0/*)").unwrap();
3✔
670
        let ext_desc_id = ext_desc.descriptor_id();
3✔
671
        let int_keychain = Keychain::Internal {
3✔
672
            account: 0,
3✔
673
            name: "int test".to_string(),
3✔
674
        };
3✔
675
        let (int_desc, _int_keymap) = Descriptor::parse_descriptor(secp, "wpkh(tprv8ZgxMBicQKsPcx5nBGsR63Pe8KnRUqmbJNENAfGftF3yuXoMMoVJJcYeUw5eVkm9WBPjWYt6HMWYJNesB5HaNVBaFc1M6dRjWSYnmewUMYy/1/*)").unwrap();
3✔
676
        let int_desc_id = int_desc.descriptor_id();
3✔
677

3✔
678
        let tx0_hex = Vec::<u8>::from_hex("01000000010000000000000000000000000000000000000000000000000000000000000000ffffffff4d04ffff001d0104455468652054696d65732030332f4a616e2f32303039204368616e63656c6c6f72206f6e206272696e6b206f66207365636f6e64206261696c6f757420666f722062616e6b73ffffffff0100f2052a01000000434104678afdb0fe5548271967f1a67130b7105cd6a828e03909a67962e0ea1f61deb649f6bc3f4cef38c4f35504e51ec112de5c384df7ba0b8d578a4c702b6bf11d5fac00000000").unwrap();
3✔
679
        let tx0: Arc<Transaction> = Arc::new(deserialize(tx0_hex.as_slice()).unwrap());
3✔
680
        let tx1_hex = Vec::<u8>::from_hex("010000000001010000000000000000000000000000000000000000000000000000000000000000ffffffff025151feffffff0200f2052a010000001600149243f727dd5343293eb83174324019ec16c2630f0000000000000000776a24aa21a9ede2f61c3f71d1defd3fa999dfa36953755c690689799962b48bebd836974e8cf94c4fecc7daa2490047304402205e423a8754336ca99dbe16509b877ef1bf98d008836c725005b3c787c41ebe46022047246e4467ad7cc7f1ad98662afcaf14c115e0095a227c7b05c5182591c23e7e01000120000000000000000000000000000000000000000000000000000000000000000000000000").unwrap();
3✔
681
        let tx1: Arc<Transaction> = Arc::new(deserialize(tx1_hex.as_slice()).unwrap());
3✔
682
        let tx2_hex = Vec::<u8>::from_hex("01000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0e0432e7494d010e062f503253482fffffffff0100f2052a010000002321038a7f6ef1c8ca0c588aa53fa860128077c9e6c11e6830f4d7ee4e763a56b7718fac00000000").unwrap();
3✔
683
        let tx2: Arc<Transaction> = Arc::new(deserialize(tx2_hex.as_slice()).unwrap());
3✔
684

3✔
685
        let outpoint0_0 = OutPoint::new(tx0.compute_txid(), 0);
3✔
686
        let txout0_0 = tx0.output.first().unwrap().clone();
3✔
687
        let outpoint1_0 = OutPoint::new(tx1.compute_txid(), 0);
3✔
688
        let txout1_0 = tx1.output.first().unwrap().clone();
3✔
689

3✔
690
        let anchor1 = anchor_fn(1, 1296667328, block_hash_1);
3✔
691
        let anchor2 = anchor_fn(2, 1296688946, block_hash_2);
3✔
692

3✔
693
        let tx_graph_changeset = tx_graph::ChangeSet::<A> {
3✔
694
            txs: [tx0.clone(), tx1.clone()].into(),
3✔
695
            txouts: [(outpoint0_0, txout0_0), (outpoint1_0, txout1_0)].into(),
3✔
696
            anchors: [(anchor1, tx0.compute_txid()), (anchor1, tx1.compute_txid())].into(),
3✔
697
            last_seen: [
3✔
698
                (tx0.compute_txid(), 1598918400),
3✔
699
                (tx1.compute_txid(), 1598919121),
3✔
700
                (tx2.compute_txid(), 1608919121),
3✔
701
            ]
3✔
702
            .into(),
3✔
703
        };
3✔
704

3✔
705
        let keychain_changeset = keychain::ChangeSet {
3✔
706
            keychains_added: [(ext_keychain, ext_desc), (int_keychain, int_desc)].into(),
3✔
707
            last_revealed: [(ext_desc_id, 124), (int_desc_id, 421)].into(),
3✔
708
        };
3✔
709

3✔
710
        let graph_changeset: indexed_tx_graph::ChangeSet<A, keychain::ChangeSet<Keychain>> =
3✔
711
            indexed_tx_graph::ChangeSet {
3✔
712
                graph: tx_graph_changeset,
3✔
713
                indexer: keychain_changeset,
3✔
714
            };
3✔
715

3✔
716
        // test changesets to write to db
3✔
717
        let mut changesets = Vec::new();
3✔
718

3✔
719
        changesets.push(CombinedChangeSet {
3✔
720
            chain: block_changeset,
3✔
721
            indexed_tx_graph: graph_changeset,
3✔
722
            network: network_changeset,
3✔
723
        });
3✔
724

3✔
725
        // create changeset that sets the whole tx2 and updates it's lastseen where before there was only the txid and last_seen
3✔
726
        let tx_graph_changeset2 = tx_graph::ChangeSet::<A> {
3✔
727
            txs: [tx2.clone()].into(),
3✔
728
            txouts: BTreeMap::default(),
3✔
729
            anchors: BTreeSet::default(),
3✔
730
            last_seen: [(tx2.compute_txid(), 1708919121)].into(),
3✔
731
        };
3✔
732

3✔
733
        let graph_changeset2: indexed_tx_graph::ChangeSet<A, keychain::ChangeSet<Keychain>> =
3✔
734
            indexed_tx_graph::ChangeSet {
3✔
735
                graph: tx_graph_changeset2,
3✔
736
                indexer: keychain::ChangeSet::default(),
3✔
737
            };
3✔
738

3✔
739
        changesets.push(CombinedChangeSet {
3✔
740
            chain: local_chain::ChangeSet::default(),
3✔
741
            indexed_tx_graph: graph_changeset2,
3✔
742
            network: None,
3✔
743
        });
3✔
744

3✔
745
        // create changeset that adds a new anchor2 for tx0 and tx1
3✔
746
        let tx_graph_changeset3 = tx_graph::ChangeSet::<A> {
3✔
747
            txs: BTreeSet::default(),
3✔
748
            txouts: BTreeMap::default(),
3✔
749
            anchors: [(anchor2, tx0.compute_txid()), (anchor2, tx1.compute_txid())].into(),
3✔
750
            last_seen: BTreeMap::default(),
3✔
751
        };
3✔
752

3✔
753
        let graph_changeset3: indexed_tx_graph::ChangeSet<A, keychain::ChangeSet<Keychain>> =
3✔
754
            indexed_tx_graph::ChangeSet {
3✔
755
                graph: tx_graph_changeset3,
3✔
756
                indexer: keychain::ChangeSet::default(),
3✔
757
            };
3✔
758

3✔
759
        changesets.push(CombinedChangeSet {
3✔
760
            chain: local_chain::ChangeSet::default(),
3✔
761
            indexed_tx_graph: graph_changeset3,
3✔
762
            network: None,
3✔
763
        });
3✔
764

3✔
765
        // aggregated test changesets
3✔
766
        let agg_test_changesets =
3✔
767
            changesets
3✔
768
                .iter()
3✔
769
                .fold(CombinedChangeSet::<Keychain, A>::default(), |mut i, cs| {
9✔
770
                    i.append(cs.clone());
9✔
771
                    i
9✔
772
                });
9✔
773

3✔
774
        (changesets, agg_test_changesets)
3✔
775
    }
3✔
776
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc