• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

TyRoXx / NonlocalityOS / 22054005403

16 Feb 2026 07:35AM UTC coverage: 78.306% (+0.3%) from 78.043%
22054005403

Pull #418

github

web-flow
Merge 957d17204 into e3367509b
Pull Request #418: Fix: Sometimes the storage garbage collector appears to collect new trees that are still needed

735 of 837 new or added lines in 29 files covered. (87.81%)

53 existing lines in 4 files now uncovered.

7324 of 9353 relevant lines covered (78.31%)

26490.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.36
/astraea/src/sqlite_storage.rs
1
use crate::{
2
    delayed_hashed_tree::DelayedHashedTree,
3
    storage::{
4
        CollectGarbage, CommitChanges, GarbageCollectionStats, LoadError, LoadRoot, LoadStoreTree,
5
        LoadTree, StoreError, StoreTree, StrongDelayedHashedTree, StrongReference,
6
        StrongReferenceTrait, UpdateRoot,
7
    },
8
    tree::{BlobDigest, HashedTree, Tree, TreeBlob, TreeChildren, TREE_BLOB_MAX_LENGTH},
9
};
10
use async_trait::async_trait;
11
use pretty_assertions::assert_eq;
12
use rusqlite::OptionalExtension;
13
use std::{
14
    collections::BTreeMap,
15
    sync::{Arc, Weak},
16
};
17
use tokio::sync::Mutex;
18
use tracing::{debug, error, info, instrument};
19

20
#[derive(Debug)]
21
struct TransactionStats {
22
    writes: u64,
23
}
24

25
#[derive(Debug)]
26
struct SQLiteStrongReferenceImpl {}
27

28
impl StrongReferenceTrait for SQLiteStrongReferenceImpl {}
29

30
#[derive(Debug)]
31
struct GarbageCollector {
32
    additional_roots: BTreeMap<BlobDigest, (i64, Weak<SQLiteStrongReferenceImpl>)>,
33
    last_gc_additional_roots_len: usize,
34
    has_gc_new_tree_table: bool,
35
}
36

37
impl GarbageCollector {
38
    fn new() -> Self {
109✔
39
        Self {
40
            additional_roots: BTreeMap::new(),
109✔
41
            last_gc_additional_roots_len: 0,
42
            has_gc_new_tree_table: false,
43
        }
44
    }
45

46
    fn require_additional_root(
24,974✔
47
        &mut self,
48
        root: &BlobDigest,
49
        root_tree_id: i64,
50
        connection: &rusqlite::Connection,
51
    ) -> rusqlite::Result<StrongReference> {
52
        let result = self.require_additional_root_entry(root, root_tree_id)?;
124,870✔
53
        self.check_automatic_collection(connection)?;
74,922✔
54
        Ok(result)
24,974✔
55
    }
56

57
    fn require_additional_root_entry(
24,974✔
58
        &mut self,
59
        root: &BlobDigest,
60
        root_tree_id: i64,
61
    ) -> rusqlite::Result<StrongReference> {
62
        match self.additional_roots.entry(*root) {
49,948✔
63
            std::collections::btree_map::Entry::Vacant(vacant_entry) => {
22,402✔
64
                let reference_counter = Arc::new(SQLiteStrongReferenceImpl {});
67,206✔
65
                vacant_entry.insert((root_tree_id, Arc::downgrade(&reference_counter)));
89,608✔
66
                Ok(StrongReference::new(Some(reference_counter), *root))
44,804✔
67
            }
68
            std::collections::btree_map::Entry::Occupied(mut occupied_entry) => {
2,572✔
69
                match occupied_entry.get().1.upgrade() {
2,572✔
70
                    Some(reference_counter) => {
71
                        let existing_tree_id = occupied_entry.get().0;
72
                        if existing_tree_id != root_tree_id {
73
                            unreachable!("Inconsistency detected: The same root digest {} is associated with multiple tree IDs: existing tree ID {}, new tree ID {}", root, existing_tree_id, root_tree_id);
74
                        }
75
                        Ok(StrongReference::new(Some(reference_counter), *root))
76
                    }
77
                    None => {
78
                        let reference_counter = Arc::new(SQLiteStrongReferenceImpl {});
129✔
79
                        occupied_entry.insert((root_tree_id, Arc::downgrade(&reference_counter)));
172✔
80
                        Ok(StrongReference::new(Some(reference_counter), *root))
86✔
81
                    }
82
                }
83
            }
84
        }
85
    }
86

87
    fn check_automatic_collection(
24,974✔
88
        &mut self,
89
        connection: &rusqlite::Connection,
90
    ) -> rusqlite::Result<()> {
91
        let additional_roots_len = self.additional_roots.len();
74,922✔
92
        // Not sure what's a good minimum here.
93
        let minimum_additional_roots_len_for_gc = 100;
49,948✔
94
        if (additional_roots_len >= minimum_additional_roots_len_for_gc)
24,974✔
95
            && (additional_roots_len > self.last_gc_additional_roots_len * 2)
224✔
96
        {
97
            info!("Automatic garbage collection triggered because the additional root count {} exceeded a threshold", additional_roots_len);
224✔
98
            let stats = self.collect_garbage(connection)?;
896✔
99
            info!(
224✔
100
                "Automatic garbage collection collected {} trees",
101
                stats.trees_collected
102
            );
103
            self.last_gc_additional_roots_len = self.additional_roots.len();
224✔
104
        }
105
        Ok(())
24,974✔
106
    }
107

108
    fn require_gc_new_tree_table(
321✔
109
        &mut self,
110
        connection: &rusqlite::Connection,
111
    ) -> std::result::Result<(), rusqlite::Error> {
112
        if self.has_gc_new_tree_table {
321✔
113
            Ok(())
278✔
114
        } else {
115
            connection.execute(
86✔
116
                // unfortunately, we cannot have a foreign key in a temp table
117
                "CREATE TEMP TABLE gc_new_tree (
118
                    id INTEGER PRIMARY KEY NOT NULL,
119
                    tree_id INTEGER UNIQUE NOT NULL
120
                ) STRICT",
121
                (),
122
            )?;
123
            self.has_gc_new_tree_table = true;
43✔
124
            Ok(())
43✔
125
        }
126
    }
127

128
    #[instrument(skip_all)]
129
    fn collect_garbage(
321✔
130
        &mut self,
131
        connection: &rusqlite::Connection,
132
    ) -> rusqlite::Result<GarbageCollectionStats> {
133
        self.require_gc_new_tree_table(connection)?;
963✔
134
        connection.execute("DELETE FROM gc_new_tree", ())?;
1,284✔
135
        {
136
            let mut statement = connection
642✔
137
                .prepare_cached("INSERT OR IGNORE INTO gc_new_tree (tree_id) VALUES (?1)")?;
138
            let mut sql_error: Option<rusqlite::Error> = None;
963✔
139
            self.additional_roots
321✔
140
                .retain(|_, (tree_id, reference_counter)| {
23,176✔
141
                    if reference_counter.upgrade().is_none() {
45,710✔
142
                        // All StrongReferences have been dropped, so we can remove this additional root
143
                        // and not consider the tree it pointed to as a root for GC purposes anymore
144
                        return false;
22,026✔
145
                    }
146
                    if let Err(err) = statement.execute((*tree_id,)) {
1,658✔
NEW
147
                        sql_error = Some(err);
×
148
                    }
149
                    true
829✔
150
                });
151
            if let Some(err) = sql_error {
321✔
NEW
152
                return Err(err);
×
153
            }
154
        }
155
        let deleted_trees = connection.execute(
962✔
156
            "DELETE FROM tree
157
        WHERE NOT EXISTS (
158
            SELECT 1 FROM reference
159
            WHERE reference.target = tree.digest
160
        )
161
        AND NOT EXISTS (
162
            SELECT 1 FROM gc_new_tree
163
            WHERE gc_new_tree.tree_id = tree.id
164
        )
165
        AND NOT EXISTS (
166
            SELECT 1 FROM root
167
            WHERE root.target = tree.digest
168
        );",
169
            (),
170
        )?;
171
        debug!(
320✔
172
            "Garbage collection deleted {} unreferenced trees",
173
            deleted_trees
174
        );
175
        self.last_gc_additional_roots_len = self.additional_roots.len();
320✔
176
        Ok(GarbageCollectionStats {
320✔
177
            trees_collected: deleted_trees as u64,
320✔
178
        })
179
    }
180
}
181

182
#[derive(Debug)]
183
struct SQLiteState {
184
    connection: rusqlite::Connection,
185
    transaction: Option<TransactionStats>,
186
    garbage_collector: GarbageCollector,
187
}
188

189
impl SQLiteState {
190
    fn require_transaction(&mut self, add_writes: u64) -> std::result::Result<(), rusqlite::Error> {
22,603✔
191
        match self.transaction {
22,603✔
192
            Some(ref mut stats) => {
22,375✔
193
                stats.writes += add_writes;
22,375✔
194
                Ok(())
22,375✔
195
            }
196
            None => {
197
                debug!("BEGIN TRANSACTION");
228✔
198
                self.connection.execute("BEGIN TRANSACTION;", ())?;
912✔
199
                self.transaction = Some(TransactionStats { writes: add_writes });
228✔
200
                Ok(())
228✔
201
            }
202
        }
203
    }
204
}
205

206
#[derive(Debug)]
207
pub struct SQLiteStorage {
208
    state: tokio::sync::Mutex<SQLiteState>,
209
}
210

211
impl SQLiteStorage {
212
    pub fn from(connection: rusqlite::Connection) -> rusqlite::Result<Self> {
109✔
213
        Self::configure_connection(&connection)?;
218✔
214
        Ok(Self {
109✔
215
            state: Mutex::new(SQLiteState {
218✔
216
                connection,
218✔
217
                transaction: None,
109✔
218
                garbage_collector: GarbageCollector::new(),
109✔
219
            }),
220
        })
221
    }
222

223
    pub fn configure_connection(connection: &rusqlite::Connection) -> rusqlite::Result<()> {
132✔
224
        connection.pragma_update(None, "foreign_keys", "on")?;
528✔
225
        // "The default suggested cache size is -2000, which means the cache size is limited to 2048000 bytes of memory."
226
        // https://www.sqlite.org/pragma.html#pragma_cache_size
227
        connection.pragma_update(None, "cache_size", "-200000")?;
528✔
228
        // "The WAL journaling mode uses a write-ahead log instead of a rollback journal to implement transactions. The WAL journaling mode is persistent; after being set it stays in effect across multiple database connections and after closing and reopening the database. A database in WAL journaling mode can only be accessed by SQLite version 3.7.0 (2010-07-21) or later."
229
        // https://www.sqlite.org/wal.html
230
        connection.pragma_update(None, "journal_mode", "WAL")?;
528✔
231
        // CREATE TEMP TABLE shall not create a file (https://sqlite.org/tempfiles.html)
232
        connection.pragma_update(None, "temp_store", "MEMORY")?;
528✔
233
        Ok(())
132✔
234
    }
235

236
    pub fn create_schema(connection: &rusqlite::Connection) -> rusqlite::Result<()> {
73✔
237
        {
238
            // Why are we using format! instead of an SQL parameter here?
239
            // Answer is the SQLite error: "parameters prohibited in CHECK constraints" (because why should anything ever work)
240
            let query = format!(
146✔
241
                "CREATE TABLE tree (
242
                    id INTEGER PRIMARY KEY NOT NULL,
243
                    digest BLOB UNIQUE NOT NULL,
244
                    tree_blob BLOB NOT NULL,
245
                    is_compressed INTEGER NOT NULL,
246
                    CONSTRAINT digest_length_matches_sha3_512 CHECK (LENGTH(digest) == 64),
247
                    CONSTRAINT tree_blob_max_length CHECK (LENGTH(tree_blob) <= {TREE_BLOB_MAX_LENGTH}),
248
                    CONSTRAINT is_compressed_boolean CHECK (is_compressed IN (0, 1))
249
                ) STRICT"
250
            );
251
            connection
73✔
252
                .execute(&query, ())
219✔
253
                .map(|size| assert_eq!(0, size))?;
146✔
254
        }
255
        connection
73✔
256
            .execute(
257
                "CREATE TABLE reference (
258
                    id INTEGER PRIMARY KEY NOT NULL,
259
                    origin INTEGER NOT NULL REFERENCES tree ON DELETE CASCADE,
260
                    zero_based_index INTEGER NOT NULL,
261
                    target BLOB NOT NULL,
262
                    UNIQUE (origin, zero_based_index),
263
                    CONSTRAINT digest_length_matches_sha3_512 CHECK (LENGTH(target) == 64)
264
                ) STRICT",
265
                (),
266
            )
267
            .map(|size| assert_eq!(0, size))?;
146✔
268
        connection
73✔
269
            .execute("CREATE INDEX reference_origin ON reference (origin)", ())
219✔
270
            .map(|size| assert_eq!(0, size))?;
146✔
271
        connection
73✔
272
            .execute("CREATE INDEX reference_target ON reference (target)", ())
219✔
273
            .map(|size| assert_eq!(0, size))?;
146✔
274
        connection
73✔
275
            .execute(
276
                "CREATE TABLE root (
277
                    id INTEGER PRIMARY KEY NOT NULL,
278
                    name TEXT UNIQUE NOT NULL,
279
                    target BLOB NOT NULL,
280
                    CONSTRAINT target_length_matches_sha3_512 CHECK (LENGTH(target) == 64)
281
                ) STRICT",
282
                (),
283
            )
284
            .map(|size| assert_eq!(0, size))?;
146✔
285
        Ok(())
73✔
286
    }
287
}
288

289
#[async_trait]
290
impl StoreTree for SQLiteStorage {
291
    //#[instrument(skip_all)]
292
    async fn store_tree(
293
        &self,
294
        tree: &HashedTree,
295
    ) -> std::result::Result<StrongReference, StoreError> {
296
        let mut state_locked = self.state.lock().await;
297
        let digest = *tree.digest();
298
        let origin_digest: [u8; 64] = digest.into();
299
        {
300
            let tree_id: Option<i64> = {
301
                let connection_locked = &state_locked.connection;
302
                let mut statement = connection_locked
303
                    .prepare_cached("SELECT id FROM tree WHERE digest = ?")
304
                    .map_err(|error| StoreError::Rusqlite(format!("{}", &error)))?;
2✔
305
                match statement.query_row(
306
                    (&origin_digest,),
307
                    |row| -> rusqlite::Result<_, rusqlite::Error> { row.get(0) },
452✔
308
                ) {
309
                    Ok(id) => Some(id),
310
                    Err(rusqlite::Error::QueryReturnedNoRows) => None,
311
                    Err(error) => {
312
                        return Err(StoreError::Rusqlite(format!("{}", &error)));
313
                    }
314
                }
315
            };
316
            if let Some(id) = tree_id {
317
                let (connection_locked, garbage_collector) = {
318
                    let state = &mut *state_locked;
319
                    (&state.connection, &mut state.garbage_collector)
320
                };
321
                return garbage_collector
322
                    .require_additional_root(tree.digest(), id, connection_locked)
NEW
323
                    .map_err(|error| StoreError::Rusqlite(error.to_string()));
×
324
            }
325
        }
326

327
        state_locked
328
            .require_transaction(1 + tree.tree().children().references().len() as u64)
329
            .map_err(|err| StoreError::Rusqlite(format!("{}", &err)))?;
×
330

331
        let connection_locked = &state_locked.connection;
332

333
        // Try to compress the blob, but only store compressed if it's beneficial
334
        let original_blob = tree.tree().blob().as_slice();
335
        let compressed = lz4_flex::compress_prepend_size(original_blob);
336

337
        let (blob_to_store, is_compressed): (&[u8], i32) = if compressed.len() < original_blob.len()
338
        {
339
            // Compression is beneficial, store compressed
340
            (&compressed, 1)
341
        } else {
342
            // Compression doesn't help, store uncompressed to save CPU time on loading
343
            (original_blob, 0)
344
        };
345

346
        {
347
            let mut statement = connection_locked
348
                .prepare_cached(
349
                    "INSERT INTO tree (digest, tree_blob, is_compressed) VALUES (?1, ?2, ?3)",
350
                )
351
                .map_err(|error| StoreError::Rusqlite(format!("{}", &error)))?;
×
352
            let rows_inserted = statement
353
                .execute((&origin_digest, blob_to_store, &is_compressed))
354
                .map_err(|err| StoreError::Rusqlite(format!("{}", &err)))?;
×
355
            assert_eq!(1, rows_inserted);
356
        }
357

358
        let tree_id: i64 = {
359
            let mut statement = connection_locked
360
                .prepare_cached("SELECT id FROM tree WHERE digest = ?1")
361
                .map_err(|err| StoreError::Rusqlite(format!("{}", &err)))?;
×
362
            statement
363
                .query_row(
364
                    (&origin_digest,),
365
                    |row| -> rusqlite::Result<_, rusqlite::Error> { row.get(0) },
89,136✔
366
                )
367
                .map_err(|err| StoreError::Rusqlite(format!("{}", &err)))?
×
368
        };
369

370
        if !tree.tree().children().references().is_empty() {
371
            let inserted_tree_rowid = connection_locked.last_insert_rowid();
372
            let mut statement = connection_locked
373
                .prepare_cached(
374
                    "INSERT INTO reference (origin, zero_based_index, target) SELECT ?1, ?2, ?3 FROM tree WHERE tree.digest = ?3",
375
                )
376
                .map_err(|err| StoreError::Rusqlite(format!("{}", &err)))?;
×
377
            for (index, reference) in tree.tree().children().references().iter().enumerate() {
378
                let target_digest: [u8; 64] = (*reference.digest()).into();
379
                let rows_inserted = statement
380
                    .execute((
381
                        &inserted_tree_rowid,
382
                        u32::try_from(index).expect("A child index won't be too large"),
383
                        &target_digest,
384
                    ))
385
                    .map_err(|err| StoreError::Rusqlite(format!("{}", &err)))?;
×
386
                match rows_inserted {
387
                    0 => {
388
                        return Err(StoreError::ChildMissing(LoadError::TreeNotFound(
389
                            *reference.digest(),
390
                        )))
391
                    }
392
                    1 => {}
393
                    _ =>
394
                        return Err(StoreError::CorruptedStorage(
395
                            "Multiple rows inserted into reference table for a single child reference, which should be impossible due to the UNIQUE constraint".to_string())),
396
                }
397
            }
398
        }
399

400
        let (connection_locked, garbage_collector) = {
401
            let state = &mut *state_locked;
402
            (&state.connection, &mut state.garbage_collector)
403
        };
404
        garbage_collector
405
            .require_additional_root(&digest, tree_id, connection_locked)
NEW
406
            .map_err(|error| StoreError::Rusqlite(error.to_string()))
×
407
    }
408
}
409

410
async fn load_tree_impl(
389✔
411
    state: &tokio::sync::Mutex<SQLiteState>,
412
    reference: &BlobDigest,
413
) -> std::result::Result<StrongDelayedHashedTree, LoadError> {
414
    let mut state_locked = state.lock().await;
1,167✔
415
    let (tree_blob, child_digests, tree_id) = {
1,158✔
416
        let connection_locked = &state_locked.connection;
778✔
417
        let digest: [u8; 64] = (*reference).into();
1,556✔
418
        let mut statement = connection_locked
777✔
419
            .prepare_cached("SELECT id, tree_blob, is_compressed FROM tree WHERE digest = ?1")
420
            .map_err(|error| LoadError::Rusqlite(format!("{}", &error)))?;
392✔
421
        let (tree_id, decompressed_data) =
772✔
422
            match statement.query_row((&digest,), |row| -> rusqlite::Result<_> {
1,551✔
423
                let id: i64 = row.get(0)?;
1,548✔
424
                let tree_blob_raw: Vec<u8> = row.get(1)?;
1,548✔
425
                let is_compressed: i32 = row.get(2)?;
1,548✔
426
                // Decompress if needed
427
                let decompressed_data = match is_compressed {
773✔
428
                    1 => match lz4_flex::decompress_size_prepended(&tree_blob_raw) {
5✔
429
                        Ok(data) => data,
8✔
430
                        Err(error) => {
1✔
431
                            let message =
1✔
432
                                format!("Failed to decompress tree blob using lz4: {error:?}");
1✔
433
                            return Ok(Err(LoadError::Inconsistency(*reference, message)));
1✔
434
                        }
435
                    },
436
                    0 => tree_blob_raw,
382✔
437
                    _ => {
NEW
438
                        let message = format!(
×
439
                            "Invalid is_compressed value: {is_compressed}, expected 0 or 1"
440
                        );
NEW
441
                        return Ok(Err(LoadError::Inconsistency(*reference, message)));
×
442
                    }
443
                };
444
                Ok(Ok((id, decompressed_data)))
386✔
445
            }) {
446
                Ok(maybe_tuple) => maybe_tuple?,
775✔
447
                Err(rusqlite::Error::QueryReturnedNoRows) => {
448
                    error!("No tree found for digest {reference} in the database.");
1✔
449
                    return Err(LoadError::TreeNotFound(*reference));
1✔
450
                }
NEW
451
                Err(sql_error) => {
×
NEW
452
                    error!("Error loading tree from the database: {sql_error:?}");
×
NEW
453
                    return Err(LoadError::Rusqlite(format!("{}", &sql_error)));
×
454
                }
455
            };
456
        let tree_blob = TreeBlob::try_from(decompressed_data.into())
1,544✔
457
            .map_err(|error| LoadError::Deserialization(*reference, error))?;
386✔
458
        let mut statement = connection_locked
772✔
459
        .prepare_cached(concat!(
772✔
460
            "SELECT reference.zero_based_index, reference.target, tree.id FROM reference, tree",
461
            " WHERE reference.origin = ? AND reference.target = tree.digest ORDER BY reference.zero_based_index ASC"
462
        ))
463
        .map_err(|error| LoadError::Rusqlite(format!("{}", &error)))?;
386✔
464
        let child_results = statement
772✔
465
            .query_map([&tree_id], |row| {
2,919✔
466
                let index: i64 = row.get(0)?;
8,588✔
467
                let target: [u8; 64] = row.get(1)?;
8,588✔
468
                let child_tree_id: i64 = row.get(2)?;
8,588✔
469
                Ok((index, BlobDigest::new(&target), child_tree_id))
6,441✔
470
            })
471
            .map_err(|error| LoadError::Rusqlite(format!("{}", &error)))?;
386✔
472
        let child_digests: Vec<(BlobDigest, i64)> = child_results
1,158✔
473
            .enumerate()
474
            .map(|(expected_index, maybe_tuple)| {
2,533✔
475
                let tuple =
2,147✔
476
                    maybe_tuple.map_err(|error| LoadError::Rusqlite(format!("{}", &error)))?;
4,294✔
477
                let target = tuple.1;
4,294✔
478
                let actual_index = tuple.0;
4,294✔
479
                let child_tree_id = tuple.2;
4,294✔
480
                if expected_index as i64 != actual_index {
2,147✔
481
                    return Err(LoadError::Inconsistency(
×
482
                        *reference,
×
483
                        format!(
×
484
                            "Expected index {}, but got {}",
×
485
                            expected_index, actual_index
×
486
                        ),
487
                    ));
488
                }
489
                Ok((target, child_tree_id))
2,147✔
490
            })
491
            .try_collect()?;
492
        (tree_blob, child_digests, tree_id)
772✔
493
    };
494
    let mut child_references = Vec::new();
772✔
495
    for (child_digest, child_tree_id) in child_digests {
4,680✔
496
        let (connection_locked, garbage_collector) = {
6,441✔
497
            let state = &mut *state_locked;
6,441✔
498
            (&state.connection, &mut state.garbage_collector)
2,147✔
499
        };
500
        let child_reference = garbage_collector
4,294✔
501
            .require_additional_root(&child_digest, child_tree_id, connection_locked)
8,588✔
502
            .map_err(|error| LoadError::Rusqlite(error.to_string()))?;
2,147✔
503
        child_references.push(child_reference);
6,441✔
504
    }
505
    let child_count = child_references.len();
1,158✔
506
    let children = match TreeChildren::try_from(child_references) {
771✔
507
        Some(children) => children,
770✔
508
        None => {
509
            let message = format!("Tree has too many children: {}", child_count);
2✔
510
            error!("{}", message);
1✔
511
            return Err(LoadError::Inconsistency(*reference, message));
1✔
512
        }
513
    };
514
    let tree = DelayedHashedTree::delayed(Arc::new(Tree::new(tree_blob, children)), *reference);
2,695✔
515
    let (connection_locked, garbage_collector) = {
1,155✔
516
        let state = &mut *state_locked;
1,155✔
517
        (&state.connection, &mut state.garbage_collector)
385✔
518
    };
519
    let root_reference = garbage_collector
770✔
520
        .require_additional_root(reference, tree_id, connection_locked)
1,540✔
521
        .map_err(|error| LoadError::Rusqlite(error.to_string()))?;
385✔
522
    Ok(StrongDelayedHashedTree::new(root_reference, tree))
770✔
523
}
524

525
#[async_trait]
526
impl LoadTree for SQLiteStorage {
527
    async fn load_tree(
528
        &self,
529
        reference: &BlobDigest,
530
    ) -> std::result::Result<StrongDelayedHashedTree, LoadError> {
531
        load_tree_impl(&self.state, reference).await
532
    }
533

534
    async fn approximate_tree_count(&self) -> std::result::Result<u64, StoreError> {
5✔
535
        let state_locked = self.state.lock().await;
536
        let connection_locked = &state_locked.connection;
537
        match connection_locked
538
            .query_row_and_then(
539
                "SELECT COUNT(*) FROM tree",
540
                (),
541
                |row| -> rusqlite::Result<_> {
5✔
542
                    let count: i64 = row.get(0)?;
20✔
543
                    Ok(count)
5✔
544
                },
545
            )
546
            .map_err(|error| StoreError::Rusqlite(format!("{}", &error)))
×
547
        {
548
            Ok(count) => Ok(u64::try_from(count).expect("COUNT(*) won't be negative")),
549
            Err(err) => Err(err),
550
        }
551
    }
552
}
553

554
impl LoadStoreTree for SQLiteStorage {}
555

556
#[async_trait]
557
impl UpdateRoot for SQLiteStorage {
558
    //#[instrument(skip_all)]
559
    async fn update_root(
560
        &self,
561
        name: &str,
562
        target: &StrongReference,
563
    ) -> std::result::Result<(), StoreError> {
564
        info!("Update root {} to {}", name, target);
565
        let mut state_locked = self.state.lock().await;
566
        state_locked
567
            .require_transaction(1)
568
            .map_err(|err| StoreError::Rusqlite(format!("{}", &err)))?;
×
569
        let connection_locked = &state_locked.connection;
570
        let target_array: [u8; 64] = (*target.digest()).into();
571
        // TODO: verify that the tree exists
572
        connection_locked.execute(
573
            "INSERT INTO root (name, target) VALUES (?1, ?2) ON CONFLICT(name) DO UPDATE SET target = ?2;",
574
            (&name, &target_array),
575
        )
576
        .map_err(|err| StoreError::Rusqlite(format!("{}", &err)))?;
2✔
577
        Ok(())
578
    }
579
}
580

581
#[async_trait]
582
impl CollectGarbage for SQLiteStorage {
583
    async fn collect_some_garbage(
584
        &self,
585
    ) -> std::result::Result<GarbageCollectionStats, StoreError> {
586
        let mut state_locked = self.state.lock().await;
587
        // TODO: why do we need a transaction?
588
        state_locked
589
            .require_transaction(/*the number doesn't really matter here*/ 1)
590
            .map_err(|err| StoreError::Rusqlite(format!("{}", &err)))?;
×
591
        let state_borrowed: &mut SQLiteState = &mut state_locked;
592
        let stats = state_borrowed
593
            .garbage_collector
594
            .collect_garbage(&state_borrowed.connection)
595
            .map_err(|err| StoreError::Rusqlite(format!("{}", &err)))?;
2✔
596
        // TODO: why do we need a transaction?
597
        state_locked
598
            .require_transaction(stats.trees_collected)
599
            .map_err(|err| StoreError::Rusqlite(format!("{}", &err)))?;
×
600
        Ok(stats)
601
    }
602
}
603

604
#[async_trait]
605
impl LoadRoot for SQLiteStorage {
606
    //#[instrument(skip_all)]
607
    async fn load_root(
608
        &self,
609
        name: &str,
610
    ) -> std::result::Result<Option<StrongReference>, LoadError> {
611
        let mut state_locked = self.state.lock().await;
612
        let connection_locked = &state_locked.connection;
613
        let target: Option<(BlobDigest, i64)> = connection_locked
614
            .query_row(
615
                "SELECT root.target, tree.id FROM root, tree WHERE root.name = ?1 AND root.target = tree.digest",
616
                (&name,),
617
                |row| -> rusqlite::Result<_> {
45✔
618
                    let target = row.get(0)?;
135✔
619
                    let tree_id: i64 = row.get(1)?;
180✔
620
                    Ok((BlobDigest::new(&target), tree_id))
90✔
621
                },
622
            )
623
            .optional()
624
            .map_err(|err| LoadError::Rusqlite(format!("{}", &err)))?;
2✔
625
        match target {
626
            Some((digest, tree_id)) => {
627
                let (connection_locked, garbage_collector) = {
628
                    let state = &mut *state_locked;
629
                    (&state.connection, &mut state.garbage_collector)
630
                };
631
                let reference = garbage_collector
632
                    .require_additional_root(&digest, tree_id, connection_locked)
NEW
633
                    .map_err(|error| LoadError::Rusqlite(error.to_string()))?;
×
634
                Ok(Some(reference))
635
            }
636
            None => Ok(None),
637
        }
638
    }
639
}
640

641
#[async_trait]
642
impl CommitChanges for SQLiteStorage {
643
    #[instrument(skip_all)]
644
    async fn commit_changes(&self) -> Result<(), StoreError> {
645
        let mut state_locked = self.state.lock().await;
646
        match state_locked.transaction {
647
            Some(ref stats) => {
648
                info!("COMMITting transaction with {} writes", stats.writes);
649
                state_locked
650
                    .connection
651
                    .execute("COMMIT;", ())
652
                    .map_err(|err| StoreError::Rusqlite(format!("{}", &err)))?;
×
653
                state_locked.transaction = None;
654
                Ok(())
655
            }
656
            None => Ok(()),
657
        }
658
    }
659
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc