• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

TyRoXx / NonlocalityOS / 20206358814

14 Dec 2025 10:07AM UTC coverage: 80.201% (-0.6%) from 80.811%
20206358814

Pull #371

github

web-flow
Merge 7dc9724d3 into 9ba501c70
Pull Request #371: Prolly tree builds much larger chunks, respecting the max tree blob size

42 of 58 new or added lines in 2 files covered. (72.41%)

37 existing lines in 3 files now uncovered.

5440 of 6783 relevant lines covered (80.2%)

34095.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.15
/sorted_tree/src/prolly_tree_editable_node.rs
1
use crate::sorted_tree::{self, NodeValue, TreeReference};
2
use astraea::{
3
    storage::{LoadTree, StoreError, StoreTree},
4
    tree::BlobDigest,
5
};
6
use serde::{de::DeserializeOwned, Deserialize, Serialize};
7
use sha3::{Digest, Sha3_512};
8
use std::collections::BTreeMap;
9
use std::fmt::Debug;
10

11
#[derive(Debug, PartialEq)]
12
pub enum IntegrityCheckResult {
13
    Valid { depth: usize },
14
    Corrupted(String),
15
}
16

17
pub fn hash_key<Key: Serialize>(key: &Key) -> u8 {
156,688✔
18
    // TODO: use a better hash function (https://docs.dolthub.com/architecture/storage-engine/prolly-tree#controlling-chunk-size)
19
    let key_serialized = postcard::to_stdvec(key).expect("serializing key should succeed");
783,440✔
20
    let mut hasher = Sha3_512::new();
313,376✔
21
    hasher.update(&key_serialized);
470,064✔
22
    let result: [u8; 64] = hasher.finalize().into();
783,440✔
23
    result[0]
156,688✔
24
}
25

26
pub fn is_split_after_key<Key: Serialize>(key: &Key, chunk_size_in_bytes: usize) -> bool {
1,068,573✔
27
    if chunk_size_in_bytes < 1000 {
1,068,573✔
28
        // No point in splitting small chunks.
29
        // TODO: use Tree efficiently
30
        return false;
911,885✔
31
    }
32
    let hash = hash_key(key);
470,064✔
33
    let chunk_boundary_threshold = 10;
313,376✔
34
    if hash < chunk_boundary_threshold {
156,688✔
35
        // written with an if expression so that we can see whether the tests cover both branches
36
        true
20,805✔
37
    } else {
38
        false
135,883✔
39
    }
40
}
41

42
pub struct SizeTracker {
43
    element_count: usize,
44
    total_element_size: usize,
45
}
46

47
impl Default for SizeTracker {
NEW
48
    fn default() -> Self {
×
NEW
49
        Self::new()
×
50
    }
51
}
52

53
impl SizeTracker {
54
    pub fn new() -> Self {
33,803✔
55
        SizeTracker {
56
            element_count: 0,
57
            total_element_size: 0,
58
        }
59
    }
60

61
    pub fn add_entry<Key: Serialize, Value: Serialize>(&mut self, key: &Key, value: &Value) {
3,183,650✔
62
        let entry_serialized: Vec<u8> =
6,367,300✔
63
            postcard::to_stdvec(&(key, value)).expect("serializing entry should succeed");
12,734,600✔
64
        self.element_count += 1;
3,183,650✔
65
        self.total_element_size += entry_serialized.len();
3,183,650✔
66
    }
67

68
    pub fn size(&self) -> usize {
1,068,574✔
69
        // TODO: optimize size calculation
70
        let metadata_serialized: Vec<u8> =
2,137,148✔
71
            postcard::to_stdvec(&Metadata { is_leaf: true }).unwrap();
3,205,722✔
72
        let element_count_serialized: Vec<u8> = postcard::to_stdvec(&self.element_count).unwrap();
5,342,870✔
73
        metadata_serialized.len() + element_count_serialized.len() + self.total_element_size
3,205,722✔
74
    }
75
}
76

77
#[derive(Serialize, Deserialize, Clone, Hash)]
78
pub struct Metadata {
79
    pub is_leaf: bool,
80
}
81

82
pub async fn store_node<Key: Serialize + Ord, Value: NodeValue>(
121✔
83
    store_tree: &dyn StoreTree,
84
    node: &sorted_tree::Node<Key, Value>,
85
    metadata: &Metadata,
86
) -> Result<BlobDigest, StoreError> {
87
    let metadata_serialized =
121✔
88
        postcard::to_stdvec(metadata).expect("serializing metadata should always succeed");
484✔
89
    crate::sorted_tree::store_node(store_tree, node, &bytes::Bytes::from(metadata_serialized)).await
484✔
90
}
91

92
#[derive(Debug, PartialEq)]
93
pub enum EitherNodeType<Key: Serialize + Ord, Value: NodeValue> {
94
    Leaf(sorted_tree::Node<Key, Value>),
95
    Internal(sorted_tree::Node<Key, TreeReference>),
96
}
97

98
#[derive(Debug, PartialEq, Eq, Clone)]
99
pub enum DeserializationError {
100
    MissingTree(BlobDigest),
101
    TreeHashMismatch(BlobDigest),
102
}
103

104
impl std::fmt::Display for DeserializationError {
105
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
×
106
        write!(f, "{self:?}")
×
107
    }
108
}
109

110
impl std::error::Error for DeserializationError {}
111

112
pub async fn load_node<
69✔
113
    Key: Serialize + DeserializeOwned + PartialEq + Ord,
114
    Value: NodeValue + Clone,
115
>(
116
    load_tree: &dyn LoadTree,
117
    root: &BlobDigest,
118
) -> Result<EitherNodeType<Key, Value>, Box<dyn std::error::Error>> {
119
    let loaded = match load_tree.load_tree(root).await {
345✔
120
        Some(loaded) => loaded,
138✔
121
        None => return Err(DeserializationError::MissingTree(*root).into()),
×
122
    };
123
    let hashed = match loaded.hash() {
138✔
124
        Some(hashed) => hashed,
138✔
125
        None => return Err(DeserializationError::TreeHashMismatch(*root).into()),
×
126
    };
127
    let tree = hashed.tree();
207✔
128
    let (metadata, sorted_tree_data) =
138✔
129
        postcard::take_from_bytes::<Metadata>(tree.blob().as_slice())?;
138✔
130
    if metadata.is_leaf {
69✔
131
        let node = sorted_tree::node_from_tree::<Key, Value>(
132
            tree,
69✔
133
            tree.blob().as_slice().len() - sorted_tree_data.len(),
207✔
134
        );
135
        Ok(EitherNodeType::Leaf(node))
69✔
136
    } else {
137
        let node = sorted_tree::node_from_tree::<Key, TreeReference>(
UNCOV
138
            tree,
×
UNCOV
139
            tree.blob().as_slice().len() - sorted_tree_data.len(),
×
140
        );
UNCOV
141
        Ok(EitherNodeType::Internal(node))
×
142
    }
143
}
144

145
#[derive(Debug, Clone)]
146
pub enum EditableNode<Key: std::cmp::Ord + Clone, Value: Clone> {
147
    Reference(TreeReference),
148
    Loaded(EditableLoadedNode<Key, Value>),
149
}
150

151
impl<
152
        Key: Serialize + DeserializeOwned + PartialEq + Ord + Clone + Debug,
153
        Value: NodeValue + Clone,
154
    > Default for EditableNode<Key, Value>
155
{
156
    fn default() -> Self {
1✔
157
        Self::new()
1✔
158
    }
159
}
160

161
impl<
162
        Key: Serialize + DeserializeOwned + PartialEq + Ord + Clone + Debug,
163
        Value: NodeValue + Clone,
164
    > EditableNode<Key, Value>
165
{
166
    pub fn new() -> Self {
33✔
167
        EditableNode::Loaded(EditableLoadedNode::Leaf(EditableLeafNode {
33✔
168
            entries: BTreeMap::new(),
33✔
169
        }))
170
    }
171

172
    pub async fn require_loaded(
3,120,739✔
173
        &mut self,
174
        load_tree: &dyn LoadTree,
175
    ) -> Result<&mut EditableLoadedNode<Key, Value>, Box<dyn std::error::Error>> {
176
        match self {
3,120,739✔
177
            EditableNode::Reference(tree_ref) => {
9✔
178
                let loaded: EitherNodeType<Key, Value> =
18✔
179
                    load_node(load_tree, tree_ref.reference()).await?;
27✔
180
                *self = EditableNode::Loaded(EditableLoadedNode::new(loaded));
18✔
181
            }
182
            EditableNode::Loaded(_loaded_node) => {}
6,241,460✔
183
        };
184
        let loaded = match self {
6,241,478✔
185
            EditableNode::Loaded(loaded_node) => loaded_node,
6,241,478✔
186
            _ => unreachable!(),
187
        };
188
        Ok(loaded)
3,120,739✔
189
    }
190

191
    pub async fn insert(
4,379✔
192
        &mut self,
193
        key: Key,
194
        value: Value,
195
        load_tree: &dyn LoadTree,
196
    ) -> Result<(), Box<dyn std::error::Error>> {
197
        let (self_top_key, nodes_split) = self.insert_impl(key, value, load_tree).await?;
30,653✔
198
        if nodes_split.is_empty() {
8,758✔
199
            return Ok(());
4,373✔
200
        }
201
        let mut entries = BTreeMap::new();
12✔
202
        entries.insert(self_top_key, self.clone());
30✔
203
        for node in nodes_split {
18✔
204
            entries.insert(
18✔
205
                node.top_key().expect("Node cannot be empty here").clone(),
24✔
206
                EditableNode::Loaded(node),
6✔
207
            );
208
        }
209
        *self = EditableNode::Loaded(EditableLoadedNode::Internal(EditableInternalNode {
12✔
210
            entries,
6✔
211
        }));
212
        Ok(())
6✔
213
    }
214

215
    pub async fn insert_impl(
7,370✔
216
        &mut self,
217
        key: Key,
218
        value: Value,
219
        load_tree: &dyn LoadTree,
220
    ) -> Result<(Key, Vec<EditableLoadedNode<Key, Value>>), Box<dyn std::error::Error>> {
221
        let loaded = self.require_loaded(load_tree).await?;
29,480✔
222
        let nodes_split = Box::pin(loaded.insert(key, value, load_tree)).await?;
51,590✔
223
        Ok((
7,370✔
224
            loaded.top_key().expect("Node cannot be empty here").clone(),
29,480✔
225
            nodes_split,
7,370✔
226
        ))
227
    }
228

229
    pub async fn remove(
1,014✔
230
        &mut self,
231
        key: &Key,
232
        load_tree: &dyn LoadTree,
233
    ) -> Result<Option<Value>, Box<dyn std::error::Error>> {
234
        let (maybe_top_key, maybe_removed) = self.remove_impl(key, load_tree).await?;
6,084✔
235
        if maybe_top_key.is_none() {
2,029✔
236
            *self = EditableNode::Loaded(EditableLoadedNode::Leaf(EditableLeafNode {
1✔
237
                entries: BTreeMap::new(),
1✔
238
            }));
239
        } else {
240
            let loaded = self.require_loaded(load_tree).await?;
4,052✔
241
            if let Some(simplified) = loaded.simplify() {
1,015✔
242
                *self = simplified;
1✔
243
            }
244
        }
245
        Ok(maybe_removed)
1,014✔
246
    }
247

248
    pub async fn remove_impl(
1,859✔
249
        &mut self,
250
        key: &Key,
251
        load_tree: &dyn LoadTree,
252
    ) -> Result<(Option<Key>, Option<Value>), Box<dyn std::error::Error>> {
253
        let loaded = self.require_loaded(load_tree).await?;
7,436✔
254
        let result = loaded.remove(key, load_tree).await?;
9,295✔
255
        Ok(result)
1,859✔
256
    }
257

258
    pub async fn find(
3,062,579✔
259
        &mut self,
260
        key: &Key,
261
        load_tree: &dyn LoadTree,
262
    ) -> Result<Option<Value>, Box<dyn std::error::Error>> {
263
        let loaded = self.require_loaded(load_tree).await?;
12,250,316✔
264
        loaded.find(key, load_tree).await
12,250,316✔
265
    }
266

267
    pub async fn count(
22,892✔
268
        &mut self,
269
        load_tree: &dyn LoadTree,
270
    ) -> Result<u64, Box<dyn std::error::Error>> {
271
        let loaded = self.require_loaded(load_tree).await?;
91,568✔
272
        Box::pin(loaded.count(load_tree)).await
91,568✔
273
    }
274

275
    pub async fn save(
134✔
276
        &mut self,
277
        store_tree: &dyn StoreTree,
278
    ) -> Result<BlobDigest, Box<dyn std::error::Error>> {
279
        match self {
134✔
280
            EditableNode::Reference(tree_ref) => Ok(*tree_ref.reference()),
28✔
281
            EditableNode::Loaded(loaded_node) => loaded_node.save(store_tree).await,
480✔
282
        }
283
    }
284

285
    pub async fn load(
60✔
286
        digest: &BlobDigest,
287
        load_tree: &dyn LoadTree,
288
    ) -> Result<Self, Box<dyn std::error::Error>> {
289
        let loaded: EitherNodeType<Key, Value> = load_node(load_tree, digest).await?;
300✔
290
        Ok(EditableNode::Loaded(EditableLoadedNode::new(loaded)))
60✔
291
    }
292

293
    pub async fn verify_integrity(
5,829✔
294
        &mut self,
295
        expected_top_key: Option<&Key>,
296
        load_tree: &dyn LoadTree,
297
    ) -> Result<IntegrityCheckResult, Box<dyn std::error::Error>> {
298
        let loaded = self.require_loaded(load_tree).await?;
23,316✔
299
        if loaded.top_key() != expected_top_key {
5,829✔
300
            return Ok(IntegrityCheckResult::Corrupted(
×
301
                "Top key mismatch".to_string(),
×
302
            ));
303
        }
304
        Box::pin(loaded.verify_integrity(load_tree)).await
23,316✔
305
    }
306

307
    pub async fn merge(
178✔
308
        &mut self,
309
        other: Self,
310
        load_tree: &dyn LoadTree,
311
    ) -> Result<(Key, Vec<EditableLoadedNode<Key, Value>>), Box<dyn std::error::Error>> {
312
        let loaded = self.require_loaded(load_tree).await?;
712✔
313
        let other_loaded = match other {
356✔
314
            EditableNode::Reference(tree_ref) => {
×
315
                let loaded: EitherNodeType<Key, Value> =
×
316
                    load_node(load_tree, tree_ref.reference()).await?;
×
317
                EditableLoadedNode::new(loaded)
×
318
            }
319
            EditableNode::Loaded(loaded_node) => loaded_node,
356✔
320
        };
321
        match (loaded, other_loaded) {
356✔
322
            (EditableLoadedNode::Leaf(self_leaf), EditableLoadedNode::Leaf(other_leaf)) => {
356✔
323
                for (key, value) in other_leaf.entries {
50,476✔
324
                    self_leaf.entries.insert(key, value);
50,298✔
325
                }
326
                let split_nodes = self_leaf.check_split();
534✔
327
                Ok((
×
328
                    self_leaf
356✔
329
                        .top_key()
356✔
330
                        .expect("Leaf cannot be empty here")
178✔
331
                        .clone(),
178✔
332
                    split_nodes
178✔
333
                        .into_iter()
178✔
334
                        .map(|n| EditableLoadedNode::Leaf(n))
263✔
335
                        .collect(),
178✔
336
                ))
337
            }
338
            (
UNCOV
339
                EditableLoadedNode::Internal(self_internal),
×
UNCOV
340
                EditableLoadedNode::Internal(other_internal),
×
341
            ) => {
×
UNCOV
342
                for (key, child_node) in other_internal.entries {
×
UNCOV
343
                    let previous_entry = self_internal.entries.insert(key, child_node);
×
UNCOV
344
                    if let Some(_existing_child) = previous_entry {
×
345
                        return Err(Box::new(std::io::Error::other("Merge node key collision")));
×
346
                    }
347
                }
UNCOV
348
                let split_nodes = self_internal.check_split();
×
349
                Ok((
×
UNCOV
350
                    self_internal
×
UNCOV
351
                        .top_key()
×
UNCOV
352
                        .expect("Internal node cannot be empty here")
×
UNCOV
353
                        .clone(),
×
UNCOV
354
                    split_nodes
×
UNCOV
355
                        .into_iter()
×
UNCOV
356
                        .map(|n| EditableLoadedNode::Internal(n))
×
UNCOV
357
                        .collect(),
×
358
                ))
359
            }
360
            _ => unreachable!(),
361
        }
362
    }
363

364
    pub async fn is_naturally_split(
19,014✔
365
        &mut self,
366
        load_tree: &dyn LoadTree,
367
    ) -> Result<bool, Box<dyn std::error::Error>> {
368
        let loaded = self.require_loaded(load_tree).await?;
76,056✔
369
        Ok(loaded.is_naturally_split())
19,014✔
370
    }
371
}
372

373
#[derive(Debug, Clone)]
374
pub struct EditableLeafNode<Key, Value> {
375
    entries: BTreeMap<Key, Value>,
376
}
377

378
impl<Key: std::cmp::Ord + Clone + Serialize, Value: Clone + NodeValue>
379
    EditableLeafNode<Key, Value>
380
{
381
    pub fn create(entries: BTreeMap<Key, Value>) -> Option<Self> {
4,755✔
382
        if entries.is_empty() {
9,510✔
383
            None
2✔
384
        } else {
385
            Some(EditableLeafNode { entries })
4,753✔
386
        }
387
    }
388

389
    pub async fn insert(&mut self, key: Key, value: Value) -> Vec<EditableLeafNode<Key, Value>> {
8,758✔
390
        self.entries.insert(key, value);
17,516✔
391
        self.check_split()
8,758✔
392
    }
393

394
    pub async fn remove(
1,014✔
395
        &mut self,
396
        key: &Key,
397
    ) -> Result<(Option<Key>, Option<Value>), Box<dyn std::error::Error>> {
398
        let removed = self.entries.remove(key);
4,056✔
399
        let top_key = self.top_key().cloned();
4,056✔
400
        Ok((top_key, removed))
1,014✔
401
    }
402

403
    fn check_split(&mut self) -> Vec<EditableLeafNode<Key, Value>> {
4,557✔
404
        let mut result = Vec::new();
9,114✔
405
        let mut current_node = BTreeMap::new();
9,114✔
406
        let mut current_node_size_tracker = SizeTracker::new();
9,114✔
407
        for entry in self.entries.iter() {
500,056✔
408
            current_node_size_tracker.add_entry(entry.0, &entry.1.to_content());
1,963,768✔
409
            current_node.insert(entry.0.clone(), entry.1.clone());
2,945,652✔
410
            if is_split_after_key(entry.0, current_node_size_tracker.size()) {
1,966,031✔
411
                result.push(
6,789✔
412
                    EditableLeafNode::create(current_node)
6,789✔
413
                        .expect("Must succeed because list is not empty"),
6,789✔
414
                );
415
                current_node = BTreeMap::new();
4,526✔
416
                current_node_size_tracker = SizeTracker::new();
2,263✔
417
            }
418
        }
419
        if !current_node.is_empty() {
7,046✔
420
            result.push(
7,467✔
421
                EditableLeafNode::create(current_node)
7,467✔
422
                    .expect("Must succeed because list is not empty"),
2,489✔
423
            );
424
        }
425
        *self = result.remove(0);
13,671✔
426
        result
4,557✔
427
    }
428

429
    pub fn top_key(&self) -> Option<&Key> {
10,743✔
430
        self.entries.keys().next_back()
21,486✔
431
    }
432

433
    pub fn find(&mut self, key: &Key) -> Option<Value> {
1,549,689✔
434
        self.entries.get(key).cloned()
6,198,756✔
435
    }
436

437
    pub fn verify_integrity(&mut self) -> Result<IntegrityCheckResult, Box<dyn std::error::Error>> {
4,977✔
438
        let mut size_tracker = SizeTracker::new();
9,954✔
439
        for (index, (key, value)) in self.entries.iter().enumerate() {
1,648,842✔
440
            size_tracker.add_entry(key, &value.to_content());
2,178,548✔
441
            let is_split = is_split_after_key(key, size_tracker.size());
2,723,185✔
442
            if (index < self.entries.len() - 1) && is_split {
1,628,936✔
443
                return Ok(IntegrityCheckResult::Corrupted(format!(
×
444
                    "Leaf node integrity check failed: Key at index {} indicates split but node is not final (number of keys: {})",
×
445
                    index, self.entries.len()
×
446
                )));
447
            }
448
        }
449
        Ok(IntegrityCheckResult::Valid { depth: 0 })
4,977✔
450
    }
451

452
    pub fn is_naturally_split(&self) -> bool {
19,014✔
453
        let mut size_tracker = SizeTracker::new();
38,028✔
454
        for entry in self.entries.iter() {
4,306,208✔
455
            size_tracker.add_entry(entry.0, &entry.1.to_content());
8,536,360✔
456
        }
457
        is_split_after_key(
458
            self.entries.keys().last().expect("leaf node is not empty"),
76,056✔
459
            size_tracker.size(),
38,028✔
460
        )
461
    }
462

463
    pub fn entries(&self) -> &BTreeMap<Key, Value> {
4✔
464
        &self.entries
4✔
465
    }
466
}
467

468
#[derive(Debug, Clone)]
469
pub struct EditableInternalNode<Key: std::cmp::Ord + Clone, Value: Clone> {
470
    entries: BTreeMap<Key, EditableNode<Key, Value>>,
471
}
472

473
impl<
474
        Key: Serialize + DeserializeOwned + PartialEq + Ord + Clone + Debug,
475
        Value: NodeValue + Clone,
476
    > EditableInternalNode<Key, Value>
477
{
478
    pub fn create(entries: BTreeMap<Key, EditableNode<Key, Value>>) -> Option<Self> {
2,991✔
479
        if entries.is_empty() {
5,982✔
480
            None
×
481
        } else {
482
            Some(EditableInternalNode { entries })
2,991✔
483
        }
484
    }
485

486
    pub async fn insert(
2,991✔
487
        &mut self,
488
        key: Key,
489
        value: Value,
490
        load_tree: &dyn LoadTree,
491
    ) -> Result<Vec<EditableInternalNode<Key, Value>>, Box<dyn std::error::Error>> {
492
        let last_index = self.entries.len() - 1;
5,982✔
493
        // TODO: optimize search
494
        for (index, (entry_key, entry_value)) in self.entries.iter_mut().enumerate() {
37,257✔
495
            if (index == last_index) || (key <= *entry_key) {
17,963✔
496
                let (updated_key, split_nodes) =
5,982✔
497
                    entry_value.insert_impl(key, value, load_tree).await?;
14,955✔
498
                if updated_key != *entry_key {
2,991✔
499
                    let old_key = entry_key.clone();
885✔
500
                    let old_value = self.entries.remove(&old_key).expect("key must exist");
1,770✔
501
                    let previous_entry = self.entries.insert(updated_key, old_value);
1,475✔
502
                    assert!(previous_entry.is_none(), "Split node key collision");
503
                }
504
                for node in split_nodes {
3,095✔
505
                    let previous_entry = self.entries.insert(
312✔
506
                        node.top_key().expect("Node cannot be empty here").clone(),
416✔
507
                        EditableNode::Loaded(node),
104✔
508
                    );
509
                    assert!(previous_entry.is_none(), "Split node key collision");
510
                }
511
                self.update_chunk_boundaries(load_tree).await?;
8,973✔
512
                break;
2,991✔
513
            }
514
        }
515
        Ok(self.check_split())
2,991✔
516
    }
517

518
    pub async fn remove(
845✔
519
        &mut self,
520
        key: &Key,
521
        load_tree: &dyn LoadTree,
522
    ) -> Result<(Option<Key>, Option<Value>), Box<dyn std::error::Error>> {
523
        // TODO: optimize search
524
        for (entry_key, entry_value) in self.entries.iter_mut() {
6,680✔
525
            if key <= entry_key {
2,495✔
526
                let (maybe_new_top_key, maybe_removed) =
1,690✔
527
                    Box::pin(entry_value.remove_impl(key, load_tree)).await?;
4,225✔
528
                match maybe_new_top_key {
845✔
529
                    Some(new_top_key) => {
845✔
530
                        if new_top_key != *entry_key {
853✔
531
                            let entry_key = entry_key.clone();
32✔
532
                            let entry_value_removed = self
24✔
533
                                .entries
16✔
534
                                .remove(&entry_key)
24✔
535
                                .expect("Must exist because we just iterated over the entries");
536
                            self.entries.insert(new_top_key, entry_value_removed);
24✔
537
                        }
538
                    }
539
                    None => {
×
540
                        let entry_key = entry_key.clone();
×
541
                        self.entries.remove(&entry_key);
×
542
                    }
543
                }
544
                let top_key = self.top_key().cloned();
3,380✔
545
                match maybe_removed {
845✔
546
                    Some(removed) => {
845✔
547
                        if !self.entries.is_empty() {
845✔
548
                            self.update_chunk_boundaries(load_tree).await?;
2,535✔
549
                        }
550
                        return Ok((top_key, Some(removed)));
845✔
551
                    }
552
                    None => {
×
553
                        return Ok((top_key, None));
×
554
                    }
555
                }
556
            }
557
        }
558
        Ok((self.top_key().cloned(), None))
×
559
    }
560

561
    async fn update_chunk_boundaries(
3,836✔
562
        &mut self,
563
        load_tree: &dyn LoadTree,
564
    ) -> Result<(), Box<dyn std::error::Error>> {
565
        loop {
×
566
            let merge_candidates = self.find_merge_candidates(load_tree).await?;
16,056✔
567
            match merge_candidates {
4,014✔
568
                Some((low_key, high_key)) => {
356✔
569
                    let mut low = self.entries.remove(&low_key).expect("key must exist");
1,068✔
570
                    let high = self.entries.remove(&high_key).expect("key must exist");
1,068✔
571
                    let (low_top_key, split_nodes) = low.merge(high, load_tree).await?;
1,068✔
572
                    assert!(split_nodes.is_empty() || low.is_naturally_split(load_tree).await?);
573
                    assert_ne!(low_key, low_top_key, "Merge did not change low key");
574
                    let previous_entry = self.entries.insert(low_top_key, low);
890✔
575
                    assert!(previous_entry.is_none(), "Merge node key collision");
576
                    let split_nodes_len = split_nodes.len();
534✔
577
                    for (index, node) in split_nodes.into_iter().enumerate() {
704✔
578
                        assert!((index == split_nodes_len - 1) || node.is_naturally_split());
579
                        let previous_entry = self.entries.insert(
255✔
580
                            node.top_key().expect("Node cannot be empty here").clone(),
340✔
581
                            EditableNode::Loaded(node),
85✔
582
                        );
583
                        assert!(previous_entry.is_none(), "Merge node key collision");
584
                    }
585
                }
586
                None => break,
3,836✔
587
            }
588
        }
589
        Ok(())
3,836✔
590
    }
591

592
    async fn find_merge_candidates(
4,014✔
593
        &mut self,
594
        load_tree: &dyn LoadTree,
595
    ) -> Result<Option<(Key, Key)>, Box<dyn std::error::Error>> {
596
        let last_index = self.entries.len() - 1;
8,028✔
597
        let mut needs_merge: Option<&Key> = None;
12,042✔
598
        // TODO: optimize search
599
        for (index, (entry_key, entry_value)) in self.entries.iter_mut().enumerate() {
69,363✔
600
            if let Some(merge_value) = needs_merge.take() {
19,285✔
601
                return Ok(Some((merge_value.clone(), entry_key.clone())));
534✔
602
            }
603
            let is_split = entry_value.is_naturally_split(load_tree).await?;
75,716✔
604
            if (index != last_index) && !is_split {
34,200✔
605
                needs_merge = Some(entry_key);
178✔
606
            }
607
        }
608
        Ok(None)
3,836✔
609
    }
610

611
    fn check_split(&mut self) -> Vec<EditableInternalNode<Key, Value>> {
2,991✔
612
        let mut result = Vec::new();
5,982✔
613
        let mut current_node = BTreeMap::new();
5,982✔
614
        let mut current_node_size_tracker = SizeTracker::new();
5,982✔
615
        for entry in self.entries.iter() {
19,962✔
616
            current_node_size_tracker.add_entry(
27,960✔
617
                entry.0,
13,980✔
618
                &TreeReference::new(BlobDigest::new(&[0; 64])).to_content(),
27,960✔
619
            );
620
            current_node.insert(entry.0.clone(), entry.1.clone());
83,880✔
621
            if is_split_after_key(entry.0, current_node_size_tracker.size()) {
55,920✔
UNCOV
622
                result.push(
×
UNCOV
623
                    EditableInternalNode::create(current_node)
×
UNCOV
624
                        .expect("Must succeed because list is not empty"),
×
625
                );
UNCOV
626
                current_node = BTreeMap::new();
×
NEW
627
                current_node_size_tracker = SizeTracker::new();
×
628
            }
629
        }
630
        if !current_node.is_empty() {
5,982✔
631
            result.push(
8,973✔
632
                EditableInternalNode::create(current_node)
8,973✔
633
                    .expect("Must succeed because list is not empty"),
2,991✔
634
            );
635
        }
636
        *self = result.remove(0);
8,973✔
637
        result
2,991✔
638
    }
639

640
    pub fn top_key(&self) -> Option<&Key> {
4,688✔
641
        self.entries.keys().next_back()
9,376✔
642
    }
643

644
    pub async fn find(
1,512,890✔
645
        &mut self,
646
        key: &Key,
647
        _load_tree: &dyn LoadTree,
648
    ) -> Result<Option<Value>, Box<dyn std::error::Error>> {
649
        // TODO: optimize search
650
        for (entry_key, entry_value) in self.entries.iter_mut() {
12,730,094✔
651
            if key <= entry_key {
4,852,157✔
652
                return Box::pin(entry_value.find(key, _load_tree)).await;
7,564,375✔
653
            }
654
        }
655
        Ok(None)
15✔
656
    }
657

658
    pub async fn verify_integrity(
852✔
659
        &mut self,
660
        load_tree: &dyn LoadTree,
661
    ) -> Result<IntegrityCheckResult, Box<dyn std::error::Error>> {
662
        if self.entries.is_empty() {
1,704✔
663
            return Ok(IntegrityCheckResult::Corrupted(
×
664
                "Internal node integrity check failed: Node has no entries".to_string(),
×
665
            ));
666
        }
667
        let mut child_depth = None;
1,704✔
668
        for (index, (key, value)) in self.entries.iter_mut().enumerate() {
15,738✔
669
            match value.verify_integrity(Some(key), load_tree).await? {
17,576✔
670
                IntegrityCheckResult::Valid { depth } => {
4,394✔
671
                    if let Some(existing_depth) = child_depth {
7,936✔
672
                        if existing_depth != depth {
3,542✔
673
                            return Ok(IntegrityCheckResult::Corrupted(format!(
×
674
                                "Internal node integrity check failed at index {}: Child node depth mismatch (expected {}, found {})",
×
675
                                index, existing_depth, depth
×
676
                            )));
677
                        }
678
                    } else {
679
                        child_depth = Some(depth);
852✔
680
                    }
681
                }
682
                IntegrityCheckResult::Corrupted(reason) => {
×
683
                    return Ok(IntegrityCheckResult::Corrupted(format!(
×
684
                        "Internal node integrity check failed at index {}: {}",
×
685
                        index, reason
×
686
                    )));
687
                }
688
            }
689
        }
690
        Ok(IntegrityCheckResult::Valid {
852✔
691
            depth: child_depth.expect("Internal node has to have at least one child") + 1,
1,704✔
692
        })
693
    }
694

UNCOV
695
    pub fn is_naturally_split(&self) -> bool {
×
UNCOV
696
        let last_key = self
×
UNCOV
697
            .entries
×
698
            .keys()
699
            .last()
700
            .expect("internal node is not empty");
NEW
701
        let mut size_tracker = SizeTracker::new();
×
NEW
702
        for entry in self.entries.iter() {
×
NEW
703
            size_tracker.add_entry(
×
NEW
704
                entry.0,
×
NEW
705
                &TreeReference::new(BlobDigest::new(&[0; 64])).to_content(),
×
706
            );
707
        }
NEW
708
        is_split_after_key(last_key, size_tracker.size())
×
709
    }
710
}
711

712
#[derive(Debug, Clone)]
713
pub enum EditableLoadedNode<Key: std::cmp::Ord + Clone, Value: Clone> {
714
    Leaf(EditableLeafNode<Key, Value>),
715
    Internal(EditableInternalNode<Key, Value>),
716
}
717

718
impl<Key: Serialize + DeserializeOwned + Ord + Clone + Debug, Value: NodeValue + Clone>
719
    EditableLoadedNode<Key, Value>
720
{
721
    pub fn new(loaded: EitherNodeType<Key, Value>) -> Self {
69✔
722
        match loaded {
69✔
723
            EitherNodeType::Leaf(leaf_node) => {
69✔
724
                let mut entries = BTreeMap::new();
138✔
725
                for (key, value) in leaf_node.entries {
1,761✔
726
                    entries.insert(key, value);
1,692✔
727
                }
728
                EditableLoadedNode::Leaf(EditableLeafNode { entries })
69✔
729
            }
UNCOV
730
            EitherNodeType::Internal(internal_node) => {
×
UNCOV
731
                let mut entries = BTreeMap::new();
×
UNCOV
732
                for (key, child_node) in internal_node.entries {
×
UNCOV
733
                    entries.insert(key, EditableNode::Reference(child_node));
×
734
                }
UNCOV
735
                EditableLoadedNode::Internal(EditableInternalNode { entries })
×
736
            }
737
        }
738
    }
739

740
    pub async fn insert(
7,370✔
741
        &mut self,
742
        key: Key,
743
        value: Value,
744
        load_tree: &dyn LoadTree,
745
    ) -> Result<Vec<EditableLoadedNode<Key, Value>>, Box<dyn std::error::Error>> {
746
        match self {
7,370✔
747
            EditableLoadedNode::Leaf(leaf_node) => {
4,379✔
748
                let split_nodes = leaf_node.insert(key, value).await;
21,895✔
749
                Ok(split_nodes
4,379✔
750
                    .into_iter()
4,379✔
751
                    .map(|node| EditableLoadedNode::Leaf(node))
4,489✔
752
                    .collect())
4,379✔
753
            }
754
            EditableLoadedNode::Internal(internal_node) => {
2,991✔
755
                let split_nodes = internal_node.insert(key, value, load_tree).await?;
17,946✔
756
                Ok(split_nodes
2,991✔
757
                    .into_iter()
2,991✔
758
                    .map(|node| EditableLoadedNode::Internal(node))
2,991✔
759
                    .collect())
2,991✔
760
            }
761
        }
762
    }
763

764
    pub async fn remove(
1,859✔
765
        &mut self,
766
        key: &Key,
767
        load_tree: &dyn LoadTree,
768
    ) -> Result<(Option<Key>, Option<Value>), Box<dyn std::error::Error>> {
769
        match self {
1,859✔
770
            EditableLoadedNode::Leaf(leaf_node) => leaf_node.remove(key).await,
4,056✔
771
            EditableLoadedNode::Internal(internal_node) => {
845✔
772
                internal_node.remove(key, load_tree).await
3,380✔
773
            }
774
        }
775
    }
776

777
    pub fn simplify(&mut self) -> Option<EditableNode<Key, Value>> {
1,013✔
778
        match self {
1,013✔
779
            EditableLoadedNode::Internal(internal_node) => {
845✔
780
                if internal_node.entries.len() == 1 {
845✔
781
                    let (_, only_child) = internal_node
3✔
782
                        .entries
2✔
783
                        .iter_mut()
784
                        .next()
785
                        .expect("internal node has one entry");
786
                    Some(only_child.clone())
1✔
787
                } else {
788
                    None
844✔
789
                }
790
            }
791
            EditableLoadedNode::Leaf(_) => None,
168✔
792
        }
793
    }
794

795
    pub async fn find(
3,062,579✔
796
        &mut self,
797
        key: &Key,
798
        load_tree: &dyn LoadTree,
799
    ) -> Result<Option<Value>, Box<dyn std::error::Error>> {
800
        match self {
3,062,579✔
801
            EditableLoadedNode::Leaf(leaf_node) => Ok(leaf_node.find(key)),
4,649,067✔
802
            EditableLoadedNode::Internal(internal_node) => internal_node.find(key, load_tree).await,
7,564,450✔
803
        }
804
    }
805

806
    pub fn top_key(&self) -> Option<&Key> {
13,394✔
807
        match self {
13,394✔
808
            EditableLoadedNode::Leaf(leaf_node) => leaf_node.top_key(),
28,653✔
809
            EditableLoadedNode::Internal(internal_node) => internal_node.top_key(),
11,529✔
810
        }
811
    }
812

813
    pub async fn count(
22,892✔
814
        &mut self,
815
        load_tree: &dyn LoadTree,
816
    ) -> Result<u64, Box<dyn std::error::Error>> {
817
        match self {
22,892✔
818
            EditableLoadedNode::Leaf(leaf_node) => Ok(leaf_node.entries.len() as u64),
38,456✔
819
            EditableLoadedNode::Internal(internal_node) => {
3,664✔
820
                let mut total_count = 0;
7,328✔
821
                for child_node in internal_node.entries.values_mut() {
25,320✔
822
                    total_count += child_node.count(load_tree).await?;
53,976✔
823
                }
824
                Ok(total_count)
3,664✔
825
            }
826
        }
827
    }
828

829
    pub async fn save(
121✔
830
        &mut self,
831
        store_tree: &dyn StoreTree,
832
    ) -> Result<BlobDigest, Box<dyn std::error::Error>> {
833
        match self {
121✔
834
            EditableLoadedNode::Leaf(leaf_node) => {
110✔
835
                let mut new_node = crate::sorted_tree::Node {
836
                    entries: Vec::new(),
110✔
837
                };
838
                for (key, value) in &leaf_node.entries {
21,812✔
839
                    new_node.entries.push((key.clone(), value.clone()));
36,170✔
840
                }
841
                let digest = store_node(store_tree, &new_node, &Metadata { is_leaf: true }).await?;
550✔
842
                Ok(digest)
110✔
843
            }
844
            EditableLoadedNode::Internal(internal_node) => {
11✔
845
                let mut new_node = crate::sorted_tree::Node {
846
                    entries: Vec::new(),
11✔
847
                };
848
                for (key, child_node) in &mut internal_node.entries {
103✔
849
                    let child_digest = Box::pin(child_node.save(store_tree)).await?;
230✔
850
                    new_node
46✔
851
                        .entries
46✔
852
                        .push((key.clone(), TreeReference::new(child_digest)));
184✔
853
                }
854
                let digest =
11✔
855
                    store_node(store_tree, &new_node, &Metadata { is_leaf: false }).await?;
44✔
856
                Ok(digest)
11✔
857
            }
858
        }
859
    }
860

861
    pub async fn verify_integrity(
5,829✔
862
        &mut self,
863
        load_tree: &dyn LoadTree,
864
    ) -> Result<IntegrityCheckResult, Box<dyn std::error::Error>> {
865
        match self {
5,829✔
866
            EditableLoadedNode::Leaf(leaf_node) => leaf_node.verify_integrity(),
14,931✔
867
            EditableLoadedNode::Internal(internal_node) => {
852✔
868
                internal_node.verify_integrity(load_tree).await
2,556✔
869
            }
870
        }
871
    }
872

873
    pub fn is_naturally_split(&self) -> bool {
19,014✔
874
        match self {
19,014✔
875
            EditableLoadedNode::Leaf(leaf_node) => leaf_node.is_naturally_split(),
57,042✔
UNCOV
876
            EditableLoadedNode::Internal(internal_node) => internal_node.is_naturally_split(),
×
877
        }
878
    }
879
}
880

881
pub struct Iterator<
882
    't,
883
    Key: Serialize + DeserializeOwned + Ord + Clone + Debug,
884
    Value: NodeValue + Clone,
885
> {
886
    next: Vec<&'t mut EditableNode<Key, Value>>,
887
    leaf_iterator: Option<std::collections::btree_map::Iter<'t, Key, Value>>,
888
    load_tree: &'t dyn LoadTree,
889
}
890

891
impl<'t, Key, Value> Iterator<'t, Key, Value>
892
where
893
    Key: Serialize + DeserializeOwned + Ord + Clone + Debug,
894
    Value: NodeValue + Clone,
895
{
896
    pub fn new(node: &'t mut EditableNode<Key, Value>, load_tree: &'t dyn LoadTree) -> Self {
3✔
897
        Iterator {
898
            next: vec![node],
9✔
899
            leaf_iterator: None,
900
            load_tree,
901
        }
902
    }
903

904
    pub async fn next(&mut self) -> Result<Option<(Key, Value)>, Box<dyn std::error::Error>> {
1,012✔
905
        loop {
×
906
            if let Some(current_node) = self.leaf_iterator.as_mut() {
1,018✔
907
                match current_node.next() {
507✔
908
                    Some((key, value)) => return Ok(Some((key.clone(), value.clone()))),
2,515✔
909
                    None => {
4✔
910
                        self.leaf_iterator = None;
4✔
911
                    }
912
                }
913
            }
914
            match self.next.pop() {
8✔
915
                Some(next_node) => {
5✔
916
                    let loaded = next_node.require_loaded(self.load_tree).await?;
20✔
917
                    match loaded {
5✔
918
                        EditableLoadedNode::Leaf(leaf_node) => {
4✔
919
                            self.leaf_iterator = Some(leaf_node.entries().iter());
4✔
920
                            continue;
4✔
921
                        }
922
                        EditableLoadedNode::Internal(internal_node) => {
1✔
923
                            internal_node
1✔
924
                                .entries
1✔
925
                                .values_mut()
926
                                .rev()
927
                                .for_each(|child_node| {
3✔
928
                                    self.next.push(child_node);
6✔
929
                                });
930
                        }
931
                    };
932
                }
933
                None => {
×
934
                    return Ok(None);
3✔
935
                }
936
            }
937
        }
938
    }
939
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc