• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vigna / webgraph-rs / 23365769388

20 Mar 2026 10:52PM UTC coverage: 68.228% (-3.0%) from 71.245%
23365769388

push

github

vigna
No le_bins,be_bins for webgraph

6655 of 9754 relevant lines covered (68.23%)

46582760.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.3
/webgraph/src/utils/sort_pairs.rs
1
/*
2
 * SPDX-FileCopyrightText: 2023 Inria
3
 * SPDX-FileCopyrightText: 2023 Sebastiano Vigna
4
 * SPDX-FileCopyrightText: 2025 Tommaso Fontana
5
 *
6
 * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
7
 */
8

9
//! Facilities to sort externally pairs of nodes with an associated label.
10

11
use crate::{
12
    traits::SortedIterator,
13
    utils::{BatchCodec, CodecIter, DefaultBatchCodec, MemoryUsage},
14
};
15
use anyhow::{Context, anyhow};
16
use dary_heap::PeekMut;
17
use std::path::{Path, PathBuf};
18

19
/// A struct that provides external sorting for pairs of nodes with an
20
/// associated label.
21
///
22
/// An instance of this structure ingests pairs of nodes with an associated
23
/// label, sort them in chunks of `batch_size` pairs, and dumps them to disk.
24
/// Then, a call to [`iter`](SortPairs::iter) returns an iterator that merges
25
/// the batches on disk on the fly, returning the pairs sorted by
26
/// lexicographical order of the pairs of nodes.
27
///
28
/// A batch should be as large as possible, given the available memory.
29
/// Small batches are inefficient because they require significantly
30
/// more I/O, and more effort during the merge phase.
31
///
32
/// Note that batches will be memory-mapped. If you encounter OS-level errors
33
/// using this structure (e.g., `ENOMEM: Out of memory` under Linux), please review
34
/// the limitations of your OS regarding memory-mapping (e.g.,
35
/// `/proc/sys/vm/max_map_count` under Linux).
36
///
37
/// The structure accepts as type parameter a [`BatchCodec`] that is used to
38
/// serialize and deserialize the labels.
39
///
40
/// You can use this structure in two ways: either create an instance with
41
/// [`new_labeled`](SortPairs::new_labeled) and add labeled pairs using
42
/// [`push_labeled`](SortPairs::push_labeled), and then iterate over the sorted
43
/// pairs using [`iter`](SortPairs::iter), or create a new instance and
44
/// immediately sort an iterator of pairs using
45
/// [`sort_labeled`](SortPairs::sort_labeled) or
46
/// [`try_sort_labeled`](SortPairs::try_sort_labeled).
47
///
48
/// `SortPairs<(), ()>` has convenience [`new`](SortPairs::new),
49
/// [`push`](SortPairs::push), [`sort`](SortPairs::sort), and
50
/// [`try_sort`](SortPairs::try_sort) methods without labels. Note however that
51
/// the [resulting iterator](SortPairs::iter) is labeled, and returns pairs
52
/// labeled with `()`. Use [`Left`](crate::prelude::proj::Left) to project away
53
/// the labels if needed.
54
///
55
/// If `DEDUP` is `true`, the iterators returned by [`iter`](SortPairs::iter),
56
/// [`sort`](SortPairs::sort), [`sort_labeled`](SortPairs::sort_labeled),
57
/// and their fallible variants will skip consecutive elements sharing
58
/// the same pair of nodes, keeping only the first occurrence. Use
59
/// [`new_dedup`](SortPairs::new_dedup) or
60
/// [`new_labeled_dedup`](SortPairs::new_labeled_dedup) to enable
61
/// deduplication.
62
pub struct SortPairs<C: BatchCodec = DefaultBatchCodec, const DEDUP: bool = false> {
63
    /// The batch size.
64
    batch_size: usize,
65
    /// Where we are going to store the batches.
66
    tmp_dir: PathBuf,
67
    /// A potentially stateful serializer and deserializer we will pass to batch iterators to serialize
68
    /// the labels to a bitstream.
69
    batch_codec: C,
70
    /// Keeps track of how many batches we created.
71
    num_batches: usize,
72
    /// The length of the last batch, which might be smaller than [`SortPairs::batch_size`].
73
    last_batch_len: usize,
74
    /// The batch of triples we are currently building.
75
    batch: Vec<((usize, usize), C::Label)>,
76
}
77

78
impl<C: BatchCodec, const DEDUP: bool> SortPairs<C, DEDUP> {
79
    /// Creates a new `SortPairs` with the given codec.
80
    fn create<P: AsRef<Path>>(
128✔
81
        memory_usage: MemoryUsage,
82
        dir: P,
83
        batch_codec: C,
84
    ) -> anyhow::Result<Self> {
85
        let dir = dir.as_ref();
384✔
86
        let mut dir_entries =
128✔
87
            std::fs::read_dir(dir).with_context(|| format!("Could not list {}", dir.display()))?;
384✔
88
        if dir_entries.next().is_some() {
256✔
89
            Err(anyhow!("{} is not empty", dir.display()))
2✔
90
        } else {
91
            let batch_size = memory_usage.batch_size::<(usize, usize, C::Label)>();
381✔
92
            Ok(SortPairs {
127✔
93
                batch_size,
254✔
94
                batch_codec,
254✔
95
                tmp_dir: dir.to_owned(),
381✔
96
                num_batches: 0,
127✔
97
                last_batch_len: 0,
127✔
98
                batch: Vec::with_capacity(batch_size),
127✔
99
            })
100
        }
101
    }
102
}
103

104
impl SortPairs {
105
    /// Creates a new `SortPairs` without labels.
106
    ///
107
    /// The `tmp_dir` must be empty, and in particular it must not be shared
108
    /// with other `SortPairs` instances.
109
    ///
110
    /// We suggest to use the [`tempfile`](https://crates.io/crates/tempfile)
111
    /// crate to obtain a suitable temporary directory, as it will be
112
    /// automatically deleted when no longer needed, but be careful to not pass
113
    /// the directory obtained directly, but rather its path (i.e., use
114
    /// `dir.path()`) because otherwise [the directory will be deleted too
115
    /// soon](https://github.com/Stebalien/tempfile/issues/115).
116
    pub fn new<P: AsRef<Path>>(memory_usage: MemoryUsage, tmp_dir: P) -> anyhow::Result<Self> {
13✔
117
        Self::create(memory_usage, tmp_dir, DefaultBatchCodec::default())
52✔
118
    }
119

120
    /// Creates a new `SortPairs` without labels that deduplicates the result.
121
    ///
122
    /// See [`new`](SortPairs::new) for details. When deduplication is enabled,
123
    /// the returned iterators will skip consecutive elements sharing the same
124
    /// pair of nodes, keeping only the first occurrence. Duplicates are also
125
    /// eliminated during batch serialization, reducing I/O and disk usage.
126
    pub fn new_dedup<P: AsRef<Path>>(
10✔
127
        memory_usage: MemoryUsage,
128
        tmp_dir: P,
129
    ) -> anyhow::Result<SortPairs<DefaultBatchCodec<true>, true>> {
130
        SortPairs::create(memory_usage, tmp_dir, <DefaultBatchCodec<true>>::default())
40✔
131
    }
132
}
133

134
impl<C: BatchCodec> SortPairs<C> {
135
    /// Creates a new `SortPairs` with labels.
136
    ///
137
    /// The `dir` must be empty, and in particular it must not be shared
138
    /// with other `SortPairs` instances. Please use the
139
    /// [`tempfile`](https://crates.io/crates/tempfile) crate to obtain
140
    /// a suitable directory.
141
    pub fn new_labeled<P: AsRef<Path>>(
105✔
142
        memory_usage: MemoryUsage,
143
        dir: P,
144
        batch_codec: C,
145
    ) -> anyhow::Result<Self> {
146
        Self::create(memory_usage, dir, batch_codec)
420✔
147
    }
148

149
    /// Creates a new `SortPairs` with labels that deduplicates the result.
150
    ///
151
    /// See [`new_labeled`](SortPairs::new_labeled) for details. When
152
    /// deduplication is enabled, the returned iterators will skip consecutive
153
    /// elements sharing the same pair of nodes, keeping only the first
154
    /// occurrence.
155
    pub fn new_labeled_dedup<P: AsRef<Path>>(
×
156
        memory_usage: MemoryUsage,
157
        dir: P,
158
        batch_codec: C,
159
    ) -> anyhow::Result<SortPairs<C, true>> {
160
        SortPairs::create(memory_usage, dir, batch_codec)
×
161
    }
162
}
163

164
impl<C: BatchCodec<Label = ()>, const DEDUP: bool> SortPairs<C, DEDUP> {
165
    /// Adds an unlabeled pair to the graph.
166
    pub fn push(&mut self, x: usize, y: usize) -> anyhow::Result<()> {
1,099✔
167
        self.push_labeled(x, y, ())
5,495✔
168
    }
169

170
    /// Takes an iterator of pairs, pushes all elements, and returns an iterator
171
    /// over the sorted pairs.
172
    ///
173
    /// This is a convenience method that combines multiple
174
    /// [`push`](SortPairs::push) calls with [`iter`](SortPairs::iter).
175
    pub fn sort(
3✔
176
        &mut self,
177
        pairs: impl IntoIterator<Item = (usize, usize)>,
178
    ) -> anyhow::Result<KMergeIters<CodecIter<C>, (), DEDUP>> {
179
        self.try_sort::<std::convert::Infallible>(pairs.into_iter().map(Ok))
15✔
180
    }
181

182
    /// Takes an iterator of fallible pairs, pushes all elements, and returns an
183
    /// iterator over the sorted pairs.
184
    ///
185
    /// This is a convenience method that combines multiple
186
    /// [`push`](SortPairs::push) calls with [`iter`](SortPairs::iter).
187
    pub fn try_sort<E: Into<anyhow::Error>>(
4✔
188
        &mut self,
189
        pairs: impl IntoIterator<Item = Result<(usize, usize), E>>,
190
    ) -> anyhow::Result<KMergeIters<CodecIter<C>, (), DEDUP>> {
191
        for pair in pairs {
21✔
192
            let (x, y) = pair.map_err(Into::into)?;
68✔
193
            self.push(x, y)?;
68✔
194
        }
195
        self.iter()
8✔
196
    }
197
}
198

199
impl<C: BatchCodec, const DEDUP: bool> SortPairs<C, DEDUP> {
200
    /// Adds a labeled pair to the graph.
201
    pub fn push_labeled(&mut self, x: usize, y: usize, t: C::Label) -> anyhow::Result<()> {
173,821✔
202
        self.batch.push(((x, y), t));
521,463✔
203
        if self.batch.len() >= self.batch_size {
347,642✔
204
            self.dump()?;
70✔
205
        }
206
        Ok(())
173,821✔
207
    }
208

209
    /// Dumps the current batch to disk.
210
    fn dump(&mut self) -> anyhow::Result<()> {
162✔
211
        // This method must be idempotent as it is called by `iter`
212
        if self.batch.is_empty() {
324✔
213
            return Ok(());
3✔
214
        }
215

216
        let batch_path = self.tmp_dir.join(format!("{:06x}", self.num_batches));
477✔
217
        let start = std::time::Instant::now();
318✔
218
        let (bit_size, stats) = self.batch_codec.encode_batch(batch_path, &mut self.batch)?;
954✔
219
        log::info!(
159✔
220
            "Dumped batch {} with {} arcs ({} bits, {:.2} bits / arc) in {:.3} seconds, stats: {}",
×
221
            self.num_batches,
×
222
            self.batch.len(),
×
223
            bit_size,
×
224
            bit_size as f64 / self.batch.len() as f64,
×
225
            start.elapsed().as_secs_f64(),
×
226
            stats
×
227
        );
228
        self.last_batch_len = self.batch.len();
159✔
229
        self.batch.clear();
318✔
230
        self.num_batches += 1;
159✔
231
        Ok(())
159✔
232
    }
233

234
    /// Returns an iterator over the labeled pairs, lexicographically sorted.
235
    pub fn iter(&mut self) -> anyhow::Result<KMergeIters<CodecIter<C>, C::Label, DEDUP>> {
127✔
236
        self.dump()?;
254✔
237
        Ok(KMergeIters::new((0..self.num_batches).map(|batch_idx| {
540✔
238
            let batch_path = self.tmp_dir.join(format!("{batch_idx:06x}"));
477✔
239
            self.batch_codec
159✔
240
                .decode_batch(batch_path)
318✔
241
                .unwrap()
159✔
242
                .into_iter()
159✔
243
        })))
244
    }
245

246
    /// Takes an iterator of labeled pairs, pushes all elements, and returns an
247
    /// iterator over the sorted pairs.
248
    ///
249
    /// This is a convenience method that combines multiple
250
    /// [`push_labeled`](SortPairs::push_labeled) calls with
251
    /// [`iter`](SortPairs::iter).
252
    pub fn sort_labeled(
2✔
253
        &mut self,
254
        pairs: impl IntoIterator<Item = ((usize, usize), C::Label)>,
255
    ) -> anyhow::Result<KMergeIters<CodecIter<C>, C::Label, DEDUP>> {
256
        self.try_sort_labeled::<std::convert::Infallible>(pairs.into_iter().map(Ok))
10✔
257
    }
258

259
    /// Takes an iterator of fallible labeled pairs, pushes all elements, and
260
    /// returns an iterator over the sorted pairs.
261
    ///
262
    /// This is a convenience method that combines multiple
263
    /// [`push_labeled`](SortPairs::push_labeled) calls with
264
    /// [`iter`](SortPairs::iter).
265
    pub fn try_sort_labeled<E: Into<anyhow::Error>>(
3✔
266
        &mut self,
267
        pairs: impl IntoIterator<Item = Result<((usize, usize), C::Label), E>>,
268
    ) -> anyhow::Result<KMergeIters<CodecIter<C>, C::Label, DEDUP>> {
269
        for pair in pairs {
14✔
270
            let ((x, y), label) = pair.map_err(Into::into)?;
55✔
271
            self.push_labeled(x, y, label)?;
55✔
272
        }
273
        self.iter()
6✔
274
    }
275
}
276

277
/// Private struct that keeps the head of an iterator and its tail.
278
///
279
/// Note that we cannot use [`Peekable`](std::iter::Peekable) for the same
280
/// purpose because [`Peekable::peek`](std::iter::Peekable::peek) needs a
281
/// mutable reference, but we would be calling it inside
282
/// [`Ord::cmp`](std::cmp::Ord::cmp), which only has an immutable reference.
283
///
284
/// Comparison is implemented only on the pair of nodes and ignoring the label.
285
#[derive(Clone, Debug)]
286
struct HeadTail<T, I: Iterator<Item = ((usize, usize), T)>> {
287
    head: ((usize, usize), T),
288
    tail: I,
289
}
290

291
impl<T, I: Iterator<Item = ((usize, usize), T)>> PartialEq for HeadTail<T, I> {
292
    #[inline(always)]
293
    fn eq(&self, other: &Self) -> bool {
×
294
        self.head.0 == other.head.0
×
295
    }
296
}
297

298
impl<T, I: Iterator<Item = ((usize, usize), T)>> Eq for HeadTail<T, I> {}
299

300
impl<T, I: Iterator<Item = ((usize, usize), T)>> PartialOrd for HeadTail<T, I> {
301
    #[inline(always)]
302
    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
133,713,742✔
303
        Some(self.cmp(other))
267,427,484✔
304
    }
305
}
306

307
impl<T, I: Iterator<Item = ((usize, usize), T)>> Ord for HeadTail<T, I> {
308
    #[inline(always)]
309
    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
133,713,742✔
310
        other.head.0.cmp(&self.head.0)
401,141,226✔
311
    }
312
}
313

314
/// Builds the heap used by [`KMergeIters`] from a collection of sorted
315
/// iterators.
316
fn build_kmerge_heap<T, I: Iterator<Item = ((usize, usize), T)>>(
236✔
317
    iters: impl IntoIterator<Item = I>,
318
) -> dary_heap::QuaternaryHeap<HeadTail<T, I>> {
319
    let iters = iters.into_iter();
708✔
320
    let mut heap = dary_heap::QuaternaryHeap::with_capacity(iters.size_hint().1.unwrap_or(10));
944✔
321
    for mut iter in iters {
810✔
322
        if let Some((pair, label)) = iter.next() {
1,969✔
323
            heap.push(HeadTail {
1,395✔
324
                head: (pair, label),
465✔
325
                tail: iter,
465✔
326
            });
327
        }
328
    }
329
    heap
236✔
330
}
331

332
/// A structure using a [quaternary heap](dary_heap::QuaternaryHeap) to merge sorted iterators.
333
///
334
/// The iterators must be sorted by the pair of nodes, and the structure will return the labeled pairs
335
/// sorted by lexicographical order of the pairs of nodes.
336
///
337
/// The structure implements [`Iterator`] and returns labeled pairs of the form `((src, dst), label)`.
338
///
339
/// If `DEDUP` is `true`, the iterator will skip consecutive elements sharing
340
/// the same pair of nodes, keeping only the first occurrence. Use
341
/// [`new_dedup`](KMergeIters::new_dedup) to enable deduplication.
342
///
343
/// The structure implements [`Default`], [`core::iter::Sum`],
344
/// [`core::ops::AddAssign`], [`Extend`], and [`core::iter::FromIterator`]
345
/// so you can compute different KMergeIters / Iterators / IntoIterators in
346
/// parallel and then merge them using either `+=`, `sum()` or `collect()`:
347
/// ```rust
348
/// use webgraph::utils::sort_pairs::KMergeIters;
349
///
350
/// let (tx, rx) = std::sync::mpsc::channel();
351
///
352
/// std::thread::scope(|s| {
353
///     for _ in 0..10 {
354
///         let tx = tx.clone();
355
///         s.spawn(move || {
356
///             // create a dummy KMergeIters
357
///             tx.send(KMergeIters::new(vec![(0..10).map(|j| ((j, j), j + j))])).unwrap()
358
///         });
359
///     }
360
/// });
361
/// drop(tx);
362
/// // merge the KMergeIters
363
/// let merged = rx.iter().sum::<KMergeIters<core::iter::Map<core::ops::Range<usize>, _>, usize>>();
364
/// ```
365
/// or with plain iterators:
366
/// ```rust
367
/// use webgraph::utils::sort_pairs::KMergeIters;
368
///
369
/// let iter = vec![vec![((0, 0), 0), ((0, 1), 1)], vec![((1, 0), 1), ((1, 1), 2)]];
370
/// let merged = iter.into_iter().collect::<KMergeIters<_, usize>>();
371
/// ```
372
#[derive(Clone, Debug)]
373
pub struct KMergeIters<I: Iterator<Item = ((usize, usize), T)>, T = (), const DEDUP: bool = false> {
374
    heap: dary_heap::QuaternaryHeap<HeadTail<T, I>>,
375
    /// The last pair returned, used for deduplication.
376
    last_pair: Option<(usize, usize)>,
377
}
378

379
impl<T, I: Iterator<Item = ((usize, usize), T)>, const DEDUP: bool> KMergeIters<I, T, DEDUP> {
380
    pub fn new(iters: impl IntoIterator<Item = I>) -> Self {
232✔
381
        KMergeIters {
382
            heap: build_kmerge_heap(iters),
464✔
383
            last_pair: None,
384
        }
385
    }
386
}
387

388
impl<T, I: Iterator<Item = ((usize, usize), T)>> KMergeIters<I, T> {
389
    /// Creates a new `KMergeIters` that deduplicates consecutive elements
390
    /// sharing the same pair of nodes.
391
    pub fn new_dedup(iters: impl IntoIterator<Item = I>) -> KMergeIters<I, T, true> {
4✔
392
        KMergeIters {
393
            heap: build_kmerge_heap(iters),
8✔
394
            last_pair: None,
395
        }
396
    }
397
}
398

399
// SAFETY: the merge of sorted iterators is itself sorted.
400
unsafe impl<T, I: Iterator<Item = ((usize, usize), T)> + SortedIterator, const DEDUP: bool>
401
    SortedIterator for KMergeIters<I, T, DEDUP>
402
{
403
}
404

405
impl<T, I: Iterator<Item = ((usize, usize), T)>, const DEDUP: bool> Iterator
406
    for KMergeIters<I, T, DEDUP>
407
{
408
    type Item = ((usize, usize), T);
409

410
    fn next(&mut self) -> Option<Self::Item> {
46,859,849✔
411
        loop {
×
412
            let mut head_tail = self.heap.peek_mut()?;
140,630,196✔
413

414
            let result = match head_tail.tail.next() {
93,753,004✔
415
                None => PeekMut::pop(head_tail).head,
460✔
416
                Some((pair, label)) => std::mem::replace(&mut head_tail.head, (pair, label)),
234,380,210✔
417
            };
418

419
            if DEDUP {
46,876,502✔
420
                if self.last_pair == Some(result.0) {
27,406,635✔
421
                    continue;
16,883✔
422
                }
423
                self.last_pair = Some(result.0);
27,389,752✔
424
            }
425

426
            return Some(result);
46,859,619✔
427
        }
428
    }
429

430
    fn count(self) -> usize {
2✔
431
        if DEDUP {
2✔
432
            self.fold(0, |count, _| count + 1)
5✔
433
        } else {
434
            self.heap
1✔
435
                .into_iter()
436
                .map(|head_tail| 1 + head_tail.tail.count())
1✔
437
                .sum()
438
        }
439
    }
440
}
441

442
impl<T, I: Iterator<Item = ((usize, usize), T)> + ExactSizeIterator> ExactSizeIterator
443
    for KMergeIters<I, T>
444
{
445
    fn len(&self) -> usize {
1✔
446
        self.heap
1✔
447
            .iter()
448
            .map(|head_tail| {
3✔
449
                // The head is always a labeled pair, so we can count it
450
                1 + head_tail.tail.len()
2✔
451
            })
452
            .sum()
453
    }
454
}
455

456
impl<T, I: Iterator<Item = ((usize, usize), T)>, const DEDUP: bool> core::iter::FusedIterator
457
    for KMergeIters<I, T, DEDUP>
458
{
459
}
460

461
impl<T, I: Iterator<Item = ((usize, usize), T)>, const DEDUP: bool> core::default::Default
462
    for KMergeIters<I, T, DEDUP>
463
{
464
    fn default() -> Self {
4✔
465
        KMergeIters {
466
            heap: dary_heap::QuaternaryHeap::default(),
4✔
467
            last_pair: None,
468
        }
469
    }
470
}
471

472
impl<T, I: Iterator<Item = ((usize, usize), T)>, const DEDUP: bool> core::iter::Sum
473
    for KMergeIters<I, T, DEDUP>
474
{
475
    fn sum<J: Iterator<Item = Self>>(iter: J) -> Self {
3✔
476
        let mut heap = dary_heap::QuaternaryHeap::default();
6✔
477
        for mut kmerge in iter {
15✔
478
            heap.extend(kmerge.heap.drain());
18✔
479
        }
480
        KMergeIters {
481
            heap,
482
            last_pair: None,
483
        }
484
    }
485
}
486

487
impl<T, I: IntoIterator<Item = ((usize, usize), T)>, const DEDUP: bool> core::iter::Sum<I>
488
    for KMergeIters<I::IntoIter, T, DEDUP>
489
{
490
    fn sum<J: Iterator<Item = I>>(iter: J) -> Self {
1✔
491
        KMergeIters::new(iter.map(IntoIterator::into_iter))
3✔
492
    }
493
}
494

495
impl<T, I: Iterator<Item = ((usize, usize), T)>, const DEDUP: bool> core::iter::FromIterator<Self>
496
    for KMergeIters<I, T, DEDUP>
497
{
498
    fn from_iter<J: IntoIterator<Item = Self>>(iter: J) -> Self {
1✔
499
        iter.into_iter().sum()
3✔
500
    }
501
}
502

503
impl<T, I: IntoIterator<Item = ((usize, usize), T)>, const DEDUP: bool> core::iter::FromIterator<I>
504
    for KMergeIters<I::IntoIter, T, DEDUP>
505
{
506
    fn from_iter<J: IntoIterator<Item = I>>(iter: J) -> Self {
1✔
507
        KMergeIters::new(iter.into_iter().map(IntoIterator::into_iter))
4✔
508
    }
509
}
510

511
impl<T, I: IntoIterator<Item = ((usize, usize), T)>, const DEDUP: bool> core::ops::AddAssign<I>
512
    for KMergeIters<I::IntoIter, T, DEDUP>
513
{
514
    fn add_assign(&mut self, rhs: I) {
1✔
515
        let mut rhs = rhs.into_iter();
3✔
516
        if let Some((pair, label)) = rhs.next() {
4✔
517
            self.heap.push(HeadTail {
3✔
518
                head: (pair, label),
1✔
519
                tail: rhs,
1✔
520
            });
521
        }
522
    }
523
}
524

525
impl<T, I: Iterator<Item = ((usize, usize), T)>, const DEDUP: bool> core::ops::AddAssign
526
    for KMergeIters<I, T, DEDUP>
527
{
528
    fn add_assign(&mut self, mut rhs: Self) {
1✔
529
        self.heap.extend(rhs.heap.drain());
4✔
530
    }
531
}
532

533
impl<T, I: IntoIterator<Item = ((usize, usize), T)>, const DEDUP: bool> Extend<I>
534
    for KMergeIters<I::IntoIter, T, DEDUP>
535
{
536
    fn extend<J: IntoIterator<Item = I>>(&mut self, iter: J) {
2✔
537
        self.heap.extend(iter.into_iter().filter_map(|iter| {
14✔
538
            let mut iter = iter.into_iter();
12✔
539
            let (pair, label) = iter.next()?;
16✔
540
            Some(HeadTail {
4✔
541
                head: (pair, label),
4✔
542
                tail: iter,
4✔
543
            })
544
        }));
545
    }
546
}
547

548
impl<T, I: Iterator<Item = ((usize, usize), T)>, const DEDUP: bool> Extend<KMergeIters<I, T, DEDUP>>
549
    for KMergeIters<I, T, DEDUP>
550
{
551
    fn extend<J: IntoIterator<Item = KMergeIters<I, T, DEDUP>>>(&mut self, iter: J) {
1✔
552
        for mut kmerge in iter {
5✔
553
            self.heap.extend(kmerge.heap.drain());
6✔
554
        }
555
    }
556
}
557

558
#[cfg(test)]
559
mod tests {
560
    use super::*;
561
    use crate::{
562
        traits::{BitDeserializer, BitSerializer},
563
        utils::{BitReader, BitWriter, gaps::GapsCodec},
564
    };
565
    use dsi_bitstream::prelude::*;
566

567
    #[derive(Clone, Debug)]
568
    struct MyDessert<E: Endianness> {
569
        _marker: std::marker::PhantomData<E>,
570
    }
571

572
    impl<E: Endianness> Default for MyDessert<E> {
573
        fn default() -> Self {
574
            MyDessert {
575
                _marker: std::marker::PhantomData,
576
            }
577
        }
578
    }
579

580
    impl<E: Endianness> BitDeserializer<E, BitReader<E>> for MyDessert<E>
581
    where
582
        BitReader<E>: BitRead<E> + CodesRead<E>,
583
    {
584
        type DeserType = usize;
585
        fn deserialize(
586
            &self,
587
            bitstream: &mut BitReader<E>,
588
        ) -> Result<Self::DeserType, <BitReader<E> as BitRead<E>>::Error> {
589
            bitstream.read_delta().map(|x| x as usize)
590
        }
591
    }
592

593
    impl<E: Endianness> BitSerializer<E, BitWriter<E>> for MyDessert<E>
594
    where
595
        BitWriter<E>: BitWrite<E> + CodesWrite<E>,
596
    {
597
        type SerType = usize;
598
        fn serialize(
599
            &self,
600
            value: &Self::SerType,
601
            bitstream: &mut BitWriter<E>,
602
        ) -> Result<usize, <BitWriter<E> as BitWrite<E>>::Error> {
603
            bitstream.write_delta(*value as u64)
604
        }
605
    }
606

607
    #[test]
608
    fn test_sort_pairs() -> anyhow::Result<()> {
609
        use tempfile::Builder;
610

611
        let dir = Builder::new().prefix("test_sort_pairs_").tempdir()?;
612
        let mut sp = SortPairs::new_labeled(
613
            MemoryUsage::BatchSize(10),
614
            dir.path(),
615
            GapsCodec::<BE, MyDessert<BE>, MyDessert<BE>>::default(),
616
        )?;
617

618
        let n = 25;
619
        for i in 0..n {
620
            sp.push_labeled(i, i + 1, i + 2)?;
621
        }
622
        let mut iter = sp.iter()?;
623
        let mut cloned = iter.clone();
624

625
        for _ in 0..n {
626
            let ((x, y), p) = iter.next().unwrap();
627
            println!("{} {} {}", x, y, p);
628
            assert_eq!(x + 1, y);
629
            assert_eq!(x + 2, p);
630
        }
631

632
        for _ in 0..n {
633
            let ((x, y), p) = cloned.next().unwrap();
634
            println!("{} {} {}", x, y, p);
635
            assert_eq!(x + 1, y);
636
            assert_eq!(x + 2, p);
637
        }
638
        Ok(())
639
    }
640

641
    #[test]
642
    fn test_sort_and_sort_labeled() -> anyhow::Result<()> {
643
        use tempfile::Builder;
644

645
        // Test unlabeled sort
646
        let dir = Builder::new().prefix("test_sort_").tempdir()?;
647
        let mut sp = SortPairs::new(MemoryUsage::BatchSize(10), dir.path())?;
648

649
        let pairs = vec![(3, 4), (1, 2), (5, 6), (0, 1), (2, 3)];
650
        let iter = sp.sort(pairs)?;
651

652
        let mut sorted_pairs = Vec::new();
653
        for ((x, y), _) in iter {
654
            sorted_pairs.push((x, y));
655
        }
656
        assert_eq!(sorted_pairs, vec![(0, 1), (1, 2), (2, 3), (3, 4), (5, 6)]);
657

658
        // Test labeled sort
659
        let dir2 = Builder::new().prefix("test_sort_labeled_").tempdir()?;
660
        let mut sp2 = SortPairs::new_labeled(
661
            MemoryUsage::BatchSize(5),
662
            dir2.path(),
663
            GapsCodec::<BE, MyDessert<BE>, MyDessert<BE>>::default(),
664
        )?;
665

666
        let labeled_pairs = vec![
667
            ((3, 4), 7),
668
            ((1, 2), 5),
669
            ((5, 6), 9),
670
            ((0, 1), 4),
671
            ((2, 3), 6),
672
        ];
673
        let iter2 = sp2.sort_labeled(labeled_pairs)?;
674

675
        let mut sorted_labeled = Vec::new();
676
        for ((x, y), label) in iter2 {
677
            sorted_labeled.push((x, y, label));
678
        }
679
        assert_eq!(
680
            sorted_labeled,
681
            vec![(0, 1, 4), (1, 2, 5), (2, 3, 6), (3, 4, 7), (5, 6, 9)]
682
        );
683

684
        Ok(())
685
    }
686
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc