• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

TyRoXx / NonlocalityOS / 20206575089

14 Dec 2025 10:25AM UTC coverage: 80.251% (-0.6%) from 80.811%
20206575089

Pull #371

github

web-flow
Merge 4d0562e17 into 9ba501c70
Pull Request #371: Prolly tree builds much larger chunks, respecting the max tree blob size

44 of 60 new or added lines in 2 files covered. (73.33%)

32 existing lines in 2 files now uncovered.

5445 of 6785 relevant lines covered (80.25%)

34109.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.22
/sorted_tree/src/prolly_tree_editable_node.rs
1
use crate::sorted_tree::{self, NodeValue, TreeReference};
2
use astraea::{
3
    storage::{LoadTree, StoreError, StoreTree},
4
    tree::{BlobDigest, TREE_BLOB_MAX_LENGTH},
5
};
6
use serde::{de::DeserializeOwned, Deserialize, Serialize};
7
use sha3::{Digest, Sha3_512};
8
use std::collections::BTreeMap;
9
use std::fmt::Debug;
10

11
#[derive(Debug, PartialEq)]
12
pub enum IntegrityCheckResult {
13
    Valid { depth: usize },
14
    Corrupted(String),
15
}
16

17
pub fn hash_key<Key: Serialize>(key: &Key) -> u8 {
156,700✔
18
    // TODO: use a better hash function (https://docs.dolthub.com/architecture/storage-engine/prolly-tree#controlling-chunk-size)
19
    let key_serialized = postcard::to_stdvec(key).expect("serializing key should succeed");
783,500✔
20
    let mut hasher = Sha3_512::new();
313,400✔
21
    hasher.update(&key_serialized);
470,100✔
22
    let result: [u8; 64] = hasher.finalize().into();
783,500✔
23
    result[0]
156,700✔
24
}
25

26
pub fn is_split_after_key<Key: Serialize>(key: &Key, chunk_size_in_bytes: usize) -> bool {
1,068,613✔
27
    if chunk_size_in_bytes < 1000 {
1,068,613✔
28
        // No point in splitting small chunks.
29
        // TODO: use Tree efficiently
30
        return false;
911,897✔
31
    }
32
    if chunk_size_in_bytes >= TREE_BLOB_MAX_LENGTH / 2 {
156,716✔
33
        // TODO: try to pack more elements in a chunk before splitting
34
        return true;
16✔
35
    }
36
    let hash = hash_key(key);
470,100✔
37
    let chunk_boundary_threshold = 10;
313,400✔
38
    if hash < chunk_boundary_threshold {
156,700✔
39
        // written with an if expression so that we can see whether the tests cover both branches
40
        true
20,805✔
41
    } else {
42
        false
135,895✔
43
    }
44
}
45

46
pub struct SizeTracker {
47
    element_count: usize,
48
    total_element_size: usize,
49
}
50

51
impl Default for SizeTracker {
NEW
52
    fn default() -> Self {
×
NEW
53
        Self::new()
×
54
    }
55
}
56

57
impl SizeTracker {
58
    pub fn new() -> Self {
33,832✔
59
        SizeTracker {
60
            element_count: 0,
61
            total_element_size: 0,
62
        }
63
    }
64

65
    pub fn add_entry<Key: Serialize, Value: Serialize>(&mut self, key: &Key, value: &Value) {
3,183,700✔
66
        let entry_serialized: Vec<u8> =
6,367,400✔
67
            postcard::to_stdvec(&(key, value)).expect("serializing entry should succeed");
12,734,800✔
68
        self.element_count += 1;
3,183,700✔
69
        self.total_element_size += entry_serialized.len();
3,183,700✔
70
    }
71

72
    pub fn size(&self) -> usize {
1,068,614✔
73
        // TODO: optimize size calculation
74
        let metadata_serialized: Vec<u8> =
2,137,228✔
75
            postcard::to_stdvec(&Metadata { is_leaf: true }).unwrap();
3,205,842✔
76
        let element_count_serialized: Vec<u8> = postcard::to_stdvec(&self.element_count).unwrap();
5,343,070✔
77
        metadata_serialized.len() + element_count_serialized.len() + self.total_element_size
3,205,842✔
78
    }
79
}
80

81
#[derive(Serialize, Deserialize, Clone, Hash)]
82
pub struct Metadata {
83
    pub is_leaf: bool,
84
}
85

86
pub async fn store_node<Key: Serialize + Ord, Value: NodeValue>(
142✔
87
    store_tree: &dyn StoreTree,
88
    node: &sorted_tree::Node<Key, Value>,
89
    metadata: &Metadata,
90
) -> Result<BlobDigest, StoreError> {
91
    let metadata_serialized =
142✔
92
        postcard::to_stdvec(metadata).expect("serializing metadata should always succeed");
568✔
93
    crate::sorted_tree::store_node(store_tree, node, &bytes::Bytes::from(metadata_serialized)).await
568✔
94
}
95

96
#[derive(Debug, PartialEq)]
97
pub enum EitherNodeType<Key: Serialize + Ord, Value: NodeValue> {
98
    Leaf(sorted_tree::Node<Key, Value>),
99
    Internal(sorted_tree::Node<Key, TreeReference>),
100
}
101

102
#[derive(Debug, PartialEq, Eq, Clone)]
103
pub enum DeserializationError {
104
    MissingTree(BlobDigest),
105
    TreeHashMismatch(BlobDigest),
106
}
107

108
impl std::fmt::Display for DeserializationError {
109
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
×
110
        write!(f, "{self:?}")
×
111
    }
112
}
113

114
impl std::error::Error for DeserializationError {}
115

116
pub async fn load_node<
69✔
117
    Key: Serialize + DeserializeOwned + PartialEq + Ord,
118
    Value: NodeValue + Clone,
119
>(
120
    load_tree: &dyn LoadTree,
121
    root: &BlobDigest,
122
) -> Result<EitherNodeType<Key, Value>, Box<dyn std::error::Error>> {
123
    let loaded = match load_tree.load_tree(root).await {
345✔
124
        Some(loaded) => loaded,
138✔
125
        None => return Err(DeserializationError::MissingTree(*root).into()),
×
126
    };
127
    let hashed = match loaded.hash() {
138✔
128
        Some(hashed) => hashed,
138✔
129
        None => return Err(DeserializationError::TreeHashMismatch(*root).into()),
×
130
    };
131
    let tree = hashed.tree();
207✔
132
    let (metadata, sorted_tree_data) =
138✔
133
        postcard::take_from_bytes::<Metadata>(tree.blob().as_slice())?;
138✔
134
    if metadata.is_leaf {
69✔
135
        let node = sorted_tree::node_from_tree::<Key, Value>(
136
            tree,
69✔
137
            tree.blob().as_slice().len() - sorted_tree_data.len(),
207✔
138
        );
139
        Ok(EitherNodeType::Leaf(node))
69✔
140
    } else {
141
        let node = sorted_tree::node_from_tree::<Key, TreeReference>(
UNCOV
142
            tree,
×
UNCOV
143
            tree.blob().as_slice().len() - sorted_tree_data.len(),
×
144
        );
UNCOV
145
        Ok(EitherNodeType::Internal(node))
×
146
    }
147
}
148

149
#[derive(Debug, Clone)]
150
pub enum EditableNode<Key: std::cmp::Ord + Clone, Value: Clone> {
151
    Reference(TreeReference),
152
    Loaded(EditableLoadedNode<Key, Value>),
153
}
154

155
impl<
156
        Key: Serialize + DeserializeOwned + PartialEq + Ord + Clone + Debug,
157
        Value: NodeValue + Clone,
158
    > Default for EditableNode<Key, Value>
159
{
160
    fn default() -> Self {
1✔
161
        Self::new()
1✔
162
    }
163
}
164

165
impl<
166
        Key: Serialize + DeserializeOwned + PartialEq + Ord + Clone + Debug,
167
        Value: NodeValue + Clone,
168
    > EditableNode<Key, Value>
169
{
170
    pub fn new() -> Self {
34✔
171
        EditableNode::Loaded(EditableLoadedNode::Leaf(EditableLeafNode {
34✔
172
            entries: BTreeMap::new(),
34✔
173
        }))
174
    }
175

176
    pub async fn require_loaded(
3,120,762✔
177
        &mut self,
178
        load_tree: &dyn LoadTree,
179
    ) -> Result<&mut EditableLoadedNode<Key, Value>, Box<dyn std::error::Error>> {
180
        match self {
3,120,762✔
181
            EditableNode::Reference(tree_ref) => {
9✔
182
                let loaded: EitherNodeType<Key, Value> =
18✔
183
                    load_node(load_tree, tree_ref.reference()).await?;
27✔
184
                *self = EditableNode::Loaded(EditableLoadedNode::new(loaded));
18✔
185
            }
186
            EditableNode::Loaded(_loaded_node) => {}
6,241,506✔
187
        };
188
        let loaded = match self {
6,241,524✔
189
            EditableNode::Loaded(loaded_node) => loaded_node,
6,241,524✔
190
            _ => unreachable!(),
191
        };
192
        Ok(loaded)
3,120,762✔
193
    }
194

195
    pub async fn insert(
4,386✔
196
        &mut self,
197
        key: Key,
198
        value: Value,
199
        load_tree: &dyn LoadTree,
200
    ) -> Result<(), Box<dyn std::error::Error>> {
201
        let (self_top_key, nodes_split) = self.insert_impl(key, value, load_tree).await?;
30,702✔
202
        if nodes_split.is_empty() {
8,772✔
203
            return Ok(());
4,379✔
204
        }
205
        let mut entries = BTreeMap::new();
14✔
206
        entries.insert(self_top_key, self.clone());
35✔
207
        for node in nodes_split {
21✔
208
            entries.insert(
21✔
209
                node.top_key().expect("Node cannot be empty here").clone(),
28✔
210
                EditableNode::Loaded(node),
7✔
211
            );
212
        }
213
        *self = EditableNode::Loaded(EditableLoadedNode::Internal(EditableInternalNode {
14✔
214
            entries,
7✔
215
        }));
216
        Ok(())
7✔
217
    }
218

219
    pub async fn insert_impl(
7,381✔
220
        &mut self,
221
        key: Key,
222
        value: Value,
223
        load_tree: &dyn LoadTree,
224
    ) -> Result<(Key, Vec<EditableLoadedNode<Key, Value>>), Box<dyn std::error::Error>> {
225
        let loaded = self.require_loaded(load_tree).await?;
29,524✔
226
        let nodes_split = Box::pin(loaded.insert(key, value, load_tree)).await?;
51,667✔
227
        Ok((
7,381✔
228
            loaded.top_key().expect("Node cannot be empty here").clone(),
29,524✔
229
            nodes_split,
7,381✔
230
        ))
231
    }
232

233
    pub async fn remove(
1,014✔
234
        &mut self,
235
        key: &Key,
236
        load_tree: &dyn LoadTree,
237
    ) -> Result<Option<Value>, Box<dyn std::error::Error>> {
238
        let (maybe_top_key, maybe_removed) = self.remove_impl(key, load_tree).await?;
6,084✔
239
        if maybe_top_key.is_none() {
2,029✔
240
            *self = EditableNode::Loaded(EditableLoadedNode::Leaf(EditableLeafNode {
1✔
241
                entries: BTreeMap::new(),
1✔
242
            }));
243
        } else {
244
            let loaded = self.require_loaded(load_tree).await?;
4,052✔
245
            if let Some(simplified) = loaded.simplify() {
1,015✔
246
                *self = simplified;
1✔
247
            }
248
        }
249
        Ok(maybe_removed)
1,014✔
250
    }
251

252
    pub async fn remove_impl(
1,859✔
253
        &mut self,
254
        key: &Key,
255
        load_tree: &dyn LoadTree,
256
    ) -> Result<(Option<Key>, Option<Value>), Box<dyn std::error::Error>> {
257
        let loaded = self.require_loaded(load_tree).await?;
7,436✔
258
        let result = loaded.remove(key, load_tree).await?;
9,295✔
259
        Ok(result)
1,859✔
260
    }
261

262
    pub async fn find(
3,062,579✔
263
        &mut self,
264
        key: &Key,
265
        load_tree: &dyn LoadTree,
266
    ) -> Result<Option<Value>, Box<dyn std::error::Error>> {
267
        let loaded = self.require_loaded(load_tree).await?;
12,250,316✔
268
        loaded.find(key, load_tree).await
12,250,316✔
269
    }
270

271
    pub async fn count(
22,892✔
272
        &mut self,
273
        load_tree: &dyn LoadTree,
274
    ) -> Result<u64, Box<dyn std::error::Error>> {
275
        let loaded = self.require_loaded(load_tree).await?;
91,568✔
276
        Box::pin(loaded.count(load_tree)).await
91,568✔
277
    }
278

279
    pub async fn save(
155✔
280
        &mut self,
281
        store_tree: &dyn StoreTree,
282
    ) -> Result<BlobDigest, Box<dyn std::error::Error>> {
283
        match self {
155✔
284
            EditableNode::Reference(tree_ref) => Ok(*tree_ref.reference()),
28✔
285
            EditableNode::Loaded(loaded_node) => loaded_node.save(store_tree).await,
564✔
286
        }
287
    }
288

289
    pub async fn load(
60✔
290
        digest: &BlobDigest,
291
        load_tree: &dyn LoadTree,
292
    ) -> Result<Self, Box<dyn std::error::Error>> {
293
        let loaded: EitherNodeType<Key, Value> = load_node(load_tree, digest).await?;
300✔
294
        Ok(EditableNode::Loaded(EditableLoadedNode::new(loaded)))
60✔
295
    }
296

297
    pub async fn verify_integrity(
5,829✔
298
        &mut self,
299
        expected_top_key: Option<&Key>,
300
        load_tree: &dyn LoadTree,
301
    ) -> Result<IntegrityCheckResult, Box<dyn std::error::Error>> {
302
        let loaded = self.require_loaded(load_tree).await?;
23,316✔
303
        if loaded.top_key() != expected_top_key {
5,829✔
304
            return Ok(IntegrityCheckResult::Corrupted(
×
305
                "Top key mismatch".to_string(),
×
306
            ));
307
        }
308
        Box::pin(loaded.verify_integrity(load_tree)).await
23,316✔
309
    }
310

311
    pub async fn merge(
178✔
312
        &mut self,
313
        other: Self,
314
        load_tree: &dyn LoadTree,
315
    ) -> Result<(Key, Vec<EditableLoadedNode<Key, Value>>), Box<dyn std::error::Error>> {
316
        let loaded = self.require_loaded(load_tree).await?;
712✔
317
        let other_loaded = match other {
356✔
318
            EditableNode::Reference(tree_ref) => {
×
319
                let loaded: EitherNodeType<Key, Value> =
×
320
                    load_node(load_tree, tree_ref.reference()).await?;
×
321
                EditableLoadedNode::new(loaded)
×
322
            }
323
            EditableNode::Loaded(loaded_node) => loaded_node,
356✔
324
        };
325
        match (loaded, other_loaded) {
356✔
326
            (EditableLoadedNode::Leaf(self_leaf), EditableLoadedNode::Leaf(other_leaf)) => {
356✔
327
                for (key, value) in other_leaf.entries {
50,476✔
328
                    self_leaf.entries.insert(key, value);
50,298✔
329
                }
330
                let split_nodes = self_leaf.check_split();
534✔
331
                Ok((
×
332
                    self_leaf
356✔
333
                        .top_key()
356✔
334
                        .expect("Leaf cannot be empty here")
178✔
335
                        .clone(),
178✔
336
                    split_nodes
178✔
337
                        .into_iter()
178✔
338
                        .map(|n| EditableLoadedNode::Leaf(n))
263✔
339
                        .collect(),
178✔
340
                ))
341
            }
342
            (
UNCOV
343
                EditableLoadedNode::Internal(self_internal),
×
UNCOV
344
                EditableLoadedNode::Internal(other_internal),
×
345
            ) => {
×
UNCOV
346
                for (key, child_node) in other_internal.entries {
×
UNCOV
347
                    let previous_entry = self_internal.entries.insert(key, child_node);
×
UNCOV
348
                    if let Some(_existing_child) = previous_entry {
×
349
                        return Err(Box::new(std::io::Error::other("Merge node key collision")));
×
350
                    }
351
                }
UNCOV
352
                let split_nodes = self_internal.check_split();
×
353
                Ok((
×
UNCOV
354
                    self_internal
×
UNCOV
355
                        .top_key()
×
UNCOV
356
                        .expect("Internal node cannot be empty here")
×
UNCOV
357
                        .clone(),
×
UNCOV
358
                    split_nodes
×
UNCOV
359
                        .into_iter()
×
UNCOV
360
                        .map(|n| EditableLoadedNode::Internal(n))
×
UNCOV
361
                        .collect(),
×
362
                ))
363
            }
364
            _ => unreachable!(),
365
        }
366
    }
367

368
    pub async fn is_naturally_split(
19,026✔
369
        &mut self,
370
        load_tree: &dyn LoadTree,
371
    ) -> Result<bool, Box<dyn std::error::Error>> {
372
        let loaded = self.require_loaded(load_tree).await?;
76,104✔
373
        Ok(loaded.is_naturally_split())
19,026✔
374
    }
375
}
376

377
#[derive(Debug, Clone)]
378
pub struct EditableLeafNode<Key, Value> {
379
    entries: BTreeMap<Key, Value>,
380
}
381

382
impl<Key: std::cmp::Ord + Clone + Serialize, Value: Clone + NodeValue>
383
    EditableLeafNode<Key, Value>
384
{
385
    pub fn create(entries: BTreeMap<Key, Value>) -> Option<Self> {
4,765✔
386
        if entries.is_empty() {
9,530✔
387
            None
2✔
388
        } else {
389
            Some(EditableLeafNode { entries })
4,763✔
390
        }
391
    }
392

393
    pub async fn insert(&mut self, key: Key, value: Value) -> Vec<EditableLeafNode<Key, Value>> {
8,772✔
394
        self.entries.insert(key, value);
17,544✔
395
        self.check_split()
8,772✔
396
    }
397

398
    pub async fn remove(
1,014✔
399
        &mut self,
400
        key: &Key,
401
    ) -> Result<(Option<Key>, Option<Value>), Box<dyn std::error::Error>> {
402
        let removed = self.entries.remove(key);
4,056✔
403
        let top_key = self.top_key().cloned();
4,056✔
404
        Ok((top_key, removed))
1,014✔
405
    }
406

407
    fn check_split(&mut self) -> Vec<EditableLeafNode<Key, Value>> {
4,564✔
408
        let mut result = Vec::new();
9,128✔
409
        let mut current_node = BTreeMap::new();
9,128✔
410
        let mut current_node_size_tracker = SizeTracker::new();
9,128✔
411
        for entry in self.entries.iter() {
500,086✔
412
            current_node_size_tracker.add_entry(entry.0, &entry.1.to_content());
1,963,832✔
413
            current_node.insert(entry.0.clone(), entry.1.clone());
2,945,748✔
414
            if is_split_after_key(entry.0, current_node_size_tracker.size()) {
1,966,101✔
415
                result.push(
6,807✔
416
                    EditableLeafNode::create(current_node)
6,807✔
417
                        .expect("Must succeed because list is not empty"),
6,807✔
418
                );
419
                current_node = BTreeMap::new();
4,538✔
420
                current_node_size_tracker = SizeTracker::new();
2,269✔
421
            }
422
        }
423
        if !current_node.is_empty() {
7,057✔
424
            result.push(
7,479✔
425
                EditableLeafNode::create(current_node)
7,479✔
426
                    .expect("Must succeed because list is not empty"),
2,493✔
427
            );
428
        }
429
        *self = result.remove(0);
13,692✔
430
        result
4,564✔
431
    }
432

433
    pub fn top_key(&self) -> Option<&Key> {
10,753✔
434
        self.entries.keys().next_back()
21,506✔
435
    }
436

437
    pub fn find(&mut self, key: &Key) -> Option<Value> {
1,549,689✔
438
        self.entries.get(key).cloned()
6,198,756✔
439
    }
440

441
    pub fn verify_integrity(&mut self) -> Result<IntegrityCheckResult, Box<dyn std::error::Error>> {
4,977✔
442
        let mut size_tracker = SizeTracker::new();
9,954✔
443
        for (index, (key, value)) in self.entries.iter().enumerate() {
1,648,842✔
444
            size_tracker.add_entry(key, &value.to_content());
2,178,548✔
445
            let is_split = is_split_after_key(key, size_tracker.size());
2,723,185✔
446
            if (index < self.entries.len() - 1) && is_split {
1,628,936✔
447
                return Ok(IntegrityCheckResult::Corrupted(format!(
×
448
                    "Leaf node integrity check failed: Key at index {} indicates split but node is not final (number of keys: {})",
×
449
                    index, self.entries.len()
×
450
                )));
451
            }
452
        }
453
        Ok(IntegrityCheckResult::Valid { depth: 0 })
4,977✔
454
    }
455

456
    pub fn is_naturally_split(&self) -> bool {
19,026✔
457
        let mut size_tracker = SizeTracker::new();
38,052✔
458
        for entry in self.entries.iter() {
4,306,276✔
459
            size_tracker.add_entry(entry.0, &entry.1.to_content());
8,536,448✔
460
        }
461
        is_split_after_key(
462
            self.entries.keys().last().expect("leaf node is not empty"),
76,104✔
463
            size_tracker.size(),
38,052✔
464
        )
465
    }
466

467
    pub fn entries(&self) -> &BTreeMap<Key, Value> {
4✔
468
        &self.entries
4✔
469
    }
470
}
471

472
#[derive(Debug, Clone)]
473
pub struct EditableInternalNode<Key: std::cmp::Ord + Clone, Value: Clone> {
474
    entries: BTreeMap<Key, EditableNode<Key, Value>>,
475
}
476

477
impl<
478
        Key: Serialize + DeserializeOwned + PartialEq + Ord + Clone + Debug,
479
        Value: NodeValue + Clone,
480
    > EditableInternalNode<Key, Value>
481
{
482
    pub fn create(entries: BTreeMap<Key, EditableNode<Key, Value>>) -> Option<Self> {
2,995✔
483
        if entries.is_empty() {
5,990✔
484
            None
×
485
        } else {
486
            Some(EditableInternalNode { entries })
2,995✔
487
        }
488
    }
489

490
    pub async fn insert(
2,995✔
491
        &mut self,
492
        key: Key,
493
        value: Value,
494
        load_tree: &dyn LoadTree,
495
    ) -> Result<Vec<EditableInternalNode<Key, Value>>, Box<dyn std::error::Error>> {
496
        let last_index = self.entries.len() - 1;
5,990✔
497
        // TODO: optimize search
498
        for (index, (entry_key, entry_value)) in self.entries.iter_mut().enumerate() {
37,299✔
499
            if (index == last_index) || (key <= *entry_key) {
17,979✔
500
                let (updated_key, split_nodes) =
5,990✔
501
                    entry_value.insert_impl(key, value, load_tree).await?;
14,975✔
502
                if updated_key != *entry_key {
2,995✔
503
                    let old_key = entry_key.clone();
891✔
504
                    let old_value = self.entries.remove(&old_key).expect("key must exist");
1,782✔
505
                    let previous_entry = self.entries.insert(updated_key, old_value);
1,485✔
506
                    assert!(previous_entry.is_none(), "Split node key collision");
507
                }
508
                for node in split_nodes {
3,101✔
509
                    let previous_entry = self.entries.insert(
318✔
510
                        node.top_key().expect("Node cannot be empty here").clone(),
424✔
511
                        EditableNode::Loaded(node),
106✔
512
                    );
513
                    assert!(previous_entry.is_none(), "Split node key collision");
514
                }
515
                self.update_chunk_boundaries(load_tree).await?;
8,985✔
516
                break;
2,995✔
517
            }
518
        }
519
        Ok(self.check_split())
2,995✔
520
    }
521

522
    pub async fn remove(
845✔
523
        &mut self,
524
        key: &Key,
525
        load_tree: &dyn LoadTree,
526
    ) -> Result<(Option<Key>, Option<Value>), Box<dyn std::error::Error>> {
527
        // TODO: optimize search
528
        for (entry_key, entry_value) in self.entries.iter_mut() {
6,680✔
529
            if key <= entry_key {
2,495✔
530
                let (maybe_new_top_key, maybe_removed) =
1,690✔
531
                    Box::pin(entry_value.remove_impl(key, load_tree)).await?;
4,225✔
532
                match maybe_new_top_key {
845✔
533
                    Some(new_top_key) => {
845✔
534
                        if new_top_key != *entry_key {
853✔
535
                            let entry_key = entry_key.clone();
32✔
536
                            let entry_value_removed = self
24✔
537
                                .entries
16✔
538
                                .remove(&entry_key)
24✔
539
                                .expect("Must exist because we just iterated over the entries");
540
                            self.entries.insert(new_top_key, entry_value_removed);
24✔
541
                        }
542
                    }
543
                    None => {
×
544
                        let entry_key = entry_key.clone();
×
545
                        self.entries.remove(&entry_key);
×
546
                    }
547
                }
548
                let top_key = self.top_key().cloned();
3,380✔
549
                match maybe_removed {
845✔
550
                    Some(removed) => {
845✔
551
                        if !self.entries.is_empty() {
845✔
552
                            self.update_chunk_boundaries(load_tree).await?;
2,535✔
553
                        }
554
                        return Ok((top_key, Some(removed)));
845✔
555
                    }
556
                    None => {
×
557
                        return Ok((top_key, None));
×
558
                    }
559
                }
560
            }
561
        }
562
        Ok((self.top_key().cloned(), None))
×
563
    }
564

565
    async fn update_chunk_boundaries(
3,840✔
566
        &mut self,
567
        load_tree: &dyn LoadTree,
568
    ) -> Result<(), Box<dyn std::error::Error>> {
569
        loop {
×
570
            let merge_candidates = self.find_merge_candidates(load_tree).await?;
16,072✔
571
            match merge_candidates {
4,018✔
572
                Some((low_key, high_key)) => {
356✔
573
                    let mut low = self.entries.remove(&low_key).expect("key must exist");
1,068✔
574
                    let high = self.entries.remove(&high_key).expect("key must exist");
1,068✔
575
                    let (low_top_key, split_nodes) = low.merge(high, load_tree).await?;
1,068✔
576
                    assert!(split_nodes.is_empty() || low.is_naturally_split(load_tree).await?);
577
                    assert_ne!(low_key, low_top_key, "Merge did not change low key");
578
                    let previous_entry = self.entries.insert(low_top_key, low);
890✔
579
                    assert!(previous_entry.is_none(), "Merge node key collision");
580
                    let split_nodes_len = split_nodes.len();
534✔
581
                    for (index, node) in split_nodes.into_iter().enumerate() {
704✔
582
                        assert!((index == split_nodes_len - 1) || node.is_naturally_split());
583
                        let previous_entry = self.entries.insert(
255✔
584
                            node.top_key().expect("Node cannot be empty here").clone(),
340✔
585
                            EditableNode::Loaded(node),
85✔
586
                        );
587
                        assert!(previous_entry.is_none(), "Merge node key collision");
588
                    }
589
                }
590
                None => break,
3,840✔
591
            }
592
        }
593
        Ok(())
3,840✔
594
    }
595

596
    async fn find_merge_candidates(
4,018✔
597
        &mut self,
598
        load_tree: &dyn LoadTree,
599
    ) -> Result<Option<(Key, Key)>, Box<dyn std::error::Error>> {
600
        let last_index = self.entries.len() - 1;
8,036✔
601
        let mut needs_merge: Option<&Key> = None;
12,054✔
602
        // TODO: optimize search
603
        for (index, (entry_key, entry_value)) in self.entries.iter_mut().enumerate() {
69,411✔
604
            if let Some(merge_value) = needs_merge.take() {
19,297✔
605
                return Ok(Some((merge_value.clone(), entry_key.clone())));
534✔
606
            }
607
            let is_split = entry_value.is_naturally_split(load_tree).await?;
75,764✔
608
            if (index != last_index) && !is_split {
34,220✔
609
                needs_merge = Some(entry_key);
178✔
610
            }
611
        }
612
        Ok(None)
3,840✔
613
    }
614

615
    fn check_split(&mut self) -> Vec<EditableInternalNode<Key, Value>> {
2,995✔
616
        let mut result = Vec::new();
5,990✔
617
        let mut current_node = BTreeMap::new();
5,990✔
618
        let mut current_node_size_tracker = SizeTracker::new();
5,990✔
619
        for entry in self.entries.iter() {
19,982✔
620
            current_node_size_tracker.add_entry(
27,984✔
621
                entry.0,
13,992✔
622
                &TreeReference::new(BlobDigest::new(&[0; 64])).to_content(),
27,984✔
623
            );
624
            current_node.insert(entry.0.clone(), entry.1.clone());
83,952✔
625
            if is_split_after_key(entry.0, current_node_size_tracker.size()) {
55,968✔
UNCOV
626
                result.push(
×
UNCOV
627
                    EditableInternalNode::create(current_node)
×
UNCOV
628
                        .expect("Must succeed because list is not empty"),
×
629
                );
UNCOV
630
                current_node = BTreeMap::new();
×
NEW
631
                current_node_size_tracker = SizeTracker::new();
×
632
            }
633
        }
634
        if !current_node.is_empty() {
5,990✔
635
            result.push(
8,985✔
636
                EditableInternalNode::create(current_node)
8,985✔
637
                    .expect("Must succeed because list is not empty"),
2,995✔
638
            );
639
        }
640
        *self = result.remove(0);
8,985✔
641
        result
2,995✔
642
    }
643

644
    pub fn top_key(&self) -> Option<&Key> {
4,692✔
645
        self.entries.keys().next_back()
9,384✔
646
    }
647

648
    pub async fn find(
1,512,890✔
649
        &mut self,
650
        key: &Key,
651
        _load_tree: &dyn LoadTree,
652
    ) -> Result<Option<Value>, Box<dyn std::error::Error>> {
653
        // TODO: optimize search
654
        for (entry_key, entry_value) in self.entries.iter_mut() {
12,730,094✔
655
            if key <= entry_key {
4,852,157✔
656
                return Box::pin(entry_value.find(key, _load_tree)).await;
7,564,375✔
657
            }
658
        }
659
        Ok(None)
15✔
660
    }
661

662
    pub async fn verify_integrity(
852✔
663
        &mut self,
664
        load_tree: &dyn LoadTree,
665
    ) -> Result<IntegrityCheckResult, Box<dyn std::error::Error>> {
666
        if self.entries.is_empty() {
1,704✔
667
            return Ok(IntegrityCheckResult::Corrupted(
×
668
                "Internal node integrity check failed: Node has no entries".to_string(),
×
669
            ));
670
        }
671
        let mut child_depth = None;
1,704✔
672
        for (index, (key, value)) in self.entries.iter_mut().enumerate() {
15,738✔
673
            match value.verify_integrity(Some(key), load_tree).await? {
17,576✔
674
                IntegrityCheckResult::Valid { depth } => {
4,394✔
675
                    if let Some(existing_depth) = child_depth {
7,936✔
676
                        if existing_depth != depth {
3,542✔
677
                            return Ok(IntegrityCheckResult::Corrupted(format!(
×
678
                                "Internal node integrity check failed at index {}: Child node depth mismatch (expected {}, found {})",
×
679
                                index, existing_depth, depth
×
680
                            )));
681
                        }
682
                    } else {
683
                        child_depth = Some(depth);
852✔
684
                    }
685
                }
686
                IntegrityCheckResult::Corrupted(reason) => {
×
687
                    return Ok(IntegrityCheckResult::Corrupted(format!(
×
688
                        "Internal node integrity check failed at index {}: {}",
×
689
                        index, reason
×
690
                    )));
691
                }
692
            }
693
        }
694
        Ok(IntegrityCheckResult::Valid {
852✔
695
            depth: child_depth.expect("Internal node has to have at least one child") + 1,
1,704✔
696
        })
697
    }
698

UNCOV
699
    pub fn is_naturally_split(&self) -> bool {
×
UNCOV
700
        let last_key = self
×
UNCOV
701
            .entries
×
702
            .keys()
703
            .last()
704
            .expect("internal node is not empty");
NEW
705
        let mut size_tracker = SizeTracker::new();
×
NEW
706
        for entry in self.entries.iter() {
×
NEW
707
            size_tracker.add_entry(
×
NEW
708
                entry.0,
×
NEW
709
                &TreeReference::new(BlobDigest::new(&[0; 64])).to_content(),
×
710
            );
711
        }
NEW
712
        is_split_after_key(last_key, size_tracker.size())
×
713
    }
714
}
715

716
#[derive(Debug, Clone)]
717
pub enum EditableLoadedNode<Key: std::cmp::Ord + Clone, Value: Clone> {
718
    Leaf(EditableLeafNode<Key, Value>),
719
    Internal(EditableInternalNode<Key, Value>),
720
}
721

722
impl<Key: Serialize + DeserializeOwned + Ord + Clone + Debug, Value: NodeValue + Clone>
723
    EditableLoadedNode<Key, Value>
724
{
725
    pub fn new(loaded: EitherNodeType<Key, Value>) -> Self {
69✔
726
        match loaded {
69✔
727
            EitherNodeType::Leaf(leaf_node) => {
69✔
728
                let mut entries = BTreeMap::new();
138✔
729
                for (key, value) in leaf_node.entries {
1,761✔
730
                    entries.insert(key, value);
1,692✔
731
                }
732
                EditableLoadedNode::Leaf(EditableLeafNode { entries })
69✔
733
            }
UNCOV
734
            EitherNodeType::Internal(internal_node) => {
×
UNCOV
735
                let mut entries = BTreeMap::new();
×
UNCOV
736
                for (key, child_node) in internal_node.entries {
×
UNCOV
737
                    entries.insert(key, EditableNode::Reference(child_node));
×
738
                }
UNCOV
739
                EditableLoadedNode::Internal(EditableInternalNode { entries })
×
740
            }
741
        }
742
    }
743

744
    pub async fn insert(
7,381✔
745
        &mut self,
746
        key: Key,
747
        value: Value,
748
        load_tree: &dyn LoadTree,
749
    ) -> Result<Vec<EditableLoadedNode<Key, Value>>, Box<dyn std::error::Error>> {
750
        match self {
7,381✔
751
            EditableLoadedNode::Leaf(leaf_node) => {
4,386✔
752
                let split_nodes = leaf_node.insert(key, value).await;
21,930✔
753
                Ok(split_nodes
4,386✔
754
                    .into_iter()
4,386✔
755
                    .map(|node| EditableLoadedNode::Leaf(node))
4,499✔
756
                    .collect())
4,386✔
757
            }
758
            EditableLoadedNode::Internal(internal_node) => {
2,995✔
759
                let split_nodes = internal_node.insert(key, value, load_tree).await?;
17,970✔
760
                Ok(split_nodes
2,995✔
761
                    .into_iter()
2,995✔
762
                    .map(|node| EditableLoadedNode::Internal(node))
2,995✔
763
                    .collect())
2,995✔
764
            }
765
        }
766
    }
767

768
    pub async fn remove(
1,859✔
769
        &mut self,
770
        key: &Key,
771
        load_tree: &dyn LoadTree,
772
    ) -> Result<(Option<Key>, Option<Value>), Box<dyn std::error::Error>> {
773
        match self {
1,859✔
774
            EditableLoadedNode::Leaf(leaf_node) => leaf_node.remove(key).await,
4,056✔
775
            EditableLoadedNode::Internal(internal_node) => {
845✔
776
                internal_node.remove(key, load_tree).await
3,380✔
777
            }
778
        }
779
    }
780

781
    pub fn simplify(&mut self) -> Option<EditableNode<Key, Value>> {
1,013✔
782
        match self {
1,013✔
783
            EditableLoadedNode::Internal(internal_node) => {
845✔
784
                if internal_node.entries.len() == 1 {
845✔
785
                    let (_, only_child) = internal_node
3✔
786
                        .entries
2✔
787
                        .iter_mut()
788
                        .next()
789
                        .expect("internal node has one entry");
790
                    Some(only_child.clone())
1✔
791
                } else {
792
                    None
844✔
793
                }
794
            }
795
            EditableLoadedNode::Leaf(_) => None,
168✔
796
        }
797
    }
798

799
    pub async fn find(
3,062,579✔
800
        &mut self,
801
        key: &Key,
802
        load_tree: &dyn LoadTree,
803
    ) -> Result<Option<Value>, Box<dyn std::error::Error>> {
804
        match self {
3,062,579✔
805
            EditableLoadedNode::Leaf(leaf_node) => Ok(leaf_node.find(key)),
4,649,067✔
806
            EditableLoadedNode::Internal(internal_node) => internal_node.find(key, load_tree).await,
7,564,450✔
807
        }
808
    }
809

810
    pub fn top_key(&self) -> Option<&Key> {
13,408✔
811
        match self {
13,408✔
812
            EditableLoadedNode::Leaf(leaf_node) => leaf_node.top_key(),
28,683✔
813
            EditableLoadedNode::Internal(internal_node) => internal_node.top_key(),
11,541✔
814
        }
815
    }
816

817
    pub async fn count(
22,892✔
818
        &mut self,
819
        load_tree: &dyn LoadTree,
820
    ) -> Result<u64, Box<dyn std::error::Error>> {
821
        match self {
22,892✔
822
            EditableLoadedNode::Leaf(leaf_node) => Ok(leaf_node.entries.len() as u64),
38,456✔
823
            EditableLoadedNode::Internal(internal_node) => {
3,664✔
824
                let mut total_count = 0;
7,328✔
825
                for child_node in internal_node.entries.values_mut() {
25,320✔
826
                    total_count += child_node.count(load_tree).await?;
53,976✔
827
                }
828
                Ok(total_count)
3,664✔
829
            }
830
        }
831
    }
832

833
    pub async fn save(
142✔
834
        &mut self,
835
        store_tree: &dyn StoreTree,
836
    ) -> Result<BlobDigest, Box<dyn std::error::Error>> {
837
        match self {
142✔
838
            EditableLoadedNode::Leaf(leaf_node) => {
126✔
839
                let mut new_node = crate::sorted_tree::Node {
840
                    entries: Vec::new(),
126✔
841
                };
842
                for (key, value) in &leaf_node.entries {
21,912✔
843
                    new_node.entries.push((key.clone(), value.clone()));
36,310✔
844
                }
845
                let digest = store_node(store_tree, &new_node, &Metadata { is_leaf: true }).await?;
630✔
846
                Ok(digest)
126✔
847
            }
848
            EditableLoadedNode::Internal(internal_node) => {
16✔
849
                let mut new_node = crate::sorted_tree::Node {
850
                    entries: Vec::new(),
16✔
851
                };
852
                for (key, child_node) in &mut internal_node.entries {
136✔
853
                    let child_digest = Box::pin(child_node.save(store_tree)).await?;
300✔
854
                    new_node
60✔
855
                        .entries
60✔
856
                        .push((key.clone(), TreeReference::new(child_digest)));
240✔
857
                }
858
                let digest =
16✔
859
                    store_node(store_tree, &new_node, &Metadata { is_leaf: false }).await?;
64✔
860
                Ok(digest)
16✔
861
            }
862
        }
863
    }
864

865
    pub async fn verify_integrity(
5,829✔
866
        &mut self,
867
        load_tree: &dyn LoadTree,
868
    ) -> Result<IntegrityCheckResult, Box<dyn std::error::Error>> {
869
        match self {
5,829✔
870
            EditableLoadedNode::Leaf(leaf_node) => leaf_node.verify_integrity(),
14,931✔
871
            EditableLoadedNode::Internal(internal_node) => {
852✔
872
                internal_node.verify_integrity(load_tree).await
2,556✔
873
            }
874
        }
875
    }
876

877
    pub fn is_naturally_split(&self) -> bool {
19,026✔
878
        match self {
19,026✔
879
            EditableLoadedNode::Leaf(leaf_node) => leaf_node.is_naturally_split(),
57,078✔
UNCOV
880
            EditableLoadedNode::Internal(internal_node) => internal_node.is_naturally_split(),
×
881
        }
882
    }
883
}
884

885
pub struct Iterator<
886
    't,
887
    Key: Serialize + DeserializeOwned + Ord + Clone + Debug,
888
    Value: NodeValue + Clone,
889
> {
890
    next: Vec<&'t mut EditableNode<Key, Value>>,
891
    leaf_iterator: Option<std::collections::btree_map::Iter<'t, Key, Value>>,
892
    load_tree: &'t dyn LoadTree,
893
}
894

895
impl<'t, Key, Value> Iterator<'t, Key, Value>
896
where
897
    Key: Serialize + DeserializeOwned + Ord + Clone + Debug,
898
    Value: NodeValue + Clone,
899
{
900
    pub fn new(node: &'t mut EditableNode<Key, Value>, load_tree: &'t dyn LoadTree) -> Self {
3✔
901
        Iterator {
902
            next: vec![node],
9✔
903
            leaf_iterator: None,
904
            load_tree,
905
        }
906
    }
907

908
    pub async fn next(&mut self) -> Result<Option<(Key, Value)>, Box<dyn std::error::Error>> {
1,012✔
909
        loop {
×
910
            if let Some(current_node) = self.leaf_iterator.as_mut() {
1,018✔
911
                match current_node.next() {
507✔
912
                    Some((key, value)) => return Ok(Some((key.clone(), value.clone()))),
2,515✔
913
                    None => {
4✔
914
                        self.leaf_iterator = None;
4✔
915
                    }
916
                }
917
            }
918
            match self.next.pop() {
8✔
919
                Some(next_node) => {
5✔
920
                    let loaded = next_node.require_loaded(self.load_tree).await?;
20✔
921
                    match loaded {
5✔
922
                        EditableLoadedNode::Leaf(leaf_node) => {
4✔
923
                            self.leaf_iterator = Some(leaf_node.entries().iter());
4✔
924
                            continue;
4✔
925
                        }
926
                        EditableLoadedNode::Internal(internal_node) => {
1✔
927
                            internal_node
1✔
928
                                .entries
1✔
929
                                .values_mut()
930
                                .rev()
931
                                .for_each(|child_node| {
3✔
932
                                    self.next.push(child_node);
6✔
933
                                });
934
                        }
935
                    };
936
                }
937
                None => {
×
938
                    return Ok(None);
3✔
939
                }
940
            }
941
        }
942
    }
943
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc