• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vigna / webgraph-rs / 18573498435

16 Oct 2025 08:11PM UTC coverage: 48.064% (+0.04%) from 48.029%
18573498435

push

github

vigna
Docs consistency fix; minimal README.md for CLI and algo

8 of 12 new or added lines in 3 files covered. (66.67%)

433 existing lines in 14 files now uncovered.

3972 of 8264 relevant lines covered (48.06%)

22128303.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/webgraph/src/utils/par_sort_graph.rs
1
/*
2
 * SPDX-FileCopyrightText: 2025 Inria
3
 * SPDX-FileCopyrightText: 2025 Sebastiano Vigna
4
 *
5
 * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
6
 */
7

8
#![allow(clippy::type_complexity)]
9

10
//! Facilities to sort in parallel externally pairs of nodes with an associated
11
//! label returned by sequence of iterators.
12

13
use std::marker::PhantomData;
14
use std::num::NonZeroUsize;
15
use std::sync::Mutex;
16

17
use anyhow::{Context, Result};
18
use dsi_bitstream::traits::NE;
19
use dsi_progress_logger::{concurrent_progress_logger, ProgressLog};
20
use rayon::prelude::*;
21

22
use super::sort_pairs::{BatchIterator, BitReader, BitWriter, KMergeIters, Triple};
23
use super::MemoryUsage;
24
use crate::traits::{BitDeserializer, BitSerializer};
25
use crate::utils::SplitIters;
26

27
/// Takes a parallel iterator of pairs as input, and returns them into a vector
28
/// of sorted iterators (which can be flattened into a single iterator),
29
/// suitable for
30
/// [`BvComp::parallel_iter`](crate::graphs::bvgraph::BvComp::parallel_iter).
31
///
32
/// Note that batches will be memory-mapped. If you encounter OS-level errors
33
/// using this class (e.g., `ENOMEM: Out of memory` under Linux), please review
34
/// the limitations of your OS regarding memory-mapping (e.g.,
35
/// `/proc/sys/vm/max_map_count` under Linux).
36
///
37
/// ```
38
/// use std::num::NonZeroUsize;
39
///
40
/// use dsi_bitstream::traits::BigEndian;
41
/// use rayon::prelude::*;
42
/// use webgraph::prelude::*;
43
/// use webgraph::graphs::bvgraph::{BvComp, CompFlags};
44
/// use webgraph::traits::{SequentialLabeling, SplitLabeling};
45
/// use webgraph::utils::par_sort_graph::ParSortGraph;
46
///
47
/// // Build a small VecGraph
48
/// let g = VecGraph::from_arcs([
49
///     (0, 4),
50
///     (1, 0),
51
///     (1, 3),
52
///     (2, 1),
53
///     (3, 2),
54
/// ]);
55
///
56
/// let num_nodes = g.num_nodes();
57
/// let num_partitions = 2;
58
///
59
/// // Split the graph into lenders and convert each to pairs
60
/// let pairs: Vec<_> = g
61
///     .split_iter(num_partitions)
62
///     .into_iter()
63
///     .map(|(_start_node, lender)| lender.into_pairs())
64
///     .collect();
65
///
66
/// // Sort the pairs using ParSortGraph
67
/// let pair_sorter = ParSortGraph::new(num_nodes)?
68
///     .num_partitions(NonZeroUsize::new(num_partitions).unwrap());
69
///
70
/// let sorted = pair_sorter.sort(pairs)?;
71
///
72
/// // Convert to (node, lender) pairs using From trait
73
/// let pairs: Vec<_> = sorted.into();
74
///
75
/// // Compress in parallel using BvComp::parallel_iter
76
/// let bvcomp_tmp_dir = tempfile::tempdir()?;
77
/// let bvcomp_out_dir = tempfile::tempdir()?;
78
///
79
/// BvComp::parallel_iter::<BigEndian, _>(
80
///     &bvcomp_out_dir.path().join("graph"),
81
///     pairs.into_iter(),
82
///     num_nodes,
83
///     CompFlags::default(),
84
///     &rayon::ThreadPoolBuilder::default().build()?,
85
///     bvcomp_tmp_dir.path(),
86
/// )?;
87
/// # Ok::<(), Box<dyn std::error::Error>>(())
88
/// ```
89
pub struct ParSortGraph<L = ()> {
90
    num_nodes: usize,
91
    expected_num_pairs: Option<usize>,
92
    num_partitions: NonZeroUsize,
93
    memory_usage: MemoryUsage,
94
    marker: PhantomData<L>,
95
}
96

97
impl ParSortGraph<()> {
98
    /// See [`try_sort`](ParSortGraph::try_sort).
UNCOV
99
    pub fn sort(
×
100
        &self,
101
        pairs: impl IntoIterator<
102
            Item: IntoIterator<Item = (usize, usize), IntoIter: Send> + Send,
103
            IntoIter: ExactSizeIterator,
104
        >,
105
    ) -> Result<SplitIters<impl IntoIterator<Item = (usize, usize), IntoIter: Send + Sync>>> {
106
        self.try_sort::<std::convert::Infallible>(pairs)
×
107
    }
108

109
    /// Sorts the output of the provided parallel iterator,
110
    /// returning a [`SplitIters`] structure.
UNCOV
111
    pub fn try_sort<E: Into<anyhow::Error>>(
×
112
        &self,
113
        pairs: impl IntoIterator<
114
            Item: IntoIterator<Item = (usize, usize), IntoIter: Send> + Send,
115
            IntoIter: ExactSizeIterator,
116
        >,
117
    ) -> Result<SplitIters<impl IntoIterator<Item = (usize, usize), IntoIter: Send + Sync>>> {
118
        let split = <ParSortGraph<()>>::try_sort_labeled::<(), (), E>(
UNCOV
119
            self,
×
UNCOV
120
            &(),
×
121
            (),
UNCOV
122
            pairs
×
UNCOV
123
                .into_iter()
×
UNCOV
124
                .map(|iter| iter.into_iter().map(|(src, dst)| (src, dst, ()))),
×
125
        )?;
126

127
        let iters_without_labels: Vec<_> = split
×
UNCOV
128
            .iters
×
129
            .into_vec()
130
            .into_iter()
131
            .map(|iter| iter.into_iter().map(|(src, dst, ())| (src, dst)))
×
132
            .collect();
133

134
        Ok(SplitIters::new(
×
135
            split.boundaries,
×
UNCOV
136
            iters_without_labels.into_boxed_slice(),
×
137
        ))
138
    }
139
}
140

141
impl<L> ParSortGraph<L> {
142
    pub fn new(num_nodes: usize) -> Result<Self> {
×
143
        Ok(Self {
×
UNCOV
144
            num_nodes,
×
UNCOV
145
            expected_num_pairs: None,
×
UNCOV
146
            num_partitions: NonZeroUsize::new(num_cpus::get()).context("zero CPUs")?,
×
UNCOV
147
            memory_usage: MemoryUsage::default(),
×
UNCOV
148
            marker: PhantomData,
×
149
        })
150
    }
151

152
    /// Approximate number of pairs to be sorted.
153
    ///
154
    /// Used only for progress reporting.
155
    pub fn expected_num_pairs(self, expected_num_pairs: usize) -> Self {
×
156
        Self {
UNCOV
157
            expected_num_pairs: Some(expected_num_pairs),
×
158
            ..self
159
        }
160
    }
161

162
    /// How many partitions to split the nodes into.
163
    ///
164
    /// Defaults to `num_cpus::get()`.
UNCOV
165
    pub fn num_partitions(self, num_partitions: NonZeroUsize) -> Self {
×
166
        Self {
167
            num_partitions,
168
            ..self
169
        }
170
    }
171

172
    /// How much memory to use for in-memory sorts.
173
    ///
174
    /// Larger values yield faster merges (by reducing logarithmically the
175
    /// number of batches to merge) but consume linearly more memory. We suggest
176
    /// to set this parameter as large as possible, depending on the available
177
    /// memory.
UNCOV
178
    pub fn memory_usage(self, memory_usage: MemoryUsage) -> Self {
×
179
        Self {
180
            memory_usage,
181
            ..self
182
        }
183
    }
184

185
    /// See [`try_sort_labeled`](ParSortGraph::try_sort_labeled).
186
    ///
187
    /// This is a convenience method for parallel iterators that cannot fail.
UNCOV
188
    pub fn sort_labeled<S, D>(
×
189
        &self,
190
        serializer: &S,
191
        deserializer: D,
192
        pairs: impl IntoIterator<
193
            Item: IntoIterator<Item = (usize, usize, L), IntoIter: Send> + Send,
194
            IntoIter: ExactSizeIterator,
195
        >,
196
    ) -> Result<
197
        SplitIters<
198
            impl IntoIterator<
199
                Item = (
200
                    usize,
201
                    usize,
202
                    <D as BitDeserializer<NE, BitReader>>::DeserType,
203
                ),
204
                IntoIter: Send + Sync,
205
            >,
206
        >,
207
    >
208
    where
209
        L: Copy + Send + Sync,
210
        S: Sync + BitSerializer<NE, BitWriter, SerType = L>,
211
        D: Clone + Send + Sync + BitDeserializer<NE, BitReader, DeserType: Copy + Send + Sync>,
212
    {
UNCOV
213
        self.try_sort_labeled::<S, D, std::convert::Infallible>(serializer, deserializer, pairs)
×
214
    }
215

216
    /// Sorts the output of the provided parallel iterator,
217
    /// returning a [`SplitIters`] structure.
218
    ///
219
    /// This  method accept as type parameter a [`BitSerializer`] and a
220
    /// [`BitDeserializer`] that are used to serialize and deserialize the labels.
221
    ///
222
    /// The bit deserializer must be [`Clone`] because we need one for each
223
    /// [`BatchIterator`], and there are possible scenarios in which the
224
    /// deserializer might be stateful.
UNCOV
225
    pub fn try_sort_labeled<S, D, E: Into<anyhow::Error>>(
×
226
        &self,
227
        serializer: &S,
228
        deserializer: D,
229
        pairs: impl IntoIterator<
230
            Item: IntoIterator<Item = (usize, usize, L), IntoIter: Send> + Send,
231
            IntoIter: ExactSizeIterator,
232
        >,
233
    ) -> Result<
234
        SplitIters<
235
            impl IntoIterator<
236
                Item = (
237
                    usize,
238
                    usize,
239
                    <D as BitDeserializer<NE, BitReader>>::DeserType,
240
                ),
241
                IntoIter: Send + Sync,
242
            >,
243
        >,
244
    >
245
    where
246
        L: Copy + Send + Sync,
247
        S: Sync + BitSerializer<NE, BitWriter, SerType = L>,
248
        D: Clone + Send + Sync + BitDeserializer<NE, BitReader, DeserType: Copy + Send + Sync>,
249
    {
UNCOV
250
        let unsorted_pairs = pairs;
×
251

UNCOV
252
        let num_partitions = self.num_partitions.into();
×
UNCOV
253
        let num_buffers = rayon::current_num_threads() * num_partitions;
×
UNCOV
254
        let batch_size = self
×
UNCOV
255
            .memory_usage
×
256
            .batch_size::<Triple<L>>()
257
            .div_ceil(num_buffers);
×
UNCOV
258
        let num_nodes_per_partition = self.num_nodes.div_ceil(num_partitions);
×
259

260
        let mut pl = concurrent_progress_logger!(
×
261
            display_memory = true,
×
262
            item_name = "pair",
×
UNCOV
263
            local_speed = true,
×
264
            expected_updates = self.expected_num_pairs,
×
265
        );
UNCOV
266
        pl.start("Reading and sorting pairs");
×
267
        pl.info(format_args!("Per-processor batch size: {}", batch_size));
×
268

269
        let presort_tmp_dir =
×
270
            tempfile::tempdir().context("Could not create temporary directory")?;
×
271

UNCOV
272
        let unsorted_pairs = unsorted_pairs.into_iter();
×
273
        let num_blocks = unsorted_pairs.len();
×
274

UNCOV
275
        let partitioned_presorted_pairs = Mutex::new(vec![Vec::new(); num_blocks]);
×
276

277
        std::thread::scope(|s| {
×
UNCOV
278
            let partitioned_presorted_pairs = &partitioned_presorted_pairs;
×
279
            let presort_tmp_dir = &presort_tmp_dir;
×
280
            for (block_id, pair) in unsorted_pairs.enumerate() {
×
UNCOV
281
                let deserializer = deserializer.clone();
×
282
                let mut pl = pl.clone();
×
UNCOV
283
                s.spawn(move || {
×
284
                    let mut unsorted_buffers = (0..num_partitions)
×
285
                        .map(|_| Vec::with_capacity(batch_size))
×
286
                        .collect::<Vec<_>>();
×
287
                    let mut sorted_pairs =
×
288
                        (0..num_partitions).map(|_| Vec::new()).collect::<Vec<_>>();
×
289

290
                    for (src, dst, label) in pair {
×
291
                        let partition_id = src / num_nodes_per_partition;
×
292

293
                        let sorted_pairs = &mut sorted_pairs[partition_id];
×
294
                        let buf = &mut unsorted_buffers[partition_id];
×
295
                        if buf.len() >= buf.capacity() {
×
UNCOV
296
                            let buf_len = buf.len();
×
297
                            super::par_sort_pairs::flush_buffer(
×
UNCOV
298
                                presort_tmp_dir.path(),
×
UNCOV
299
                                serializer,
×
UNCOV
300
                                deserializer.clone(),
×
UNCOV
301
                                block_id,
×
UNCOV
302
                                partition_id,
×
303
                                sorted_pairs,
×
UNCOV
304
                                buf,
×
305
                            )
306
                            .context("Could not flush buffer")
×
307
                            .unwrap();
×
308
                            assert!(buf.is_empty(), "flush_buffer did not empty the buffer");
×
309
                            pl.update_with_count(buf_len);
×
310
                        }
311

312
                        buf.push(Triple {
×
313
                            pair: [src, dst],
×
314
                            label,
×
315
                        });
316
                    }
317

318
                    for (partition_id, (pairs, mut buf)) in sorted_pairs
×
319
                        .iter_mut()
×
320
                        .zip(unsorted_buffers.into_iter())
×
321
                        .enumerate()
×
322
                    {
UNCOV
323
                        let buf_len = buf.len();
×
324
                        super::par_sort_pairs::flush_buffer(
×
325
                            presort_tmp_dir.path(),
×
326
                            serializer,
×
UNCOV
327
                            deserializer.clone(),
×
UNCOV
328
                            block_id,
×
UNCOV
329
                            partition_id,
×
330
                            pairs,
×
331
                            &mut buf,
×
332
                        )
333
                        .context("Could not flush buffer at the end")
×
UNCOV
334
                        .unwrap();
×
335
                        assert!(buf.is_empty(), "flush_buffer did not empty the buffer");
×
336
                        pl.update_with_count(buf_len);
×
337
                    }
338

339
                    // TODO: ugly
340
                    partitioned_presorted_pairs.lock().unwrap()[block_id] = sorted_pairs;
×
341
                });
342
            }
343
        });
344

345
        // At this point, the iterator could be collected into
346
        // {worker_id -> {partition_id -> [iterators]}}
347
        // ie. Vec<Vec<Vec<BatchIterator>>>>.
348
        //
349
        // Let's merge the {partition_id -> [iterators]} maps of each worker
UNCOV
350
        let partitioned_presorted_pairs = partitioned_presorted_pairs
×
351
            .into_inner()
352
            .unwrap()
353
            .into_par_iter()
354
            .reduce(
UNCOV
355
                || (0..num_partitions).map(|_| Vec::new()).collect(),
×
UNCOV
356
                |mut pair_partitions1: Vec<Vec<BatchIterator<D>>>,
×
UNCOV
357
                 pair_partitions2: Vec<Vec<BatchIterator<D>>>|
×
UNCOV
358
                 -> Vec<Vec<BatchIterator<D>>> {
×
UNCOV
359
                    assert_eq!(pair_partitions1.len(), num_partitions);
×
UNCOV
360
                    assert_eq!(pair_partitions2.len(), num_partitions);
×
UNCOV
361
                    for (partition1, partition2) in pair_partitions1
×
362
                        .iter_mut()
×
UNCOV
363
                        .zip(pair_partitions2.into_iter())
×
364
                    {
UNCOV
365
                        partition1.extend(partition2.into_iter());
×
366
                    }
367
                    pair_partitions1
×
368
                },
369
            );
370
        // At this point, the iterator was turned into
371
        // {partition_id -> [iterators]}
372
        // ie. Vec<Vec<BatchIterator>>>.
373
        pl.done();
×
374

375
        // Build boundaries array: [0, nodes_per_partition, 2*nodes_per_partition, ..., num_nodes]
UNCOV
376
        let boundaries: Vec<usize> = (0..=num_partitions)
×
377
            .map(|i| (i * num_nodes_per_partition).min(self.num_nodes))
×
378
            .collect();
379

380
        // Build iterators array
UNCOV
381
        let iters: Vec<_> = partitioned_presorted_pairs
×
382
            .into_iter()
UNCOV
383
            .map(|partition| {
×
384
                // 'partition' contains N iterators that are not sorted with respect to each other.
385
                // We merge them and turn them into a single sorted iterator.
UNCOV
386
                KMergeIters::new(partition)
×
387
            })
388
            .collect();
389

UNCOV
390
        Ok(SplitIters::new(
×
UNCOV
391
            boundaries.into_boxed_slice(),
×
UNCOV
392
            iters.into_boxed_slice(),
×
393
        ))
394
    }
395
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc