• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

TyRoXx / NonlocalityOS / 20571693521

29 Dec 2025 11:22AM UTC coverage: 76.855% (-0.008%) from 76.863%
20571693521

push

github

TyRoXx
Correct formatting

5801 of 7548 relevant lines covered (76.85%)

29858.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.18
/sorted_tree/src/prolly_tree_editable_node.rs
1
use crate::sorted_tree::{self, NodeValue, TreeReference};
2
use astraea::{
3
    storage::{LoadTree, StoreError, StoreTree},
4
    tree::{BlobDigest, TREE_BLOB_MAX_LENGTH},
5
};
6
use serde::{de::DeserializeOwned, Deserialize, Serialize};
7
use std::collections::BTreeMap;
8
use std::fmt::Debug;
9
use std::hash::{BuildHasher, Hash};
10

11
#[derive(Debug, PartialEq)]
12
pub enum IntegrityCheckResult {
13
    Valid { depth: usize },
14
    Corrupted(String),
15
}
16

17
pub fn hash_key<Key: Serialize>(key: &Key) -> u8 {
199,605✔
18
    // TODO: use a better hash function (https://docs.dolthub.com/architecture/storage-engine/prolly-tree#controlling-chunk-size)
19
    let key_serialized = postcard::to_stdvec(key).expect("serializing key should succeed");
998,025✔
20
    let hasher = rapidhash::quality::SeedableState::fixed();
399,210✔
21
    let result: [u8; 8] = hasher.hash_one(&key_serialized).to_le_bytes();
1,197,630✔
22
    result[0]
199,605✔
23
}
24

25
pub fn is_split_after_key<Key: Serialize>(key: &Key, chunk_size_in_bytes: usize) -> bool {
1,087,013✔
26
    if chunk_size_in_bytes < 1000 {
1,087,013✔
27
        // No point in splitting small chunks.
28
        // TODO: use Tree efficiently
29
        return false;
887,396✔
30
    }
31
    if chunk_size_in_bytes >= TREE_BLOB_MAX_LENGTH / 2 {
199,617✔
32
        // TODO: try to pack more elements in a chunk before splitting
33
        return true;
14✔
34
    }
35
    let hash = hash_key(key);
598,809✔
36
    let chunk_boundary_threshold = 10;
399,206✔
37
    if hash < chunk_boundary_threshold {
199,603✔
38
        // written with an if expression so that we can see whether the tests cover both branches
39
        true
19,374✔
40
    } else {
41
        false
180,229✔
42
    }
43
}
44

45
pub struct SizeTracker {
46
    element_count: usize,
47
    total_element_size: usize,
48
}
49

50
impl Default for SizeTracker {
51
    fn default() -> Self {
×
52
        Self::new()
×
53
    }
54
}
55

56
impl SizeTracker {
57
    pub fn new() -> Self {
32,272✔
58
        SizeTracker {
59
            element_count: 0,
60
            total_element_size: 0,
61
        }
62
    }
63

64
    pub fn add_entry<Key: Serialize, Value: Serialize>(&mut self, key: &Key, value: &Value) {
3,199,443✔
65
        let entry_serialized: Vec<u8> =
6,398,886✔
66
            postcard::to_stdvec(&(key, value)).expect("serializing entry should succeed");
12,797,772✔
67
        self.element_count += 1;
3,199,443✔
68
        self.total_element_size += entry_serialized.len();
3,199,443✔
69
    }
70

71
    pub fn size(&self) -> usize {
1,087,014✔
72
        // TODO: optimize size calculation
73
        let metadata_serialized: Vec<u8> =
2,174,028✔
74
            postcard::to_stdvec(&Metadata { is_leaf: true }).unwrap();
3,261,042✔
75
        let element_count_serialized: Vec<u8> = postcard::to_stdvec(&self.element_count).unwrap();
5,435,070✔
76
        metadata_serialized.len() + element_count_serialized.len() + self.total_element_size
3,261,042✔
77
    }
78
}
79

80
#[derive(Serialize, Deserialize, Clone, Hash)]
81
pub struct Metadata {
82
    pub is_leaf: bool,
83
}
84

85
pub async fn store_node<Key: Serialize + Ord, Value: NodeValue>(
150✔
86
    store_tree: &dyn StoreTree,
87
    node: &sorted_tree::Node<Key, Value>,
88
    metadata: &Metadata,
89
) -> Result<BlobDigest, StoreError> {
90
    let metadata_serialized =
150✔
91
        postcard::to_stdvec(metadata).expect("serializing metadata should always succeed");
600✔
92
    crate::sorted_tree::store_node(store_tree, node, &bytes::Bytes::from(metadata_serialized)).await
600✔
93
}
94

95
#[derive(Debug, PartialEq)]
96
pub enum EitherNodeType<Key: Serialize + Ord, Value: NodeValue> {
97
    Leaf(sorted_tree::Node<Key, Value>),
98
    Internal(sorted_tree::Node<Key, TreeReference>),
99
}
100

101
#[derive(Debug, PartialEq, Eq, Clone)]
102
pub enum DeserializationError {
103
    MissingTree(BlobDigest),
104
    TreeHashMismatch(BlobDigest),
105
}
106

107
impl std::fmt::Display for DeserializationError {
108
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
×
109
        write!(f, "{self:?}")
×
110
    }
111
}
112

113
impl std::error::Error for DeserializationError {}
114

115
pub async fn load_node<
69✔
116
    Key: Serialize + DeserializeOwned + PartialEq + Ord,
117
    Value: NodeValue + Clone,
118
>(
119
    load_tree: &dyn LoadTree,
120
    root: &BlobDigest,
121
) -> Result<EitherNodeType<Key, Value>, Box<dyn std::error::Error>> {
122
    let loaded = match load_tree.load_tree(root).await {
345✔
123
        Some(loaded) => loaded,
138✔
124
        None => return Err(DeserializationError::MissingTree(*root).into()),
×
125
    };
126
    let hashed = match loaded.hash() {
138✔
127
        Some(hashed) => hashed,
138✔
128
        None => return Err(DeserializationError::TreeHashMismatch(*root).into()),
×
129
    };
130
    let tree = hashed.tree();
207✔
131
    let (metadata, sorted_tree_data) =
138✔
132
        postcard::take_from_bytes::<Metadata>(tree.blob().as_slice())?;
138✔
133
    if metadata.is_leaf {
69✔
134
        let node = sorted_tree::node_from_tree::<Key, Value>(
135
            tree,
69✔
136
            tree.blob().as_slice().len() - sorted_tree_data.len(),
207✔
137
        );
138
        Ok(EitherNodeType::Leaf(node))
69✔
139
    } else {
140
        let node = sorted_tree::node_from_tree::<Key, TreeReference>(
141
            tree,
×
142
            tree.blob().as_slice().len() - sorted_tree_data.len(),
×
143
        );
144
        Ok(EitherNodeType::Internal(node))
×
145
    }
146
}
147

148
#[derive(Debug, Clone)]
149
pub enum EditableNode<Key: std::cmp::Ord + Clone, Value: Clone> {
150
    Reference(TreeReference),
151
    Loaded(EditableLoadedNode<Key, Value>),
152
}
153

154
impl<
155
        Key: Serialize + DeserializeOwned + PartialEq + Ord + Clone + Debug,
156
        Value: NodeValue + Clone,
157
    > Default for EditableNode<Key, Value>
158
{
159
    fn default() -> Self {
1✔
160
        Self::new()
1✔
161
    }
162
}
163

164
impl<
165
        Key: Serialize + DeserializeOwned + PartialEq + Ord + Clone + Debug,
166
        Value: NodeValue + Clone,
167
    > EditableNode<Key, Value>
168
{
169
    pub fn new() -> Self {
34✔
170
        EditableNode::Loaded(EditableLoadedNode::Leaf(EditableLeafNode {
34✔
171
            entries: BTreeMap::new(),
34✔
172
        }))
173
    }
174

175
    pub async fn require_loaded(
3,112,924✔
176
        &mut self,
177
        load_tree: &dyn LoadTree,
178
    ) -> Result<&mut EditableLoadedNode<Key, Value>, Box<dyn std::error::Error>> {
179
        match self {
3,112,924✔
180
            EditableNode::Reference(tree_ref) => {
9✔
181
                let loaded: EitherNodeType<Key, Value> =
18✔
182
                    load_node(load_tree, tree_ref.reference()).await?;
27✔
183
                *self = EditableNode::Loaded(EditableLoadedNode::new(loaded));
18✔
184
            }
185
            EditableNode::Loaded(_loaded_node) => {}
6,225,830✔
186
        };
187
        let loaded = match self {
6,225,848✔
188
            EditableNode::Loaded(loaded_node) => loaded_node,
6,225,848✔
189
            _ => unreachable!(),
190
        };
191
        Ok(loaded)
3,112,924✔
192
    }
193

194
    pub async fn insert(
4,386✔
195
        &mut self,
196
        key: Key,
197
        value: Value,
198
        load_tree: &dyn LoadTree,
199
    ) -> Result<(), Box<dyn std::error::Error>> {
200
        let (self_top_key, nodes_split) = self.insert_impl(key, value, load_tree).await?;
30,702✔
201
        if nodes_split.is_empty() {
8,772✔
202
            return Ok(());
4,379✔
203
        }
204
        let mut entries = BTreeMap::new();
14✔
205
        entries.insert(self_top_key, self.clone());
35✔
206
        for node in nodes_split {
21✔
207
            entries.insert(
21✔
208
                node.top_key().expect("Node cannot be empty here").clone(),
28✔
209
                EditableNode::Loaded(node),
7✔
210
            );
211
        }
212
        *self = EditableNode::Loaded(EditableLoadedNode::Internal(EditableInternalNode {
14✔
213
            entries,
7✔
214
        }));
215
        Ok(())
7✔
216
    }
217

218
    pub async fn insert_impl(
7,366✔
219
        &mut self,
220
        key: Key,
221
        value: Value,
222
        load_tree: &dyn LoadTree,
223
    ) -> Result<(Key, Vec<EditableLoadedNode<Key, Value>>), Box<dyn std::error::Error>> {
224
        let loaded = self.require_loaded(load_tree).await?;
29,464✔
225
        let nodes_split = Box::pin(loaded.insert(key, value, load_tree)).await?;
51,562✔
226
        Ok((
7,366✔
227
            loaded.top_key().expect("Node cannot be empty here").clone(),
29,464✔
228
            nodes_split,
7,366✔
229
        ))
230
    }
231

232
    pub async fn remove(
1,014✔
233
        &mut self,
234
        key: &Key,
235
        load_tree: &dyn LoadTree,
236
    ) -> Result<Option<Value>, Box<dyn std::error::Error>> {
237
        let (maybe_top_key, maybe_removed) = self.remove_impl(key, load_tree).await?;
6,084✔
238
        if maybe_top_key.is_none() {
2,029✔
239
            *self = EditableNode::Loaded(EditableLoadedNode::Leaf(EditableLeafNode {
1✔
240
                entries: BTreeMap::new(),
1✔
241
            }));
242
        } else {
243
            let loaded = self.require_loaded(load_tree).await?;
4,052✔
244
            if let Some(simplified) = loaded.simplify() {
1,015✔
245
                *self = simplified;
1✔
246
            }
247
        }
248
        Ok(maybe_removed)
1,014✔
249
    }
250

251
    pub async fn remove_impl(
1,901✔
252
        &mut self,
253
        key: &Key,
254
        load_tree: &dyn LoadTree,
255
    ) -> Result<(Option<Key>, Option<Value>), Box<dyn std::error::Error>> {
256
        let loaded = self.require_loaded(load_tree).await?;
7,604✔
257
        let result = loaded.remove(key, load_tree).await?;
9,505✔
258
        Ok(result)
1,901✔
259
    }
260

261
    pub async fn find(
3,057,003✔
262
        &mut self,
263
        key: &Key,
264
        load_tree: &dyn LoadTree,
265
    ) -> Result<Option<Value>, Box<dyn std::error::Error>> {
266
        let loaded = self.require_loaded(load_tree).await?;
12,228,012✔
267
        loaded.find(key, load_tree).await
12,228,012✔
268
    }
269

270
    pub async fn count(
21,928✔
271
        &mut self,
272
        load_tree: &dyn LoadTree,
273
    ) -> Result<u64, Box<dyn std::error::Error>> {
274
        let loaded = self.require_loaded(load_tree).await?;
87,712✔
275
        Box::pin(loaded.count(load_tree)).await
87,712✔
276
    }
277

278
    pub async fn save(
163✔
279
        &mut self,
280
        store_tree: &dyn StoreTree,
281
    ) -> Result<BlobDigest, Box<dyn std::error::Error>> {
282
        match self {
163✔
283
            EditableNode::Reference(tree_ref) => Ok(*tree_ref.reference()),
28✔
284
            EditableNode::Loaded(loaded_node) => loaded_node.save(store_tree).await,
596✔
285
        }
286
    }
287

288
    pub async fn load(
60✔
289
        digest: &BlobDigest,
290
        load_tree: &dyn LoadTree,
291
    ) -> Result<Self, Box<dyn std::error::Error>> {
292
        let loaded: EitherNodeType<Key, Value> = load_node(load_tree, digest).await?;
300✔
293
        Ok(EditableNode::Loaded(EditableLoadedNode::new(loaded)))
60✔
294
    }
295

296
    pub async fn verify_integrity(
5,684✔
297
        &mut self,
298
        expected_top_key: Option<&Key>,
299
        load_tree: &dyn LoadTree,
300
    ) -> Result<IntegrityCheckResult, Box<dyn std::error::Error>> {
301
        let loaded = self.require_loaded(load_tree).await?;
22,736✔
302
        if loaded.top_key() != expected_top_key {
5,684✔
303
            return Ok(IntegrityCheckResult::Corrupted(
×
304
                "Top key mismatch".to_string(),
×
305
            ));
306
        }
307
        Box::pin(loaded.verify_integrity(load_tree)).await
22,736✔
308
    }
309

310
    pub async fn merge(
109✔
311
        &mut self,
312
        other: Self,
313
        load_tree: &dyn LoadTree,
314
    ) -> Result<(Key, Vec<EditableLoadedNode<Key, Value>>), Box<dyn std::error::Error>> {
315
        let loaded = self.require_loaded(load_tree).await?;
436✔
316
        let other_loaded = match other {
218✔
317
            EditableNode::Reference(tree_ref) => {
×
318
                let loaded: EitherNodeType<Key, Value> =
×
319
                    load_node(load_tree, tree_ref.reference()).await?;
×
320
                EditableLoadedNode::new(loaded)
×
321
            }
322
            EditableNode::Loaded(loaded_node) => loaded_node,
218✔
323
        };
324
        match (loaded, other_loaded) {
218✔
325
            (EditableLoadedNode::Leaf(self_leaf), EditableLoadedNode::Leaf(other_leaf)) => {
218✔
326
                for (key, value) in other_leaf.entries {
27,172✔
327
                    self_leaf.entries.insert(key, value);
27,063✔
328
                }
329
                let split_nodes = self_leaf.check_split();
327✔
330
                Ok((
×
331
                    self_leaf
218✔
332
                        .top_key()
218✔
333
                        .expect("Leaf cannot be empty here")
109✔
334
                        .clone(),
109✔
335
                    split_nodes
109✔
336
                        .into_iter()
109✔
337
                        .map(|n| EditableLoadedNode::Leaf(n))
166✔
338
                        .collect(),
109✔
339
                ))
340
            }
341
            (
342
                EditableLoadedNode::Internal(self_internal),
×
343
                EditableLoadedNode::Internal(other_internal),
×
344
            ) => {
×
345
                for (key, child_node) in other_internal.entries {
×
346
                    let previous_entry = self_internal.entries.insert(key, child_node);
×
347
                    if let Some(_existing_child) = previous_entry {
×
348
                        return Err(Box::new(std::io::Error::other("Merge node key collision")));
×
349
                    }
350
                }
351
                let split_nodes = self_internal.check_split();
×
352
                Ok((
×
353
                    self_internal
×
354
                        .top_key()
×
355
                        .expect("Internal node cannot be empty here")
×
356
                        .clone(),
×
357
                    split_nodes
×
358
                        .into_iter()
×
359
                        .map(|n| EditableLoadedNode::Internal(n))
×
360
                        .collect(),
×
361
                ))
362
            }
363
            _ => unreachable!(),
364
        }
365
    }
366

367
    pub async fn is_naturally_split(
17,915✔
368
        &mut self,
369
        load_tree: &dyn LoadTree,
370
    ) -> Result<bool, Box<dyn std::error::Error>> {
371
        let loaded = self.require_loaded(load_tree).await?;
71,660✔
372
        Ok(loaded.is_naturally_split())
17,915✔
373
    }
374
}
375

376
#[derive(Debug, Clone)]
377
pub struct EditableLeafNode<Key, Value> {
378
    entries: BTreeMap<Key, Value>,
379
}
380

381
impl<Key: std::cmp::Ord + Clone + Serialize, Value: Clone + NodeValue>
382
    EditableLeafNode<Key, Value>
383
{
384
    pub fn create(entries: BTreeMap<Key, Value>) -> Option<Self> {
4,629✔
385
        if entries.is_empty() {
9,258✔
386
            None
2✔
387
        } else {
388
            Some(EditableLeafNode { entries })
4,627✔
389
        }
390
    }
391

392
    pub async fn insert(&mut self, key: Key, value: Value) -> Vec<EditableLeafNode<Key, Value>> {
8,772✔
393
        self.entries.insert(key, value);
17,544✔
394
        self.check_split()
8,772✔
395
    }
396

397
    pub async fn remove(
1,014✔
398
        &mut self,
399
        key: &Key,
400
    ) -> Result<(Option<Key>, Option<Value>), Box<dyn std::error::Error>> {
401
        let removed = self.entries.remove(key);
4,056✔
402
        let top_key = self.top_key().cloned();
4,056✔
403
        Ok((top_key, removed))
1,014✔
404
    }
405

406
    fn check_split(&mut self) -> Vec<EditableLeafNode<Key, Value>> {
4,495✔
407
        let mut result = Vec::new();
8,990✔
408
        let mut current_node = BTreeMap::new();
8,990✔
409
        let mut current_node_size_tracker = SizeTracker::new();
8,990✔
410
        for entry in self.entries.iter() {
520,218✔
411
            current_node_size_tracker.add_entry(entry.0, &entry.1.to_content());
2,044,912✔
412
            current_node.insert(entry.0.clone(), entry.1.clone());
3,067,368✔
413
            if is_split_after_key(entry.0, current_node_size_tracker.size()) {
2,047,003✔
414
                result.push(
6,273✔
415
                    EditableLeafNode::create(current_node)
6,273✔
416
                        .expect("Must succeed because list is not empty"),
6,273✔
417
                );
418
                current_node = BTreeMap::new();
4,182✔
419
                current_node_size_tracker = SizeTracker::new();
2,091✔
420
            }
421
        }
422
        if !current_node.is_empty() {
7,030✔
423
            result.push(
7,605✔
424
                EditableLeafNode::create(current_node)
7,605✔
425
                    .expect("Must succeed because list is not empty"),
2,535✔
426
            );
427
        }
428
        *self = result.remove(0);
13,485✔
429
        result
4,495✔
430
    }
431

432
    pub fn top_key(&self) -> Option<&Key> {
10,430✔
433
        self.entries.keys().next_back()
20,860✔
434
    }
435

436
    pub fn find(&mut self, key: &Key) -> Option<Value> {
1,549,689✔
437
        self.entries.get(key).cloned()
6,198,756✔
438
    }
439

440
    pub fn verify_integrity(&mut self) -> Result<IntegrityCheckResult, Box<dyn std::error::Error>> {
4,790✔
441
        let mut size_tracker = SizeTracker::new();
9,580✔
442
        for (index, (key, value)) in self.entries.iter().enumerate() {
1,648,281✔
443
            size_tracker.add_entry(key, &value.to_content());
2,178,548✔
444
            let is_split = is_split_after_key(key, size_tracker.size());
2,723,185✔
445
            if (index < self.entries.len() - 1) && is_split {
1,629,123✔
446
                return Ok(IntegrityCheckResult::Corrupted(format!(
×
447
                    "Leaf node integrity check failed: Key at index {} indicates split but node is not final (number of keys: {})",
×
448
                    index, self.entries.len()
×
449
                )));
450
            }
451
        }
452
        Ok(IntegrityCheckResult::Valid { depth: 0 })
4,790✔
453
    }
454

455
    pub fn is_naturally_split(&self) -> bool {
17,915✔
456
        let mut size_tracker = SizeTracker::new();
35,830✔
457
        for entry in self.entries.iter() {
4,296,518✔
458
            size_tracker.add_entry(entry.0, &entry.1.to_content());
8,521,376✔
459
        }
460
        is_split_after_key(
461
            self.entries.keys().last().expect("leaf node is not empty"),
71,660✔
462
            size_tracker.size(),
35,830✔
463
        )
464
    }
465

466
    pub fn entries(&self) -> &BTreeMap<Key, Value> {
4✔
467
        &self.entries
4✔
468
    }
469
}
470

471
#[derive(Debug, Clone)]
472
pub struct EditableInternalNode<Key: std::cmp::Ord + Clone, Value: Clone> {
473
    entries: BTreeMap<Key, EditableNode<Key, Value>>,
474
}
475

476
impl<
477
        Key: Serialize + DeserializeOwned + PartialEq + Ord + Clone + Debug,
478
        Value: NodeValue + Clone,
479
    > EditableInternalNode<Key, Value>
480
{
481
    pub fn create(entries: BTreeMap<Key, EditableNode<Key, Value>>) -> Option<Self> {
2,980✔
482
        if entries.is_empty() {
5,960✔
483
            None
×
484
        } else {
485
            Some(EditableInternalNode { entries })
2,980✔
486
        }
487
    }
488

489
    pub async fn insert(
2,980✔
490
        &mut self,
491
        key: Key,
492
        value: Value,
493
        load_tree: &dyn LoadTree,
494
    ) -> Result<Vec<EditableInternalNode<Key, Value>>, Box<dyn std::error::Error>> {
495
        let last_index = self.entries.len() - 1;
5,960✔
496
        // TODO: optimize search
497
        for (index, (entry_key, entry_value)) in self.entries.iter_mut().enumerate() {
35,280✔
498
            if (index == last_index) || (key <= *entry_key) {
16,574✔
499
                let (updated_key, split_nodes) =
5,960✔
500
                    entry_value.insert_impl(key, value, load_tree).await?;
14,900✔
501
                if updated_key != *entry_key {
2,980✔
502
                    let old_key = entry_key.clone();
855✔
503
                    let old_value = self.entries.remove(&old_key).expect("key must exist");
1,710✔
504
                    let previous_entry = self.entries.insert(updated_key, old_value);
1,425✔
505
                    assert!(previous_entry.is_none(), "Split node key collision");
506
                }
507
                for node in split_nodes {
3,047✔
508
                    let previous_entry = self.entries.insert(
201✔
509
                        node.top_key().expect("Node cannot be empty here").clone(),
268✔
510
                        EditableNode::Loaded(node),
67✔
511
                    );
512
                    assert!(previous_entry.is_none(), "Split node key collision");
513
                }
514
                self.update_chunk_boundaries(load_tree).await?;
8,940✔
515
                break;
2,980✔
516
            }
517
        }
518
        Ok(self.check_split())
2,980✔
519
    }
520

521
    pub async fn remove(
887✔
522
        &mut self,
523
        key: &Key,
524
        load_tree: &dyn LoadTree,
525
    ) -> Result<(Option<Key>, Option<Value>), Box<dyn std::error::Error>> {
526
        // TODO: optimize search
527
        for (entry_key, entry_value) in self.entries.iter_mut() {
6,608✔
528
            if key <= entry_key {
2,417✔
529
                let (maybe_new_top_key, maybe_removed) =
1,774✔
530
                    Box::pin(entry_value.remove_impl(key, load_tree)).await?;
4,435✔
531
                match maybe_new_top_key {
887✔
532
                    Some(new_top_key) => {
887✔
533
                        if new_top_key != *entry_key {
894✔
534
                            let entry_key = entry_key.clone();
28✔
535
                            let entry_value_removed = self
21✔
536
                                .entries
14✔
537
                                .remove(&entry_key)
21✔
538
                                .expect("Must exist because we just iterated over the entries");
539
                            self.entries.insert(new_top_key, entry_value_removed);
21✔
540
                        }
541
                    }
542
                    None => {
×
543
                        let entry_key = entry_key.clone();
×
544
                        self.entries.remove(&entry_key);
×
545
                    }
546
                }
547
                let top_key = self.top_key().cloned();
3,548✔
548
                match maybe_removed {
887✔
549
                    Some(removed) => {
887✔
550
                        if !self.entries.is_empty() {
887✔
551
                            self.update_chunk_boundaries(load_tree).await?;
2,661✔
552
                        }
553
                        return Ok((top_key, Some(removed)));
887✔
554
                    }
555
                    None => {
×
556
                        return Ok((top_key, None));
×
557
                    }
558
                }
559
            }
560
        }
561
        Ok((self.top_key().cloned(), None))
×
562
    }
563

564
    async fn update_chunk_boundaries(
3,867✔
565
        &mut self,
566
        load_tree: &dyn LoadTree,
567
    ) -> Result<(), Box<dyn std::error::Error>> {
568
        loop {
×
569
            let merge_candidates = self.find_merge_candidates(load_tree).await?;
15,904✔
570
            match merge_candidates {
3,976✔
571
                Some((low_key, high_key)) => {
218✔
572
                    let mut low = self.entries.remove(&low_key).expect("key must exist");
654✔
573
                    let high = self.entries.remove(&high_key).expect("key must exist");
654✔
574
                    let (low_top_key, split_nodes) = low.merge(high, load_tree).await?;
654✔
575
                    assert!(split_nodes.is_empty() || low.is_naturally_split(load_tree).await?);
576
                    assert_ne!(low_key, low_top_key, "Merge did not change low key");
577
                    let previous_entry = self.entries.insert(low_top_key, low);
545✔
578
                    assert!(previous_entry.is_none(), "Merge node key collision");
579
                    let split_nodes_len = split_nodes.len();
327✔
580
                    for (index, node) in split_nodes.into_iter().enumerate() {
441✔
581
                        assert!((index == split_nodes_len - 1) || node.is_naturally_split());
582
                        let previous_entry = self.entries.insert(
171✔
583
                            node.top_key().expect("Node cannot be empty here").clone(),
228✔
584
                            EditableNode::Loaded(node),
57✔
585
                        );
586
                        assert!(previous_entry.is_none(), "Merge node key collision");
587
                    }
588
                }
589
                None => break,
3,867✔
590
            }
591
        }
592
        Ok(())
3,867✔
593
    }
594

595
    async fn find_merge_candidates(
3,976✔
596
        &mut self,
597
        load_tree: &dyn LoadTree,
598
    ) -> Result<Option<(Key, Key)>, Box<dyn std::error::Error>> {
599
        let last_index = self.entries.len() - 1;
7,952✔
600
        let mut needs_merge: Option<&Key> = None;
11,928✔
601
        // TODO: optimize search
602
        for (index, (entry_key, entry_value)) in self.entries.iter_mut().enumerate() {
65,829✔
603
            if let Some(merge_value) = needs_merge.take() {
18,076✔
604
                return Ok(Some((merge_value.clone(), entry_key.clone())));
327✔
605
            }
606
            let is_split = entry_value.is_naturally_split(load_tree).await?;
71,432✔
607
            if (index != last_index) && !is_split {
31,958✔
608
                needs_merge = Some(entry_key);
109✔
609
            }
610
        }
611
        Ok(None)
3,867✔
612
    }
613

614
    fn check_split(&mut self) -> Vec<EditableInternalNode<Key, Value>> {
2,980✔
615
        let mut result = Vec::new();
5,960✔
616
        let mut current_node = BTreeMap::new();
5,960✔
617
        let mut current_node_size_tracker = SizeTracker::new();
5,960✔
618
        for entry in self.entries.iter() {
19,193✔
619
            current_node_size_tracker.add_entry(
26,466✔
620
                entry.0,
13,233✔
621
                &TreeReference::new(BlobDigest::new(&[0; 64])).to_content(),
26,466✔
622
            );
623
            current_node.insert(entry.0.clone(), entry.1.clone());
79,398✔
624
            if is_split_after_key(entry.0, current_node_size_tracker.size()) {
52,932✔
625
                result.push(
×
626
                    EditableInternalNode::create(current_node)
×
627
                        .expect("Must succeed because list is not empty"),
×
628
                );
629
                current_node = BTreeMap::new();
×
630
                current_node_size_tracker = SizeTracker::new();
×
631
            }
632
        }
633
        if !current_node.is_empty() {
5,960✔
634
            result.push(
8,940✔
635
                EditableInternalNode::create(current_node)
8,940✔
636
                    .expect("Must succeed because list is not empty"),
2,980✔
637
            );
638
        }
639
        *self = result.remove(0);
8,940✔
640
        result
2,980✔
641
    }
642

643
    pub fn top_key(&self) -> Option<&Key> {
4,761✔
644
        self.entries.keys().next_back()
9,522✔
645
    }
646

647
    pub async fn find(
1,507,314✔
648
        &mut self,
649
        key: &Key,
650
        _load_tree: &dyn LoadTree,
651
    ) -> Result<Option<Value>, Box<dyn std::error::Error>> {
652
        // TODO: optimize search
653
        for (entry_key, entry_value) in self.entries.iter_mut() {
11,578,646✔
654
            if key <= entry_key {
4,282,009✔
655
                return Box::pin(entry_value.find(key, _load_tree)).await;
7,536,495✔
656
            }
657
        }
658
        Ok(None)
15✔
659
    }
660

661
    pub async fn verify_integrity(
894✔
662
        &mut self,
663
        load_tree: &dyn LoadTree,
664
    ) -> Result<IntegrityCheckResult, Box<dyn std::error::Error>> {
665
        if self.entries.is_empty() {
1,788✔
666
            return Ok(IntegrityCheckResult::Corrupted(
×
667
                "Internal node integrity check failed: Node has no entries".to_string(),
×
668
            ));
669
        }
670
        let mut child_depth = None;
1,788✔
671
        for (index, (key, value)) in self.entries.iter_mut().enumerate() {
15,429✔
672
            match value.verify_integrity(Some(key), load_tree).await? {
16,996✔
673
                IntegrityCheckResult::Valid { depth } => {
4,249✔
674
                    if let Some(existing_depth) = child_depth {
7,604✔
675
                        if existing_depth != depth {
3,355✔
676
                            return Ok(IntegrityCheckResult::Corrupted(format!(
×
677
                                "Internal node integrity check failed at index {}: Child node depth mismatch (expected {}, found {})",
×
678
                                index, existing_depth, depth
×
679
                            )));
680
                        }
681
                    } else {
682
                        child_depth = Some(depth);
894✔
683
                    }
684
                }
685
                IntegrityCheckResult::Corrupted(reason) => {
×
686
                    return Ok(IntegrityCheckResult::Corrupted(format!(
×
687
                        "Internal node integrity check failed at index {}: {}",
×
688
                        index, reason
×
689
                    )));
690
                }
691
            }
692
        }
693
        Ok(IntegrityCheckResult::Valid {
894✔
694
            depth: child_depth.expect("Internal node has to have at least one child") + 1,
1,788✔
695
        })
696
    }
697

698
    pub fn is_naturally_split(&self) -> bool {
×
699
        let last_key = self
×
700
            .entries
×
701
            .keys()
702
            .last()
703
            .expect("internal node is not empty");
704
        let mut size_tracker = SizeTracker::new();
×
705
        for entry in self.entries.iter() {
×
706
            size_tracker.add_entry(
×
707
                entry.0,
×
708
                &TreeReference::new(BlobDigest::new(&[0; 64])).to_content(),
×
709
            );
710
        }
711
        is_split_after_key(last_key, size_tracker.size())
×
712
    }
713
}
714

715
#[derive(Debug, Clone)]
716
pub enum EditableLoadedNode<Key: std::cmp::Ord + Clone, Value: Clone> {
717
    Leaf(EditableLeafNode<Key, Value>),
718
    Internal(EditableInternalNode<Key, Value>),
719
}
720

721
impl<Key: Serialize + DeserializeOwned + Ord + Clone + Debug, Value: NodeValue + Clone>
722
    EditableLoadedNode<Key, Value>
723
{
724
    pub fn new(loaded: EitherNodeType<Key, Value>) -> Self {
69✔
725
        match loaded {
69✔
726
            EitherNodeType::Leaf(leaf_node) => {
69✔
727
                let mut entries = BTreeMap::new();
138✔
728
                for (key, value) in leaf_node.entries {
1,761✔
729
                    entries.insert(key, value);
1,692✔
730
                }
731
                EditableLoadedNode::Leaf(EditableLeafNode { entries })
69✔
732
            }
733
            EitherNodeType::Internal(internal_node) => {
×
734
                let mut entries = BTreeMap::new();
×
735
                for (key, child_node) in internal_node.entries {
×
736
                    entries.insert(key, EditableNode::Reference(child_node));
×
737
                }
738
                EditableLoadedNode::Internal(EditableInternalNode { entries })
×
739
            }
740
        }
741
    }
742

743
    pub async fn insert(
7,366✔
744
        &mut self,
745
        key: Key,
746
        value: Value,
747
        load_tree: &dyn LoadTree,
748
    ) -> Result<Vec<EditableLoadedNode<Key, Value>>, Box<dyn std::error::Error>> {
749
        match self {
7,366✔
750
            EditableLoadedNode::Leaf(leaf_node) => {
4,386✔
751
                let split_nodes = leaf_node.insert(key, value).await;
21,930✔
752
                Ok(split_nodes
4,386✔
753
                    .into_iter()
4,386✔
754
                    .map(|node| EditableLoadedNode::Leaf(node))
4,460✔
755
                    .collect())
4,386✔
756
            }
757
            EditableLoadedNode::Internal(internal_node) => {
2,980✔
758
                let split_nodes = internal_node.insert(key, value, load_tree).await?;
17,880✔
759
                Ok(split_nodes
2,980✔
760
                    .into_iter()
2,980✔
761
                    .map(|node| EditableLoadedNode::Internal(node))
2,980✔
762
                    .collect())
2,980✔
763
            }
764
        }
765
    }
766

767
    pub async fn remove(
1,901✔
768
        &mut self,
769
        key: &Key,
770
        load_tree: &dyn LoadTree,
771
    ) -> Result<(Option<Key>, Option<Value>), Box<dyn std::error::Error>> {
772
        match self {
1,901✔
773
            EditableLoadedNode::Leaf(leaf_node) => leaf_node.remove(key).await,
4,056✔
774
            EditableLoadedNode::Internal(internal_node) => {
887✔
775
                internal_node.remove(key, load_tree).await
3,548✔
776
            }
777
        }
778
    }
779

780
    pub fn simplify(&mut self) -> Option<EditableNode<Key, Value>> {
1,013✔
781
        match self {
1,013✔
782
            EditableLoadedNode::Internal(internal_node) => {
887✔
783
                if internal_node.entries.len() == 1 {
887✔
784
                    let (_, only_child) = internal_node
3✔
785
                        .entries
2✔
786
                        .iter_mut()
787
                        .next()
788
                        .expect("internal node has one entry");
789
                    Some(only_child.clone())
1✔
790
                } else {
791
                    None
886✔
792
                }
793
            }
794
            EditableLoadedNode::Leaf(_) => None,
126✔
795
        }
796
    }
797

798
    pub async fn find(
3,057,003✔
799
        &mut self,
800
        key: &Key,
801
        load_tree: &dyn LoadTree,
802
    ) -> Result<Option<Value>, Box<dyn std::error::Error>> {
803
        match self {
3,057,003✔
804
            EditableLoadedNode::Leaf(leaf_node) => Ok(leaf_node.find(key)),
4,649,067✔
805
            EditableLoadedNode::Internal(internal_node) => internal_node.find(key, load_tree).await,
7,536,570✔
806
        }
807
    }
808

809
    pub fn top_key(&self) -> Option<&Key> {
13,181✔
810
        match self {
13,181✔
811
            EditableLoadedNode::Leaf(leaf_node) => leaf_node.top_key(),
27,921✔
812
            EditableLoadedNode::Internal(internal_node) => internal_node.top_key(),
11,622✔
813
        }
814
    }
815

816
    pub async fn count(
21,928✔
817
        &mut self,
818
        load_tree: &dyn LoadTree,
819
    ) -> Result<u64, Box<dyn std::error::Error>> {
820
        match self {
21,928✔
821
            EditableLoadedNode::Leaf(leaf_node) => Ok(leaf_node.entries.len() as u64),
36,528✔
822
            EditableLoadedNode::Internal(internal_node) => {
3,664✔
823
                let mut total_count = 0;
7,328✔
824
                for child_node in internal_node.entries.values_mut() {
24,356✔
825
                    total_count += child_node.count(load_tree).await?;
51,084✔
826
                }
827
                Ok(total_count)
3,664✔
828
            }
829
        }
830
    }
831

832
    pub async fn save(
150✔
833
        &mut self,
834
        store_tree: &dyn StoreTree,
835
    ) -> Result<BlobDigest, Box<dyn std::error::Error>> {
836
        match self {
150✔
837
            EditableLoadedNode::Leaf(leaf_node) => {
133✔
838
                let mut new_node = crate::sorted_tree::Node {
839
                    entries: Vec::new(),
133✔
840
                };
841
                for (key, value) in &leaf_node.entries {
21,919✔
842
                    new_node.entries.push((key.clone(), value.clone()));
36,310✔
843
                }
844
                let digest = store_node(store_tree, &new_node, &Metadata { is_leaf: true }).await?;
665✔
845
                Ok(digest)
133✔
846
            }
847
            EditableLoadedNode::Internal(internal_node) => {
17✔
848
                let mut new_node = crate::sorted_tree::Node {
849
                    entries: Vec::new(),
17✔
850
                };
851
                for (key, child_node) in &mut internal_node.entries {
153✔
852
                    let child_digest = Box::pin(child_node.save(store_tree)).await?;
340✔
853
                    new_node
68✔
854
                        .entries
68✔
855
                        .push((key.clone(), TreeReference::new(child_digest)));
272✔
856
                }
857
                let digest =
17✔
858
                    store_node(store_tree, &new_node, &Metadata { is_leaf: false }).await?;
68✔
859
                Ok(digest)
17✔
860
            }
861
        }
862
    }
863

864
    pub async fn verify_integrity(
5,684✔
865
        &mut self,
866
        load_tree: &dyn LoadTree,
867
    ) -> Result<IntegrityCheckResult, Box<dyn std::error::Error>> {
868
        match self {
5,684✔
869
            EditableLoadedNode::Leaf(leaf_node) => leaf_node.verify_integrity(),
14,370✔
870
            EditableLoadedNode::Internal(internal_node) => {
894✔
871
                internal_node.verify_integrity(load_tree).await
2,682✔
872
            }
873
        }
874
    }
875

876
    pub fn is_naturally_split(&self) -> bool {
17,915✔
877
        match self {
17,915✔
878
            EditableLoadedNode::Leaf(leaf_node) => leaf_node.is_naturally_split(),
53,745✔
879
            EditableLoadedNode::Internal(internal_node) => internal_node.is_naturally_split(),
×
880
        }
881
    }
882
}
883

884
pub struct Iterator<
885
    't,
886
    Key: Serialize + DeserializeOwned + Ord + Clone + Debug,
887
    Value: NodeValue + Clone,
888
> {
889
    next: Vec<&'t mut EditableNode<Key, Value>>,
890
    leaf_iterator: Option<std::collections::btree_map::Iter<'t, Key, Value>>,
891
    load_tree: &'t dyn LoadTree,
892
}
893

894
impl<'t, Key, Value> Iterator<'t, Key, Value>
895
where
896
    Key: Serialize + DeserializeOwned + Ord + Clone + Debug,
897
    Value: NodeValue + Clone,
898
{
899
    pub fn new(node: &'t mut EditableNode<Key, Value>, load_tree: &'t dyn LoadTree) -> Self {
3✔
900
        Iterator {
901
            next: vec![node],
9✔
902
            leaf_iterator: None,
903
            load_tree,
904
        }
905
    }
906

907
    pub async fn next(&mut self) -> Result<Option<(Key, Value)>, Box<dyn std::error::Error>> {
1,012✔
908
        loop {
×
909
            if let Some(current_node) = self.leaf_iterator.as_mut() {
1,018✔
910
                match current_node.next() {
507✔
911
                    Some((key, value)) => return Ok(Some((key.clone(), value.clone()))),
2,515✔
912
                    None => {
4✔
913
                        self.leaf_iterator = None;
4✔
914
                    }
915
                }
916
            }
917
            match self.next.pop() {
8✔
918
                Some(next_node) => {
5✔
919
                    let loaded = next_node.require_loaded(self.load_tree).await?;
20✔
920
                    match loaded {
5✔
921
                        EditableLoadedNode::Leaf(leaf_node) => {
4✔
922
                            self.leaf_iterator = Some(leaf_node.entries().iter());
4✔
923
                            continue;
4✔
924
                        }
925
                        EditableLoadedNode::Internal(internal_node) => {
1✔
926
                            internal_node
1✔
927
                                .entries
1✔
928
                                .values_mut()
929
                                .rev()
930
                                .for_each(|child_node| {
3✔
931
                                    self.next.push(child_node);
6✔
932
                                });
933
                        }
934
                    };
935
                }
936
                None => {
×
937
                    return Ok(None);
3✔
938
                }
939
            }
940
        }
941
    }
942
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc