• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vigna / webgraph-rs / 18575567968

16 Oct 2025 09:40PM UTC coverage: 48.064%. Remained the same
18575567968

push

github

vigna
Changed name, fixed doc test

0 of 2 new or added lines in 1 file covered. (0.0%)

2 existing lines in 2 files now uncovered.

3972 of 8264 relevant lines covered (48.06%)

22122959.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/webgraph/src/utils/par_sort_graph.rs
1
/*
2
 * SPDX-FileCopyrightText: 2025 Inria
3
 * SPDX-FileCopyrightText: 2025 Sebastiano Vigna
4
 *
5
 * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
6
 */
7

8
#![allow(clippy::type_complexity)]
9

10
//! Facilities to sort in parallel externally (labelled) pairs of nodes
11
//! returned by a sequence of iterators.
12
//!
13
//! The typical use of [`ParSortPairs`] is to sort pairs of nodes with an
14
//! associated label representing a graph; the resulting [`SplitIters`]
15
//! structure can be then used to build a compressed representation of the graph
16
//! using, for example,
17
//! [`BvComp::parallel_iter`](crate::graphs::bvgraph::BvComp::parallel_iter).
18
//!
19
//! If your pairs are emitted by a single parallel iterator, consider using
20
//! [`ParSortPairs`](crate::utils::par_sort_pairs::ParSortPairs) instead.
21

22
use std::marker::PhantomData;
23
use std::num::NonZeroUsize;
24
use std::sync::Mutex;
25

26
use anyhow::{Context, Result};
27
use dsi_bitstream::traits::NE;
28
use dsi_progress_logger::{concurrent_progress_logger, ProgressLog};
29
use rayon::prelude::*;
30

31
use super::sort_pairs::{BatchIterator, BitReader, BitWriter, KMergeIters, Triple};
32
use super::MemoryUsage;
33
use crate::traits::{BitDeserializer, BitSerializer};
34
use crate::utils::SplitIters;
35

36
/// Takes a sequence of iterators of (labelled)pairs as input, and turns them
37
/// into [`SplitIters`] structure which is suitable for
38
/// [`BvComp::parallel_iter`](crate::graphs::bvgraph::BvComp::parallel_iter).
39
///
40
/// Note that batches will be memory-mapped. If you encounter OS-level errors
41
/// using this class (e.g., `ENOMEM: Out of memory` under Linux), please review
42
/// the limitations of your OS regarding memory-mapping (e.g.,
43
/// `/proc/sys/vm/max_map_count` under Linux).
44
///
45
/// # Examples
46
///
47
/// In this example we transpose a graph in parallel by splitting it, exchanging
48
/// the source and destination of each arc, sorting the resulting pairs in
49
/// parallel using [`ParSortIters`], and then compressing the result using
50
/// [`BvComp::parallel_iter`](crate::graphs::bvgraph::BvComp::parallel_iter):
51
///
52
/// ```
53
/// use std::num::NonZeroUsize;
54
///
55
/// use dsi_bitstream::traits::BigEndian;
56
/// use rayon::prelude::*;
57
/// use webgraph::prelude::*;
58
/// use webgraph::graphs::bvgraph::{BvComp, CompFlags};
59
/// use webgraph::traits::{SequentialLabeling, SplitLabeling};
60
/// use webgraph::utils::par_sort_graph::ParSortIters;
61
///
62
/// // Build a small VecGraph
63
/// let g = VecGraph::from_arcs([
64
///     (0, 4),
65
///     (1, 0),
66
///     (1, 3),
67
///     (2, 1),
68
///     (3, 2),
69
/// ]);
70
///
71
/// let num_nodes = g.num_nodes();
72
/// let num_partitions = 2;
73
///
74
/// // Split the graph into lenders and convert each to pairs
75
/// let pairs: Vec<_> = g
76
///     .split_iter(num_partitions)
77
///     .into_iter()
78
///     .map(|(_start_node, lender)| lender.into_pairs().map(|(src, dst)| (dst, src)))
79
///     .collect();
80
///
81
/// // Sort the pairs using ParSortIters
82
/// let pair_sorter = ParSortIters::new(num_nodes)?
83
///     .num_partitions(NonZeroUsize::new(num_partitions).unwrap());
84
///
85
/// let sorted = pair_sorter.sort(pairs)?;
86
///
87
/// // Convert to (node, lender) pairs using From trait
88
/// let pairs: Vec<_> = sorted.into();
89
///
90
/// // Compress in parallel using BvComp::parallel_iter
91
/// let bvcomp_tmp_dir = tempfile::tempdir()?;
92
/// let bvcomp_out_dir = tempfile::tempdir()?;
93
///
94
/// BvComp::parallel_iter::<BigEndian, _>(
95
///     &bvcomp_out_dir.path().join("graph"),
96
///     pairs.into_iter(),
97
///     num_nodes,
98
///     CompFlags::default(),
99
///     &rayon::ThreadPoolBuilder::default().build()?,
100
///     bvcomp_tmp_dir.path(),
101
/// )?;
102
/// # Ok::<(), Box<dyn std::error::Error>>(())
103
/// ```
104
pub struct ParSortIters<L = ()> {
105
    num_nodes: usize,
106
    expected_num_pairs: Option<usize>,
107
    num_partitions: NonZeroUsize,
108
    memory_usage: MemoryUsage,
109
    marker: PhantomData<L>,
110
}
111

112
impl ParSortIters<()> {
113
    /// See [`try_sort`](ParSortIters::try_sort).
UNCOV
114
    pub fn sort(
×
115
        &self,
116
        pairs: impl IntoIterator<
117
            Item: IntoIterator<Item = (usize, usize), IntoIter: Send> + Send,
118
            IntoIter: ExactSizeIterator,
119
        >,
120
    ) -> Result<SplitIters<impl IntoIterator<Item = (usize, usize), IntoIter: Send + Sync>>> {
121
        self.try_sort::<std::convert::Infallible>(pairs)
×
122
    }
123

124
    /// Sorts the output of the provided parallel iterator,
125
    /// returning a [`SplitIters`] structure.
126
    pub fn try_sort<E: Into<anyhow::Error>>(
×
127
        &self,
128
        pairs: impl IntoIterator<
129
            Item: IntoIterator<Item = (usize, usize), IntoIter: Send> + Send,
130
            IntoIter: ExactSizeIterator,
131
        >,
132
    ) -> Result<SplitIters<impl IntoIterator<Item = (usize, usize), IntoIter: Send + Sync>>> {
133
        let split = <ParSortIters<()>>::try_sort_labeled::<(), (), E>(
134
            self,
×
135
            &(),
×
136
            (),
137
            pairs
×
138
                .into_iter()
×
139
                .map(|iter| iter.into_iter().map(|(src, dst)| (src, dst, ()))),
×
140
        )?;
141

142
        let iters_without_labels: Vec<_> = split
×
143
            .iters
×
144
            .into_vec()
145
            .into_iter()
146
            .map(|iter| iter.into_iter().map(|(src, dst, ())| (src, dst)))
×
147
            .collect();
148

149
        Ok(SplitIters::new(
×
150
            split.boundaries,
×
151
            iters_without_labels.into_boxed_slice(),
×
152
        ))
153
    }
154
}
155

156
impl<L> ParSortIters<L> {
157
    pub fn new(num_nodes: usize) -> Result<Self> {
×
158
        Ok(Self {
×
159
            num_nodes,
×
160
            expected_num_pairs: None,
×
161
            num_partitions: NonZeroUsize::new(num_cpus::get()).context("zero CPUs")?,
×
162
            memory_usage: MemoryUsage::default(),
×
163
            marker: PhantomData,
×
164
        })
165
    }
166

167
    /// Approximate number of pairs to be sorted.
168
    ///
169
    /// Used only for progress reporting.
170
    pub fn expected_num_pairs(self, expected_num_pairs: usize) -> Self {
×
171
        Self {
172
            expected_num_pairs: Some(expected_num_pairs),
×
173
            ..self
174
        }
175
    }
176

177
    /// How many partitions to split the nodes into.
178
    ///
179
    /// Defaults to `num_cpus::get()`.
180
    pub fn num_partitions(self, num_partitions: NonZeroUsize) -> Self {
×
181
        Self {
182
            num_partitions,
183
            ..self
184
        }
185
    }
186

187
    /// How much memory to use for in-memory sorts.
188
    ///
189
    /// Larger values yield faster merges (by reducing logarithmically the
190
    /// number of batches to merge) but consume linearly more memory. We suggest
191
    /// to set this parameter as large as possible, depending on the available
192
    /// memory.
193
    pub fn memory_usage(self, memory_usage: MemoryUsage) -> Self {
×
194
        Self {
195
            memory_usage,
196
            ..self
197
        }
198
    }
199

200
    /// See [`try_sort_labeled`](ParSortIters::try_sort_labeled).
201
    ///
202
    /// This is a convenience method for parallel iterators that cannot fail.
203
    pub fn sort_labeled<S, D>(
×
204
        &self,
205
        serializer: &S,
206
        deserializer: D,
207
        pairs: impl IntoIterator<
208
            Item: IntoIterator<Item = (usize, usize, L), IntoIter: Send> + Send,
209
            IntoIter: ExactSizeIterator,
210
        >,
211
    ) -> Result<
212
        SplitIters<
213
            impl IntoIterator<
214
                Item = (
215
                    usize,
216
                    usize,
217
                    <D as BitDeserializer<NE, BitReader>>::DeserType,
218
                ),
219
                IntoIter: Send + Sync,
220
            >,
221
        >,
222
    >
223
    where
224
        L: Copy + Send + Sync,
225
        S: Sync + BitSerializer<NE, BitWriter, SerType = L>,
226
        D: Clone + Send + Sync + BitDeserializer<NE, BitReader, DeserType: Copy + Send + Sync>,
227
    {
228
        self.try_sort_labeled::<S, D, std::convert::Infallible>(serializer, deserializer, pairs)
×
229
    }
230

231
    /// Sorts the output of the provided parallel iterator,
232
    /// returning a [`SplitIters`] structure.
233
    ///
234
    /// This  method accept as type parameter a [`BitSerializer`] and a
235
    /// [`BitDeserializer`] that are used to serialize and deserialize the labels.
236
    ///
237
    /// The bit deserializer must be [`Clone`] because we need one for each
238
    /// [`BatchIterator`], and there are possible scenarios in which the
239
    /// deserializer might be stateful.
240
    pub fn try_sort_labeled<S, D, E: Into<anyhow::Error>>(
×
241
        &self,
242
        serializer: &S,
243
        deserializer: D,
244
        pairs: impl IntoIterator<
245
            Item: IntoIterator<Item = (usize, usize, L), IntoIter: Send> + Send,
246
            IntoIter: ExactSizeIterator,
247
        >,
248
    ) -> Result<
249
        SplitIters<
250
            impl IntoIterator<
251
                Item = (
252
                    usize,
253
                    usize,
254
                    <D as BitDeserializer<NE, BitReader>>::DeserType,
255
                ),
256
                IntoIter: Send + Sync,
257
            >,
258
        >,
259
    >
260
    where
261
        L: Copy + Send + Sync,
262
        S: Sync + BitSerializer<NE, BitWriter, SerType = L>,
263
        D: Clone + Send + Sync + BitDeserializer<NE, BitReader, DeserType: Copy + Send + Sync>,
264
    {
265
        let unsorted_pairs = pairs;
×
266

267
        let num_partitions = self.num_partitions.into();
×
268
        let num_buffers = rayon::current_num_threads() * num_partitions;
×
269
        let batch_size = self
×
270
            .memory_usage
×
271
            .batch_size::<Triple<L>>()
272
            .div_ceil(num_buffers);
×
273
        let num_nodes_per_partition = self.num_nodes.div_ceil(num_partitions);
×
274

275
        let mut pl = concurrent_progress_logger!(
×
276
            display_memory = true,
×
277
            item_name = "pair",
×
278
            local_speed = true,
×
279
            expected_updates = self.expected_num_pairs,
×
280
        );
281
        pl.start("Reading and sorting pairs");
×
282
        pl.info(format_args!("Per-processor batch size: {}", batch_size));
×
283

284
        let presort_tmp_dir =
×
285
            tempfile::tempdir().context("Could not create temporary directory")?;
×
286

287
        let unsorted_pairs = unsorted_pairs.into_iter();
×
288
        let num_blocks = unsorted_pairs.len();
×
289

290
        let partitioned_presorted_pairs = Mutex::new(vec![Vec::new(); num_blocks]);
×
291

292
        std::thread::scope(|s| {
×
293
            let partitioned_presorted_pairs = &partitioned_presorted_pairs;
×
294
            let presort_tmp_dir = &presort_tmp_dir;
×
295
            for (block_id, pair) in unsorted_pairs.enumerate() {
×
296
                let deserializer = deserializer.clone();
×
297
                let mut pl = pl.clone();
×
298
                s.spawn(move || {
×
299
                    let mut unsorted_buffers = (0..num_partitions)
×
300
                        .map(|_| Vec::with_capacity(batch_size))
×
301
                        .collect::<Vec<_>>();
×
302
                    let mut sorted_pairs =
×
303
                        (0..num_partitions).map(|_| Vec::new()).collect::<Vec<_>>();
×
304

305
                    for (src, dst, label) in pair {
×
306
                        let partition_id = src / num_nodes_per_partition;
×
307

308
                        let sorted_pairs = &mut sorted_pairs[partition_id];
×
309
                        let buf = &mut unsorted_buffers[partition_id];
×
310
                        if buf.len() >= buf.capacity() {
×
311
                            let buf_len = buf.len();
×
312
                            super::par_sort_pairs::flush_buffer(
×
313
                                presort_tmp_dir.path(),
×
314
                                serializer,
×
315
                                deserializer.clone(),
×
316
                                block_id,
×
317
                                partition_id,
×
318
                                sorted_pairs,
×
319
                                buf,
×
320
                            )
321
                            .context("Could not flush buffer")
×
322
                            .unwrap();
×
323
                            assert!(buf.is_empty(), "flush_buffer did not empty the buffer");
×
324
                            pl.update_with_count(buf_len);
×
325
                        }
326

327
                        buf.push(Triple {
×
328
                            pair: [src, dst],
×
329
                            label,
×
330
                        });
331
                    }
332

333
                    for (partition_id, (pairs, mut buf)) in sorted_pairs
×
334
                        .iter_mut()
×
335
                        .zip(unsorted_buffers.into_iter())
×
336
                        .enumerate()
×
337
                    {
338
                        let buf_len = buf.len();
×
339
                        super::par_sort_pairs::flush_buffer(
×
340
                            presort_tmp_dir.path(),
×
341
                            serializer,
×
342
                            deserializer.clone(),
×
343
                            block_id,
×
344
                            partition_id,
×
345
                            pairs,
×
346
                            &mut buf,
×
347
                        )
348
                        .context("Could not flush buffer at the end")
×
349
                        .unwrap();
×
350
                        assert!(buf.is_empty(), "flush_buffer did not empty the buffer");
×
351
                        pl.update_with_count(buf_len);
×
352
                    }
353

354
                    // TODO: ugly
355
                    partitioned_presorted_pairs.lock().unwrap()[block_id] = sorted_pairs;
×
356
                });
357
            }
358
        });
359

360
        // At this point, the iterator could be collected into
361
        // {worker_id -> {partition_id -> [iterators]}}
362
        // ie. Vec<Vec<Vec<BatchIterator>>>>.
363
        //
364
        // Let's merge the {partition_id -> [iterators]} maps of each worker
365
        let partitioned_presorted_pairs = partitioned_presorted_pairs
×
366
            .into_inner()
367
            .unwrap()
368
            .into_par_iter()
369
            .reduce(
370
                || (0..num_partitions).map(|_| Vec::new()).collect(),
×
371
                |mut pair_partitions1: Vec<Vec<BatchIterator<D>>>,
×
372
                 pair_partitions2: Vec<Vec<BatchIterator<D>>>|
×
373
                 -> Vec<Vec<BatchIterator<D>>> {
×
374
                    assert_eq!(pair_partitions1.len(), num_partitions);
×
375
                    assert_eq!(pair_partitions2.len(), num_partitions);
×
376
                    for (partition1, partition2) in pair_partitions1
×
377
                        .iter_mut()
×
378
                        .zip(pair_partitions2.into_iter())
×
379
                    {
380
                        partition1.extend(partition2.into_iter());
×
381
                    }
382
                    pair_partitions1
×
383
                },
384
            );
385
        // At this point, the iterator was turned into
386
        // {partition_id -> [iterators]}
387
        // ie. Vec<Vec<BatchIterator>>>.
388
        pl.done();
×
389

390
        // Build boundaries array: [0, nodes_per_partition, 2*nodes_per_partition, ..., num_nodes]
391
        let boundaries: Vec<usize> = (0..=num_partitions)
×
392
            .map(|i| (i * num_nodes_per_partition).min(self.num_nodes))
×
393
            .collect();
394

395
        // Build iterators array
396
        let iters: Vec<_> = partitioned_presorted_pairs
×
397
            .into_iter()
398
            .map(|partition| {
×
399
                // 'partition' contains N iterators that are not sorted with respect to each other.
400
                // We merge them and turn them into a single sorted iterator.
401
                KMergeIters::new(partition)
×
402
            })
403
            .collect();
404

405
        Ok(SplitIters::new(
×
406
            boundaries.into_boxed_slice(),
×
407
            iters.into_boxed_slice(),
×
408
        ))
409
    }
410
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc