• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

TyRoXx / NonlocalityOS / 22054314229

16 Feb 2026 07:48AM UTC coverage: 78.283% (+0.2%) from 78.043%
22054314229

Pull #418

github

web-flow
Merge 973223b37 into e3367509b
Pull Request #418: Fix: Sometimes the storage garbage collector appears to collect new trees that are still needed

736 of 838 new or added lines in 29 files covered. (87.83%)

55 existing lines in 6 files now uncovered.

7321 of 9352 relevant lines covered (78.28%)

26493.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.99
/sorted_tree/src/prolly_tree_editable_node.rs
1
use crate::sorted_tree::{self, NodeValue, TreeReference};
2
use astraea::{
3
    storage::{load_children, LoadError, LoadTree, StoreError, StoreTree, StrongReference},
4
    tree::{BlobDigest, TREE_BLOB_MAX_LENGTH},
5
};
6
use serde::{de::DeserializeOwned, Deserialize, Serialize};
7
use std::collections::BTreeMap;
8
use std::fmt::Debug;
9
use std::hash::{BuildHasher, Hash};
10

11
#[derive(Debug, PartialEq)]
12
pub enum IntegrityCheckResult {
13
    Valid { depth: usize },
14
    Corrupted(String),
15
}
16

17
// TODO: Why does this only return the first byte of the hash? Is 256 buckets enough for good distribution?
18
// Would using more bytes improve the split point selection quality?
19
pub fn hash_key<Key: Serialize>(key: &Key) -> u8 {
204,445✔
20
    // TODO: use a better hash function (https://docs.dolthub.com/architecture/storage-engine/prolly-tree#controlling-chunk-size)
21
    let key_serialized = postcard::to_stdvec(key).expect("serializing key should succeed");
1,022,225✔
22
    let hasher = rapidhash::quality::SeedableState::fixed();
408,890✔
23
    let result: [u8; 8] = hasher.hash_one(&key_serialized).to_le_bytes();
1,226,670✔
24
    result[0]
204,445✔
25
}
26

27
// TODO: Why does this function determine split points based on key hash + size but not based on the actual tree balance?
28
// Should this be using a rolling hash or Rabin fingerprinting for content-defined chunking?
29
// The threshold value of 10 (out of 256 possible hash values) gives ~4% split probability - is this optimal?
30
// What happens if keys are not uniformly distributed? Could this lead to unbalanced trees?
31
pub fn is_split_after_key<Key: Serialize>(key: &Key, chunk_size_in_bytes: usize) -> bool {
1,162,521✔
32
    if chunk_size_in_bytes < 1000 {
1,162,521✔
33
        // No point in splitting small chunks.
34
        // TODO: use Tree efficiently
35
        return false;
958,064✔
36
    }
37
    if chunk_size_in_bytes >= TREE_BLOB_MAX_LENGTH / 2 {
204,457✔
38
        // TODO: try to pack more elements in a chunk before splitting
39
        return true;
14✔
40
    }
41
    let hash = hash_key(key);
613,329✔
42
    // TODO: Why is chunk_boundary_threshold hardcoded to 10? Should this be configurable or derived from target chunk size?
43
    // Is there a mathematical relationship between this threshold and the desired average chunk size?
44
    let chunk_boundary_threshold = 10;
408,886✔
45
    if hash < chunk_boundary_threshold {
204,443✔
46
        // written with an if expression so that we can see whether the tests cover both branches
47
        true
22,582✔
48
    } else {
49
        false
181,861✔
50
    }
51
}
52

53
pub struct SizeTracker {
54
    element_count: usize,
55
    total_element_size: usize,
56
}
57

58
impl Default for SizeTracker {
59
    fn default() -> Self {
×
60
        Self::new()
×
61
    }
62
}
63

64
impl SizeTracker {
65
    pub fn new() -> Self {
38,410✔
66
        SizeTracker {
67
            element_count: 0,
68
            total_element_size: 0,
69
        }
70
    }
71

72
    pub fn add_entry<Key: Serialize, Value: Serialize>(&mut self, key: &Key, value: &Value) {
3,769,832✔
73
        let entry_serialized: Vec<u8> =
7,539,664✔
74
            postcard::to_stdvec(&(key, value)).expect("serializing entry should succeed");
15,079,328✔
75
        self.element_count += 1;
3,769,832✔
76
        self.total_element_size += entry_serialized.len();
3,769,832✔
77
    }
78

79
    pub fn size(&self) -> usize {
1,162,522✔
80
        // TODO: optimize size calculation
81
        let metadata_serialized: Vec<u8> =
2,325,044✔
82
            postcard::to_stdvec(&Metadata { is_leaf: true }).unwrap();
3,487,566✔
83
        let element_count_serialized: Vec<u8> = postcard::to_stdvec(&self.element_count).unwrap();
5,812,610✔
84
        metadata_serialized.len() + element_count_serialized.len() + self.total_element_size
3,487,566✔
85
    }
86
}
87

88
#[derive(Serialize, Deserialize, Clone, Hash)]
89
pub struct Metadata {
90
    pub is_leaf: bool,
91
}
92

93
pub async fn store_node<Key: Serialize + Ord, Value: NodeValue>(
415✔
94
    store_tree: &(dyn StoreTree + Send + Sync),
95
    node: &sorted_tree::Node<Key, Value>,
96
    metadata: &Metadata,
97
) -> Result<StrongReference, StoreError> {
98
    let metadata_serialized =
415✔
99
        postcard::to_stdvec(metadata).expect("serializing metadata should always succeed");
1,660✔
100
    crate::sorted_tree::store_node(store_tree, node, &bytes::Bytes::from(metadata_serialized)).await
1,660✔
101
}
102

103
#[derive(Debug, PartialEq)]
104
pub enum EitherNodeType<Key: Serialize + Ord, Value: NodeValue> {
105
    Leaf(sorted_tree::Node<Key, Value>),
106
    Internal(sorted_tree::Node<Key, TreeReference>),
107
}
108

109
#[derive(Debug, PartialEq, Eq, Clone)]
110
pub enum DeserializationError {
111
    Load(LoadError),
112
    TreeHashMismatch(BlobDigest),
113
}
114

115
impl std::fmt::Display for DeserializationError {
116
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
×
117
        write!(f, "{self:?}")
×
118
    }
119
}
120

121
impl std::error::Error for DeserializationError {}
122

123
pub async fn load_node<
186✔
124
    Key: Serialize + DeserializeOwned + PartialEq + Ord,
125
    Value: NodeValue + Clone,
126
>(
127
    load_tree: &(dyn LoadTree + Send + Sync),
128
    root: &BlobDigest,
129
) -> Result<EitherNodeType<Key, Value>, Box<dyn std::error::Error>> {
130
    let loaded = match load_tree.load_tree(root).await {
930✔
131
        Ok(loaded) => loaded,
372✔
132
        Err(error) => return Err(DeserializationError::Load(error).into()),
×
133
    };
134
    let hashed = match loaded.hash() {
372✔
135
        Some(hashed) => hashed,
372✔
136
        None => return Err(DeserializationError::TreeHashMismatch(*root).into()),
×
137
    };
138
    let tree = hashed.hashed_tree().tree();
558✔
139
    let (metadata, sorted_tree_data) =
372✔
140
        postcard::take_from_bytes::<Metadata>(tree.blob().as_slice())?;
372✔
141
    let children = load_children(load_tree, tree)
744✔
142
        .await?
186✔
143
        .into_iter()
144
        .map(|child| child.reference().clone())
2,374✔
145
        .collect::<Vec<_>>();
146
    if metadata.is_leaf {
186✔
147
        let node = sorted_tree::node_from_tree::<Key, Value>(
148
            tree.blob(),
185✔
149
            &children,
185✔
150
            tree.blob().as_slice().len() - sorted_tree_data.len(),
555✔
151
        );
152
        Ok(EitherNodeType::Leaf(node))
185✔
153
    } else {
154
        let node = sorted_tree::node_from_tree::<Key, TreeReference>(
155
            tree.blob(),
1✔
156
            &children,
1✔
157
            tree.blob().as_slice().len() - sorted_tree_data.len(),
3✔
158
        );
159
        Ok(EitherNodeType::Internal(node))
1✔
160
    }
161
}
162

163
#[derive(Debug, Clone)]
164
pub enum EditableNode<Key: std::cmp::Ord + Clone, Value: Clone> {
165
    Reference(StrongReference),
166
    Loaded(EditableLoadedNode<Key, Value>),
167
}
168

169
impl<
170
        Key: Serialize + DeserializeOwned + PartialEq + Ord + Clone + Debug,
171
        Value: NodeValue + Clone,
172
    > Default for EditableNode<Key, Value>
173
{
174
    fn default() -> Self {
1✔
175
        Self::new()
1✔
176
    }
177
}
178

179
impl<
180
        Key: Serialize + DeserializeOwned + PartialEq + Ord + Clone + Debug,
181
        Value: NodeValue + Clone,
182
    > EditableNode<Key, Value>
183
{
184
    pub fn new() -> Self {
291✔
185
        EditableNode::Loaded(EditableLoadedNode::Leaf(EditableLeafNode {
291✔
186
            entries: BTreeMap::new(),
291✔
187
        }))
188
    }
189

190
    pub async fn require_loaded(
3,119,165✔
191
        &mut self,
192
        load_tree: &(dyn LoadTree + Send + Sync),
193
    ) -> Result<&mut EditableLoadedNode<Key, Value>, Box<dyn std::error::Error>> {
194
        match self {
3,119,165✔
195
            EditableNode::Reference(strong_reference) => {
17✔
196
                let loaded: EitherNodeType<Key, Value> =
34✔
197
                    load_node(load_tree, strong_reference.digest()).await?;
51✔
198
                *self = EditableNode::Loaded(EditableLoadedNode::new(loaded));
34✔
199
            }
200
            EditableNode::Loaded(_loaded_node) => {}
6,238,296✔
201
        };
202
        let loaded = match self {
6,238,330✔
203
            EditableNode::Loaded(loaded_node) => loaded_node,
6,238,330✔
204
            _ => unreachable!(),
205
        };
206
        Ok(loaded)
3,119,165✔
207
    }
208

209
    pub async fn insert(
5,606✔
210
        &mut self,
211
        key: Key,
212
        value: Value,
213
        load_tree: &(dyn LoadTree + Send + Sync),
214
    ) -> Result<(), Box<dyn std::error::Error>> {
215
        let (self_top_key, nodes_split) = self.insert_impl(key, value, load_tree).await?;
39,242✔
216
        if nodes_split.is_empty() {
11,212✔
217
            return Ok(());
5,598✔
218
        }
219
        let mut entries = BTreeMap::new();
16✔
220
        entries.insert(self_top_key, self.clone());
40✔
221
        for node in nodes_split {
24✔
222
            entries.insert(
24✔
223
                node.top_key().expect("Node cannot be empty here").clone(),
32✔
224
                EditableNode::Loaded(node),
8✔
225
            );
226
        }
227
        *self = EditableNode::Loaded(EditableLoadedNode::Internal(EditableInternalNode {
16✔
228
            entries,
8✔
229
        }));
230
        Ok(())
8✔
231
    }
232

233
    pub async fn insert_impl(
9,444✔
234
        &mut self,
235
        key: Key,
236
        value: Value,
237
        load_tree: &(dyn LoadTree + Send + Sync),
238
    ) -> Result<(Key, Vec<EditableLoadedNode<Key, Value>>), Box<dyn std::error::Error>> {
239
        let loaded = self.require_loaded(load_tree).await?;
37,776✔
240
        let nodes_split = Box::pin(loaded.insert(key, value, load_tree)).await?;
66,108✔
241
        Ok((
9,444✔
242
            loaded.top_key().expect("Node cannot be empty here").clone(),
37,776✔
243
            nodes_split,
9,444✔
244
        ))
245
    }
246

247
    pub async fn remove(
1,014✔
248
        &mut self,
249
        key: &Key,
250
        load_tree: &(dyn LoadTree + Send + Sync),
251
    ) -> Result<Option<Value>, Box<dyn std::error::Error>> {
252
        let (maybe_top_key, maybe_removed) = self.remove_impl(key, load_tree).await?;
6,084✔
253
        if maybe_top_key.is_none() {
2,029✔
254
            *self = EditableNode::Loaded(EditableLoadedNode::Leaf(EditableLeafNode {
1✔
255
                entries: BTreeMap::new(),
1✔
256
            }));
257
        } else {
258
            let loaded = self.require_loaded(load_tree).await?;
4,052✔
259
            if let Some(simplified) = loaded.simplify() {
1,015✔
260
                *self = simplified;
1✔
261
            }
262
        }
263
        Ok(maybe_removed)
1,014✔
264
    }
265

266
    pub async fn remove_impl(
1,901✔
267
        &mut self,
268
        key: &Key,
269
        load_tree: &(dyn LoadTree + Send + Sync),
270
    ) -> Result<(Option<Key>, Option<Value>), Box<dyn std::error::Error>> {
271
        let loaded = self.require_loaded(load_tree).await?;
7,604✔
272
        let result = loaded.remove(key, load_tree).await?;
9,505✔
273
        Ok(result)
1,901✔
274
    }
275

276
    pub async fn find(
3,057,003✔
277
        &mut self,
278
        key: &Key,
279
        load_tree: &(dyn LoadTree + Send + Sync),
280
    ) -> Result<Option<Value>, Box<dyn std::error::Error>> {
281
        let loaded = self.require_loaded(load_tree).await?;
12,228,012✔
282
        loaded.find(key, load_tree).await
12,228,012✔
283
    }
284

285
    pub async fn count(
21,928✔
286
        &mut self,
287
        load_tree: &(dyn LoadTree + Send + Sync),
288
    ) -> Result<u64, Box<dyn std::error::Error>> {
289
        let loaded = self.require_loaded(load_tree).await?;
87,712✔
290
        Box::pin(loaded.count(load_tree)).await
87,712✔
291
    }
292

293
    pub async fn save(
428✔
294
        &mut self,
295
        store_tree: &(dyn StoreTree + Send + Sync),
296
    ) -> Result<StrongReference, Box<dyn std::error::Error>> {
297
        match self {
428✔
298
            EditableNode::Reference(strong_reference) => Ok(strong_reference.clone()),
28✔
299
            EditableNode::Loaded(loaded_node) => loaded_node.save(store_tree).await,
1,656✔
300
        }
301
    }
302

303
    pub async fn load(
169✔
304
        digest: &BlobDigest,
305
        load_tree: &(dyn LoadTree + Send + Sync),
306
    ) -> Result<Self, Box<dyn std::error::Error>> {
307
        let loaded: EitherNodeType<Key, Value> = load_node(load_tree, digest).await?;
845✔
308
        Ok(EditableNode::Loaded(EditableLoadedNode::new(loaded)))
169✔
309
    }
310

311
    pub async fn verify_integrity(
5,684✔
312
        &mut self,
313
        expected_top_key: Option<&Key>,
314
        load_tree: &(dyn LoadTree + Send + Sync),
315
    ) -> Result<IntegrityCheckResult, Box<dyn std::error::Error>> {
316
        let loaded = self.require_loaded(load_tree).await?;
22,736✔
317
        if loaded.top_key() != expected_top_key {
5,684✔
318
            return Ok(IntegrityCheckResult::Corrupted(
×
319
                "Top key mismatch".to_string(),
×
320
            ));
321
        }
322
        Box::pin(loaded.verify_integrity(load_tree)).await
22,736✔
323
    }
324

325
    pub async fn merge(
109✔
326
        &mut self,
327
        other: Self,
328
        load_tree: &(dyn LoadTree + Send + Sync),
329
    ) -> Result<(Key, Vec<EditableLoadedNode<Key, Value>>), Box<dyn std::error::Error>> {
330
        let loaded = self.require_loaded(load_tree).await?;
436✔
331
        let other_loaded = match other {
218✔
NEW
332
            EditableNode::Reference(strong_reference) => {
×
333
                let loaded: EitherNodeType<Key, Value> =
×
NEW
334
                    load_node(load_tree, strong_reference.digest()).await?;
×
335
                EditableLoadedNode::new(loaded)
×
336
            }
337
            EditableNode::Loaded(loaded_node) => loaded_node,
218✔
338
        };
339
        match (loaded, other_loaded) {
218✔
340
            (EditableLoadedNode::Leaf(self_leaf), EditableLoadedNode::Leaf(other_leaf)) => {
218✔
341
                for (key, value) in other_leaf.entries {
27,172✔
342
                    self_leaf.entries.insert(key, value);
27,063✔
343
                }
344
                let split_nodes = self_leaf.check_split();
327✔
345
                Ok((
×
346
                    self_leaf
218✔
347
                        .top_key()
218✔
348
                        .expect("Leaf cannot be empty here")
109✔
349
                        .clone(),
109✔
350
                    split_nodes
109✔
351
                        .into_iter()
109✔
352
                        .map(|n| EditableLoadedNode::Leaf(n))
166✔
353
                        .collect(),
109✔
354
                ))
355
            }
356
            (
357
                EditableLoadedNode::Internal(self_internal),
×
358
                EditableLoadedNode::Internal(other_internal),
×
359
            ) => {
×
360
                for (key, child_node) in other_internal.entries {
×
361
                    let previous_entry = self_internal.entries.insert(key, child_node);
×
362
                    if let Some(_existing_child) = previous_entry {
×
363
                        return Err(Box::new(std::io::Error::other("Merge node key collision")));
×
364
                    }
365
                }
366
                let split_nodes = self_internal.check_split();
×
367
                Ok((
×
368
                    self_internal
×
369
                        .top_key()
×
370
                        .expect("Internal node cannot be empty here")
×
371
                        .clone(),
×
372
                    split_nodes
×
373
                        .into_iter()
×
374
                        .map(|n| EditableLoadedNode::Internal(n))
×
375
                        .collect(),
×
376
                ))
377
            }
378
            _ => unreachable!(),
379
        }
380
    }
381

382
    pub async fn is_naturally_split(
21,961✔
383
        &mut self,
384
        load_tree: &(dyn LoadTree + Send + Sync),
385
    ) -> Result<bool, Box<dyn std::error::Error>> {
386
        let loaded = self.require_loaded(load_tree).await?;
87,844✔
387
        Ok(loaded.is_naturally_split())
21,961✔
388
    }
389
}
390

391
#[derive(Debug, Clone)]
392
pub struct EditableLeafNode<Key, Value> {
393
    entries: BTreeMap<Key, Value>,
394
}
395

396
impl<Key: std::cmp::Ord + Clone + Serialize, Value: Clone + NodeValue>
397
    EditableLeafNode<Key, Value>
398
{
399
    pub fn create(entries: BTreeMap<Key, Value>) -> Option<Self> {
5,856✔
400
        if entries.is_empty() {
11,712✔
401
            None
2✔
402
        } else {
403
            Some(EditableLeafNode { entries })
5,854✔
404
        }
405
    }
406

407
    pub async fn insert(&mut self, key: Key, value: Value) -> Vec<EditableLeafNode<Key, Value>> {
11,212✔
408
        self.entries.insert(key, value);
22,424✔
409
        self.check_split()
11,212✔
410
    }
411

412
    pub async fn remove(
1,014✔
413
        &mut self,
414
        key: &Key,
415
    ) -> Result<(Option<Key>, Option<Value>), Box<dyn std::error::Error>> {
416
        let removed = self.entries.remove(key);
4,056✔
417
        let top_key = self.top_key().cloned();
4,056✔
418
        Ok((top_key, removed))
1,014✔
419
    }
420

421
    fn check_split(&mut self) -> Vec<EditableLeafNode<Key, Value>> {
5,715✔
422
        let mut result = Vec::new();
11,430✔
423
        let mut current_node = BTreeMap::new();
11,430✔
424
        let mut current_node_size_tracker = SizeTracker::new();
11,430✔
425
        for entry in self.entries.iter() {
590,074✔
426
            current_node_size_tracker.add_entry(entry.0, &entry.1.to_content());
2,314,576✔
427
            current_node.insert(entry.0.clone(), entry.1.clone());
3,471,864✔
428
            if is_split_after_key(entry.0, current_node_size_tracker.size()) {
2,316,681✔
429
                result.push(
6,315✔
430
                    EditableLeafNode::create(current_node)
6,315✔
431
                        .expect("Must succeed because list is not empty"),
6,315✔
432
                );
433
                current_node = BTreeMap::new();
4,210✔
434
                current_node_size_tracker = SizeTracker::new();
2,105✔
435
            }
436
        }
437
        if !current_node.is_empty() {
9,463✔
438
            result.push(
11,244✔
439
                EditableLeafNode::create(current_node)
11,244✔
440
                    .expect("Must succeed because list is not empty"),
3,748✔
441
            );
442
        }
443
        *self = result.remove(0);
17,145✔
444
        result
5,715✔
445
    }
446

447
    pub fn top_key(&self) -> Option<&Key> {
11,657✔
448
        self.entries.keys().next_back()
23,314✔
449
    }
450

451
    pub fn find(&mut self, key: &Key) -> Option<Value> {
1,549,689✔
452
        self.entries.get(key).cloned()
6,198,756✔
453
    }
454

455
    pub fn verify_integrity(&mut self) -> Result<IntegrityCheckResult, Box<dyn std::error::Error>> {
4,790✔
456
        let mut size_tracker = SizeTracker::new();
9,580✔
457
        for (index, (key, value)) in self.entries.iter().enumerate() {
1,648,281✔
458
            size_tracker.add_entry(key, &value.to_content());
2,178,548✔
459
            let is_split = is_split_after_key(key, size_tracker.size());
2,723,185✔
460
            if (index < self.entries.len() - 1) && is_split {
1,629,123✔
461
                return Ok(IntegrityCheckResult::Corrupted(format!(
×
462
                    "Leaf node integrity check failed: Key at index {} indicates split but node is not final (number of keys: {})",
×
463
                    index, self.entries.len()
×
464
                )));
465
            }
466
        }
467
        Ok(IntegrityCheckResult::Valid { depth: 0 })
4,790✔
468
    }
469

470
    pub fn is_naturally_split(&self) -> bool {
21,961✔
471
        let mut size_tracker = SizeTracker::new();
43,922✔
472
        for entry in self.entries.iter() {
5,302,464✔
473
            size_tracker.add_entry(entry.0, &entry.1.to_content());
10,517,084✔
474
        }
475
        is_split_after_key(
476
            self.entries.keys().last().expect("leaf node is not empty"),
87,844✔
477
            size_tracker.size(),
43,922✔
478
        )
479
    }
480

481
    pub fn entries(&self) -> &BTreeMap<Key, Value> {
120✔
482
        &self.entries
120✔
483
    }
484
}
485

486
#[derive(Debug, Clone)]
487
pub struct EditableInternalNode<Key: std::cmp::Ord + Clone, Value: Clone> {
488
    entries: BTreeMap<Key, EditableNode<Key, Value>>,
489
}
490

491
impl<
492
        Key: Serialize + DeserializeOwned + PartialEq + Ord + Clone + Debug,
493
        Value: NodeValue + Clone,
494
    > EditableInternalNode<Key, Value>
495
{
496
    pub fn create(entries: BTreeMap<Key, EditableNode<Key, Value>>) -> Option<Self> {
3,838✔
497
        if entries.is_empty() {
7,676✔
498
            None
×
499
        } else {
500
            Some(EditableInternalNode { entries })
3,838✔
501
        }
502
    }
503

504
    pub async fn insert(
3,838✔
505
        &mut self,
506
        key: Key,
507
        value: Value,
508
        load_tree: &(dyn LoadTree + Send + Sync),
509
    ) -> Result<Vec<EditableInternalNode<Key, Value>>, Box<dyn std::error::Error>> {
510
        let last_index = self.entries.len() - 1;
7,676✔
511
        // TODO: optimize search
512
        for (index, (entry_key, entry_value)) in self.entries.iter_mut().enumerate() {
49,974✔
513
            if (index == last_index) || (key <= *entry_key) {
23,796✔
514
                let (updated_key, split_nodes) =
7,676✔
515
                    entry_value.insert_impl(key, value, load_tree).await?;
19,190✔
516
                if updated_key != *entry_key {
3,838✔
517
                    let old_key = entry_key.clone();
3,411✔
518
                    let old_value = self.entries.remove(&old_key).expect("key must exist");
6,822✔
519
                    let previous_entry = self.entries.insert(updated_key, old_value);
5,685✔
520
                    assert!(previous_entry.is_none(), "Split node key collision");
521
                }
522
                for node in split_nodes {
3,911✔
523
                    let previous_entry = self.entries.insert(
219✔
524
                        node.top_key().expect("Node cannot be empty here").clone(),
292✔
525
                        EditableNode::Loaded(node),
73✔
526
                    );
527
                    assert!(previous_entry.is_none(), "Split node key collision");
528
                }
529
                self.update_chunk_boundaries(load_tree).await?;
11,514✔
530
                break;
3,838✔
531
            }
532
        }
533
        Ok(self.check_split())
3,838✔
534
    }
535

536
    pub async fn remove(
887✔
537
        &mut self,
538
        key: &Key,
539
        load_tree: &(dyn LoadTree + Send + Sync),
540
    ) -> Result<(Option<Key>, Option<Value>), Box<dyn std::error::Error>> {
541
        // TODO: optimize search
542
        for (entry_key, entry_value) in self.entries.iter_mut() {
6,608✔
543
            if key <= entry_key {
2,417✔
544
                let (maybe_new_top_key, maybe_removed) =
1,774✔
545
                    Box::pin(entry_value.remove_impl(key, load_tree)).await?;
4,435✔
546
                match maybe_new_top_key {
887✔
547
                    Some(new_top_key) => {
887✔
548
                        if new_top_key != *entry_key {
894✔
549
                            let entry_key = entry_key.clone();
28✔
550
                            let entry_value_removed = self
21✔
551
                                .entries
14✔
552
                                .remove(&entry_key)
21✔
553
                                .expect("Must exist because we just iterated over the entries");
554
                            self.entries.insert(new_top_key, entry_value_removed);
21✔
555
                        }
556
                    }
557
                    None => {
×
558
                        let entry_key = entry_key.clone();
×
559
                        self.entries.remove(&entry_key);
×
560
                    }
561
                }
562
                let top_key = self.top_key().cloned();
3,548✔
563
                match maybe_removed {
887✔
564
                    Some(removed) => {
887✔
565
                        if !self.entries.is_empty() {
887✔
566
                            self.update_chunk_boundaries(load_tree).await?;
2,661✔
567
                        }
568
                        return Ok((top_key, Some(removed)));
887✔
569
                    }
570
                    None => {
×
571
                        return Ok((top_key, None));
×
572
                    }
573
                }
574
            }
575
        }
576
        Ok((self.top_key().cloned(), None))
×
577
    }
578

579
    async fn update_chunk_boundaries(
4,725✔
580
        &mut self,
581
        load_tree: &(dyn LoadTree + Send + Sync),
582
    ) -> Result<(), Box<dyn std::error::Error>> {
583
        loop {
×
584
            let merge_candidates = self.find_merge_candidates(load_tree).await?;
19,336✔
585
            match merge_candidates {
4,834✔
586
                Some((low_key, high_key)) => {
218✔
587
                    let mut low = self.entries.remove(&low_key).expect("key must exist");
654✔
588
                    let high = self.entries.remove(&high_key).expect("key must exist");
654✔
589
                    let (low_top_key, split_nodes) = low.merge(high, load_tree).await?;
654✔
590
                    assert!(split_nodes.is_empty() || low.is_naturally_split(load_tree).await?);
591
                    assert_ne!(low_key, low_top_key, "Merge did not change low key");
592
                    let previous_entry = self.entries.insert(low_top_key, low);
545✔
593
                    assert!(previous_entry.is_none(), "Merge node key collision");
594
                    let split_nodes_len = split_nodes.len();
327✔
595
                    for (index, node) in split_nodes.into_iter().enumerate() {
441✔
596
                        assert!((index == split_nodes_len - 1) || node.is_naturally_split());
597
                        let previous_entry = self.entries.insert(
171✔
598
                            node.top_key().expect("Node cannot be empty here").clone(),
228✔
599
                            EditableNode::Loaded(node),
57✔
600
                        );
601
                        assert!(previous_entry.is_none(), "Merge node key collision");
602
                    }
603
                }
604
                None => break,
4,725✔
605
            }
606
        }
607
        Ok(())
4,725✔
608
    }
609

610
    async fn find_merge_candidates(
4,834✔
611
        &mut self,
612
        load_tree: &(dyn LoadTree + Send + Sync),
613
    ) -> Result<Option<(Key, Key)>, Box<dyn std::error::Error>> {
614
        let last_index = self.entries.len() - 1;
9,668✔
615
        let mut needs_merge: Option<&Key> = None;
14,502✔
616
        // TODO: optimize search
617
        for (index, (entry_key, entry_value)) in self.entries.iter_mut().enumerate() {
80,541✔
618
            if let Some(merge_value) = needs_merge.take() {
22,122✔
619
                return Ok(Some((merge_value.clone(), entry_key.clone())));
327✔
620
            }
621
            let is_split = entry_value.is_naturally_split(load_tree).await?;
87,616✔
622
            if (index != last_index) && !is_split {
39,192✔
623
                needs_merge = Some(entry_key);
109✔
624
            }
625
        }
626
        Ok(None)
4,725✔
627
    }
628

629
    fn check_split(&mut self) -> Vec<EditableInternalNode<Key, Value>> {
3,838✔
630
        let mut result = Vec::new();
7,676✔
631
        let mut current_node = BTreeMap::new();
7,676✔
632
        let mut current_node_size_tracker = SizeTracker::new();
7,676✔
633
        for entry in self.entries.iter() {
24,955✔
634
            current_node_size_tracker.add_entry(
34,558✔
635
                entry.0,
17,279✔
636
                &TreeReference::new(StrongReference::from_weak(BlobDigest::new(&[0; 64])))
51,837✔
637
                    .to_content(),
17,279✔
638
            );
639
            current_node.insert(entry.0.clone(), entry.1.clone());
103,674✔
640
            if is_split_after_key(entry.0, current_node_size_tracker.size()) {
69,116✔
641
                result.push(
×
642
                    EditableInternalNode::create(current_node)
×
643
                        .expect("Must succeed because list is not empty"),
×
644
                );
645
                current_node = BTreeMap::new();
×
646
                current_node_size_tracker = SizeTracker::new();
×
647
            }
648
        }
649
        if !current_node.is_empty() {
7,676✔
650
            result.push(
11,514✔
651
                EditableInternalNode::create(current_node)
11,514✔
652
                    .expect("Must succeed because list is not empty"),
3,838✔
653
            );
654
        }
655
        *self = result.remove(0);
11,514✔
656
        result
3,838✔
657
    }
658

659
    pub fn top_key(&self) -> Option<&Key> {
5,619✔
660
        self.entries.keys().next_back()
11,238✔
661
    }
662

663
    pub async fn find(
1,507,314✔
664
        &mut self,
665
        key: &Key,
666
        load_tree: &(dyn LoadTree + Send + Sync),
667
    ) -> Result<Option<Value>, Box<dyn std::error::Error>> {
668
        // TODO: optimize search
669
        for (entry_key, entry_value) in self.entries.iter_mut() {
11,578,646✔
670
            if key <= entry_key {
4,282,009✔
671
                return Box::pin(entry_value.find(key, load_tree)).await;
7,536,495✔
672
            }
673
        }
674
        Ok(None)
15✔
675
    }
676

677
    pub async fn verify_integrity(
894✔
678
        &mut self,
679
        load_tree: &(dyn LoadTree + Send + Sync),
680
    ) -> Result<IntegrityCheckResult, Box<dyn std::error::Error>> {
681
        if self.entries.is_empty() {
1,788✔
682
            return Ok(IntegrityCheckResult::Corrupted(
×
683
                "Internal node integrity check failed: Node has no entries".to_string(),
×
684
            ));
685
        }
686
        let mut child_depth = None;
1,788✔
687
        for (index, (key, value)) in self.entries.iter_mut().enumerate() {
15,429✔
688
            match value.verify_integrity(Some(key), load_tree).await? {
16,996✔
689
                IntegrityCheckResult::Valid { depth } => {
4,249✔
690
                    if let Some(existing_depth) = child_depth {
7,604✔
691
                        if existing_depth != depth {
3,355✔
692
                            return Ok(IntegrityCheckResult::Corrupted(format!(
×
693
                                "Internal node integrity check failed at index {}: Child node depth mismatch (expected {}, found {})",
×
694
                                index, existing_depth, depth
×
695
                            )));
696
                        }
697
                    } else {
698
                        child_depth = Some(depth);
894✔
699
                    }
700
                }
701
                IntegrityCheckResult::Corrupted(reason) => {
×
702
                    return Ok(IntegrityCheckResult::Corrupted(format!(
×
703
                        "Internal node integrity check failed at index {}: {}",
×
704
                        index, reason
×
705
                    )));
706
                }
707
            }
708
        }
709
        Ok(IntegrityCheckResult::Valid {
894✔
710
            depth: child_depth.expect("Internal node has to have at least one child") + 1,
1,788✔
711
        })
712
    }
713

714
    pub fn is_naturally_split(&self) -> bool {
×
715
        let last_key = self
×
716
            .entries
×
717
            .keys()
718
            .last()
719
            .expect("internal node is not empty");
720
        let mut size_tracker = SizeTracker::new();
×
721
        for entry in self.entries.iter() {
×
722
            size_tracker.add_entry(
×
723
                entry.0,
×
NEW
724
                &TreeReference::new(StrongReference::from_weak(BlobDigest::new(&[0; 64])))
×
NEW
725
                    .to_content(),
×
726
            );
727
        }
728
        is_split_after_key(last_key, size_tracker.size())
×
729
    }
730
}
731

732
#[derive(Debug, Clone)]
733
pub enum EditableLoadedNode<Key: std::cmp::Ord + Clone, Value: Clone> {
734
    Leaf(EditableLeafNode<Key, Value>),
735
    Internal(EditableInternalNode<Key, Value>),
736
}
737

738
impl<Key: Serialize + DeserializeOwned + Ord + Clone + Debug, Value: NodeValue + Clone>
739
    EditableLoadedNode<Key, Value>
740
{
741
    pub fn new(loaded: EitherNodeType<Key, Value>) -> Self {
186✔
742
        match loaded {
186✔
743
            EitherNodeType::Leaf(leaf_node) => {
185✔
744
                let mut entries = BTreeMap::new();
370✔
745
                for (key, value) in leaf_node.entries {
5,135✔
746
                    entries.insert(key, value);
4,950✔
747
                }
748
                EditableLoadedNode::Leaf(EditableLeafNode { entries })
185✔
749
            }
750
            EitherNodeType::Internal(internal_node) => {
1✔
751
                let mut entries = BTreeMap::new();
2✔
752
                for (key, child_node) in internal_node.entries {
25✔
753
                    entries.insert(key, EditableNode::Reference(child_node.reference().clone()));
32✔
754
                }
755
                EditableLoadedNode::Internal(EditableInternalNode { entries })
1✔
756
            }
757
        }
758
    }
759

760
    pub async fn insert(
9,444✔
761
        &mut self,
762
        key: Key,
763
        value: Value,
764
        load_tree: &(dyn LoadTree + Send + Sync),
765
    ) -> Result<Vec<EditableLoadedNode<Key, Value>>, Box<dyn std::error::Error>> {
766
        match self {
9,444✔
767
            EditableLoadedNode::Leaf(leaf_node) => {
5,606✔
768
                let split_nodes = leaf_node.insert(key, value).await;
28,030✔
769
                Ok(split_nodes
5,606✔
770
                    .into_iter()
5,606✔
771
                    .map(|node| EditableLoadedNode::Leaf(node))
5,687✔
772
                    .collect())
5,606✔
773
            }
774
            EditableLoadedNode::Internal(internal_node) => {
3,838✔
775
                let split_nodes = internal_node.insert(key, value, load_tree).await?;
23,028✔
776
                Ok(split_nodes
3,838✔
777
                    .into_iter()
3,838✔
778
                    .map(|node| EditableLoadedNode::Internal(node))
3,838✔
779
                    .collect())
3,838✔
780
            }
781
        }
782
    }
783

784
    pub async fn remove(
1,901✔
785
        &mut self,
786
        key: &Key,
787
        load_tree: &(dyn LoadTree + Send + Sync),
788
    ) -> Result<(Option<Key>, Option<Value>), Box<dyn std::error::Error>> {
789
        match self {
1,901✔
790
            EditableLoadedNode::Leaf(leaf_node) => leaf_node.remove(key).await,
4,056✔
791
            EditableLoadedNode::Internal(internal_node) => {
887✔
792
                internal_node.remove(key, load_tree).await
3,548✔
793
            }
794
        }
795
    }
796

797
    pub fn simplify(&mut self) -> Option<EditableNode<Key, Value>> {
1,013✔
798
        match self {
1,013✔
799
            EditableLoadedNode::Internal(internal_node) => {
887✔
800
                if internal_node.entries.len() == 1 {
887✔
801
                    let (_, only_child) = internal_node
3✔
802
                        .entries
2✔
803
                        .iter_mut()
804
                        .next()
805
                        .expect("internal node has one entry");
806
                    Some(only_child.clone())
1✔
807
                } else {
808
                    None
886✔
809
                }
810
            }
811
            EditableLoadedNode::Leaf(_) => None,
126✔
812
        }
813
    }
814

815
    pub async fn find(
3,057,003✔
816
        &mut self,
817
        key: &Key,
818
        load_tree: &(dyn LoadTree + Send + Sync),
819
    ) -> Result<Option<Value>, Box<dyn std::error::Error>> {
820
        match self {
3,057,003✔
821
            EditableLoadedNode::Leaf(leaf_node) => Ok(leaf_node.find(key)),
4,649,067✔
822
            EditableLoadedNode::Internal(internal_node) => internal_node.find(key, load_tree).await,
7,536,570✔
823
        }
824
    }
825

826
    pub fn top_key(&self) -> Option<&Key> {
15,266✔
827
        match self {
15,266✔
828
            EditableLoadedNode::Leaf(leaf_node) => leaf_node.top_key(),
31,602✔
829
            EditableLoadedNode::Internal(internal_node) => internal_node.top_key(),
14,196✔
830
        }
831
    }
832

833
    pub async fn count(
21,928✔
834
        &mut self,
835
        load_tree: &(dyn LoadTree + Send + Sync),
836
    ) -> Result<u64, Box<dyn std::error::Error>> {
837
        match self {
21,928✔
838
            EditableLoadedNode::Leaf(leaf_node) => Ok(leaf_node.entries.len() as u64),
36,528✔
839
            EditableLoadedNode::Internal(internal_node) => {
3,664✔
840
                let mut total_count = 0;
7,328✔
841
                for child_node in internal_node.entries.values_mut() {
24,356✔
842
                    total_count += child_node.count(load_tree).await?;
51,084✔
843
                }
844
                Ok(total_count)
3,664✔
845
            }
846
        }
847
    }
848

849
    pub async fn save(
415✔
850
        &mut self,
851
        store_tree: &(dyn StoreTree + Send + Sync),
852
    ) -> Result<StrongReference, Box<dyn std::error::Error>> {
853
        match self {
415✔
854
            EditableLoadedNode::Leaf(leaf_node) => {
397✔
855
                let mut new_node = crate::sorted_tree::Node {
856
                    entries: Vec::new(),
397✔
857
                };
858
                for (key, value) in &leaf_node.entries {
25,843✔
859
                    new_node.entries.push((key.clone(), value.clone()));
42,410✔
860
                }
861
                let digest = store_node(store_tree, &new_node, &Metadata { is_leaf: true }).await?;
1,985✔
862
                Ok(digest)
397✔
863
            }
864
            EditableLoadedNode::Internal(internal_node) => {
18✔
865
                let mut new_node = crate::sorted_tree::Node {
866
                    entries: Vec::new(),
18✔
867
                };
868
                for (key, child_node) in &mut internal_node.entries {
170✔
869
                    let child_reference = Box::pin(child_node.save(store_tree)).await?;
380✔
870
                    new_node
76✔
871
                        .entries
76✔
872
                        .push((key.clone(), TreeReference::new(child_reference.clone())));
380✔
873
                }
874
                let digest =
18✔
875
                    store_node(store_tree, &new_node, &Metadata { is_leaf: false }).await?;
72✔
876
                Ok(digest)
18✔
877
            }
878
        }
879
    }
880

881
    pub async fn verify_integrity(
5,684✔
882
        &mut self,
883
        load_tree: &(dyn LoadTree + Send + Sync),
884
    ) -> Result<IntegrityCheckResult, Box<dyn std::error::Error>> {
885
        match self {
5,684✔
886
            EditableLoadedNode::Leaf(leaf_node) => leaf_node.verify_integrity(),
14,370✔
887
            EditableLoadedNode::Internal(internal_node) => {
894✔
888
                internal_node.verify_integrity(load_tree).await
2,682✔
889
            }
890
        }
891
    }
892

893
    pub fn is_naturally_split(&self) -> bool {
21,961✔
894
        match self {
21,961✔
895
            EditableLoadedNode::Leaf(leaf_node) => leaf_node.is_naturally_split(),
65,883✔
896
            EditableLoadedNode::Internal(internal_node) => internal_node.is_naturally_split(),
×
897
        }
898
    }
899
}
900

901
pub struct Iterator<
902
    't,
903
    Key: Serialize + DeserializeOwned + Ord + Clone + Debug,
904
    Value: NodeValue + Clone,
905
> {
906
    next: Vec<&'t mut EditableNode<Key, Value>>,
907
    leaf_iterator: Option<std::collections::btree_map::Iter<'t, Key, Value>>,
908
    load_tree: &'t (dyn LoadTree + Send + Sync),
909
}
910

911
impl<'t, Key, Value> Iterator<'t, Key, Value>
912
where
913
    Key: Serialize + DeserializeOwned + Ord + Clone + Debug,
914
    Value: NodeValue + Clone,
915
{
916
    pub fn new(
112✔
917
        node: &'t mut EditableNode<Key, Value>,
918
        load_tree: &'t (dyn LoadTree + Send + Sync),
919
    ) -> Self {
920
        Iterator {
921
            next: vec![node],
336✔
922
            leaf_iterator: None,
923
            load_tree,
924
        }
925
    }
926

927
    pub async fn next(&mut self) -> Result<Option<(Key, Value)>, Box<dyn std::error::Error>> {
3,402✔
928
        loop {
×
929
            if let Some(current_node) = self.leaf_iterator.as_mut() {
3,532✔
930
                match current_node.next() {
1,709✔
931
                    Some((key, value)) => return Ok(Some((key.clone(), value.clone()))),
7,945✔
932
                    None => {
120✔
933
                        self.leaf_iterator = None;
120✔
934
                    }
935
                }
936
            }
937
            match self.next.pop() {
234✔
938
                Some(next_node) => {
122✔
939
                    let loaded = next_node.require_loaded(self.load_tree).await?;
488✔
940
                    match loaded {
122✔
941
                        EditableLoadedNode::Leaf(leaf_node) => {
120✔
942
                            self.leaf_iterator = Some(leaf_node.entries().iter());
120✔
943
                            continue;
120✔
944
                        }
945
                        EditableLoadedNode::Internal(internal_node) => {
2✔
946
                            internal_node
2✔
947
                                .entries
2✔
948
                                .values_mut()
949
                                .rev()
950
                                .for_each(|child_node| {
12✔
951
                                    self.next.push(child_node);
30✔
952
                                });
953
                        }
954
                    };
955
                }
956
                None => {
×
957
                    return Ok(None);
112✔
958
                }
959
            }
960
        }
961
    }
962
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc