• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bitcoindevkit / bdk / 9886495690

11 Jul 2024 06:19AM UTC coverage: 83.314% (-0.1%) from 83.434%
9886495690

Pull #1510

github

web-flow
Merge 041f4e341 into d99b3ef4b
Pull Request #1510: refactor(chain)! removed IndexedTxGraph 🗑

356 of 376 new or added lines in 6 files covered. (94.68%)

69 existing lines in 3 files now uncovered.

10965 of 13161 relevant lines covered (83.31%)

17008.16 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.38
/crates/sqlite/src/store.rs
1
use bdk_chain::bitcoin::consensus::{deserialize, serialize};
2
use bdk_chain::bitcoin::hashes::Hash;
3
use bdk_chain::bitcoin::{Amount, Network, OutPoint, ScriptBuf, Transaction, TxOut};
4
use bdk_chain::bitcoin::{BlockHash, Txid};
5
use bdk_chain::miniscript::descriptor::{Descriptor, DescriptorPublicKey};
6
use rusqlite::{named_params, Connection};
7
use serde::{Deserialize, Serialize};
8
use std::collections::{BTreeMap, BTreeSet};
9
use std::fmt::Debug;
10
use std::marker::PhantomData;
11
use std::str::FromStr;
12
use std::sync::{Arc, Mutex};
13

14
use crate::Error;
15
use bdk_chain::CombinedChangeSet;
16
use bdk_chain::{
17
    indexer::keychain_txout, local_chain, tx_graph, Anchor, DescriptorExt, DescriptorId, Merge,
18
};
19

20
/// Persists data in to a relational schema based [SQLite] database file.
21
///
22
/// The changesets loaded or stored represent changes to keychain and blockchain data.
23
///
24
/// [SQLite]: https://www.sqlite.org/index.html
25
pub struct Store<K, A> {
26
    // A rusqlite connection to the SQLite database. Uses a Mutex for thread safety.
27
    conn: Mutex<Connection>,
28
    keychain_marker: PhantomData<K>,
29
    anchor_marker: PhantomData<A>,
30
}
31

32
impl<K, A> Debug for Store<K, A> {
33
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
34
        Debug::fmt(&self.conn, f)
×
35
    }
×
36
}
37

38
impl<K, A> Store<K, A>
39
where
40
    K: Ord + for<'de> Deserialize<'de> + Serialize + Send,
41
    A: Anchor + for<'de> Deserialize<'de> + Serialize + Send,
42
{
43
    /// Creates a new store from a [`Connection`].
44
    pub fn new(mut conn: Connection) -> Result<Self, rusqlite::Error> {
10✔
45
        Self::migrate(&mut conn)?;
10✔
46

47
        Ok(Self {
10✔
48
            conn: Mutex::new(conn),
10✔
49
            keychain_marker: Default::default(),
10✔
50
            anchor_marker: Default::default(),
10✔
51
        })
10✔
52
    }
10✔
53

54
    pub(crate) fn db_transaction(&mut self) -> Result<rusqlite::Transaction, Error> {
16✔
55
        let connection = self.conn.get_mut().expect("unlocked connection mutex");
16✔
56
        connection.transaction().map_err(Error::Sqlite)
16✔
57
    }
16✔
58
}
59

60
/// Network table related functions.
61
impl<K, A> Store<K, A> {
62
    /// Insert [`Network`] for which all other tables data is valid.
63
    ///
64
    /// Error if trying to insert different network value.
65
    fn insert_network(
8✔
66
        current_network: &Option<Network>,
8✔
67
        db_transaction: &rusqlite::Transaction,
8✔
68
        network_changeset: &Option<Network>,
8✔
69
    ) -> Result<(), Error> {
8✔
70
        if let Some(network) = network_changeset {
8✔
71
            match current_network {
×
72
                // if no network change do nothing
×
73
                Some(current_network) if current_network == network => Ok(()),
×
74
                // if new network not the same as current, error
75
                Some(current_network) => Err(Error::Network {
×
76
                    expected: *current_network,
×
77
                    given: *network,
×
78
                }),
×
79
                // insert network if none exists
80
                None => {
81
                    let insert_network_stmt = &mut db_transaction
4✔
82
                        .prepare_cached("INSERT INTO network (name) VALUES (:name)")
4✔
83
                        .expect("insert network statement");
4✔
84
                    let name = network.to_string();
4✔
85
                    insert_network_stmt
4✔
86
                        .execute(named_params! {":name": name })
4✔
87
                        .map_err(Error::Sqlite)?;
4✔
88
                    Ok(())
4✔
89
                }
90
            }
91
        } else {
92
            Ok(())
4✔
93
        }
94
    }
8✔
95

96
    /// Select the valid [`Network`] for this database, or `None` if not set.
97
    fn select_network(db_transaction: &rusqlite::Transaction) -> Result<Option<Network>, Error> {
16✔
98
        let mut select_network_stmt = db_transaction
16✔
99
            .prepare_cached("SELECT name FROM network WHERE rowid = 1")
16✔
100
            .expect("select network statement");
16✔
101

16✔
102
        let network = select_network_stmt
16✔
103
            .query_row([], |row| {
16✔
104
                let network = row.get_unwrap::<usize, String>(0);
12✔
105
                let network = Network::from_str(network.as_str()).expect("valid network");
12✔
106
                Ok(network)
12✔
107
            })
16✔
108
            .map_err(Error::Sqlite);
16✔
109
        match network {
4✔
110
            Ok(network) => Ok(Some(network)),
12✔
111
            Err(Error::Sqlite(rusqlite::Error::QueryReturnedNoRows)) => Ok(None),
4✔
112
            Err(e) => Err(e),
×
113
        }
114
    }
16✔
115
}
116

117
/// Block table related functions.
118
impl<K, A> Store<K, A> {
119
    /// Insert or delete local chain blocks.
120
    ///
121
    /// Error if trying to insert existing block hash.
122
    fn insert_or_delete_blocks(
8✔
123
        db_transaction: &rusqlite::Transaction,
8✔
124
        chain_changeset: &local_chain::ChangeSet,
8✔
125
    ) -> Result<(), Error> {
8✔
126
        for (height, hash) in chain_changeset.iter() {
8✔
127
            match hash {
8✔
128
                // add new hash at height
129
                Some(hash) => {
8✔
130
                    let insert_block_stmt = &mut db_transaction
8✔
131
                        .prepare_cached("INSERT INTO block (hash, height) VALUES (:hash, :height)")
8✔
132
                        .expect("insert block statement");
8✔
133
                    let hash = hash.to_string();
8✔
134
                    insert_block_stmt
8✔
135
                        .execute(named_params! {":hash": hash, ":height": height })
8✔
136
                        .map_err(Error::Sqlite)?;
8✔
137
                }
138
                // delete block at height
139
                None => {
140
                    let delete_block_stmt = &mut db_transaction
×
141
                        .prepare_cached("DELETE FROM block WHERE height IS :height")
×
142
                        .expect("delete block statement");
×
143
                    delete_block_stmt
×
144
                        .execute(named_params! {":height": height })
×
145
                        .map_err(Error::Sqlite)?;
×
146
                }
147
            }
148
        }
149

150
        Ok(())
8✔
151
    }
8✔
152

153
    /// Select all blocks.
154
    fn select_blocks(
8✔
155
        db_transaction: &rusqlite::Transaction,
8✔
156
    ) -> Result<BTreeMap<u32, Option<BlockHash>>, Error> {
8✔
157
        let mut select_blocks_stmt = db_transaction
8✔
158
            .prepare_cached("SELECT height, hash FROM block")
8✔
159
            .expect("select blocks statement");
8✔
160

161
        let blocks = select_blocks_stmt
8✔
162
            .query_map([], |row| {
12✔
163
                let height = row.get_unwrap::<usize, u32>(0);
12✔
164
                let hash = row.get_unwrap::<usize, String>(1);
12✔
165
                let hash = Some(BlockHash::from_str(hash.as_str()).expect("block hash"));
12✔
166
                Ok((height, hash))
12✔
167
            })
12✔
168
            .map_err(Error::Sqlite)?;
8✔
169
        blocks
8✔
170
            .into_iter()
8✔
171
            .map(|row| row.map_err(Error::Sqlite))
12✔
172
            .collect()
8✔
173
    }
8✔
174
}
175

176
/// Keychain table related functions.
177
///
178
/// The keychain objects are stored as [`JSONB`] data.
179
/// [`JSONB`]: https://sqlite.org/json1.html#jsonb
180
impl<K, A> Store<K, A>
181
where
182
    K: Ord + for<'de> Deserialize<'de> + Serialize + Send,
183
    A: Anchor + Send,
184
{
185
    /// Insert keychain with descriptor and last active index.
186
    ///
187
    /// If keychain exists only update last active index.
188
    fn insert_keychains(
8✔
189
        db_transaction: &rusqlite::Transaction,
8✔
190
        tx_graph_changeset: &tx_graph::ChangeSet<A, keychain_txout::ChangeSet<K>>,
8✔
191
    ) -> Result<(), Error> {
8✔
192
        let keychain_changeset = &tx_graph_changeset.indexer;
8✔
193
        for (keychain, descriptor) in keychain_changeset.keychains_added.iter() {
10✔
194
            let insert_keychain_stmt = &mut db_transaction
8✔
195
                .prepare_cached("INSERT INTO keychain (keychain, descriptor, descriptor_id) VALUES (jsonb(:keychain), :descriptor, :descriptor_id)")
8✔
196
                .expect("insert keychain statement");
8✔
197
            let keychain_json = serde_json::to_string(keychain).expect("keychain json");
8✔
198
            let descriptor_id = descriptor.descriptor_id().to_byte_array();
8✔
199
            let descriptor = descriptor.to_string();
8✔
200
            insert_keychain_stmt.execute(named_params! {":keychain": keychain_json, ":descriptor": descriptor, ":descriptor_id": descriptor_id })
8✔
201
                .map_err(Error::Sqlite)?;
8✔
202
        }
203
        Ok(())
8✔
204
    }
8✔
205

206
    /// Update descriptor last revealed index.
207
    fn update_last_revealed(
8✔
208
        db_transaction: &rusqlite::Transaction,
8✔
209
        tx_graph_changeset: &tx_graph::ChangeSet<A, keychain_txout::ChangeSet<K>>,
8✔
210
    ) -> Result<(), Error> {
8✔
211
        let keychain_changeset = &tx_graph_changeset.indexer;
8✔
212
        for (descriptor_id, last_revealed) in keychain_changeset.last_revealed.iter() {
8✔
213
            let update_last_revealed_stmt = &mut db_transaction
5✔
214
                .prepare_cached(
5✔
215
                    "UPDATE keychain SET last_revealed = :last_revealed
5✔
216
                              WHERE descriptor_id = :descriptor_id",
5✔
217
                )
5✔
218
                .expect("update last revealed statement");
5✔
219
            let descriptor_id = descriptor_id.to_byte_array();
5✔
220
            update_last_revealed_stmt.execute(named_params! {":descriptor_id": descriptor_id, ":last_revealed": * last_revealed })
5✔
221
                .map_err(Error::Sqlite)?;
5✔
222
        }
223
        Ok(())
8✔
224
    }
8✔
225

226
    /// Select keychains added.
227
    fn select_keychains(
8✔
228
        db_transaction: &rusqlite::Transaction,
8✔
229
    ) -> Result<BTreeMap<K, Descriptor<DescriptorPublicKey>>, Error> {
8✔
230
        let mut select_keychains_added_stmt = db_transaction
8✔
231
            .prepare_cached("SELECT json(keychain), descriptor FROM keychain")
8✔
232
            .expect("select keychains statement");
8✔
233

234
        let keychains = select_keychains_added_stmt
8✔
235
            .query_map([], |row| {
16✔
236
                let keychain = row.get_unwrap::<usize, String>(0);
16✔
237
                let keychain = serde_json::from_str::<K>(keychain.as_str()).expect("keychain");
16✔
238
                let descriptor = row.get_unwrap::<usize, String>(1);
16✔
239
                let descriptor = Descriptor::from_str(descriptor.as_str()).expect("descriptor");
16✔
240
                Ok((keychain, descriptor))
16✔
241
            })
16✔
242
            .map_err(Error::Sqlite)?;
8✔
243
        keychains
8✔
244
            .into_iter()
8✔
245
            .map(|row| row.map_err(Error::Sqlite))
16✔
246
            .collect()
8✔
247
    }
8✔
248

249
    /// Select descriptor last revealed indexes.
250
    fn select_last_revealed(
8✔
251
        db_transaction: &rusqlite::Transaction,
8✔
252
    ) -> Result<BTreeMap<DescriptorId, u32>, Error> {
8✔
253
        let mut select_last_revealed_stmt = db_transaction
8✔
254
            .prepare_cached(
8✔
255
                "SELECT descriptor, last_revealed FROM keychain WHERE last_revealed IS NOT NULL",
8✔
256
            )
8✔
257
            .expect("select last revealed statement");
8✔
258

259
        let last_revealed = select_last_revealed_stmt
8✔
260
            .query_map([], |row| {
10✔
261
                let descriptor = row.get_unwrap::<usize, String>(0);
5✔
262
                let descriptor = Descriptor::from_str(descriptor.as_str()).expect("descriptor");
5✔
263
                let descriptor_id = descriptor.descriptor_id();
5✔
264
                let last_revealed = row.get_unwrap::<usize, u32>(1);
5✔
265
                Ok((descriptor_id, last_revealed))
5✔
266
            })
10✔
267
            .map_err(Error::Sqlite)?;
8✔
268
        last_revealed
8✔
269
            .into_iter()
8✔
270
            .map(|row| row.map_err(Error::Sqlite))
10✔
271
            .collect()
8✔
272
    }
8✔
273
}
274

275
/// Tx (transaction) and txout (transaction output) table related functions.
276
impl<K, A> Store<K, A> {
277
    /// Insert transactions.
278
    ///
279
    /// Error if trying to insert existing txid.
280
    fn insert_txs(
8✔
281
        db_transaction: &rusqlite::Transaction,
8✔
282
        tx_graph_changeset: &tx_graph::ChangeSet<A, keychain_txout::ChangeSet<K>>,
8✔
283
    ) -> Result<(), Error> {
8✔
284
        for tx in tx_graph_changeset.txs.iter() {
8✔
285
            let insert_tx_stmt = &mut db_transaction
6✔
286
                .prepare_cached("INSERT INTO tx (txid, whole_tx) VALUES (:txid, :whole_tx) ON CONFLICT (txid) DO UPDATE SET whole_tx = :whole_tx WHERE txid = :txid")
6✔
287
                .expect("insert or update tx whole_tx statement");
6✔
288
            let txid = tx.compute_txid().to_string();
6✔
289
            let whole_tx = serialize(&tx);
6✔
290
            insert_tx_stmt
6✔
291
                .execute(named_params! {":txid": txid, ":whole_tx": whole_tx })
6✔
292
                .map_err(Error::Sqlite)?;
6✔
293
        }
294
        Ok(())
8✔
295
    }
8✔
296

297
    /// Select all transactions.
298
    fn select_txs(
8✔
299
        db_transaction: &rusqlite::Transaction,
8✔
300
    ) -> Result<BTreeSet<Arc<Transaction>>, Error> {
8✔
301
        let mut select_tx_stmt = db_transaction
8✔
302
            .prepare_cached("SELECT whole_tx FROM tx WHERE whole_tx IS NOT NULL")
8✔
303
            .expect("select tx statement");
8✔
304

305
        let txs = select_tx_stmt
8✔
306
            .query_map([], |row| {
12✔
307
                let whole_tx = row.get_unwrap::<usize, Vec<u8>>(0);
6✔
308
                let whole_tx: Transaction = deserialize(&whole_tx).expect("transaction");
6✔
309
                Ok(Arc::new(whole_tx))
6✔
310
            })
12✔
311
            .map_err(Error::Sqlite)?;
8✔
312

313
        txs.into_iter()
8✔
314
            .map(|row| row.map_err(Error::Sqlite))
12✔
315
            .collect()
8✔
316
    }
8✔
317

318
    /// Select all transactions with last_seen values.
319
    fn select_last_seen(
8✔
320
        db_transaction: &rusqlite::Transaction,
8✔
321
    ) -> Result<BTreeMap<Txid, u64>, Error> {
8✔
322
        // load tx last_seen
8✔
323
        let mut select_last_seen_stmt = db_transaction
8✔
324
            .prepare_cached("SELECT txid, last_seen FROM tx WHERE last_seen IS NOT NULL")
8✔
325
            .expect("select tx last seen statement");
8✔
326

327
        let last_seen = select_last_seen_stmt
8✔
328
            .query_map([], |row| {
12✔
329
                let txid = row.get_unwrap::<usize, String>(0);
6✔
330
                let txid = Txid::from_str(&txid).expect("txid");
6✔
331
                let last_seen = row.get_unwrap::<usize, u64>(1);
6✔
332
                Ok((txid, last_seen))
6✔
333
            })
12✔
334
            .map_err(Error::Sqlite)?;
8✔
335
        last_seen
8✔
336
            .into_iter()
8✔
337
            .map(|row| row.map_err(Error::Sqlite))
12✔
338
            .collect()
8✔
339
    }
8✔
340

341
    /// Insert txouts.
342
    ///
343
    /// Error if trying to insert existing outpoint.
344
    fn insert_txouts(
8✔
345
        db_transaction: &rusqlite::Transaction,
8✔
346
        tx_graph_changeset: &tx_graph::ChangeSet<A, keychain_txout::ChangeSet<K>>,
8✔
347
    ) -> Result<(), Error> {
8✔
348
        for txout in tx_graph_changeset.txouts.iter() {
8✔
349
            let insert_txout_stmt = &mut db_transaction
4✔
350
                .prepare_cached("INSERT INTO txout (txid, vout, value, script) VALUES (:txid, :vout, :value, :script)")
4✔
351
                .expect("insert txout statement");
4✔
352
            let txid = txout.0.txid.to_string();
4✔
353
            let vout = txout.0.vout;
4✔
354
            let value = txout.1.value.to_sat();
4✔
355
            let script = txout.1.script_pubkey.as_bytes();
4✔
356
            insert_txout_stmt.execute(named_params! {":txid": txid, ":vout": vout, ":value": value, ":script": script })
4✔
357
                .map_err(Error::Sqlite)?;
4✔
358
        }
359
        Ok(())
8✔
360
    }
8✔
361

362
    /// Select all transaction outputs.
363
    fn select_txouts(
8✔
364
        db_transaction: &rusqlite::Transaction,
8✔
365
    ) -> Result<BTreeMap<OutPoint, TxOut>, Error> {
8✔
366
        // load tx outs
8✔
367
        let mut select_txout_stmt = db_transaction
8✔
368
            .prepare_cached("SELECT txid, vout, value, script FROM txout")
8✔
369
            .expect("select txout statement");
8✔
370

371
        let txouts = select_txout_stmt
8✔
372
            .query_map([], |row| {
10✔
373
                let txid = row.get_unwrap::<usize, String>(0);
4✔
374
                let txid = Txid::from_str(&txid).expect("txid");
4✔
375
                let vout = row.get_unwrap::<usize, u32>(1);
4✔
376
                let outpoint = OutPoint::new(txid, vout);
4✔
377
                let value = row.get_unwrap::<usize, u64>(2);
4✔
378
                let script_pubkey = row.get_unwrap::<usize, Vec<u8>>(3);
4✔
379
                let script_pubkey = ScriptBuf::from_bytes(script_pubkey);
4✔
380
                let txout = TxOut {
4✔
381
                    value: Amount::from_sat(value),
4✔
382
                    script_pubkey,
4✔
383
                };
4✔
384
                Ok((outpoint, txout))
4✔
385
            })
10✔
386
            .map_err(Error::Sqlite)?;
8✔
387
        txouts
8✔
388
            .into_iter()
8✔
389
            .map(|row| row.map_err(Error::Sqlite))
10✔
390
            .collect()
8✔
391
    }
8✔
392

393
    /// Update transaction last seen times.
394
    fn update_last_seen(
8✔
395
        db_transaction: &rusqlite::Transaction,
8✔
396
        tx_graph_changeset: &tx_graph::ChangeSet<A, keychain_txout::ChangeSet<K>>,
8✔
397
    ) -> Result<(), Error> {
8✔
398
        for tx_last_seen in tx_graph_changeset.last_seen.iter() {
10✔
399
            let insert_or_update_tx_stmt = &mut db_transaction
8✔
400
                .prepare_cached("INSERT INTO tx (txid, last_seen) VALUES (:txid, :last_seen) ON CONFLICT (txid) DO UPDATE SET last_seen = :last_seen WHERE txid = :txid")
8✔
401
                .expect("insert or update tx last_seen statement");
8✔
402
            let txid = tx_last_seen.0.to_string();
8✔
403
            let last_seen = *tx_last_seen.1;
8✔
404
            insert_or_update_tx_stmt
8✔
405
                .execute(named_params! {":txid": txid, ":last_seen": last_seen })
8✔
406
                .map_err(Error::Sqlite)?;
8✔
407
        }
408
        Ok(())
8✔
409
    }
8✔
410
}
411

412
/// Anchor table related functions.
413
impl<K, A> Store<K, A>
414
where
415
    K: Ord + for<'de> Deserialize<'de> + Serialize + Send,
416
    A: Anchor + for<'de> Deserialize<'de> + Serialize + Send,
417
{
418
    /// Insert anchors.
419
    fn insert_anchors(
8✔
420
        db_transaction: &rusqlite::Transaction,
8✔
421
        tx_graph_changeset: &tx_graph::ChangeSet<A, keychain_txout::ChangeSet<K>>,
8✔
422
    ) -> Result<(), Error> {
8✔
423
        // serde_json::to_string
424
        for anchor in tx_graph_changeset.anchors.iter() {
10✔
425
            let insert_anchor_stmt = &mut db_transaction
8✔
426
                .prepare_cached("INSERT INTO anchor_tx (block_hash, anchor, txid) VALUES (:block_hash, jsonb(:anchor), :txid)")
8✔
427
                .expect("insert anchor statement");
8✔
428
            let block_hash = anchor.0.anchor_block().hash.to_string();
8✔
429
            let anchor_json = serde_json::to_string(&anchor.0).expect("anchor json");
8✔
430
            let txid = anchor.1.to_string();
8✔
431
            insert_anchor_stmt.execute(named_params! {":block_hash": block_hash, ":anchor": anchor_json, ":txid": txid })
8✔
432
                .map_err(Error::Sqlite)?;
8✔
433
        }
434
        Ok(())
8✔
435
    }
8✔
436

437
    /// Select all anchors.
438
    fn select_anchors(
8✔
439
        db_transaction: &rusqlite::Transaction,
8✔
440
    ) -> Result<BTreeSet<(A, Txid)>, Error> {
8✔
441
        // serde_json::from_str
8✔
442
        let mut select_anchor_stmt = db_transaction
8✔
443
            .prepare_cached("SELECT block_hash, json(anchor), txid FROM anchor_tx")
8✔
444
            .expect("select anchor statement");
8✔
445
        let anchors = select_anchor_stmt
8✔
446
            .query_map([], |row| {
14✔
447
                let hash = row.get_unwrap::<usize, String>(0);
8✔
448
                let hash = BlockHash::from_str(hash.as_str()).expect("block hash");
8✔
449
                let anchor = row.get_unwrap::<usize, String>(1);
8✔
450
                let anchor: A = serde_json::from_str(anchor.as_str()).expect("anchor");
8✔
451
                // double check anchor blob block hash matches
8✔
452
                assert_eq!(hash, anchor.anchor_block().hash);
8✔
453
                let txid = row.get_unwrap::<usize, String>(2);
8✔
454
                let txid = Txid::from_str(&txid).expect("txid");
8✔
455
                Ok((anchor, txid))
8✔
456
            })
14✔
457
            .map_err(Error::Sqlite)?;
8✔
458
        anchors
8✔
459
            .into_iter()
8✔
460
            .map(|row| row.map_err(Error::Sqlite))
14✔
461
            .collect()
8✔
462
    }
8✔
463
}
464

465
/// Functions to read and write all [`CombinedChangeSet`] data.
466
impl<K, A> Store<K, A>
467
where
468
    K: Ord + for<'de> Deserialize<'de> + Serialize + Send,
469
    A: Anchor + for<'de> Deserialize<'de> + Serialize + Send,
470
{
471
    /// Write the given `changeset` atomically.
472
    pub fn write(&mut self, changeset: &CombinedChangeSet<K, A>) -> Result<(), Error> {
8✔
473
        // no need to write anything if changeset is empty
8✔
474
        if changeset.is_empty() {
8✔
475
            return Ok(());
×
476
        }
8✔
477

478
        let db_transaction = self.db_transaction()?;
8✔
479

480
        let network_changeset = &changeset.network;
8✔
481
        let current_network = Self::select_network(&db_transaction)?;
8✔
482
        Self::insert_network(&current_network, &db_transaction, network_changeset)?;
8✔
483

484
        let chain_changeset = &changeset.chain;
8✔
485
        Self::insert_or_delete_blocks(&db_transaction, chain_changeset)?;
8✔
486

487
        let tx_graph_changeset = &changeset.tx_graph;
8✔
488
        Self::insert_keychains(&db_transaction, tx_graph_changeset)?;
8✔
489
        Self::update_last_revealed(&db_transaction, tx_graph_changeset)?;
8✔
490
        Self::insert_txs(&db_transaction, tx_graph_changeset)?;
8✔
491
        Self::insert_txouts(&db_transaction, tx_graph_changeset)?;
8✔
492
        Self::insert_anchors(&db_transaction, tx_graph_changeset)?;
8✔
493
        Self::update_last_seen(&db_transaction, tx_graph_changeset)?;
8✔
494
        db_transaction.commit().map_err(Error::Sqlite)
8✔
495
    }
8✔
496

497
    /// Read the entire database and return the aggregate [`CombinedChangeSet`].
498
    pub fn read(&mut self) -> Result<Option<CombinedChangeSet<K, A>>, Error> {
8✔
499
        let db_transaction = self.db_transaction()?;
8✔
500

501
        let network = Self::select_network(&db_transaction)?;
8✔
502
        let chain = Self::select_blocks(&db_transaction)?;
8✔
503
        let keychains_added = Self::select_keychains(&db_transaction)?;
8✔
504
        let last_revealed = Self::select_last_revealed(&db_transaction)?;
8✔
505
        let txs = Self::select_txs(&db_transaction)?;
8✔
506
        let last_seen = Self::select_last_seen(&db_transaction)?;
8✔
507
        let txouts = Self::select_txouts(&db_transaction)?;
8✔
508
        let anchors = Self::select_anchors(&db_transaction)?;
8✔
509

510
        let tx_graph: tx_graph::ChangeSet<A, _> = tx_graph::ChangeSet {
8✔
511
            txs,
8✔
512
            txouts,
8✔
513
            anchors,
8✔
514
            last_seen,
8✔
515
            indexer: keychain_txout::ChangeSet {
8✔
516
                keychains_added,
8✔
517
                last_revealed,
8✔
518
            },
8✔
519
        };
8✔
520

8✔
521
        if network.is_none() && chain.is_empty() && tx_graph.is_empty() {
8✔
UNCOV
522
            Ok(None)
×
523
        } else {
524
            Ok(Some(CombinedChangeSet {
8✔
525
                chain,
8✔
526
                tx_graph,
8✔
527
                network,
8✔
528
            }))
8✔
529
        }
530
    }
8✔
531
}
532

533
#[cfg(test)]
534
mod test {
535
    use super::*;
536
    use crate::store::Merge;
537
    use bdk_chain::bitcoin::consensus::encode::deserialize;
538
    use bdk_chain::bitcoin::constants::genesis_block;
539
    use bdk_chain::bitcoin::hashes::hex::FromHex;
540
    use bdk_chain::bitcoin::transaction::Transaction;
541
    use bdk_chain::bitcoin::Network::Testnet;
542
    use bdk_chain::bitcoin::{secp256k1, BlockHash, OutPoint};
543
    use bdk_chain::miniscript::Descriptor;
544
    use bdk_chain::CombinedChangeSet;
545
    use bdk_chain::{tx_graph, BlockId, ConfirmationBlockTime, DescriptorExt};
546
    use std::str::FromStr;
547
    use std::sync::Arc;
548

549
    #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Hash, Debug, Serialize, Deserialize)]
20✔
550
    enum Keychain {
551
        External { account: u32, name: String },
552
        Internal { account: u32, name: String },
553
    }
554

555
    #[test]
556
    fn insert_and_load_aggregate_changesets_with_confirmation_block_time_anchor() {
1✔
557
        let (test_changesets, agg_test_changesets) =
1✔
558
            create_test_changesets(&|height, time, hash| ConfirmationBlockTime {
2✔
559
                confirmation_time: time,
2✔
560
                block_id: (height, hash).into(),
2✔
561
            });
2✔
562

1✔
563
        let conn = Connection::open_in_memory().expect("in memory connection");
1✔
564
        let mut store = Store::<Keychain, ConfirmationBlockTime>::new(conn)
1✔
565
            .expect("create new memory db store");
1✔
566

1✔
567
        test_changesets.iter().for_each(|changeset| {
3✔
568
            store.write(changeset).expect("write changeset");
3✔
569
        });
3✔
570

1✔
571
        let agg_changeset = store.read().expect("aggregated changeset");
1✔
572

1✔
573
        assert_eq!(agg_changeset, Some(agg_test_changesets));
1✔
574
    }
1✔
575

576
    #[test]
577
    fn insert_and_load_aggregate_changesets_with_blockid_anchor() {
1✔
578
        let (test_changesets, agg_test_changesets) =
1✔
579
            create_test_changesets(&|height, _time, hash| BlockId { height, hash });
2✔
580

1✔
581
        let conn = Connection::open_in_memory().expect("in memory connection");
1✔
582
        let mut store = Store::<Keychain, BlockId>::new(conn).expect("create new memory db store");
1✔
583

1✔
584
        test_changesets.iter().for_each(|changeset| {
3✔
585
            store.write(changeset).expect("write changeset");
3✔
586
        });
3✔
587

1✔
588
        let agg_changeset = store.read().expect("aggregated changeset");
1✔
589

1✔
590
        assert_eq!(agg_changeset, Some(agg_test_changesets));
1✔
591
    }
1✔
592

593
    fn create_test_changesets<A: Anchor + Copy>(
2✔
594
        anchor_fn: &dyn Fn(u32, u64, BlockHash) -> A,
2✔
595
    ) -> (
2✔
596
        Vec<CombinedChangeSet<Keychain, A>>,
2✔
597
        CombinedChangeSet<Keychain, A>,
2✔
598
    ) {
2✔
599
        let secp = &secp256k1::Secp256k1::signing_only();
2✔
600

2✔
601
        let network_changeset = Some(Testnet);
2✔
602

2✔
603
        let block_hash_0: BlockHash = genesis_block(Testnet).block_hash();
2✔
604
        let block_hash_1 =
2✔
605
            BlockHash::from_str("00000000b873e79784647a6c82962c70d228557d24a747ea4d1b8bbe878e1206")
2✔
606
                .unwrap();
2✔
607
        let block_hash_2 =
2✔
608
            BlockHash::from_str("000000006c02c8ea6e4ff69651f7fcde348fb9d557a06e6957b65552002a7820")
2✔
609
                .unwrap();
2✔
610

2✔
611
        let block_changeset = [
2✔
612
            (0, Some(block_hash_0)),
2✔
613
            (1, Some(block_hash_1)),
2✔
614
            (2, Some(block_hash_2)),
2✔
615
        ]
2✔
616
        .into();
2✔
617

2✔
618
        let ext_keychain = Keychain::External {
2✔
619
            account: 0,
2✔
620
            name: "ext test".to_string(),
2✔
621
        };
2✔
622
        let (ext_desc, _ext_keymap) = Descriptor::parse_descriptor(secp, "wpkh(tprv8ZgxMBicQKsPcx5nBGsR63Pe8KnRUqmbJNENAfGftF3yuXoMMoVJJcYeUw5eVkm9WBPjWYt6HMWYJNesB5HaNVBaFc1M6dRjWSYnmewUMYy/0/*)").unwrap();
2✔
623
        let ext_desc_id = ext_desc.descriptor_id();
2✔
624
        let int_keychain = Keychain::Internal {
2✔
625
            account: 0,
2✔
626
            name: "int test".to_string(),
2✔
627
        };
2✔
628
        let (int_desc, _int_keymap) = Descriptor::parse_descriptor(secp, "wpkh(tprv8ZgxMBicQKsPcx5nBGsR63Pe8KnRUqmbJNENAfGftF3yuXoMMoVJJcYeUw5eVkm9WBPjWYt6HMWYJNesB5HaNVBaFc1M6dRjWSYnmewUMYy/1/*)").unwrap();
2✔
629
        let int_desc_id = int_desc.descriptor_id();
2✔
630

2✔
631
        let tx0_hex = Vec::<u8>::from_hex("01000000010000000000000000000000000000000000000000000000000000000000000000ffffffff4d04ffff001d0104455468652054696d65732030332f4a616e2f32303039204368616e63656c6c6f72206f6e206272696e6b206f66207365636f6e64206261696c6f757420666f722062616e6b73ffffffff0100f2052a01000000434104678afdb0fe5548271967f1a67130b7105cd6a828e03909a67962e0ea1f61deb649f6bc3f4cef38c4f35504e51ec112de5c384df7ba0b8d578a4c702b6bf11d5fac00000000").unwrap();
2✔
632
        let tx0: Arc<Transaction> = Arc::new(deserialize(tx0_hex.as_slice()).unwrap());
2✔
633
        let tx1_hex = Vec::<u8>::from_hex("010000000001010000000000000000000000000000000000000000000000000000000000000000ffffffff025151feffffff0200f2052a010000001600149243f727dd5343293eb83174324019ec16c2630f0000000000000000776a24aa21a9ede2f61c3f71d1defd3fa999dfa36953755c690689799962b48bebd836974e8cf94c4fecc7daa2490047304402205e423a8754336ca99dbe16509b877ef1bf98d008836c725005b3c787c41ebe46022047246e4467ad7cc7f1ad98662afcaf14c115e0095a227c7b05c5182591c23e7e01000120000000000000000000000000000000000000000000000000000000000000000000000000").unwrap();
2✔
634
        let tx1: Arc<Transaction> = Arc::new(deserialize(tx1_hex.as_slice()).unwrap());
2✔
635
        let tx2_hex = Vec::<u8>::from_hex("01000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0e0432e7494d010e062f503253482fffffffff0100f2052a010000002321038a7f6ef1c8ca0c588aa53fa860128077c9e6c11e6830f4d7ee4e763a56b7718fac00000000").unwrap();
2✔
636
        let tx2: Arc<Transaction> = Arc::new(deserialize(tx2_hex.as_slice()).unwrap());
2✔
637

2✔
638
        let outpoint0_0 = OutPoint::new(tx0.compute_txid(), 0);
2✔
639
        let txout0_0 = tx0.output.first().unwrap().clone();
2✔
640
        let outpoint1_0 = OutPoint::new(tx1.compute_txid(), 0);
2✔
641
        let txout1_0 = tx1.output.first().unwrap().clone();
2✔
642

2✔
643
        let anchor1 = anchor_fn(1, 1296667328, block_hash_1);
2✔
644
        let anchor2 = anchor_fn(2, 1296688946, block_hash_2);
2✔
645

2✔
646
        let tx_graph_changeset = tx_graph::ChangeSet {
2✔
647
            txs: [tx0.clone(), tx1.clone()].into(),
2✔
648
            txouts: [(outpoint0_0, txout0_0), (outpoint1_0, txout1_0)].into(),
2✔
649
            anchors: [(anchor1, tx0.compute_txid()), (anchor1, tx1.compute_txid())].into(),
2✔
650
            last_seen: [
2✔
651
                (tx0.compute_txid(), 1598918400),
2✔
652
                (tx1.compute_txid(), 1598919121),
2✔
653
                (tx2.compute_txid(), 1608919121),
2✔
654
            ]
2✔
655
            .into(),
2✔
656
            indexer: keychain_txout::ChangeSet {
2✔
657
                keychains_added: [(ext_keychain, ext_desc), (int_keychain, int_desc)].into(),
2✔
658
                last_revealed: [(ext_desc_id, 124), (int_desc_id, 421)].into(),
2✔
659
            },
2✔
660
        };
2✔
661

2✔
662
        // test changesets to write to db
2✔
663
        let mut changesets = Vec::new();
2✔
664

2✔
665
        changesets.push(CombinedChangeSet {
2✔
666
            chain: block_changeset,
2✔
667
            tx_graph: tx_graph_changeset,
2✔
668
            network: network_changeset,
2✔
669
        });
2✔
670

2✔
671
        // create changeset that sets the whole tx2 and updates it's lastseen where before there was only the txid and last_seen
2✔
672
        let tx_graph_changeset2 = tx_graph::ChangeSet {
2✔
673
            txs: [tx2.clone()].into(),
2✔
674
            txouts: BTreeMap::default(),
2✔
675
            anchors: BTreeSet::default(),
2✔
676
            last_seen: [(tx2.compute_txid(), 1708919121)].into(),
2✔
677
            ..Default::default()
2✔
678
        };
2✔
679

2✔
680
        changesets.push(CombinedChangeSet {
2✔
681
            chain: local_chain::ChangeSet::default(),
2✔
682
            tx_graph: tx_graph_changeset2,
2✔
683
            network: None,
2✔
684
        });
2✔
685

2✔
686
        // create changeset that adds a new anchor2 for tx0 and tx1
2✔
687
        let tx_graph_changeset3 = tx_graph::ChangeSet {
2✔
688
            txs: BTreeSet::default(),
2✔
689
            txouts: BTreeMap::default(),
2✔
690
            anchors: [(anchor2, tx0.compute_txid()), (anchor2, tx1.compute_txid())].into(),
2✔
691
            last_seen: BTreeMap::default(),
2✔
692
            ..Default::default()
2✔
693
        };
2✔
694

2✔
695
        changesets.push(CombinedChangeSet {
2✔
696
            chain: local_chain::ChangeSet::default(),
2✔
697
            tx_graph: tx_graph_changeset3,
2✔
698
            network: None,
2✔
699
        });
2✔
700

2✔
701
        // aggregated test changesets
2✔
702
        let agg_test_changesets =
2✔
703
            changesets
2✔
704
                .iter()
2✔
705
                .fold(CombinedChangeSet::<Keychain, A>::default(), |mut i, cs| {
6✔
706
                    i.merge(cs.clone());
6✔
707
                    i
6✔
708
                });
6✔
709

2✔
710
        (changesets, agg_test_changesets)
2✔
711
    }
2✔
712
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc