• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

TyRoXx / NonlocalityOS / 20206122348

14 Dec 2025 09:44AM UTC coverage: 80.316% (-0.5%) from 80.811%
20206122348

Pull #371

github

web-flow
Merge 69beb1cf5 into 9ba501c70
Pull Request #371: Prolly tree builds much larger chunks, respecting the max tree blob size

30 of 37 new or added lines in 1 file covered. (81.08%)

32 existing lines in 2 files now uncovered.

5431 of 6762 relevant lines covered (80.32%)

34201.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.52
/sorted_tree/src/prolly_tree_editable_node.rs
1
use crate::sorted_tree::{self, NodeValue, TreeReference};
2
use astraea::{
3
    storage::{LoadTree, StoreError, StoreTree},
4
    tree::BlobDigest,
5
};
6
use serde::{de::DeserializeOwned, Deserialize, Serialize};
7
use sha3::{Digest, Sha3_512};
8
use std::collections::BTreeMap;
9
use std::fmt::Debug;
10

11
#[derive(Debug, PartialEq)]
12
pub enum IntegrityCheckResult {
13
    Valid { depth: usize },
14
    Corrupted(String),
15
}
16

17
pub fn hash_key<Key: Serialize>(key: &Key) -> u8 {
156,688✔
18
    // TODO: use a better hash function (https://docs.dolthub.com/architecture/storage-engine/prolly-tree#controlling-chunk-size)
19
    let key_serialized = postcard::to_stdvec(key).expect("serializing key should succeed");
783,440✔
20
    let mut hasher = Sha3_512::new();
313,376✔
21
    hasher.update(&key_serialized);
470,064✔
22
    let result: [u8; 64] = hasher.finalize().into();
783,440✔
23
    result[0]
156,688✔
24
}
25

26
pub fn is_split_after_key<Key: Serialize>(key: &Key, chunk_size_in_bytes: usize) -> bool {
1,068,573✔
27
    if chunk_size_in_bytes < 1000 {
1,068,573✔
28
        // No point in splitting small chunks.
29
        // TODO: use Tree efficiently
30
        return false;
911,885✔
31
    }
32
    let hash = hash_key(key);
470,064✔
33
    let chunk_boundary_threshold = 10;
313,376✔
34
    if hash < chunk_boundary_threshold {
156,688✔
35
        // written with an if expression so that we can see whether the tests cover both branches
36
        true
20,805✔
37
    } else {
38
        false
135,883✔
39
    }
40
}
41

42
struct SizeTracker {
43
    element_count: usize,
44
    total_element_size: usize,
45
}
46

47
impl SizeTracker {
48
    fn new() -> Self {
33,802✔
49
        SizeTracker {
50
            element_count: 0,
51
            total_element_size: 0,
52
        }
53
    }
54

55
    pub fn add_entry<Key: Serialize, Value: Serialize>(&mut self, key: &Key, value: &Value) {
3,183,649✔
56
        let entry_serialized: Vec<u8> =
6,367,298✔
57
            postcard::to_stdvec(&(key, value)).expect("serializing entry should succeed");
12,734,596✔
58
        self.element_count += 1;
3,183,649✔
59
        self.total_element_size += entry_serialized.len();
3,183,649✔
60
    }
61

62
    pub fn size(&self) -> usize {
1,068,573✔
63
        // TODO: optimize size calculation
64
        let metadata_serialized: Vec<u8> =
2,137,146✔
65
            postcard::to_stdvec(&Metadata { is_leaf: true }).unwrap();
3,205,719✔
66
        let element_count_serialized: Vec<u8> = postcard::to_stdvec(&self.element_count).unwrap();
5,342,865✔
67
        metadata_serialized.len() + element_count_serialized.len() + self.total_element_size
3,205,719✔
68
    }
69
}
70

71
#[derive(Serialize, Deserialize, Clone, Hash)]
72
pub struct Metadata {
73
    pub is_leaf: bool,
74
}
75

76
pub async fn store_node<Key: Serialize + Ord, Value: NodeValue>(
120✔
77
    store_tree: &dyn StoreTree,
78
    node: &sorted_tree::Node<Key, Value>,
79
    metadata: &Metadata,
80
) -> Result<BlobDigest, StoreError> {
81
    let metadata_serialized =
120✔
82
        postcard::to_stdvec(metadata).expect("serializing metadata should always succeed");
480✔
83
    crate::sorted_tree::store_node(store_tree, node, &bytes::Bytes::from(metadata_serialized)).await
480✔
84
}
85

86
#[derive(Debug, PartialEq)]
87
pub enum EitherNodeType<Key: Serialize + Ord, Value: NodeValue> {
88
    Leaf(sorted_tree::Node<Key, Value>),
89
    Internal(sorted_tree::Node<Key, TreeReference>),
90
}
91

92
#[derive(Debug, PartialEq, Eq, Clone)]
93
pub enum DeserializationError {
94
    MissingTree(BlobDigest),
95
    TreeHashMismatch(BlobDigest),
96
}
97

98
impl std::fmt::Display for DeserializationError {
99
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
×
100
        write!(f, "{self:?}")
×
101
    }
102
}
103

104
impl std::error::Error for DeserializationError {}
105

106
pub async fn load_node<
69✔
107
    Key: Serialize + DeserializeOwned + PartialEq + Ord,
108
    Value: NodeValue + Clone,
109
>(
110
    load_tree: &dyn LoadTree,
111
    root: &BlobDigest,
112
) -> Result<EitherNodeType<Key, Value>, Box<dyn std::error::Error>> {
113
    let loaded = match load_tree.load_tree(root).await {
345✔
114
        Some(loaded) => loaded,
138✔
115
        None => return Err(DeserializationError::MissingTree(*root).into()),
×
116
    };
117
    let hashed = match loaded.hash() {
138✔
118
        Some(hashed) => hashed,
138✔
119
        None => return Err(DeserializationError::TreeHashMismatch(*root).into()),
×
120
    };
121
    let tree = hashed.tree();
207✔
122
    let (metadata, sorted_tree_data) =
138✔
123
        postcard::take_from_bytes::<Metadata>(tree.blob().as_slice())?;
138✔
124
    if metadata.is_leaf {
69✔
125
        let node = sorted_tree::node_from_tree::<Key, Value>(
126
            tree,
69✔
127
            tree.blob().as_slice().len() - sorted_tree_data.len(),
207✔
128
        );
129
        Ok(EitherNodeType::Leaf(node))
69✔
130
    } else {
131
        let node = sorted_tree::node_from_tree::<Key, TreeReference>(
UNCOV
132
            tree,
×
UNCOV
133
            tree.blob().as_slice().len() - sorted_tree_data.len(),
×
134
        );
UNCOV
135
        Ok(EitherNodeType::Internal(node))
×
136
    }
137
}
138

139
#[derive(Debug, Clone)]
140
pub enum EditableNode<Key: std::cmp::Ord + Clone, Value: Clone> {
141
    Reference(TreeReference),
142
    Loaded(EditableLoadedNode<Key, Value>),
143
}
144

145
impl<
146
        Key: Serialize + DeserializeOwned + PartialEq + Ord + Clone + Debug,
147
        Value: NodeValue + Clone,
148
    > Default for EditableNode<Key, Value>
149
{
150
    fn default() -> Self {
1✔
151
        Self::new()
1✔
152
    }
153
}
154

155
impl<
156
        Key: Serialize + DeserializeOwned + PartialEq + Ord + Clone + Debug,
157
        Value: NodeValue + Clone,
158
    > EditableNode<Key, Value>
159
{
160
    pub fn new() -> Self {
33✔
161
        EditableNode::Loaded(EditableLoadedNode::Leaf(EditableLeafNode {
33✔
162
            entries: BTreeMap::new(),
33✔
163
        }))
164
    }
165

166
    pub async fn require_loaded(
3,120,739✔
167
        &mut self,
168
        load_tree: &dyn LoadTree,
169
    ) -> Result<&mut EditableLoadedNode<Key, Value>, Box<dyn std::error::Error>> {
170
        match self {
3,120,739✔
171
            EditableNode::Reference(tree_ref) => {
9✔
172
                let loaded: EitherNodeType<Key, Value> =
18✔
173
                    load_node(load_tree, tree_ref.reference()).await?;
27✔
174
                *self = EditableNode::Loaded(EditableLoadedNode::new(loaded));
18✔
175
            }
176
            EditableNode::Loaded(_loaded_node) => {}
6,241,460✔
177
        };
178
        let loaded = match self {
6,241,478✔
179
            EditableNode::Loaded(loaded_node) => loaded_node,
6,241,478✔
180
            _ => unreachable!(),
181
        };
182
        Ok(loaded)
3,120,739✔
183
    }
184

185
    pub async fn insert(
4,379✔
186
        &mut self,
187
        key: Key,
188
        value: Value,
189
        load_tree: &dyn LoadTree,
190
    ) -> Result<(), Box<dyn std::error::Error>> {
191
        let (self_top_key, nodes_split) = self.insert_impl(key, value, load_tree).await?;
30,653✔
192
        if nodes_split.is_empty() {
8,758✔
193
            return Ok(());
4,373✔
194
        }
195
        let mut entries = BTreeMap::new();
12✔
196
        entries.insert(self_top_key, self.clone());
30✔
197
        for node in nodes_split {
18✔
198
            entries.insert(
18✔
199
                node.top_key().expect("Node cannot be empty here").clone(),
24✔
200
                EditableNode::Loaded(node),
6✔
201
            );
202
        }
203
        *self = EditableNode::Loaded(EditableLoadedNode::Internal(EditableInternalNode {
12✔
204
            entries,
6✔
205
        }));
206
        Ok(())
6✔
207
    }
208

209
    pub async fn insert_impl(
7,370✔
210
        &mut self,
211
        key: Key,
212
        value: Value,
213
        load_tree: &dyn LoadTree,
214
    ) -> Result<(Key, Vec<EditableLoadedNode<Key, Value>>), Box<dyn std::error::Error>> {
215
        let loaded = self.require_loaded(load_tree).await?;
29,480✔
216
        let nodes_split = Box::pin(loaded.insert(key, value, load_tree)).await?;
51,590✔
217
        Ok((
7,370✔
218
            loaded.top_key().expect("Node cannot be empty here").clone(),
29,480✔
219
            nodes_split,
7,370✔
220
        ))
221
    }
222

223
    pub async fn remove(
1,014✔
224
        &mut self,
225
        key: &Key,
226
        load_tree: &dyn LoadTree,
227
    ) -> Result<Option<Value>, Box<dyn std::error::Error>> {
228
        let (maybe_top_key, maybe_removed) = self.remove_impl(key, load_tree).await?;
6,084✔
229
        if maybe_top_key.is_none() {
2,029✔
230
            *self = EditableNode::Loaded(EditableLoadedNode::Leaf(EditableLeafNode {
1✔
231
                entries: BTreeMap::new(),
1✔
232
            }));
233
        } else {
234
            let loaded = self.require_loaded(load_tree).await?;
4,052✔
235
            if let Some(simplified) = loaded.simplify() {
1,015✔
236
                *self = simplified;
1✔
237
            }
238
        }
239
        Ok(maybe_removed)
1,014✔
240
    }
241

242
    pub async fn remove_impl(
1,859✔
243
        &mut self,
244
        key: &Key,
245
        load_tree: &dyn LoadTree,
246
    ) -> Result<(Option<Key>, Option<Value>), Box<dyn std::error::Error>> {
247
        let loaded = self.require_loaded(load_tree).await?;
7,436✔
248
        let result = loaded.remove(key, load_tree).await?;
9,295✔
249
        Ok(result)
1,859✔
250
    }
251

252
    pub async fn find(
3,062,579✔
253
        &mut self,
254
        key: &Key,
255
        load_tree: &dyn LoadTree,
256
    ) -> Result<Option<Value>, Box<dyn std::error::Error>> {
257
        let loaded = self.require_loaded(load_tree).await?;
12,250,316✔
258
        loaded.find(key, load_tree).await
12,250,316✔
259
    }
260

261
    pub async fn count(
22,892✔
262
        &mut self,
263
        load_tree: &dyn LoadTree,
264
    ) -> Result<u64, Box<dyn std::error::Error>> {
265
        let loaded = self.require_loaded(load_tree).await?;
91,568✔
266
        Box::pin(loaded.count(load_tree)).await
91,568✔
267
    }
268

269
    pub async fn save(
134✔
270
        &mut self,
271
        store_tree: &dyn StoreTree,
272
    ) -> Result<BlobDigest, Box<dyn std::error::Error>> {
273
        match self {
134✔
274
            EditableNode::Reference(tree_ref) => Ok(*tree_ref.reference()),
28✔
275
            EditableNode::Loaded(loaded_node) => loaded_node.save(store_tree).await,
480✔
276
        }
277
    }
278

279
    pub async fn load(
60✔
280
        digest: &BlobDigest,
281
        load_tree: &dyn LoadTree,
282
    ) -> Result<Self, Box<dyn std::error::Error>> {
283
        let loaded: EitherNodeType<Key, Value> = load_node(load_tree, digest).await?;
300✔
284
        Ok(EditableNode::Loaded(EditableLoadedNode::new(loaded)))
60✔
285
    }
286

287
    pub async fn verify_integrity(
5,829✔
288
        &mut self,
289
        expected_top_key: Option<&Key>,
290
        load_tree: &dyn LoadTree,
291
    ) -> Result<IntegrityCheckResult, Box<dyn std::error::Error>> {
292
        let loaded = self.require_loaded(load_tree).await?;
23,316✔
293
        if loaded.top_key() != expected_top_key {
5,829✔
294
            return Ok(IntegrityCheckResult::Corrupted(
×
295
                "Top key mismatch".to_string(),
×
296
            ));
297
        }
298
        Box::pin(loaded.verify_integrity(load_tree)).await
23,316✔
299
    }
300

301
    pub async fn merge(
178✔
302
        &mut self,
303
        other: Self,
304
        load_tree: &dyn LoadTree,
305
    ) -> Result<(Key, Vec<EditableLoadedNode<Key, Value>>), Box<dyn std::error::Error>> {
306
        let loaded = self.require_loaded(load_tree).await?;
712✔
307
        let other_loaded = match other {
356✔
308
            EditableNode::Reference(tree_ref) => {
×
309
                let loaded: EitherNodeType<Key, Value> =
×
310
                    load_node(load_tree, tree_ref.reference()).await?;
×
311
                EditableLoadedNode::new(loaded)
×
312
            }
313
            EditableNode::Loaded(loaded_node) => loaded_node,
356✔
314
        };
315
        match (loaded, other_loaded) {
356✔
316
            (EditableLoadedNode::Leaf(self_leaf), EditableLoadedNode::Leaf(other_leaf)) => {
356✔
317
                for (key, value) in other_leaf.entries {
50,476✔
318
                    self_leaf.entries.insert(key, value);
50,298✔
319
                }
320
                let split_nodes = self_leaf.check_split();
534✔
321
                Ok((
×
322
                    self_leaf
356✔
323
                        .top_key()
356✔
324
                        .expect("Leaf cannot be empty here")
178✔
325
                        .clone(),
178✔
326
                    split_nodes
178✔
327
                        .into_iter()
178✔
328
                        .map(|n| EditableLoadedNode::Leaf(n))
263✔
329
                        .collect(),
178✔
330
                ))
331
            }
332
            (
UNCOV
333
                EditableLoadedNode::Internal(self_internal),
×
UNCOV
334
                EditableLoadedNode::Internal(other_internal),
×
335
            ) => {
×
UNCOV
336
                for (key, child_node) in other_internal.entries {
×
UNCOV
337
                    let previous_entry = self_internal.entries.insert(key, child_node);
×
UNCOV
338
                    if let Some(_existing_child) = previous_entry {
×
339
                        return Err(Box::new(std::io::Error::other("Merge node key collision")));
×
340
                    }
341
                }
UNCOV
342
                let split_nodes = self_internal.check_split();
×
343
                Ok((
×
UNCOV
344
                    self_internal
×
UNCOV
345
                        .top_key()
×
UNCOV
346
                        .expect("Internal node cannot be empty here")
×
UNCOV
347
                        .clone(),
×
UNCOV
348
                    split_nodes
×
UNCOV
349
                        .into_iter()
×
UNCOV
350
                        .map(|n| EditableLoadedNode::Internal(n))
×
UNCOV
351
                        .collect(),
×
352
                ))
353
            }
354
            _ => unreachable!(),
355
        }
356
    }
357

358
    pub async fn is_naturally_split(
19,014✔
359
        &mut self,
360
        load_tree: &dyn LoadTree,
361
    ) -> Result<bool, Box<dyn std::error::Error>> {
362
        let loaded = self.require_loaded(load_tree).await?;
76,056✔
363
        Ok(loaded.is_naturally_split())
19,014✔
364
    }
365
}
366

367
#[derive(Debug, Clone)]
368
pub struct EditableLeafNode<Key, Value> {
369
    entries: BTreeMap<Key, Value>,
370
}
371

372
impl<Key: std::cmp::Ord + Clone + Serialize, Value: Clone + NodeValue>
373
    EditableLeafNode<Key, Value>
374
{
375
    pub fn create(entries: BTreeMap<Key, Value>) -> Option<Self> {
4,753✔
376
        if entries.is_empty() {
9,506✔
377
            None
1✔
378
        } else {
379
            Some(EditableLeafNode { entries })
4,752✔
380
        }
381
    }
382

383
    pub async fn insert(&mut self, key: Key, value: Value) -> Vec<EditableLeafNode<Key, Value>> {
8,758✔
384
        self.entries.insert(key, value);
17,516✔
385
        self.check_split()
8,758✔
386
    }
387

388
    pub async fn remove(
1,014✔
389
        &mut self,
390
        key: &Key,
391
    ) -> Result<(Option<Key>, Option<Value>), Box<dyn std::error::Error>> {
392
        let removed = self.entries.remove(key);
4,056✔
393
        let top_key = self.top_key().cloned();
4,056✔
394
        Ok((top_key, removed))
1,014✔
395
    }
396

397
    fn check_split(&mut self) -> Vec<EditableLeafNode<Key, Value>> {
4,557✔
398
        let mut result = Vec::new();
9,114✔
399
        let mut current_node = BTreeMap::new();
9,114✔
400
        let mut current_node_size_tracker = SizeTracker::new();
9,114✔
401
        for entry in self.entries.iter() {
500,056✔
402
            current_node_size_tracker.add_entry(entry.0, &entry.1.to_content());
1,963,768✔
403
            current_node.insert(entry.0.clone(), entry.1.clone());
2,945,652✔
404
            if is_split_after_key(entry.0, current_node_size_tracker.size()) {
1,966,031✔
405
                result.push(
6,789✔
406
                    EditableLeafNode::create(current_node)
6,789✔
407
                        .expect("Must succeed because list is not empty"),
6,789✔
408
                );
409
                current_node = BTreeMap::new();
4,526✔
410
                current_node_size_tracker = SizeTracker::new();
2,263✔
411
            }
412
        }
413
        if !current_node.is_empty() {
7,046✔
414
            result.push(
7,467✔
415
                EditableLeafNode::create(current_node)
7,467✔
416
                    .expect("Must succeed because list is not empty"),
2,489✔
417
            );
418
        }
419
        *self = result.remove(0);
13,671✔
420
        result
4,557✔
421
    }
422

423
    pub fn top_key(&self) -> Option<&Key> {
10,743✔
424
        self.entries.keys().next_back()
21,486✔
425
    }
426

427
    pub fn find(&mut self, key: &Key) -> Option<Value> {
1,549,689✔
428
        self.entries.get(key).cloned()
6,198,756✔
429
    }
430

431
    pub fn verify_integrity(&mut self) -> Result<IntegrityCheckResult, Box<dyn std::error::Error>> {
4,977✔
432
        let mut size_tracker = SizeTracker::new();
9,954✔
433
        for (index, (key, value)) in self.entries.iter().enumerate() {
1,648,842✔
434
            size_tracker.add_entry(key, &value.to_content());
2,178,548✔
435
            let is_split = is_split_after_key(key, size_tracker.size());
2,723,185✔
436
            if (index < self.entries.len() - 1) && is_split {
1,628,936✔
437
                return Ok(IntegrityCheckResult::Corrupted(format!(
×
438
                    "Leaf node integrity check failed: Key at index {} indicates split but node is not final (number of keys: {})",
×
439
                    index, self.entries.len()
×
440
                )));
441
            }
442
        }
443
        Ok(IntegrityCheckResult::Valid { depth: 0 })
4,977✔
444
    }
445

446
    pub fn is_naturally_split(&self) -> bool {
19,014✔
447
        let mut size_tracker = SizeTracker::new();
38,028✔
448
        for entry in self.entries.iter() {
4,306,208✔
449
            size_tracker.add_entry(entry.0, &entry.1.to_content());
8,536,360✔
450
        }
451
        is_split_after_key(
452
            self.entries.keys().last().expect("leaf node is not empty"),
76,056✔
453
            size_tracker.size(),
38,028✔
454
        )
455
    }
456

457
    pub fn entries(&self) -> &BTreeMap<Key, Value> {
4✔
458
        &self.entries
4✔
459
    }
460
}
461

462
#[derive(Debug, Clone)]
463
pub struct EditableInternalNode<Key: std::cmp::Ord + Clone, Value: Clone> {
464
    entries: BTreeMap<Key, EditableNode<Key, Value>>,
465
}
466

467
impl<
468
        Key: Serialize + DeserializeOwned + PartialEq + Ord + Clone + Debug,
469
        Value: NodeValue + Clone,
470
    > EditableInternalNode<Key, Value>
471
{
472
    pub fn create(entries: BTreeMap<Key, EditableNode<Key, Value>>) -> Option<Self> {
2,991✔
473
        if entries.is_empty() {
5,982✔
474
            None
×
475
        } else {
476
            Some(EditableInternalNode { entries })
2,991✔
477
        }
478
    }
479

480
    pub async fn insert(
2,991✔
481
        &mut self,
482
        key: Key,
483
        value: Value,
484
        load_tree: &dyn LoadTree,
485
    ) -> Result<Vec<EditableInternalNode<Key, Value>>, Box<dyn std::error::Error>> {
486
        let last_index = self.entries.len() - 1;
5,982✔
487
        // TODO: optimize search
488
        for (index, (entry_key, entry_value)) in self.entries.iter_mut().enumerate() {
37,257✔
489
            if (index == last_index) || (key <= *entry_key) {
17,963✔
490
                let (updated_key, split_nodes) =
5,982✔
491
                    entry_value.insert_impl(key, value, load_tree).await?;
14,955✔
492
                if updated_key != *entry_key {
2,991✔
493
                    let old_key = entry_key.clone();
885✔
494
                    let old_value = self.entries.remove(&old_key).expect("key must exist");
1,770✔
495
                    let previous_entry = self.entries.insert(updated_key, old_value);
1,475✔
496
                    assert!(previous_entry.is_none(), "Split node key collision");
497
                }
498
                for node in split_nodes {
3,095✔
499
                    let previous_entry = self.entries.insert(
312✔
500
                        node.top_key().expect("Node cannot be empty here").clone(),
416✔
501
                        EditableNode::Loaded(node),
104✔
502
                    );
503
                    assert!(previous_entry.is_none(), "Split node key collision");
504
                }
505
                self.update_chunk_boundaries(load_tree).await?;
8,973✔
506
                break;
2,991✔
507
            }
508
        }
509
        Ok(self.check_split())
2,991✔
510
    }
511

512
    pub async fn remove(
845✔
513
        &mut self,
514
        key: &Key,
515
        load_tree: &dyn LoadTree,
516
    ) -> Result<(Option<Key>, Option<Value>), Box<dyn std::error::Error>> {
517
        // TODO: optimize search
518
        for (entry_key, entry_value) in self.entries.iter_mut() {
6,680✔
519
            if key <= entry_key {
2,495✔
520
                let (maybe_new_top_key, maybe_removed) =
1,690✔
521
                    Box::pin(entry_value.remove_impl(key, load_tree)).await?;
4,225✔
522
                match maybe_new_top_key {
845✔
523
                    Some(new_top_key) => {
845✔
524
                        if new_top_key != *entry_key {
853✔
525
                            let entry_key = entry_key.clone();
32✔
526
                            let entry_value_removed = self
24✔
527
                                .entries
16✔
528
                                .remove(&entry_key)
24✔
529
                                .expect("Must exist because we just iterated over the entries");
530
                            self.entries.insert(new_top_key, entry_value_removed);
24✔
531
                        }
532
                    }
533
                    None => {
×
534
                        let entry_key = entry_key.clone();
×
535
                        self.entries.remove(&entry_key);
×
536
                    }
537
                }
538
                let top_key = self.top_key().cloned();
3,380✔
539
                match maybe_removed {
845✔
540
                    Some(removed) => {
845✔
541
                        if !self.entries.is_empty() {
845✔
542
                            self.update_chunk_boundaries(load_tree).await?;
2,535✔
543
                        }
544
                        return Ok((top_key, Some(removed)));
845✔
545
                    }
546
                    None => {
×
547
                        return Ok((top_key, None));
×
548
                    }
549
                }
550
            }
551
        }
552
        Ok((self.top_key().cloned(), None))
×
553
    }
554

555
    async fn update_chunk_boundaries(
3,836✔
556
        &mut self,
557
        load_tree: &dyn LoadTree,
558
    ) -> Result<(), Box<dyn std::error::Error>> {
559
        loop {
×
560
            let merge_candidates = self.find_merge_candidates(load_tree).await?;
16,056✔
561
            match merge_candidates {
4,014✔
562
                Some((low_key, high_key)) => {
356✔
563
                    let mut low = self.entries.remove(&low_key).expect("key must exist");
1,068✔
564
                    let high = self.entries.remove(&high_key).expect("key must exist");
1,068✔
565
                    let (low_top_key, split_nodes) = low.merge(high, load_tree).await?;
1,068✔
566
                    assert!(split_nodes.is_empty() || low.is_naturally_split(load_tree).await?);
567
                    assert_ne!(low_key, low_top_key, "Merge did not change low key");
568
                    let previous_entry = self.entries.insert(low_top_key, low);
890✔
569
                    assert!(previous_entry.is_none(), "Merge node key collision");
570
                    let split_nodes_len = split_nodes.len();
534✔
571
                    for (index, node) in split_nodes.into_iter().enumerate() {
704✔
572
                        assert!((index == split_nodes_len - 1) || node.is_naturally_split());
573
                        let previous_entry = self.entries.insert(
255✔
574
                            node.top_key().expect("Node cannot be empty here").clone(),
340✔
575
                            EditableNode::Loaded(node),
85✔
576
                        );
577
                        assert!(previous_entry.is_none(), "Merge node key collision");
578
                    }
579
                }
580
                None => break,
3,836✔
581
            }
582
        }
583
        Ok(())
3,836✔
584
    }
585

586
    async fn find_merge_candidates(
4,014✔
587
        &mut self,
588
        load_tree: &dyn LoadTree,
589
    ) -> Result<Option<(Key, Key)>, Box<dyn std::error::Error>> {
590
        let last_index = self.entries.len() - 1;
8,028✔
591
        let mut needs_merge: Option<&Key> = None;
12,042✔
592
        // TODO: optimize search
593
        for (index, (entry_key, entry_value)) in self.entries.iter_mut().enumerate() {
69,363✔
594
            if let Some(merge_value) = needs_merge.take() {
19,285✔
595
                return Ok(Some((merge_value.clone(), entry_key.clone())));
534✔
596
            }
597
            let is_split = entry_value.is_naturally_split(load_tree).await?;
75,716✔
598
            if (index != last_index) && !is_split {
34,200✔
599
                needs_merge = Some(entry_key);
178✔
600
            }
601
        }
602
        Ok(None)
3,836✔
603
    }
604

605
    fn check_split(&mut self) -> Vec<EditableInternalNode<Key, Value>> {
2,991✔
606
        let mut result = Vec::new();
5,982✔
607
        let mut current_node = BTreeMap::new();
5,982✔
608
        let mut current_node_size_tracker = SizeTracker::new();
5,982✔
609
        for entry in self.entries.iter() {
19,962✔
610
            current_node_size_tracker.add_entry(
27,960✔
611
                entry.0,
13,980✔
612
                &TreeReference::new(BlobDigest::new(&[0; 64])).to_content(),
27,960✔
613
            );
614
            current_node.insert(entry.0.clone(), entry.1.clone());
83,880✔
615
            if is_split_after_key(entry.0, current_node_size_tracker.size()) {
55,920✔
UNCOV
616
                result.push(
×
UNCOV
617
                    EditableInternalNode::create(current_node)
×
UNCOV
618
                        .expect("Must succeed because list is not empty"),
×
619
                );
UNCOV
620
                current_node = BTreeMap::new();
×
NEW
621
                current_node_size_tracker = SizeTracker::new();
×
622
            }
623
        }
624
        if !current_node.is_empty() {
5,982✔
625
            result.push(
8,973✔
626
                EditableInternalNode::create(current_node)
8,973✔
627
                    .expect("Must succeed because list is not empty"),
2,991✔
628
            );
629
        }
630
        *self = result.remove(0);
8,973✔
631
        result
2,991✔
632
    }
633

634
    pub fn top_key(&self) -> Option<&Key> {
4,688✔
635
        self.entries.keys().next_back()
9,376✔
636
    }
637

638
    pub async fn find(
1,512,890✔
639
        &mut self,
640
        key: &Key,
641
        _load_tree: &dyn LoadTree,
642
    ) -> Result<Option<Value>, Box<dyn std::error::Error>> {
643
        // TODO: optimize search
644
        for (entry_key, entry_value) in self.entries.iter_mut() {
12,730,094✔
645
            if key <= entry_key {
4,852,157✔
646
                return Box::pin(entry_value.find(key, _load_tree)).await;
7,564,375✔
647
            }
648
        }
649
        Ok(None)
15✔
650
    }
651

652
    pub async fn verify_integrity(
852✔
653
        &mut self,
654
        load_tree: &dyn LoadTree,
655
    ) -> Result<IntegrityCheckResult, Box<dyn std::error::Error>> {
656
        if self.entries.is_empty() {
1,704✔
657
            return Ok(IntegrityCheckResult::Corrupted(
×
658
                "Internal node integrity check failed: Node has no entries".to_string(),
×
659
            ));
660
        }
661
        let mut child_depth = None;
1,704✔
662
        for (index, (key, value)) in self.entries.iter_mut().enumerate() {
15,738✔
663
            match value.verify_integrity(Some(key), load_tree).await? {
17,576✔
664
                IntegrityCheckResult::Valid { depth } => {
4,394✔
665
                    if let Some(existing_depth) = child_depth {
7,936✔
666
                        if existing_depth != depth {
3,542✔
667
                            return Ok(IntegrityCheckResult::Corrupted(format!(
×
668
                                "Internal node integrity check failed at index {}: Child node depth mismatch (expected {}, found {})",
×
669
                                index, existing_depth, depth
×
670
                            )));
671
                        }
672
                    } else {
673
                        child_depth = Some(depth);
852✔
674
                    }
675
                }
676
                IntegrityCheckResult::Corrupted(reason) => {
×
677
                    return Ok(IntegrityCheckResult::Corrupted(format!(
×
678
                        "Internal node integrity check failed at index {}: {}",
×
679
                        index, reason
×
680
                    )));
681
                }
682
            }
683
        }
684
        Ok(IntegrityCheckResult::Valid {
852✔
685
            depth: child_depth.expect("Internal node has to have at least one child") + 1,
1,704✔
686
        })
687
    }
688

UNCOV
689
    pub fn is_naturally_split(&self) -> bool {
×
UNCOV
690
        let last_key = self
×
UNCOV
691
            .entries
×
692
            .keys()
693
            .last()
694
            .expect("internal node is not empty");
NEW
695
        let mut size_tracker = SizeTracker::new();
×
NEW
696
        for entry in self.entries.iter() {
×
NEW
697
            size_tracker.add_entry(
×
NEW
698
                entry.0,
×
NEW
699
                &TreeReference::new(BlobDigest::new(&[0; 64])).to_content(),
×
700
            );
701
        }
NEW
702
        is_split_after_key(last_key, size_tracker.size())
×
703
    }
704
}
705

706
#[derive(Debug, Clone)]
707
pub enum EditableLoadedNode<Key: std::cmp::Ord + Clone, Value: Clone> {
708
    Leaf(EditableLeafNode<Key, Value>),
709
    Internal(EditableInternalNode<Key, Value>),
710
}
711

712
impl<Key: Serialize + DeserializeOwned + Ord + Clone + Debug, Value: NodeValue + Clone>
713
    EditableLoadedNode<Key, Value>
714
{
715
    pub fn new(loaded: EitherNodeType<Key, Value>) -> Self {
69✔
716
        match loaded {
69✔
717
            EitherNodeType::Leaf(leaf_node) => {
69✔
718
                let mut entries = BTreeMap::new();
138✔
719
                for (key, value) in leaf_node.entries {
1,761✔
720
                    entries.insert(key, value);
1,692✔
721
                }
722
                EditableLoadedNode::Leaf(EditableLeafNode { entries })
69✔
723
            }
UNCOV
724
            EitherNodeType::Internal(internal_node) => {
×
UNCOV
725
                let mut entries = BTreeMap::new();
×
UNCOV
726
                for (key, child_node) in internal_node.entries {
×
UNCOV
727
                    entries.insert(key, EditableNode::Reference(child_node));
×
728
                }
UNCOV
729
                EditableLoadedNode::Internal(EditableInternalNode { entries })
×
730
            }
731
        }
732
    }
733

734
    pub async fn insert(
7,370✔
735
        &mut self,
736
        key: Key,
737
        value: Value,
738
        load_tree: &dyn LoadTree,
739
    ) -> Result<Vec<EditableLoadedNode<Key, Value>>, Box<dyn std::error::Error>> {
740
        match self {
7,370✔
741
            EditableLoadedNode::Leaf(leaf_node) => {
4,379✔
742
                let split_nodes = leaf_node.insert(key, value).await;
21,895✔
743
                Ok(split_nodes
4,379✔
744
                    .into_iter()
4,379✔
745
                    .map(|node| EditableLoadedNode::Leaf(node))
4,489✔
746
                    .collect())
4,379✔
747
            }
748
            EditableLoadedNode::Internal(internal_node) => {
2,991✔
749
                let split_nodes = internal_node.insert(key, value, load_tree).await?;
17,946✔
750
                Ok(split_nodes
2,991✔
751
                    .into_iter()
2,991✔
752
                    .map(|node| EditableLoadedNode::Internal(node))
2,991✔
753
                    .collect())
2,991✔
754
            }
755
        }
756
    }
757

758
    pub async fn remove(
1,859✔
759
        &mut self,
760
        key: &Key,
761
        load_tree: &dyn LoadTree,
762
    ) -> Result<(Option<Key>, Option<Value>), Box<dyn std::error::Error>> {
763
        match self {
1,859✔
764
            EditableLoadedNode::Leaf(leaf_node) => leaf_node.remove(key).await,
4,056✔
765
            EditableLoadedNode::Internal(internal_node) => {
845✔
766
                internal_node.remove(key, load_tree).await
3,380✔
767
            }
768
        }
769
    }
770

771
    pub fn simplify(&mut self) -> Option<EditableNode<Key, Value>> {
1,013✔
772
        match self {
1,013✔
773
            EditableLoadedNode::Internal(internal_node) => {
845✔
774
                if internal_node.entries.len() == 1 {
845✔
775
                    let (_, only_child) = internal_node
3✔
776
                        .entries
2✔
777
                        .iter_mut()
778
                        .next()
779
                        .expect("internal node has one entry");
780
                    Some(only_child.clone())
1✔
781
                } else {
782
                    None
844✔
783
                }
784
            }
785
            EditableLoadedNode::Leaf(_) => None,
168✔
786
        }
787
    }
788

789
    pub async fn find(
3,062,579✔
790
        &mut self,
791
        key: &Key,
792
        load_tree: &dyn LoadTree,
793
    ) -> Result<Option<Value>, Box<dyn std::error::Error>> {
794
        match self {
3,062,579✔
795
            EditableLoadedNode::Leaf(leaf_node) => Ok(leaf_node.find(key)),
4,649,067✔
796
            EditableLoadedNode::Internal(internal_node) => internal_node.find(key, load_tree).await,
7,564,450✔
797
        }
798
    }
799

800
    pub fn top_key(&self) -> Option<&Key> {
13,394✔
801
        match self {
13,394✔
802
            EditableLoadedNode::Leaf(leaf_node) => leaf_node.top_key(),
28,653✔
803
            EditableLoadedNode::Internal(internal_node) => internal_node.top_key(),
11,529✔
804
        }
805
    }
806

807
    pub async fn count(
22,892✔
808
        &mut self,
809
        load_tree: &dyn LoadTree,
810
    ) -> Result<u64, Box<dyn std::error::Error>> {
811
        match self {
22,892✔
812
            EditableLoadedNode::Leaf(leaf_node) => Ok(leaf_node.entries.len() as u64),
38,456✔
813
            EditableLoadedNode::Internal(internal_node) => {
3,664✔
814
                let mut total_count = 0;
7,328✔
815
                for child_node in internal_node.entries.values_mut() {
25,320✔
816
                    total_count += child_node.count(load_tree).await?;
53,976✔
817
                }
818
                Ok(total_count)
3,664✔
819
            }
820
        }
821
    }
822

823
    pub async fn save(
120✔
824
        &mut self,
825
        store_tree: &dyn StoreTree,
826
    ) -> Result<BlobDigest, Box<dyn std::error::Error>> {
827
        match self {
120✔
828
            EditableLoadedNode::Leaf(leaf_node) => {
109✔
829
                let mut new_node = crate::sorted_tree::Node {
830
                    entries: Vec::new(),
109✔
831
                };
832
                for (key, value) in &leaf_node.entries {
21,808✔
833
                    new_node.entries.push((key.clone(), value.clone()));
36,165✔
834
                }
835
                let digest = store_node(store_tree, &new_node, &Metadata { is_leaf: true }).await?;
545✔
836
                Ok(digest)
109✔
837
            }
838
            EditableLoadedNode::Internal(internal_node) => {
11✔
839
                let mut new_node = crate::sorted_tree::Node {
840
                    entries: Vec::new(),
11✔
841
                };
842
                for (key, child_node) in &mut internal_node.entries {
103✔
843
                    let child_digest = Box::pin(child_node.save(store_tree)).await?;
230✔
844
                    new_node
46✔
845
                        .entries
46✔
846
                        .push((key.clone(), TreeReference::new(child_digest)));
184✔
847
                }
848
                let digest =
11✔
849
                    store_node(store_tree, &new_node, &Metadata { is_leaf: false }).await?;
44✔
850
                Ok(digest)
11✔
851
            }
852
        }
853
    }
854

855
    pub async fn verify_integrity(
5,829✔
856
        &mut self,
857
        load_tree: &dyn LoadTree,
858
    ) -> Result<IntegrityCheckResult, Box<dyn std::error::Error>> {
859
        match self {
5,829✔
860
            EditableLoadedNode::Leaf(leaf_node) => leaf_node.verify_integrity(),
14,931✔
861
            EditableLoadedNode::Internal(internal_node) => {
852✔
862
                internal_node.verify_integrity(load_tree).await
2,556✔
863
            }
864
        }
865
    }
866

867
    pub fn is_naturally_split(&self) -> bool {
19,014✔
868
        match self {
19,014✔
869
            EditableLoadedNode::Leaf(leaf_node) => leaf_node.is_naturally_split(),
57,042✔
UNCOV
870
            EditableLoadedNode::Internal(internal_node) => internal_node.is_naturally_split(),
×
871
        }
872
    }
873
}
874

875
pub struct Iterator<
876
    't,
877
    Key: Serialize + DeserializeOwned + Ord + Clone + Debug,
878
    Value: NodeValue + Clone,
879
> {
880
    next: Vec<&'t mut EditableNode<Key, Value>>,
881
    leaf_iterator: Option<std::collections::btree_map::Iter<'t, Key, Value>>,
882
    load_tree: &'t dyn LoadTree,
883
}
884

885
impl<'t, Key, Value> Iterator<'t, Key, Value>
886
where
887
    Key: Serialize + DeserializeOwned + Ord + Clone + Debug,
888
    Value: NodeValue + Clone,
889
{
890
    pub fn new(node: &'t mut EditableNode<Key, Value>, load_tree: &'t dyn LoadTree) -> Self {
3✔
891
        Iterator {
892
            next: vec![node],
9✔
893
            leaf_iterator: None,
894
            load_tree,
895
        }
896
    }
897

898
    pub async fn next(&mut self) -> Result<Option<(Key, Value)>, Box<dyn std::error::Error>> {
1,012✔
899
        loop {
×
900
            if let Some(current_node) = self.leaf_iterator.as_mut() {
1,018✔
901
                match current_node.next() {
507✔
902
                    Some((key, value)) => return Ok(Some((key.clone(), value.clone()))),
2,515✔
903
                    None => {
4✔
904
                        self.leaf_iterator = None;
4✔
905
                    }
906
                }
907
            }
908
            match self.next.pop() {
8✔
909
                Some(next_node) => {
5✔
910
                    let loaded = next_node.require_loaded(self.load_tree).await?;
20✔
911
                    match loaded {
5✔
912
                        EditableLoadedNode::Leaf(leaf_node) => {
4✔
913
                            self.leaf_iterator = Some(leaf_node.entries().iter());
4✔
914
                            continue;
4✔
915
                        }
916
                        EditableLoadedNode::Internal(internal_node) => {
1✔
917
                            internal_node
1✔
918
                                .entries
1✔
919
                                .values_mut()
920
                                .rev()
921
                                .for_each(|child_node| {
3✔
922
                                    self.next.push(child_node);
6✔
923
                                });
924
                        }
925
                    };
926
                }
927
                None => {
×
928
                    return Ok(None);
3✔
929
                }
930
            }
931
        }
932
    }
933
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc