• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

TyRoXx / NonlocalityOS / 20595626332

30 Dec 2025 11:35AM UTC coverage: 76.953% (+0.009%) from 76.944%
20595626332

push

github

TyRoXx
Test large directory de/serialization

7 of 7 new or added lines in 1 file covered. (100.0%)

181 existing lines in 5 files now uncovered.

5803 of 7541 relevant lines covered (76.95%)

31717.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.98
/sorted_tree/src/prolly_tree_editable_node.rs
1
use crate::sorted_tree::{self, NodeValue, TreeReference};
2
use astraea::{
3
    storage::{LoadTree, StoreError, StoreTree},
4
    tree::{BlobDigest, TREE_BLOB_MAX_LENGTH},
5
};
6
use serde::{de::DeserializeOwned, Deserialize, Serialize};
7
use std::collections::BTreeMap;
8
use std::fmt::Debug;
9
use std::hash::{BuildHasher, Hash};
10

11
#[derive(Debug, PartialEq)]
12
pub enum IntegrityCheckResult {
13
    Valid { depth: usize },
14
    Corrupted(String),
15
}
16

17
pub fn hash_key<Key: Serialize>(key: &Key) -> u8 {
206,091✔
18
    // TODO: use a better hash function (https://docs.dolthub.com/architecture/storage-engine/prolly-tree#controlling-chunk-size)
19
    let key_serialized = postcard::to_stdvec(key).expect("serializing key should succeed");
1,030,455✔
20
    let hasher = rapidhash::quality::SeedableState::fixed();
412,182✔
21
    let result: [u8; 8] = hasher.hash_one(&key_serialized).to_le_bytes();
1,236,546✔
22
    result[0]
206,091✔
23
}
24

25
pub fn is_split_after_key<Key: Serialize>(key: &Key, chunk_size_in_bytes: usize) -> bool {
1,200,712✔
26
    if chunk_size_in_bytes < 1000 {
1,200,712✔
27
        // No point in splitting small chunks.
28
        // TODO: use Tree efficiently
29
        return false;
994,609✔
30
    }
31
    if chunk_size_in_bytes >= TREE_BLOB_MAX_LENGTH / 2 {
206,103✔
32
        // TODO: try to pack more elements in a chunk before splitting
33
        return true;
14✔
34
    }
35
    let hash = hash_key(key);
618,267✔
36
    let chunk_boundary_threshold = 10;
412,178✔
37
    if hash < chunk_boundary_threshold {
206,089✔
38
        // written with an if expression so that we can see whether the tests cover both branches
39
        true
21,204✔
40
    } else {
41
        false
184,885✔
42
    }
43
}
44

45
pub struct SizeTracker {
46
    element_count: usize,
47
    total_element_size: usize,
48
}
49

50
impl Default for SizeTracker {
51
    fn default() -> Self {
×
52
        Self::new()
×
53
    }
54
}
55

56
impl SizeTracker {
57
    pub fn new() -> Self {
36,775✔
58
        SizeTracker {
59
            element_count: 0,
60
            total_element_size: 0,
61
        }
62
    }
63

64
    pub fn add_entry<Key: Serialize, Value: Serialize>(&mut self, key: &Key, value: &Value) {
3,798,502✔
65
        let entry_serialized: Vec<u8> =
7,597,004✔
66
            postcard::to_stdvec(&(key, value)).expect("serializing entry should succeed");
15,194,008✔
67
        self.element_count += 1;
3,798,502✔
68
        self.total_element_size += entry_serialized.len();
3,798,502✔
69
    }
70

71
    pub fn size(&self) -> usize {
1,200,713✔
72
        // TODO: optimize size calculation
73
        let metadata_serialized: Vec<u8> =
2,401,426✔
74
            postcard::to_stdvec(&Metadata { is_leaf: true }).unwrap();
3,602,139✔
75
        let element_count_serialized: Vec<u8> = postcard::to_stdvec(&self.element_count).unwrap();
6,003,565✔
76
        metadata_serialized.len() + element_count_serialized.len() + self.total_element_size
3,602,139✔
77
    }
78
}
79

80
#[derive(Serialize, Deserialize, Clone, Hash)]
81
pub struct Metadata {
82
    pub is_leaf: bool,
83
}
84

85
pub async fn store_node<Key: Serialize + Ord, Value: NodeValue>(
256✔
86
    store_tree: &(dyn StoreTree + Send + Sync),
87
    node: &sorted_tree::Node<Key, Value>,
88
    metadata: &Metadata,
89
) -> Result<BlobDigest, StoreError> {
90
    let metadata_serialized =
256✔
91
        postcard::to_stdvec(metadata).expect("serializing metadata should always succeed");
1,024✔
92
    crate::sorted_tree::store_node(store_tree, node, &bytes::Bytes::from(metadata_serialized)).await
1,024✔
93
}
94

95
#[derive(Debug, PartialEq)]
96
pub enum EitherNodeType<Key: Serialize + Ord, Value: NodeValue> {
97
    Leaf(sorted_tree::Node<Key, Value>),
98
    Internal(sorted_tree::Node<Key, TreeReference>),
99
}
100

101
#[derive(Debug, PartialEq, Eq, Clone)]
102
pub enum DeserializationError {
103
    MissingTree(BlobDigest),
104
    TreeHashMismatch(BlobDigest),
105
}
106

107
impl std::fmt::Display for DeserializationError {
108
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
×
109
        write!(f, "{self:?}")
×
110
    }
111
}
112

113
impl std::error::Error for DeserializationError {}
114

115
pub async fn load_node<
128✔
116
    Key: Serialize + DeserializeOwned + PartialEq + Ord,
117
    Value: NodeValue + Clone,
118
>(
119
    load_tree: &(dyn LoadTree + Send + Sync),
120
    root: &BlobDigest,
121
) -> Result<EitherNodeType<Key, Value>, Box<dyn std::error::Error>> {
122
    let loaded = match load_tree.load_tree(root).await {
640✔
123
        Some(loaded) => loaded,
256✔
124
        None => return Err(DeserializationError::MissingTree(*root).into()),
×
125
    };
126
    let hashed = match loaded.hash() {
256✔
127
        Some(hashed) => hashed,
256✔
128
        None => return Err(DeserializationError::TreeHashMismatch(*root).into()),
×
129
    };
130
    let tree = hashed.tree();
384✔
131
    let (metadata, sorted_tree_data) =
256✔
132
        postcard::take_from_bytes::<Metadata>(tree.blob().as_slice())?;
256✔
133
    if metadata.is_leaf {
128✔
134
        let node = sorted_tree::node_from_tree::<Key, Value>(
135
            tree,
127✔
136
            tree.blob().as_slice().len() - sorted_tree_data.len(),
381✔
137
        );
138
        Ok(EitherNodeType::Leaf(node))
127✔
139
    } else {
140
        let node = sorted_tree::node_from_tree::<Key, TreeReference>(
141
            tree,
1✔
142
            tree.blob().as_slice().len() - sorted_tree_data.len(),
3✔
143
        );
144
        Ok(EitherNodeType::Internal(node))
1✔
145
    }
146
}
147

148
#[derive(Debug, Clone)]
149
pub enum EditableNode<Key: std::cmp::Ord + Clone, Value: Clone> {
150
    Reference(TreeReference),
151
    Loaded(EditableLoadedNode<Key, Value>),
152
}
153

154
impl<
155
        Key: Serialize + DeserializeOwned + PartialEq + Ord + Clone + Debug,
156
        Value: NodeValue + Clone,
157
    > Default for EditableNode<Key, Value>
158
{
159
    fn default() -> Self {
1✔
160
        Self::new()
1✔
161
    }
162
}
163

164
impl<
165
        Key: Serialize + DeserializeOwned + PartialEq + Ord + Clone + Debug,
166
        Value: NodeValue + Clone,
167
    > EditableNode<Key, Value>
168
{
169
    pub fn new() -> Self {
135✔
170
        EditableNode::Loaded(EditableLoadedNode::Leaf(EditableLeafNode {
135✔
171
            entries: BTreeMap::new(),
135✔
172
        }))
173
    }
174

175
    pub async fn require_loaded(
3,117,478✔
176
        &mut self,
177
        load_tree: &(dyn LoadTree + Send + Sync),
178
    ) -> Result<&mut EditableLoadedNode<Key, Value>, Box<dyn std::error::Error>> {
179
        match self {
3,117,478✔
180
            EditableNode::Reference(tree_ref) => {
14✔
181
                let loaded: EitherNodeType<Key, Value> =
28✔
182
                    load_node(load_tree, tree_ref.reference()).await?;
42✔
183
                *self = EditableNode::Loaded(EditableLoadedNode::new(loaded));
28✔
184
            }
185
            EditableNode::Loaded(_loaded_node) => {}
6,234,928✔
186
        };
187
        let loaded = match self {
6,234,956✔
188
            EditableNode::Loaded(loaded_node) => loaded_node,
6,234,956✔
189
            _ => unreachable!(),
190
        };
191
        Ok(loaded)
3,117,478✔
192
    }
193

194
    pub async fn insert(
5,466✔
195
        &mut self,
196
        key: Key,
197
        value: Value,
198
        load_tree: &(dyn LoadTree + Send + Sync),
199
    ) -> Result<(), Box<dyn std::error::Error>> {
200
        let (self_top_key, nodes_split) = self.insert_impl(key, value, load_tree).await?;
38,262✔
201
        if nodes_split.is_empty() {
10,932✔
202
            return Ok(());
5,458✔
203
        }
204
        let mut entries = BTreeMap::new();
16✔
205
        entries.insert(self_top_key, self.clone());
40✔
206
        for node in nodes_split {
24✔
207
            entries.insert(
24✔
208
                node.top_key().expect("Node cannot be empty here").clone(),
32✔
209
                EditableNode::Loaded(node),
8✔
210
            );
211
        }
212
        *self = EditableNode::Loaded(EditableLoadedNode::Internal(EditableInternalNode {
16✔
213
            entries,
8✔
214
        }));
215
        Ok(())
8✔
216
    }
217

218
    pub async fn insert_impl(
9,244✔
219
        &mut self,
220
        key: Key,
221
        value: Value,
222
        load_tree: &(dyn LoadTree + Send + Sync),
223
    ) -> Result<(Key, Vec<EditableLoadedNode<Key, Value>>), Box<dyn std::error::Error>> {
224
        let loaded = self.require_loaded(load_tree).await?;
36,976✔
225
        let nodes_split = Box::pin(loaded.insert(key, value, load_tree)).await?;
64,708✔
226
        Ok((
9,244✔
227
            loaded.top_key().expect("Node cannot be empty here").clone(),
36,976✔
228
            nodes_split,
9,244✔
229
        ))
230
    }
231

232
    pub async fn remove(
1,014✔
233
        &mut self,
234
        key: &Key,
235
        load_tree: &(dyn LoadTree + Send + Sync),
236
    ) -> Result<Option<Value>, Box<dyn std::error::Error>> {
237
        let (maybe_top_key, maybe_removed) = self.remove_impl(key, load_tree).await?;
6,084✔
238
        if maybe_top_key.is_none() {
2,029✔
239
            *self = EditableNode::Loaded(EditableLoadedNode::Leaf(EditableLeafNode {
1✔
240
                entries: BTreeMap::new(),
1✔
241
            }));
242
        } else {
243
            let loaded = self.require_loaded(load_tree).await?;
4,052✔
244
            if let Some(simplified) = loaded.simplify() {
1,015✔
245
                *self = simplified;
1✔
246
            }
247
        }
248
        Ok(maybe_removed)
1,014✔
249
    }
250

251
    pub async fn remove_impl(
1,901✔
252
        &mut self,
253
        key: &Key,
254
        load_tree: &(dyn LoadTree + Send + Sync),
255
    ) -> Result<(Option<Key>, Option<Value>), Box<dyn std::error::Error>> {
256
        let loaded = self.require_loaded(load_tree).await?;
7,604✔
257
        let result = loaded.remove(key, load_tree).await?;
9,505✔
258
        Ok(result)
1,901✔
259
    }
260

261
    pub async fn find(
3,057,003✔
262
        &mut self,
263
        key: &Key,
264
        load_tree: &(dyn LoadTree + Send + Sync),
265
    ) -> Result<Option<Value>, Box<dyn std::error::Error>> {
266
        let loaded = self.require_loaded(load_tree).await?;
12,228,012✔
267
        loaded.find(key, load_tree).await
12,228,012✔
268
    }
269

270
    pub async fn count(
21,928✔
271
        &mut self,
272
        load_tree: &(dyn LoadTree + Send + Sync),
273
    ) -> Result<u64, Box<dyn std::error::Error>> {
274
        let loaded = self.require_loaded(load_tree).await?;
87,712✔
275
        Box::pin(loaded.count(load_tree)).await
87,712✔
276
    }
277

278
    pub async fn save(
269✔
279
        &mut self,
280
        store_tree: &(dyn StoreTree + Send + Sync),
281
    ) -> Result<BlobDigest, Box<dyn std::error::Error>> {
282
        match self {
269✔
283
            EditableNode::Reference(tree_ref) => Ok(*tree_ref.reference()),
28✔
284
            EditableNode::Loaded(loaded_node) => loaded_node.save(store_tree).await,
1,020✔
285
        }
286
    }
287

288
    pub async fn load(
114✔
289
        digest: &BlobDigest,
290
        load_tree: &(dyn LoadTree + Send + Sync),
291
    ) -> Result<Self, Box<dyn std::error::Error>> {
292
        let loaded: EitherNodeType<Key, Value> = load_node(load_tree, digest).await?;
570✔
293
        Ok(EditableNode::Loaded(EditableLoadedNode::new(loaded)))
114✔
294
    }
295

296
    pub async fn verify_integrity(
5,684✔
297
        &mut self,
298
        expected_top_key: Option<&Key>,
299
        load_tree: &(dyn LoadTree + Send + Sync),
300
    ) -> Result<IntegrityCheckResult, Box<dyn std::error::Error>> {
301
        let loaded = self.require_loaded(load_tree).await?;
22,736✔
302
        if loaded.top_key() != expected_top_key {
5,684✔
303
            return Ok(IntegrityCheckResult::Corrupted(
×
304
                "Top key mismatch".to_string(),
×
305
            ));
306
        }
307
        Box::pin(loaded.verify_integrity(load_tree)).await
22,736✔
308
    }
309

310
    pub async fn merge(
109✔
311
        &mut self,
312
        other: Self,
313
        load_tree: &(dyn LoadTree + Send + Sync),
314
    ) -> Result<(Key, Vec<EditableLoadedNode<Key, Value>>), Box<dyn std::error::Error>> {
315
        let loaded = self.require_loaded(load_tree).await?;
436✔
316
        let other_loaded = match other {
218✔
317
            EditableNode::Reference(tree_ref) => {
×
318
                let loaded: EitherNodeType<Key, Value> =
×
319
                    load_node(load_tree, tree_ref.reference()).await?;
×
320
                EditableLoadedNode::new(loaded)
×
321
            }
322
            EditableNode::Loaded(loaded_node) => loaded_node,
218✔
323
        };
324
        match (loaded, other_loaded) {
218✔
325
            (EditableLoadedNode::Leaf(self_leaf), EditableLoadedNode::Leaf(other_leaf)) => {
218✔
326
                for (key, value) in other_leaf.entries {
27,172✔
327
                    self_leaf.entries.insert(key, value);
27,063✔
328
                }
329
                let split_nodes = self_leaf.check_split();
327✔
330
                Ok((
×
331
                    self_leaf
218✔
332
                        .top_key()
218✔
333
                        .expect("Leaf cannot be empty here")
109✔
334
                        .clone(),
109✔
335
                    split_nodes
109✔
336
                        .into_iter()
109✔
337
                        .map(|n| EditableLoadedNode::Leaf(n))
166✔
338
                        .collect(),
109✔
339
                ))
340
            }
341
            (
342
                EditableLoadedNode::Internal(self_internal),
×
343
                EditableLoadedNode::Internal(other_internal),
×
344
            ) => {
×
345
                for (key, child_node) in other_internal.entries {
×
346
                    let previous_entry = self_internal.entries.insert(key, child_node);
×
347
                    if let Some(_existing_child) = previous_entry {
×
348
                        return Err(Box::new(std::io::Error::other("Merge node key collision")));
×
349
                    }
350
                }
351
                let split_nodes = self_internal.check_split();
×
352
                Ok((
×
353
                    self_internal
×
354
                        .top_key()
×
355
                        .expect("Internal node cannot be empty here")
×
356
                        .clone(),
×
357
                    split_nodes
×
358
                        .into_iter()
×
359
                        .map(|n| EditableLoadedNode::Internal(n))
×
360
                        .collect(),
×
361
                ))
362
            }
363
            _ => unreachable!(),
364
        }
365
    }
366

367
    pub async fn is_naturally_split(
20,532✔
368
        &mut self,
369
        load_tree: &(dyn LoadTree + Send + Sync),
370
    ) -> Result<bool, Box<dyn std::error::Error>> {
371
        let loaded = self.require_loaded(load_tree).await?;
82,128✔
372
        Ok(loaded.is_naturally_split())
20,532✔
373
    }
374
}
375

376
#[derive(Debug, Clone)]
377
pub struct EditableLeafNode<Key, Value> {
378
    entries: BTreeMap<Key, Value>,
379
}
380

381
impl<Key: std::cmp::Ord + Clone + Serialize, Value: Clone + NodeValue>
382
    EditableLeafNode<Key, Value>
383
{
384
    pub fn create(entries: BTreeMap<Key, Value>) -> Option<Self> {
5,713✔
385
        if entries.is_empty() {
11,426✔
386
            None
2✔
387
        } else {
388
            Some(EditableLeafNode { entries })
5,711✔
389
        }
390
    }
391

392
    pub async fn insert(&mut self, key: Key, value: Value) -> Vec<EditableLeafNode<Key, Value>> {
10,932✔
393
        self.entries.insert(key, value);
21,864✔
394
        self.check_split()
10,932✔
395
    }
396

397
    pub async fn remove(
1,014✔
398
        &mut self,
399
        key: &Key,
400
    ) -> Result<(Option<Key>, Option<Value>), Box<dyn std::error::Error>> {
401
        let removed = self.entries.remove(key);
4,056✔
402
        let top_key = self.top_key().cloned();
4,056✔
403
        Ok((top_key, removed))
1,014✔
404
    }
405

406
    fn check_split(&mut self) -> Vec<EditableLeafNode<Key, Value>> {
5,575✔
407
        let mut result = Vec::new();
11,150✔
408
        let mut current_node = BTreeMap::new();
11,150✔
409
        let mut current_node_size_tracker = SizeTracker::new();
11,150✔
410
        for entry in self.entries.iter() {
630,843✔
411
            current_node_size_tracker.add_entry(entry.0, &entry.1.to_content());
2,478,772✔
412
            current_node.insert(entry.0.clone(), entry.1.clone());
3,718,158✔
413
            if is_split_after_key(entry.0, current_node_size_tracker.size()) {
2,480,871✔
414
                result.push(
6,297✔
415
                    EditableLeafNode::create(current_node)
6,297✔
416
                        .expect("Must succeed because list is not empty"),
6,297✔
417
                );
418
                current_node = BTreeMap::new();
4,198✔
419
                current_node_size_tracker = SizeTracker::new();
2,099✔
420
            }
421
        }
422
        if !current_node.is_empty() {
9,186✔
423
            result.push(
10,833✔
424
                EditableLeafNode::create(current_node)
10,833✔
425
                    .expect("Must succeed because list is not empty"),
3,611✔
426
            );
427
        }
428
        *self = result.remove(0);
16,725✔
429
        result
5,575✔
430
    }
431

432
    pub fn top_key(&self) -> Option<&Key> {
11,514✔
433
        self.entries.keys().next_back()
23,028✔
434
    }
435

436
    pub fn find(&mut self, key: &Key) -> Option<Value> {
1,549,689✔
437
        self.entries.get(key).cloned()
6,198,756✔
438
    }
439

440
    pub fn verify_integrity(&mut self) -> Result<IntegrityCheckResult, Box<dyn std::error::Error>> {
4,790✔
441
        let mut size_tracker = SizeTracker::new();
9,580✔
442
        for (index, (key, value)) in self.entries.iter().enumerate() {
1,648,281✔
443
            size_tracker.add_entry(key, &value.to_content());
2,178,548✔
444
            let is_split = is_split_after_key(key, size_tracker.size());
2,723,185✔
445
            if (index < self.entries.len() - 1) && is_split {
1,629,123✔
446
                return Ok(IntegrityCheckResult::Corrupted(format!(
×
447
                    "Leaf node integrity check failed: Key at index {} indicates split but node is not final (number of keys: {})",
×
448
                    index, self.entries.len()
×
449
                )));
450
            }
451
        }
452
        Ok(IntegrityCheckResult::Valid { depth: 0 })
4,790✔
453
    }
454

455
    pub fn is_naturally_split(&self) -> bool {
20,532✔
456
        let mut size_tracker = SizeTracker::new();
41,064✔
457
        for entry in self.entries.iter() {
5,277,706✔
458
            size_tracker.add_entry(entry.0, &entry.1.to_content());
10,473,284✔
459
        }
460
        is_split_after_key(
461
            self.entries.keys().last().expect("leaf node is not empty"),
82,128✔
462
            size_tracker.size(),
41,064✔
463
        )
464
    }
465

466
    pub fn entries(&self) -> &BTreeMap<Key, Value> {
62✔
467
        &self.entries
62✔
468
    }
469
}
470

471
#[derive(Debug, Clone)]
472
pub struct EditableInternalNode<Key: std::cmp::Ord + Clone, Value: Clone> {
473
    entries: BTreeMap<Key, EditableNode<Key, Value>>,
474
}
475

476
impl<
477
        Key: Serialize + DeserializeOwned + PartialEq + Ord + Clone + Debug,
478
        Value: NodeValue + Clone,
479
    > EditableInternalNode<Key, Value>
480
{
481
    pub fn create(entries: BTreeMap<Key, EditableNode<Key, Value>>) -> Option<Self> {
3,778✔
482
        if entries.is_empty() {
7,556✔
483
            None
×
484
        } else {
485
            Some(EditableInternalNode { entries })
3,778✔
486
        }
487
    }
488

489
    pub async fn insert(
3,778✔
490
        &mut self,
491
        key: Key,
492
        value: Value,
493
        load_tree: &(dyn LoadTree + Send + Sync),
494
    ) -> Result<Vec<EditableInternalNode<Key, Value>>, Box<dyn std::error::Error>> {
495
        let last_index = self.entries.len() - 1;
7,556✔
496
        // TODO: optimize search
497
        for (index, (entry_key, entry_value)) in self.entries.iter_mut().enumerate() {
45,516✔
498
            if (index == last_index) || (key <= *entry_key) {
21,004✔
499
                let (updated_key, split_nodes) =
7,556✔
500
                    entry_value.insert_impl(key, value, load_tree).await?;
18,890✔
501
                if updated_key != *entry_key {
3,778✔
502
                    let old_key = entry_key.clone();
3,240✔
503
                    let old_value = self.entries.remove(&old_key).expect("key must exist");
6,480✔
504
                    let previous_entry = self.entries.insert(updated_key, old_value);
5,400✔
505
                    assert!(previous_entry.is_none(), "Split node key collision");
506
                }
507
                for node in split_nodes {
3,848✔
508
                    let previous_entry = self.entries.insert(
210✔
509
                        node.top_key().expect("Node cannot be empty here").clone(),
280✔
510
                        EditableNode::Loaded(node),
70✔
511
                    );
512
                    assert!(previous_entry.is_none(), "Split node key collision");
513
                }
514
                self.update_chunk_boundaries(load_tree).await?;
11,334✔
515
                break;
3,778✔
516
            }
517
        }
518
        Ok(self.check_split())
3,778✔
519
    }
520

521
    pub async fn remove(
887✔
522
        &mut self,
523
        key: &Key,
524
        load_tree: &(dyn LoadTree + Send + Sync),
525
    ) -> Result<(Option<Key>, Option<Value>), Box<dyn std::error::Error>> {
526
        // TODO: optimize search
527
        for (entry_key, entry_value) in self.entries.iter_mut() {
6,608✔
528
            if key <= entry_key {
2,417✔
529
                let (maybe_new_top_key, maybe_removed) =
1,774✔
530
                    Box::pin(entry_value.remove_impl(key, load_tree)).await?;
4,435✔
531
                match maybe_new_top_key {
887✔
532
                    Some(new_top_key) => {
887✔
533
                        if new_top_key != *entry_key {
894✔
534
                            let entry_key = entry_key.clone();
28✔
535
                            let entry_value_removed = self
21✔
536
                                .entries
14✔
537
                                .remove(&entry_key)
21✔
538
                                .expect("Must exist because we just iterated over the entries");
539
                            self.entries.insert(new_top_key, entry_value_removed);
21✔
540
                        }
541
                    }
542
                    None => {
×
543
                        let entry_key = entry_key.clone();
×
544
                        self.entries.remove(&entry_key);
×
545
                    }
546
                }
547
                let top_key = self.top_key().cloned();
3,548✔
548
                match maybe_removed {
887✔
549
                    Some(removed) => {
887✔
550
                        if !self.entries.is_empty() {
887✔
551
                            self.update_chunk_boundaries(load_tree).await?;
2,661✔
552
                        }
553
                        return Ok((top_key, Some(removed)));
887✔
554
                    }
555
                    None => {
×
556
                        return Ok((top_key, None));
×
557
                    }
558
                }
559
            }
560
        }
561
        Ok((self.top_key().cloned(), None))
×
562
    }
563

564
    async fn update_chunk_boundaries(
4,665✔
565
        &mut self,
566
        load_tree: &(dyn LoadTree + Send + Sync),
567
    ) -> Result<(), Box<dyn std::error::Error>> {
568
        loop {
×
569
            let merge_candidates = self.find_merge_candidates(load_tree).await?;
19,096✔
570
            match merge_candidates {
4,774✔
571
                Some((low_key, high_key)) => {
218✔
572
                    let mut low = self.entries.remove(&low_key).expect("key must exist");
654✔
573
                    let high = self.entries.remove(&high_key).expect("key must exist");
654✔
574
                    let (low_top_key, split_nodes) = low.merge(high, load_tree).await?;
654✔
575
                    assert!(split_nodes.is_empty() || low.is_naturally_split(load_tree).await?);
576
                    assert_ne!(low_key, low_top_key, "Merge did not change low key");
577
                    let previous_entry = self.entries.insert(low_top_key, low);
545✔
578
                    assert!(previous_entry.is_none(), "Merge node key collision");
579
                    let split_nodes_len = split_nodes.len();
327✔
580
                    for (index, node) in split_nodes.into_iter().enumerate() {
441✔
581
                        assert!((index == split_nodes_len - 1) || node.is_naturally_split());
582
                        let previous_entry = self.entries.insert(
171✔
583
                            node.top_key().expect("Node cannot be empty here").clone(),
228✔
584
                            EditableNode::Loaded(node),
57✔
585
                        );
586
                        assert!(previous_entry.is_none(), "Merge node key collision");
587
                    }
588
                }
589
                None => break,
4,665✔
590
            }
591
        }
592
        Ok(())
4,665✔
593
    }
594

595
    async fn find_merge_candidates(
4,774✔
596
        &mut self,
597
        load_tree: &(dyn LoadTree + Send + Sync),
598
    ) -> Result<Option<(Key, Key)>, Box<dyn std::error::Error>> {
599
        let last_index = self.entries.len() - 1;
9,548✔
600
        let mut needs_merge: Option<&Key> = None;
14,322✔
601
        // TODO: optimize search
602
        for (index, (entry_key, entry_value)) in self.entries.iter_mut().enumerate() {
76,074✔
603
            if let Some(merge_value) = needs_merge.take() {
20,693✔
604
                return Ok(Some((merge_value.clone(), entry_key.clone())));
327✔
605
            }
606
            let is_split = entry_value.is_naturally_split(load_tree).await?;
81,900✔
607
            if (index != last_index) && !is_split {
36,394✔
608
                needs_merge = Some(entry_key);
109✔
609
            }
610
        }
611
        Ok(None)
4,665✔
612
    }
613

614
    fn check_split(&mut self) -> Vec<EditableInternalNode<Key, Value>> {
3,778✔
615
        let mut result = Vec::new();
7,556✔
616
        let mut current_node = BTreeMap::new();
7,556✔
617
        let mut current_node_size_tracker = SizeTracker::new();
7,556✔
618
        for entry in self.entries.iter() {
23,406✔
619
            current_node_size_tracker.add_entry(
31,700✔
620
                entry.0,
15,850✔
621
                &TreeReference::new(BlobDigest::new(&[0; 64])).to_content(),
31,700✔
622
            );
623
            current_node.insert(entry.0.clone(), entry.1.clone());
95,100✔
624
            if is_split_after_key(entry.0, current_node_size_tracker.size()) {
63,400✔
625
                result.push(
×
626
                    EditableInternalNode::create(current_node)
×
627
                        .expect("Must succeed because list is not empty"),
×
628
                );
629
                current_node = BTreeMap::new();
×
630
                current_node_size_tracker = SizeTracker::new();
×
631
            }
632
        }
633
        if !current_node.is_empty() {
7,556✔
634
            result.push(
11,334✔
635
                EditableInternalNode::create(current_node)
11,334✔
636
                    .expect("Must succeed because list is not empty"),
3,778✔
637
            );
638
        }
639
        *self = result.remove(0);
11,334✔
640
        result
3,778✔
641
    }
642

643
    pub fn top_key(&self) -> Option<&Key> {
5,559✔
644
        self.entries.keys().next_back()
11,118✔
645
    }
646

647
    pub async fn find(
1,507,314✔
648
        &mut self,
649
        key: &Key,
650
        load_tree: &(dyn LoadTree + Send + Sync),
651
    ) -> Result<Option<Value>, Box<dyn std::error::Error>> {
652
        // TODO: optimize search
653
        for (entry_key, entry_value) in self.entries.iter_mut() {
11,578,646✔
654
            if key <= entry_key {
4,282,009✔
655
                return Box::pin(entry_value.find(key, load_tree)).await;
7,536,495✔
656
            }
657
        }
658
        Ok(None)
15✔
659
    }
660

661
    pub async fn verify_integrity(
894✔
662
        &mut self,
663
        load_tree: &(dyn LoadTree + Send + Sync),
664
    ) -> Result<IntegrityCheckResult, Box<dyn std::error::Error>> {
665
        if self.entries.is_empty() {
1,788✔
666
            return Ok(IntegrityCheckResult::Corrupted(
×
667
                "Internal node integrity check failed: Node has no entries".to_string(),
×
668
            ));
669
        }
670
        let mut child_depth = None;
1,788✔
671
        for (index, (key, value)) in self.entries.iter_mut().enumerate() {
15,429✔
672
            match value.verify_integrity(Some(key), load_tree).await? {
16,996✔
673
                IntegrityCheckResult::Valid { depth } => {
4,249✔
674
                    if let Some(existing_depth) = child_depth {
7,604✔
675
                        if existing_depth != depth {
3,355✔
676
                            return Ok(IntegrityCheckResult::Corrupted(format!(
×
677
                                "Internal node integrity check failed at index {}: Child node depth mismatch (expected {}, found {})",
×
678
                                index, existing_depth, depth
×
679
                            )));
680
                        }
681
                    } else {
682
                        child_depth = Some(depth);
894✔
683
                    }
684
                }
685
                IntegrityCheckResult::Corrupted(reason) => {
×
686
                    return Ok(IntegrityCheckResult::Corrupted(format!(
×
687
                        "Internal node integrity check failed at index {}: {}",
×
688
                        index, reason
×
689
                    )));
690
                }
691
            }
692
        }
693
        Ok(IntegrityCheckResult::Valid {
894✔
694
            depth: child_depth.expect("Internal node has to have at least one child") + 1,
1,788✔
695
        })
696
    }
697

698
    pub fn is_naturally_split(&self) -> bool {
×
699
        let last_key = self
×
700
            .entries
×
701
            .keys()
702
            .last()
703
            .expect("internal node is not empty");
704
        let mut size_tracker = SizeTracker::new();
×
705
        for entry in self.entries.iter() {
×
706
            size_tracker.add_entry(
×
707
                entry.0,
×
708
                &TreeReference::new(BlobDigest::new(&[0; 64])).to_content(),
×
709
            );
710
        }
711
        is_split_after_key(last_key, size_tracker.size())
×
712
    }
713
}
714

715
#[derive(Debug, Clone)]
716
pub enum EditableLoadedNode<Key: std::cmp::Ord + Clone, Value: Clone> {
717
    Leaf(EditableLeafNode<Key, Value>),
718
    Internal(EditableInternalNode<Key, Value>),
719
}
720

721
impl<Key: Serialize + DeserializeOwned + Ord + Clone + Debug, Value: NodeValue + Clone>
722
    EditableLoadedNode<Key, Value>
723
{
724
    pub fn new(loaded: EitherNodeType<Key, Value>) -> Self {
128✔
725
        match loaded {
128✔
726
            EitherNodeType::Leaf(leaf_node) => {
127✔
727
                let mut entries = BTreeMap::new();
254✔
728
                for (key, value) in leaf_node.entries {
4,951✔
729
                    entries.insert(key, value);
4,824✔
730
                }
731
                EditableLoadedNode::Leaf(EditableLeafNode { entries })
127✔
732
            }
733
            EitherNodeType::Internal(internal_node) => {
1✔
734
                let mut entries = BTreeMap::new();
2✔
735
                for (key, child_node) in internal_node.entries {
16✔
736
                    entries.insert(key, EditableNode::Reference(child_node));
15✔
737
                }
738
                EditableLoadedNode::Internal(EditableInternalNode { entries })
1✔
739
            }
740
        }
741
    }
742

743
    pub async fn insert(
9,244✔
744
        &mut self,
745
        key: Key,
746
        value: Value,
747
        load_tree: &(dyn LoadTree + Send + Sync),
748
    ) -> Result<Vec<EditableLoadedNode<Key, Value>>, Box<dyn std::error::Error>> {
749
        match self {
9,244✔
750
            EditableLoadedNode::Leaf(leaf_node) => {
5,466✔
751
                let split_nodes = leaf_node.insert(key, value).await;
27,330✔
752
                Ok(split_nodes
5,466✔
753
                    .into_iter()
5,466✔
754
                    .map(|node| EditableLoadedNode::Leaf(node))
5,544✔
755
                    .collect())
5,466✔
756
            }
757
            EditableLoadedNode::Internal(internal_node) => {
3,778✔
758
                let split_nodes = internal_node.insert(key, value, load_tree).await?;
22,668✔
759
                Ok(split_nodes
3,778✔
760
                    .into_iter()
3,778✔
761
                    .map(|node| EditableLoadedNode::Internal(node))
3,778✔
762
                    .collect())
3,778✔
763
            }
764
        }
765
    }
766

767
    pub async fn remove(
1,901✔
768
        &mut self,
769
        key: &Key,
770
        load_tree: &(dyn LoadTree + Send + Sync),
771
    ) -> Result<(Option<Key>, Option<Value>), Box<dyn std::error::Error>> {
772
        match self {
1,901✔
773
            EditableLoadedNode::Leaf(leaf_node) => leaf_node.remove(key).await,
4,056✔
774
            EditableLoadedNode::Internal(internal_node) => {
887✔
775
                internal_node.remove(key, load_tree).await
3,548✔
776
            }
777
        }
778
    }
779

780
    pub fn simplify(&mut self) -> Option<EditableNode<Key, Value>> {
1,013✔
781
        match self {
1,013✔
782
            EditableLoadedNode::Internal(internal_node) => {
887✔
783
                if internal_node.entries.len() == 1 {
887✔
784
                    let (_, only_child) = internal_node
3✔
785
                        .entries
2✔
786
                        .iter_mut()
787
                        .next()
788
                        .expect("internal node has one entry");
789
                    Some(only_child.clone())
1✔
790
                } else {
791
                    None
886✔
792
                }
793
            }
794
            EditableLoadedNode::Leaf(_) => None,
126✔
795
        }
796
    }
797

798
    pub async fn find(
3,057,003✔
799
        &mut self,
800
        key: &Key,
801
        load_tree: &(dyn LoadTree + Send + Sync),
802
    ) -> Result<Option<Value>, Box<dyn std::error::Error>> {
803
        match self {
3,057,003✔
804
            EditableLoadedNode::Leaf(leaf_node) => Ok(leaf_node.find(key)),
4,649,067✔
805
            EditableLoadedNode::Internal(internal_node) => internal_node.find(key, load_tree).await,
7,536,570✔
806
        }
807
    }
808

809
    pub fn top_key(&self) -> Option<&Key> {
15,063✔
810
        match self {
15,063✔
811
            EditableLoadedNode::Leaf(leaf_node) => leaf_node.top_key(),
31,173✔
812
            EditableLoadedNode::Internal(internal_node) => internal_node.top_key(),
14,016✔
813
        }
814
    }
815

816
    pub async fn count(
21,928✔
817
        &mut self,
818
        load_tree: &(dyn LoadTree + Send + Sync),
819
    ) -> Result<u64, Box<dyn std::error::Error>> {
820
        match self {
21,928✔
821
            EditableLoadedNode::Leaf(leaf_node) => Ok(leaf_node.entries.len() as u64),
36,528✔
822
            EditableLoadedNode::Internal(internal_node) => {
3,664✔
823
                let mut total_count = 0;
7,328✔
824
                for child_node in internal_node.entries.values_mut() {
24,356✔
825
                    total_count += child_node.count(load_tree).await?;
51,084✔
826
                }
827
                Ok(total_count)
3,664✔
828
            }
829
        }
830
    }
831

832
    pub async fn save(
256✔
833
        &mut self,
834
        store_tree: &(dyn StoreTree + Send + Sync),
835
    ) -> Result<BlobDigest, Box<dyn std::error::Error>> {
836
        match self {
256✔
837
            EditableLoadedNode::Leaf(leaf_node) => {
238✔
838
                let mut new_node = crate::sorted_tree::Node {
839
                    entries: Vec::new(),
238✔
840
                };
841
                for (key, value) in &leaf_node.entries {
25,264✔
842
                    new_node.entries.push((key.clone(), value.clone()));
41,710✔
843
                }
844
                let digest = store_node(store_tree, &new_node, &Metadata { is_leaf: true }).await?;
1,190✔
845
                Ok(digest)
238✔
846
            }
847
            EditableLoadedNode::Internal(internal_node) => {
18✔
848
                let mut new_node = crate::sorted_tree::Node {
849
                    entries: Vec::new(),
18✔
850
                };
851
                for (key, child_node) in &mut internal_node.entries {
164✔
852
                    let child_digest = Box::pin(child_node.save(store_tree)).await?;
365✔
853
                    new_node
73✔
854
                        .entries
73✔
855
                        .push((key.clone(), TreeReference::new(child_digest)));
292✔
856
                }
857
                let digest =
18✔
858
                    store_node(store_tree, &new_node, &Metadata { is_leaf: false }).await?;
72✔
859
                Ok(digest)
18✔
860
            }
861
        }
862
    }
863

864
    pub async fn verify_integrity(
5,684✔
865
        &mut self,
866
        load_tree: &(dyn LoadTree + Send + Sync),
867
    ) -> Result<IntegrityCheckResult, Box<dyn std::error::Error>> {
868
        match self {
5,684✔
869
            EditableLoadedNode::Leaf(leaf_node) => leaf_node.verify_integrity(),
14,370✔
870
            EditableLoadedNode::Internal(internal_node) => {
894✔
871
                internal_node.verify_integrity(load_tree).await
2,682✔
872
            }
873
        }
874
    }
875

876
    pub fn is_naturally_split(&self) -> bool {
20,532✔
877
        match self {
20,532✔
878
            EditableLoadedNode::Leaf(leaf_node) => leaf_node.is_naturally_split(),
61,596✔
879
            EditableLoadedNode::Internal(internal_node) => internal_node.is_naturally_split(),
×
880
        }
881
    }
882
}
883

884
pub struct Iterator<
885
    't,
886
    Key: Serialize + DeserializeOwned + Ord + Clone + Debug,
887
    Value: NodeValue + Clone,
888
> {
889
    next: Vec<&'t mut EditableNode<Key, Value>>,
890
    leaf_iterator: Option<std::collections::btree_map::Iter<'t, Key, Value>>,
891
    load_tree: &'t (dyn LoadTree + Send + Sync),
892
}
893

894
impl<'t, Key, Value> Iterator<'t, Key, Value>
895
where
896
    Key: Serialize + DeserializeOwned + Ord + Clone + Debug,
897
    Value: NodeValue + Clone,
898
{
899
    pub fn new(
57✔
900
        node: &'t mut EditableNode<Key, Value>,
901
        load_tree: &'t (dyn LoadTree + Send + Sync),
902
    ) -> Self {
903
        Iterator {
904
            next: vec![node],
171✔
905
            leaf_iterator: None,
906
            load_tree,
907
        }
908
    }
909

910
    pub async fn next(&mut self) -> Result<Option<(Key, Value)>, Box<dyn std::error::Error>> {
3,208✔
UNCOV
911
        loop {
×
912
            if let Some(current_node) = self.leaf_iterator.as_mut() {
3,277✔
913
                match current_node.next() {
1,609✔
914
                    Some((key, value)) => return Ok(Some((key.clone(), value.clone()))),
7,735✔
915
                    None => {
62✔
916
                        self.leaf_iterator = None;
62✔
917
                    }
918
                }
919
            }
920
            match self.next.pop() {
121✔
921
                Some(next_node) => {
64✔
922
                    let loaded = next_node.require_loaded(self.load_tree).await?;
256✔
923
                    match loaded {
64✔
924
                        EditableLoadedNode::Leaf(leaf_node) => {
62✔
925
                            self.leaf_iterator = Some(leaf_node.entries().iter());
62✔
926
                            continue;
62✔
927
                        }
928
                        EditableLoadedNode::Internal(internal_node) => {
2✔
929
                            internal_node
2✔
930
                                .entries
2✔
931
                                .values_mut()
932
                                .rev()
933
                                .for_each(|child_node| {
9✔
934
                                    self.next.push(child_node);
21✔
935
                                });
936
                        }
937
                    };
938
                }
UNCOV
939
                None => {
×
940
                    return Ok(None);
57✔
941
                }
942
            }
943
        }
944
    }
945
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc