• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bitcoindevkit / bdk / 9165140102

20 May 2024 09:17PM UTC coverage: 83.139% (+0.7%) from 82.434%
9165140102

Pull #1128

github

web-flow
Merge addbc3b5d into 2f059a158
Pull Request #1128: feat: add bdk_sqlite_store crate implementing PersistBackend

660 of 688 new or added lines in 5 files covered. (95.93%)

206 existing lines in 1 file now uncovered.

11316 of 13611 relevant lines covered (83.14%)

16773.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.36
/crates/sqlite_store/src/store.rs
1
use bdk_chain::bitcoin::consensus::{deserialize, serialize};
2
use bdk_chain::bitcoin::hashes::Hash;
3
use bdk_chain::bitcoin::{Amount, Network, OutPoint, ScriptBuf, Transaction, TxOut};
4
use bdk_chain::bitcoin::{BlockHash, Txid};
5
use bdk_chain::miniscript::descriptor::{Descriptor, DescriptorPublicKey};
6
use rusqlite::{named_params, Connection};
7
use serde::{Deserialize, Serialize};
8
use std::collections::{BTreeMap, BTreeSet};
9
use std::marker::PhantomData;
10
use std::str::FromStr;
11
use std::sync::{Arc, Mutex};
12

13
use crate::schema::Schema;
14
use crate::{ChangeSet, Error};
15
use bdk_chain::{
16
    indexed_tx_graph, keychain, local_chain, tx_graph, Anchor, Append, DescriptorExt, DescriptorId,
17
};
18

19
/// Persists [`ChangeSet`] data in to a relational schema based SQLite database file.
20
///
21
/// The changesets loaded or stored represent changes to keychain and blockchain data.
22
#[derive(Debug)]
23
pub struct Store<K, A> {
24
    // A rusqlite connection to the SQLite database. Uses a Mutex for thread safety.
25
    conn: Mutex<Connection>,
26
    keychain_marker: PhantomData<K>,
27
    anchor_marker: PhantomData<A>,
28
}
29

30
impl<K, A> Store<K, A>
31
where
32
    K: Ord + for<'de> Deserialize<'de> + Serialize + Send,
33
    A: Anchor + for<'de> Deserialize<'de> + Serialize + Send,
34
{
35
    /// Creates a new store from a [`Connection`].
36
    pub fn new(mut conn: Connection) -> Result<Self, rusqlite::Error> {
12✔
37
        Self::migrate(&mut conn)?;
12✔
38
        Ok(Self {
12✔
39
            conn: Mutex::new(conn),
12✔
40
            keychain_marker: Default::default(),
12✔
41
            anchor_marker: Default::default(),
12✔
42
        })
12✔
43
    }
12✔
44

45
    pub(crate) fn db_transaction(&mut self) -> Result<rusqlite::Transaction, Error> {
25✔
46
        let connection = self.conn.get_mut().expect("unlocked connection mutex");
25✔
47
        connection.transaction().map_err(Error::Sqlite)
25✔
48
    }
25✔
49
}
50

51
impl<K, A> Schema for Store<K, A>
52
where
53
    K: Ord + for<'de> Deserialize<'de> + Serialize + Send,
54
    A: Anchor + Send,
55
{
56
}
57

58
/// Network table related functions.
59
pub(crate) trait NetworkStore {
60
    /// Insert [`Network`] for which all other tables data is valid.
61
    ///
62
    /// Error if trying to insert different network value.
63
    fn insert_network(
12✔
64
        current_network: &Option<Network>,
12✔
65
        db_transaction: &rusqlite::Transaction,
12✔
66
        network_changeset: &Option<Network>,
12✔
67
    ) -> Result<(), Error> {
12✔
68
        if let Some(network) = network_changeset {
12✔
NEW
69
            match current_network {
×
NEW
70
                // if no network change do nothing
×
NEW
71
                Some(current_network) if current_network == network => Ok(()),
×
72
                // if new network not the same as current, error
NEW
73
                Some(current_network) => Err(Error::Network {
×
NEW
74
                    expected: *current_network,
×
NEW
75
                    given: *network,
×
NEW
76
                }),
×
77
                // insert network if none exists
78
                None => {
79
                    let insert_network_stmt = &mut db_transaction
5✔
80
                        .prepare_cached("INSERT INTO network (name) VALUES (:name)")
5✔
81
                        .expect("insert network statement");
5✔
82
                    let name = network.to_string();
5✔
83
                    insert_network_stmt
5✔
84
                        .execute(named_params! {":name": name })
5✔
85
                        .map_err(Error::Sqlite)?;
5✔
86
                    Ok(())
5✔
87
                }
88
            }
89
        } else {
90
            Ok(())
7✔
91
        }
92
    }
12✔
93

94
    /// Select the valid [`Network`] for this database, or `None` if not set.
95
    fn select_network(db_transaction: &rusqlite::Transaction) -> Result<Option<Network>, Error> {
25✔
96
        let mut select_network_stmt = db_transaction
25✔
97
            .prepare_cached("SELECT name FROM network WHERE rowid = 1")
25✔
98
            .expect("select network statement");
25✔
99

25✔
100
        let network = select_network_stmt
25✔
101
            .query_row([], |row| {
25✔
102
                let network = row.get_unwrap::<usize, String>(0);
17✔
103
                let network = Network::from_str(network.as_str()).expect("valid network");
17✔
104
                Ok(network)
17✔
105
            })
25✔
106
            .map_err(Error::Sqlite);
25✔
107
        match network {
8✔
108
            Ok(network) => Ok(Some(network)),
17✔
109
            Err(Error::Sqlite(rusqlite::Error::QueryReturnedNoRows)) => Ok(None),
8✔
NEW
110
            Err(e) => Err(e),
×
111
        }
112
    }
25✔
113
}
114

115
impl<K, A> NetworkStore for Store<K, A>
116
where
117
    K: Send,
118
    A: Send,
119
{
120
}
121

122
/// Block table related functions.
123
pub(crate) trait BlockStore {
124
    /// Insert or delete local chain blocks.
125
    ///
126
    /// Error if trying to insert existing block hash.
127
    fn insert_or_delete_blocks(
12✔
128
        db_transaction: &rusqlite::Transaction,
12✔
129
        chain_changeset: &local_chain::ChangeSet,
12✔
130
    ) -> Result<(), Error> {
12✔
131
        for (height, hash) in chain_changeset.iter() {
12✔
132
            match hash {
11✔
133
                // add new hash at height
134
                Some(hash) => {
11✔
135
                    let insert_block_stmt = &mut db_transaction
11✔
136
                        .prepare_cached("INSERT INTO block (hash, height) VALUES (:hash, :height)")
11✔
137
                        .expect("insert block statement");
11✔
138
                    let hash = hash.to_string();
11✔
139
                    insert_block_stmt
11✔
140
                        .execute(named_params! {":hash": hash, ":height": height })
11✔
141
                        .map_err(Error::Sqlite)?;
11✔
142
                }
143
                // delete block at height
144
                None => {
NEW
145
                    let delete_block_stmt = &mut db_transaction
×
NEW
146
                        .prepare_cached("DELETE FROM block WHERE height IS :height")
×
NEW
147
                        .expect("delete block statement");
×
NEW
148
                    delete_block_stmt
×
NEW
149
                        .execute(named_params! {":height": height })
×
NEW
150
                        .map_err(Error::Sqlite)?;
×
151
                }
152
            }
153
        }
154

155
        Ok(())
12✔
156
    }
12✔
157

158
    /// Select all blocks.
159
    fn select_blocks(
13✔
160
        db_transaction: &rusqlite::Transaction,
13✔
161
    ) -> Result<BTreeMap<u32, Option<BlockHash>>, Error> {
13✔
162
        let mut select_blocks_stmt = db_transaction
13✔
163
            .prepare_cached("SELECT height, hash FROM block")
13✔
164
            .expect("select blocks statement");
13✔
165

166
        let blocks = select_blocks_stmt
13✔
167
            .query_map([], |row| {
19✔
168
                let height = row.get_unwrap::<usize, u32>(0);
16✔
169
                let hash = row.get_unwrap::<usize, String>(1);
16✔
170
                let hash = Some(BlockHash::from_str(hash.as_str()).expect("block hash"));
16✔
171
                Ok((height, hash))
16✔
172
            })
19✔
173
            .map_err(Error::Sqlite)?;
13✔
174
        blocks
13✔
175
            .into_iter()
13✔
176
            .map(|row| row.map_err(Error::Sqlite))
19✔
177
            .collect()
13✔
178
    }
13✔
179
}
180

181
impl<K, A> BlockStore for Store<K, A>
182
where
183
    K: Send,
184
    A: Send,
185
{
186
}
187

188
/// Keychain table related functions.
189
///
190
/// The keychain objects are stored as [`JSONB`] data.
191
/// [`JSONB`]: https://sqlite.org/json1.html#jsonb
192
pub(crate) trait KeychainStore<K: Send, A: Send>
193
where
194
    K: Ord + for<'de> Deserialize<'de> + Serialize + Send,
195
    A: Anchor + Send,
196
{
197
    /// Insert keychain with descriptor and last active index.
198
    ///
199
    /// If keychain exists only update last active index.
200
    fn insert_keychains(
12✔
201
        db_transaction: &rusqlite::Transaction,
12✔
202
        tx_graph_changeset: &indexed_tx_graph::ChangeSet<A, keychain::ChangeSet<K>>,
12✔
203
    ) -> Result<(), Error> {
12✔
204
        let keychain_changeset = &tx_graph_changeset.indexer;
12✔
205
        for (keychain, descriptor) in keychain_changeset.keychains_added.iter() {
12✔
206
            let insert_keychain_stmt = &mut db_transaction
8✔
207
                .prepare_cached("INSERT INTO keychain (keychain, descriptor, descriptor_id) VALUES (jsonb(:keychain), :descriptor, :descriptor_id)")
8✔
208
                .expect("insert keychain statement");
8✔
209
            let keychain_json = serde_json::to_string(keychain).expect("keychain json");
8✔
210
            let descriptor_id = descriptor.descriptor_id().to_byte_array();
8✔
211
            let descriptor = descriptor.to_string();
8✔
212
            insert_keychain_stmt.execute(named_params! {":keychain": keychain_json, ":descriptor": descriptor, ":descriptor_id": descriptor_id })
8✔
213
                .map_err(Error::Sqlite)?;
8✔
214
        }
215
        Ok(())
12✔
216
    }
12✔
217

218
    /// Update descriptor last revealed index.
219
    fn update_last_revealed(
12✔
220
        db_transaction: &rusqlite::Transaction,
12✔
221
        tx_graph_changeset: &indexed_tx_graph::ChangeSet<A, keychain::ChangeSet<K>>,
12✔
222
    ) -> Result<(), Error> {
12✔
223
        let keychain_changeset = &tx_graph_changeset.indexer;
12✔
224
        for (descriptor_id, last_revealed) in keychain_changeset.last_revealed.iter() {
12✔
225
            let update_last_revealed_stmt = &mut db_transaction
7✔
226
                .prepare_cached(
7✔
227
                    "UPDATE keychain SET last_revealed = :last_revealed
7✔
228
                              WHERE descriptor_id = :descriptor_id",
7✔
229
                )
7✔
230
                .expect("update last revealed statement");
7✔
231
            let descriptor_id = descriptor_id.to_byte_array();
7✔
232
            update_last_revealed_stmt.execute(named_params! {":descriptor_id": descriptor_id, ":last_revealed": * last_revealed })
7✔
233
                .map_err(Error::Sqlite)?;
7✔
234
        }
235
        Ok(())
12✔
236
    }
12✔
237

238
    /// Select keychains added.
239
    fn select_keychains(
13✔
240
        db_transaction: &rusqlite::Transaction,
13✔
241
    ) -> Result<BTreeMap<K, Descriptor<DescriptorPublicKey>>, Error> {
13✔
242
        let mut select_keychains_added_stmt = db_transaction
13✔
243
            .prepare_cached("SELECT json(keychain), descriptor FROM keychain")
13✔
244
            .expect("select keychains statement");
13✔
245

246
        let keychains = select_keychains_added_stmt
13✔
247
            .query_map([], |row| {
16✔
248
                let keychain = row.get_unwrap::<usize, String>(0);
13✔
249
                let keychain = serde_json::from_str::<K>(keychain.as_str()).expect("keychain");
13✔
250
                let descriptor = row.get_unwrap::<usize, String>(1);
13✔
251
                let descriptor = Descriptor::from_str(descriptor.as_str()).expect("descriptor");
13✔
252
                Ok((keychain, descriptor))
13✔
253
            })
16✔
254
            .map_err(Error::Sqlite)?;
13✔
255
        keychains
13✔
256
            .into_iter()
13✔
257
            .map(|row| row.map_err(Error::Sqlite))
16✔
258
            .collect()
13✔
259
    }
13✔
260

261
    /// Select descriptor last revealed indexes.
262
    fn select_last_revealed(
13✔
263
        db_transaction: &rusqlite::Transaction,
13✔
264
    ) -> Result<BTreeMap<DescriptorId, u32>, Error> {
13✔
265
        let mut select_last_revealed_stmt = db_transaction
13✔
266
            .prepare_cached(
13✔
267
                "SELECT descriptor, last_revealed FROM keychain WHERE last_revealed IS NOT NULL",
13✔
268
            )
13✔
269
            .expect("select last revealed statement");
13✔
270

271
        let last_revealed = select_last_revealed_stmt
13✔
272
            .query_map([], |row| {
16✔
273
                let descriptor = row.get_unwrap::<usize, String>(0);
8✔
274
                let descriptor = Descriptor::from_str(descriptor.as_str()).expect("descriptor");
8✔
275
                let descriptor_id = descriptor.descriptor_id();
8✔
276
                let last_revealed = row.get_unwrap::<usize, u32>(1);
8✔
277
                Ok((descriptor_id, last_revealed))
8✔
278
            })
16✔
279
            .map_err(Error::Sqlite)?;
13✔
280
        last_revealed
13✔
281
            .into_iter()
13✔
282
            .map(|row| row.map_err(Error::Sqlite))
16✔
283
            .collect()
13✔
284
    }
13✔
285
}
286

287
impl<K, A> KeychainStore<K, A> for Store<K, A>
288
where
289
    K: Ord + for<'de> Deserialize<'de> + Serialize + Send,
290
    A: Anchor + Send,
291
{
292
}
293

294
/// Tx (transaction) and txout (transaction output) table related functions.
295
pub(crate) trait TxStore<K: Send, A: Send> {
296
    /// Insert transactions.
297
    ///
298
    /// Error if trying to insert existing txid.
299
    fn insert_txs(
12✔
300
        db_transaction: &rusqlite::Transaction,
12✔
301
        tx_graph_changeset: &indexed_tx_graph::ChangeSet<A, keychain::ChangeSet<K>>,
12✔
302
    ) -> Result<(), Error> {
12✔
303
        for tx in tx_graph_changeset.graph.txs.iter() {
12✔
304
            let insert_tx_stmt = &mut db_transaction
9✔
305
                .prepare_cached("INSERT INTO tx (txid, whole_tx) VALUES (:txid, :whole_tx) ON CONFLICT (txid) DO UPDATE SET whole_tx = :whole_tx WHERE txid = :txid")
9✔
306
                .expect("insert or update tx whole_tx statement");
9✔
307
            let txid = tx.txid().to_string();
9✔
308
            let whole_tx = serialize(&tx);
9✔
309
            insert_tx_stmt
9✔
310
                .execute(named_params! {":txid": txid, ":whole_tx": whole_tx })
9✔
311
                .map_err(Error::Sqlite)?;
9✔
312
        }
313
        Ok(())
12✔
314
    }
12✔
315

316
    /// Select all transactions.
317
    fn select_txs(
13✔
318
        db_transaction: &rusqlite::Transaction,
13✔
319
    ) -> Result<BTreeSet<Arc<Transaction>>, Error> {
13✔
320
        let mut select_tx_stmt = db_transaction
13✔
321
            .prepare_cached("SELECT whole_tx FROM tx WHERE whole_tx IS NOT NULL")
13✔
322
            .expect("select tx statement");
13✔
323

324
        let txs = select_tx_stmt
13✔
325
            .query_map([], |row| {
19✔
326
                let whole_tx = row.get_unwrap::<usize, Vec<u8>>(0);
9✔
327
                let whole_tx: Transaction = deserialize(&whole_tx).expect("transaction");
9✔
328
                Ok(Arc::new(whole_tx))
9✔
329
            })
19✔
330
            .map_err(Error::Sqlite)?;
13✔
331

332
        txs.into_iter()
13✔
333
            .map(|row| row.map_err(Error::Sqlite))
19✔
334
            .collect()
13✔
335
    }
13✔
336

337
    /// Select all transactions with last_seen values.
338
    fn select_last_seen(
13✔
339
        db_transaction: &rusqlite::Transaction,
13✔
340
    ) -> Result<BTreeMap<Txid, u64>, Error> {
13✔
341
        // load tx last_seen
13✔
342
        let mut select_last_seen_stmt = db_transaction
13✔
343
            .prepare_cached("SELECT txid, last_seen FROM tx WHERE last_seen IS NOT NULL")
13✔
344
            .expect("select tx last seen statement");
13✔
345

346
        let last_seen = select_last_seen_stmt
13✔
347
            .query_map([], |row| {
19✔
348
                let txid = row.get_unwrap::<usize, String>(0);
9✔
349
                let txid = Txid::from_str(&txid).expect("txid");
9✔
350
                let last_seen = row.get_unwrap::<usize, u64>(1);
9✔
351
                Ok((txid, last_seen))
9✔
352
            })
19✔
353
            .map_err(Error::Sqlite)?;
13✔
354
        last_seen
13✔
355
            .into_iter()
13✔
356
            .map(|row| row.map_err(Error::Sqlite))
19✔
357
            .collect()
13✔
358
    }
13✔
359

360
    /// Insert txouts.
361
    ///
362
    /// Error if trying to insert existing outpoint.
363
    fn insert_txouts(
12✔
364
        db_transaction: &rusqlite::Transaction,
12✔
365
        tx_graph_changeset: &indexed_tx_graph::ChangeSet<A, keychain::ChangeSet<K>>,
12✔
366
    ) -> Result<(), Error> {
12✔
367
        for txout in tx_graph_changeset.graph.txouts.iter() {
12✔
368
            let insert_txout_stmt = &mut db_transaction
6✔
369
                .prepare_cached("INSERT INTO txout (txid, vout, value, script) VALUES (:txid, :vout, :value, :script)")
6✔
370
                .expect("insert txout statement");
6✔
371
            let txid = txout.0.txid.to_string();
6✔
372
            let vout = txout.0.vout;
6✔
373
            let value = txout.1.value.to_sat();
6✔
374
            let script = txout.1.script_pubkey.as_bytes();
6✔
375
            insert_txout_stmt.execute(named_params! {":txid": txid, ":vout": vout, ":value": value, ":script": script })
6✔
376
                .map_err(Error::Sqlite)?;
6✔
377
        }
378
        Ok(())
12✔
379
    }
12✔
380

381
    /// Select all transaction outputs.
382
    fn select_txouts(
13✔
383
        db_transaction: &rusqlite::Transaction,
13✔
384
    ) -> Result<BTreeMap<OutPoint, TxOut>, Error> {
13✔
385
        // load tx outs
13✔
386
        let mut select_txout_stmt = db_transaction
13✔
387
            .prepare_cached("SELECT txid, vout, value, script FROM txout")
13✔
388
            .expect("select txout statement");
13✔
389

390
        let txouts = select_txout_stmt
13✔
391
            .query_map([], |row| {
16✔
392
                let txid = row.get_unwrap::<usize, String>(0);
6✔
393
                let txid = Txid::from_str(&txid).expect("txid");
6✔
394
                let vout = row.get_unwrap::<usize, u32>(1);
6✔
395
                let outpoint = OutPoint::new(txid, vout);
6✔
396
                let value = row.get_unwrap::<usize, u64>(2);
6✔
397
                let script_pubkey = row.get_unwrap::<usize, Vec<u8>>(3);
6✔
398
                let script_pubkey = ScriptBuf::from_bytes(script_pubkey);
6✔
399
                let txout = TxOut {
6✔
400
                    value: Amount::from_sat(value),
6✔
401
                    script_pubkey,
6✔
402
                };
6✔
403
                Ok((outpoint, txout))
6✔
404
            })
16✔
405
            .map_err(Error::Sqlite)?;
13✔
406
        txouts
13✔
407
            .into_iter()
13✔
408
            .map(|row| row.map_err(Error::Sqlite))
16✔
409
            .collect()
13✔
410
    }
13✔
411

412
    /// Update transaction last seen times.
413
    fn update_last_seen(
12✔
414
        db_transaction: &rusqlite::Transaction,
12✔
415
        tx_graph_changeset: &indexed_tx_graph::ChangeSet<A, keychain::ChangeSet<K>>,
12✔
416
    ) -> Result<(), Error> {
12✔
417
        for tx_last_seen in tx_graph_changeset.graph.last_seen.iter() {
15✔
418
            let insert_or_update_tx_stmt = &mut db_transaction
12✔
419
                .prepare_cached("INSERT INTO tx (txid, last_seen) VALUES (:txid, :last_seen) ON CONFLICT (txid) DO UPDATE SET last_seen = :last_seen WHERE txid = :txid")
12✔
420
                .expect("insert or update tx last_seen statement");
12✔
421
            let txid = tx_last_seen.0.to_string();
12✔
422
            let last_seen = *tx_last_seen.1;
12✔
423
            insert_or_update_tx_stmt
12✔
424
                .execute(named_params! {":txid": txid, ":last_seen": last_seen })
12✔
425
                .map_err(Error::Sqlite)?;
12✔
426
        }
427
        Ok(())
12✔
428
    }
12✔
429
}
430

431
impl<K, A> TxStore<K, A> for Store<K, A>
432
where
433
    K: Ord + for<'de> Deserialize<'de> + Serialize + Send,
434
    A: Anchor + Send,
435
{
436
}
437

438
/// Anchor table related functions.
439
pub(crate) trait AnchorStore<K, A>
440
where
441
    K: Ord + for<'de> Deserialize<'de> + Serialize + Send,
442
    A: Anchor + for<'de> Deserialize<'de> + Serialize + Send,
443
{
444
    /// Insert anchors.
445
    fn insert_anchors(
12✔
446
        db_transaction: &rusqlite::Transaction,
12✔
447
        tx_graph_changeset: &indexed_tx_graph::ChangeSet<A, keychain::ChangeSet<K>>,
12✔
448
    ) -> Result<(), Error> {
12✔
449
        // serde_json::to_string
450
        for anchor in tx_graph_changeset.graph.anchors.iter() {
15✔
451
            let insert_anchor_stmt = &mut db_transaction
12✔
452
                .prepare_cached("INSERT INTO anchor_tx (block_hash, anchor, txid) VALUES (:block_hash, jsonb(:anchor), :txid)")
12✔
453
                .expect("insert anchor statement");
12✔
454
            let block_hash = anchor.0.anchor_block().hash.to_string();
12✔
455
            let anchor_json = serde_json::to_string(&anchor.0).expect("anchor json");
12✔
456
            let txid = anchor.1.to_string();
12✔
457
            insert_anchor_stmt.execute(named_params! {":block_hash": block_hash, ":anchor": anchor_json, ":txid": txid })
12✔
458
                .map_err(Error::Sqlite)?;
12✔
459
        }
460
        Ok(())
12✔
461
    }
12✔
462

463
    /// Select all anchors.
464
    fn select_anchors(
13✔
465
        db_transaction: &rusqlite::Transaction,
13✔
466
    ) -> Result<BTreeSet<(A, Txid)>, Error> {
13✔
467
        // serde_json::from_str
13✔
468
        let mut select_anchor_stmt = db_transaction
13✔
469
            .prepare_cached("SELECT block_hash, json(anchor), txid FROM anchor_tx")
13✔
470
            .expect("select anchor statement");
13✔
471
        let anchors = select_anchor_stmt
13✔
472
            .query_map([], |row| {
22✔
473
                let hash = row.get_unwrap::<usize, String>(0);
12✔
474
                let hash = BlockHash::from_str(hash.as_str()).expect("block hash");
12✔
475
                let anchor = row.get_unwrap::<usize, String>(1);
12✔
476
                let anchor: A = serde_json::from_str(anchor.as_str()).expect("anchor");
12✔
477
                // double check anchor blob block hash matches
12✔
478
                assert_eq!(hash, anchor.anchor_block().hash);
12✔
479
                let txid = row.get_unwrap::<usize, String>(2);
12✔
480
                let txid = Txid::from_str(&txid).expect("txid");
12✔
481
                Ok((anchor, txid))
12✔
482
            })
22✔
483
            .map_err(Error::Sqlite)?;
13✔
484
        anchors
13✔
485
            .into_iter()
13✔
486
            .map(|row| row.map_err(Error::Sqlite))
22✔
487
            .collect()
13✔
488
    }
13✔
489
}
490

491
impl<K, A> AnchorStore<K, A> for Store<K, A>
492
where
493
    K: Ord + for<'de> Deserialize<'de> + Serialize + Send,
494
    A: Anchor + for<'de> Deserialize<'de> + Serialize + Send,
495
{
496
}
497

498
/// Functions to read and write all [`ChangeSet`] data.
499
pub(crate) trait ReadWrite<K, A>:
500
    NetworkStore + KeychainStore<K, A> + BlockStore + TxStore<K, A> + AnchorStore<K, A>
501
where
502
    K: Ord + for<'de> Deserialize<'de> + Serialize + Send,
503
    A: Anchor + for<'de> Deserialize<'de> + Serialize + Send,
504
{
505
    fn db_transaction(&mut self) -> Result<rusqlite::Transaction, Error>;
506

507
    fn write(&mut self, changeset: &ChangeSet<K, A>) -> Result<(), Error> {
12✔
508
        // no need to write anything if changeset is empty
12✔
509
        if changeset.is_empty() {
12✔
NEW
510
            return Ok(());
×
511
        }
12✔
512

513
        let db_transaction = self.db_transaction()?;
12✔
514

515
        let network_changeset = &changeset.network;
12✔
516
        let current_network = Self::select_network(&db_transaction)?;
12✔
517
        Self::insert_network(&current_network, &db_transaction, network_changeset)?;
12✔
518

519
        let chain_changeset = &changeset.chain;
12✔
520
        Self::insert_or_delete_blocks(&db_transaction, chain_changeset)?;
12✔
521

522
        let tx_graph_changeset = &changeset.tx_graph;
12✔
523
        Self::insert_keychains(&db_transaction, tx_graph_changeset)?;
12✔
524
        Self::update_last_revealed(&db_transaction, tx_graph_changeset)?;
12✔
525
        Self::insert_txs(&db_transaction, tx_graph_changeset)?;
12✔
526
        Self::insert_txouts(&db_transaction, tx_graph_changeset)?;
12✔
527
        Self::insert_anchors(&db_transaction, tx_graph_changeset)?;
12✔
528
        Self::update_last_seen(&db_transaction, tx_graph_changeset)?;
12✔
529
        db_transaction.commit().map_err(Error::Sqlite)
12✔
530
    }
12✔
531

532
    fn read(&mut self) -> Result<Option<ChangeSet<K, A>>, Error> {
13✔
533
        let db_transaction = self.db_transaction()?;
13✔
534

535
        let network = Self::select_network(&db_transaction)?;
13✔
536
        let chain = Self::select_blocks(&db_transaction)?;
13✔
537
        let keychains_added = Self::select_keychains(&db_transaction)?;
13✔
538
        let last_revealed = Self::select_last_revealed(&db_transaction)?;
13✔
539
        let txs = Self::select_txs(&db_transaction)?;
13✔
540
        let last_seen = Self::select_last_seen(&db_transaction)?;
13✔
541
        let txouts = Self::select_txouts(&db_transaction)?;
13✔
542
        let anchors = Self::select_anchors(&db_transaction)?;
13✔
543

544
        let graph: tx_graph::ChangeSet<A> = tx_graph::ChangeSet {
13✔
545
            txs,
13✔
546
            txouts,
13✔
547
            anchors,
13✔
548
            last_seen,
13✔
549
        };
13✔
550

13✔
551
        let indexer: keychain::ChangeSet<K> = keychain::ChangeSet {
13✔
552
            keychains_added,
13✔
553
            last_revealed,
13✔
554
        };
13✔
555

13✔
556
        let tx_graph: indexed_tx_graph::ChangeSet<A, keychain::ChangeSet<K>> =
13✔
557
            indexed_tx_graph::ChangeSet { graph, indexer };
13✔
558

13✔
559
        if network.is_none() && chain.is_empty() && tx_graph.is_empty() {
13✔
560
            Ok(None)
3✔
561
        } else {
562
            Ok(Some(ChangeSet {
10✔
563
                network,
10✔
564
                chain,
10✔
565
                tx_graph,
10✔
566
            }))
10✔
567
        }
568
    }
13✔
569
}
570

571
impl<K, A> ReadWrite<K, A> for Store<K, A>
572
where
573
    K: Ord + for<'de> Deserialize<'de> + Serialize + Send,
574
    A: Anchor + for<'de> Deserialize<'de> + Serialize + Send,
575
{
576
    fn db_transaction(&mut self) -> Result<rusqlite::Transaction, Error> {
25✔
577
        self.db_transaction()
25✔
578
    }
25✔
579
}
580

581
#[cfg(test)]
582
mod test {
583
    use super::*;
584
    use crate::Append;
585
    use crate::ChangeSet;
586
    use bdk_chain::bitcoin::consensus::encode::deserialize;
587
    use bdk_chain::bitcoin::constants::genesis_block;
588
    use bdk_chain::bitcoin::hashes::hex::FromHex;
589
    use bdk_chain::bitcoin::transaction::Transaction;
590
    use bdk_chain::bitcoin::Network::Testnet;
591
    use bdk_chain::bitcoin::{secp256k1, BlockHash, OutPoint};
592
    use bdk_chain::miniscript::Descriptor;
593
    use bdk_chain::{
594
        indexed_tx_graph, keychain, tx_graph, BlockId, ConfirmationHeightAnchor,
595
        ConfirmationTimeHeightAnchor, DescriptorExt,
596
    };
597
    use bdk_persist::PersistBackend;
598
    use std::str::FromStr;
599
    use std::sync::Arc;
600

601
    #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Hash, Debug, Serialize, Deserialize)]
30✔
602
    enum Keychain {
603
        External { account: u32, name: String },
604
        Internal { account: u32, name: String },
605
    }
606

607
    #[test]
608
    fn insert_and_load_aggregate_changesets_with_confirmation_time_height_anchor(
1✔
609
    ) -> anyhow::Result<()> {
1✔
610
        let (test_changesets, agg_test_changesets) =
1✔
611
            create_test_changesets(&|height, time, hash| ConfirmationTimeHeightAnchor {
2✔
612
                confirmation_height: height,
2✔
613
                confirmation_time: time,
2✔
614
                anchor_block: (height, hash).into(),
2✔
615
            });
2✔
616

1✔
617
        let conn = Connection::open_in_memory().expect("in memory connection");
1✔
618
        let mut store = Store::<Keychain, ConfirmationTimeHeightAnchor>::new(conn)
1✔
619
            .expect("create new memory db store");
1✔
620

1✔
621
        test_changesets.iter().for_each(|changeset| {
3✔
622
            store.write_changes(changeset).expect("write changeset");
3✔
623
        });
3✔
624

1✔
625
        let agg_changeset = store.load_from_persistence().expect("aggregated changeset");
1✔
626

1✔
627
        assert_eq!(agg_changeset, Some(agg_test_changesets));
1✔
628
        Ok(())
1✔
629
    }
1✔
630

631
    #[test]
632
    fn insert_and_load_aggregate_changesets_with_confirmation_height_anchor() -> anyhow::Result<()>
1✔
633
    {
1✔
634
        let (test_changesets, agg_test_changesets) =
1✔
635
            create_test_changesets(&|height, _time, hash| ConfirmationHeightAnchor {
2✔
636
                confirmation_height: height,
2✔
637
                anchor_block: (height, hash).into(),
2✔
638
            });
2✔
639

1✔
640
        let conn = Connection::open_in_memory().expect("in memory connection");
1✔
641
        let mut store = Store::<Keychain, ConfirmationHeightAnchor>::new(conn)
1✔
642
            .expect("create new memory db store");
1✔
643

1✔
644
        test_changesets.iter().for_each(|changeset| {
3✔
645
            store.write_changes(changeset).expect("write changeset");
3✔
646
        });
3✔
647

1✔
648
        let agg_changeset = store.load_from_persistence().expect("aggregated changeset");
1✔
649

1✔
650
        assert_eq!(agg_changeset, Some(agg_test_changesets));
1✔
651
        Ok(())
1✔
652
    }
1✔
653

654
    #[test]
655
    fn insert_and_load_aggregate_changesets_with_blockid_anchor() -> anyhow::Result<()> {
1✔
656
        let (test_changesets, agg_test_changesets) =
1✔
657
            create_test_changesets(&|height, _time, hash| BlockId { height, hash });
2✔
658

1✔
659
        let conn = Connection::open_in_memory().expect("in memory connection");
1✔
660
        let mut store = Store::<Keychain, BlockId>::new(conn).expect("create new memory db store");
1✔
661

1✔
662
        test_changesets.iter().for_each(|changeset| {
3✔
663
            store.write_changes(changeset).expect("write changeset");
3✔
664
        });
3✔
665

1✔
666
        let agg_changeset = store.load_from_persistence().expect("aggregated changeset");
1✔
667

1✔
668
        assert_eq!(agg_changeset, Some(agg_test_changesets));
1✔
669
        Ok(())
1✔
670
    }
1✔
671

672
    fn create_test_changesets<A: Anchor + Copy>(
3✔
673
        anchor_fn: &dyn Fn(u32, u64, BlockHash) -> A,
3✔
674
    ) -> (Vec<ChangeSet<Keychain, A>>, ChangeSet<Keychain, A>) {
3✔
675
        let secp = &secp256k1::Secp256k1::signing_only();
3✔
676

3✔
677
        let network_changeset = Some(Testnet);
3✔
678

3✔
679
        let block_hash_0: BlockHash = genesis_block(Testnet).block_hash();
3✔
680
        let block_hash_1 =
3✔
681
            BlockHash::from_str("00000000b873e79784647a6c82962c70d228557d24a747ea4d1b8bbe878e1206")
3✔
682
                .unwrap();
3✔
683
        let block_hash_2 =
3✔
684
            BlockHash::from_str("000000006c02c8ea6e4ff69651f7fcde348fb9d557a06e6957b65552002a7820")
3✔
685
                .unwrap();
3✔
686

3✔
687
        let block_changeset = [
3✔
688
            (0, Some(block_hash_0)),
3✔
689
            (1, Some(block_hash_1)),
3✔
690
            (2, Some(block_hash_2)),
3✔
691
        ]
3✔
692
        .into();
3✔
693

3✔
694
        let ext_keychain = Keychain::External {
3✔
695
            account: 0,
3✔
696
            name: "ext test".to_string(),
3✔
697
        };
3✔
698
        let (ext_desc, _ext_keymap) = Descriptor::parse_descriptor(secp, "wpkh(tprv8ZgxMBicQKsPcx5nBGsR63Pe8KnRUqmbJNENAfGftF3yuXoMMoVJJcYeUw5eVkm9WBPjWYt6HMWYJNesB5HaNVBaFc1M6dRjWSYnmewUMYy/0/*)").unwrap();
3✔
699
        let ext_desc_id = ext_desc.descriptor_id();
3✔
700
        let int_keychain = Keychain::Internal {
3✔
701
            account: 0,
3✔
702
            name: "int test".to_string(),
3✔
703
        };
3✔
704
        let (int_desc, _int_keymap) = Descriptor::parse_descriptor(secp, "wpkh(tprv8ZgxMBicQKsPcx5nBGsR63Pe8KnRUqmbJNENAfGftF3yuXoMMoVJJcYeUw5eVkm9WBPjWYt6HMWYJNesB5HaNVBaFc1M6dRjWSYnmewUMYy/1/*)").unwrap();
3✔
705
        let int_desc_id = int_desc.descriptor_id();
3✔
706

3✔
707
        let tx0_hex = Vec::<u8>::from_hex("01000000010000000000000000000000000000000000000000000000000000000000000000ffffffff4d04ffff001d0104455468652054696d65732030332f4a616e2f32303039204368616e63656c6c6f72206f6e206272696e6b206f66207365636f6e64206261696c6f757420666f722062616e6b73ffffffff0100f2052a01000000434104678afdb0fe5548271967f1a67130b7105cd6a828e03909a67962e0ea1f61deb649f6bc3f4cef38c4f35504e51ec112de5c384df7ba0b8d578a4c702b6bf11d5fac00000000").unwrap();
3✔
708
        let tx0: Arc<Transaction> = Arc::new(deserialize(tx0_hex.as_slice()).unwrap());
3✔
709
        let tx1_hex = Vec::<u8>::from_hex("010000000001010000000000000000000000000000000000000000000000000000000000000000ffffffff025151feffffff0200f2052a010000001600149243f727dd5343293eb83174324019ec16c2630f0000000000000000776a24aa21a9ede2f61c3f71d1defd3fa999dfa36953755c690689799962b48bebd836974e8cf94c4fecc7daa2490047304402205e423a8754336ca99dbe16509b877ef1bf98d008836c725005b3c787c41ebe46022047246e4467ad7cc7f1ad98662afcaf14c115e0095a227c7b05c5182591c23e7e01000120000000000000000000000000000000000000000000000000000000000000000000000000").unwrap();
3✔
710
        let tx1: Arc<Transaction> = Arc::new(deserialize(tx1_hex.as_slice()).unwrap());
3✔
711
        let tx2_hex = Vec::<u8>::from_hex("01000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0e0432e7494d010e062f503253482fffffffff0100f2052a010000002321038a7f6ef1c8ca0c588aa53fa860128077c9e6c11e6830f4d7ee4e763a56b7718fac00000000").unwrap();
3✔
712
        let tx2: Arc<Transaction> = Arc::new(deserialize(tx2_hex.as_slice()).unwrap());
3✔
713

3✔
714
        let outpoint0_0 = OutPoint::new(tx0.txid(), 0);
3✔
715
        let txout0_0 = tx0.output.first().unwrap().clone();
3✔
716
        let outpoint1_0 = OutPoint::new(tx1.txid(), 0);
3✔
717
        let txout1_0 = tx1.output.first().unwrap().clone();
3✔
718

3✔
719
        let anchor1 = anchor_fn(1, 1296667328, block_hash_1);
3✔
720
        let anchor2 = anchor_fn(2, 1296688946, block_hash_2);
3✔
721

3✔
722
        let tx_graph_changeset = tx_graph::ChangeSet::<A> {
3✔
723
            txs: [tx0.clone(), tx1.clone()].into(),
3✔
724
            txouts: [(outpoint0_0, txout0_0), (outpoint1_0, txout1_0)].into(),
3✔
725
            anchors: [(anchor1, tx0.txid()), (anchor1, tx1.txid())].into(),
3✔
726
            last_seen: [
3✔
727
                (tx0.txid(), 1598918400),
3✔
728
                (tx1.txid(), 1598919121),
3✔
729
                (tx2.txid(), 1608919121),
3✔
730
            ]
3✔
731
            .into(),
3✔
732
        };
3✔
733

3✔
734
        let keychain_changeset = keychain::ChangeSet {
3✔
735
            keychains_added: [(ext_keychain, ext_desc), (int_keychain, int_desc)].into(),
3✔
736
            last_revealed: [(ext_desc_id, 124), (int_desc_id, 421)].into(),
3✔
737
        };
3✔
738

3✔
739
        let graph_changeset: indexed_tx_graph::ChangeSet<A, keychain::ChangeSet<Keychain>> =
3✔
740
            indexed_tx_graph::ChangeSet {
3✔
741
                graph: tx_graph_changeset,
3✔
742
                indexer: keychain_changeset,
3✔
743
            };
3✔
744

3✔
745
        // test changesets to write to db
3✔
746
        let mut changesets = Vec::new();
3✔
747

3✔
748
        changesets.push(ChangeSet {
3✔
749
            network: network_changeset,
3✔
750
            chain: block_changeset,
3✔
751
            tx_graph: graph_changeset,
3✔
752
        });
3✔
753

3✔
754
        // create changeset that sets the whole tx2 and updates it's lastseen where before there was only the txid and last_seen
3✔
755
        let tx_graph_changeset2 = tx_graph::ChangeSet::<A> {
3✔
756
            txs: [tx2.clone()].into(),
3✔
757
            txouts: BTreeMap::default(),
3✔
758
            anchors: BTreeSet::default(),
3✔
759
            last_seen: [(tx2.txid(), 1708919121)].into(),
3✔
760
        };
3✔
761

3✔
762
        let graph_changeset2: indexed_tx_graph::ChangeSet<A, keychain::ChangeSet<Keychain>> =
3✔
763
            indexed_tx_graph::ChangeSet {
3✔
764
                graph: tx_graph_changeset2,
3✔
765
                indexer: keychain::ChangeSet::default(),
3✔
766
            };
3✔
767

3✔
768
        changesets.push(ChangeSet {
3✔
769
            network: None,
3✔
770
            chain: local_chain::ChangeSet::default(),
3✔
771
            tx_graph: graph_changeset2,
3✔
772
        });
3✔
773

3✔
774
        // create changeset that adds a new anchor2 for tx0 and tx1
3✔
775
        let tx_graph_changeset3 = tx_graph::ChangeSet::<A> {
3✔
776
            txs: BTreeSet::default(),
3✔
777
            txouts: BTreeMap::default(),
3✔
778
            anchors: [(anchor2, tx0.txid()), (anchor2, tx1.txid())].into(),
3✔
779
            last_seen: BTreeMap::default(),
3✔
780
        };
3✔
781

3✔
782
        let graph_changeset3: indexed_tx_graph::ChangeSet<A, keychain::ChangeSet<Keychain>> =
3✔
783
            indexed_tx_graph::ChangeSet {
3✔
784
                graph: tx_graph_changeset3,
3✔
785
                indexer: keychain::ChangeSet::default(),
3✔
786
            };
3✔
787

3✔
788
        changesets.push(ChangeSet {
3✔
789
            network: None,
3✔
790
            chain: local_chain::ChangeSet::default(),
3✔
791
            tx_graph: graph_changeset3,
3✔
792
        });
3✔
793

3✔
794
        // aggregated test changesets
3✔
795
        let agg_test_changesets =
3✔
796
            changesets
3✔
797
                .iter()
3✔
798
                .fold(ChangeSet::<Keychain, A>::default(), |mut i, cs| {
9✔
799
                    i.append(cs.clone());
9✔
800
                    i
9✔
801
                });
9✔
802

3✔
803
        (changesets, agg_test_changesets)
3✔
804
    }
3✔
805
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc