• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vigna / webgraph-rs / 18414008817

10 Oct 2025 05:37PM UTC coverage: 47.835% (-0.1%) from 47.965%
18414008817

push

github

vigna
Unlabeled methods

3955 of 8268 relevant lines covered (47.84%)

22056616.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/webgraph/src/utils/par_sort_graph.rs
1
/*
2
 * SPDX-FileCopyrightText: 2025 Inria
3
 *
4
 * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
5
 */
6

7
#![allow(clippy::type_complexity)]
8

9
//! Facilities to sort in parallel externally pairs of nodes with an associated
10
//! label returned by a [`ParallelIterator`].
11

12
use std::marker::PhantomData;
13
use std::num::NonZeroUsize;
14
use std::path::Path;
15
use std::sync::Mutex;
16

17
use anyhow::{ensure, Context, Result};
18
use dsi_bitstream::traits::NE;
19
use dsi_progress_logger::{concurrent_progress_logger, ProgressLog};
20
use rayon::prelude::*;
21
use rdst::RadixSort;
22

23
use super::sort_pairs::{BatchIterator, BitReader, BitWriter, KMergeIters, Triple};
24
use super::MemoryUsage;
25
use crate::traits::{BitDeserializer, BitSerializer};
26

27
/// Takes a parallel iterator of pairs as input, and returns them into a vector
28
/// of sorted iterators (which can be flattened into a single iterator),
29
/// suitable for
30
/// [`BvComp::parallel_iter`](crate::graphs::bvgraph::BvComp::parallel_iter).
31
///
32
/// Note that batches will be memory-mapped. If you encounter OS-level errors
33
/// using this class (e.g., `ENOMEM: Out of memory` under Linux), please review
34
/// the limitations of your OS regarding memory-mapping (e.g.,
35
/// `/proc/sys/vm/max_map_count` under Linux).
36
///
37
/// ```ignore TODO
38
/// use std::num::NonZeroUsize;
39
///
40
/// use dsi_bitstream::traits::BigEndian;
41
/// use lender::Lender;
42
/// use rayon::prelude::*;
43
/// use webgraph::traits::SequentialLabeling;
44
/// use webgraph::graphs::bvgraph::{BvComp, CompFlags};
45
/// use webgraph::graphs::arc_list_graph::Iter;
46
/// use webgraph::utils::par_sort_graph::ParSortGraph;
47
///
48
/// let num_partitions = 2;
49
/// let num_nodes: usize = 5;
50
/// let num_nodes_per_partition = num_nodes.div_ceil(num_partitions);
51
/// let unsorted_pairs = vec![(1, 3), (3, 2), (2, 1), (1, 0), (0, 4)];
52
///
53
/// let pair_sorter = ParSortGraph::new(num_nodes)?
54
///     .expected_num_pairs(unsorted_pairs.len())
55
///     .num_partitions(NonZeroUsize::new(num_partitions).unwrap());
56
///
57
/// assert_eq!(
58
///     pair_sorter.sort(
59
///         unsorted_pairs.par_iter().copied()
60
///     )?
61
///         .into_iter()
62
///         .map(|partition| partition.into_iter().collect::<Vec<_>>())
63
///         .collect::<Vec<_>>(),
64
///     vec![
65
///         vec![(0, 4), (1, 0), (1, 3), (2, 1)], // nodes 0, 1, and 2 are in partition 0
66
///         vec![(3, 2)], // nodes 3 and 4 are in partition 1
67
///     ],
68
/// );
69
///
70
/// let bvcomp_tmp_dir = tempfile::tempdir()?;
71
/// let bvcomp_out_dir = tempfile::tempdir()?;
72
///
73
/// BvComp::parallel_iter::<BigEndian, _>(
74
///     &bvcomp_out_dir.path().join("graph"),
75
///     pair_sorter.sort(
76
///         unsorted_pairs.par_iter().copied()
77
///     )?
78
///         .into_iter()
79
///         .into_iter()
80
///         .enumerate()
81
///         .map(|(partition_id, partition)| {
82
///             webgraph::prelude::LeftIterator(Iter::<(), _>::try_new_from(
83
///                 num_nodes_per_partition,
84
///                 partition.into_iter().map(|(src, dst)| (src, dst, ())),
85
///                 partition_id*num_nodes_per_partition,
86
///             ).unwrap())
87
///         }),
88
///     num_nodes,
89
///     CompFlags::default(),
90
///     &rayon::ThreadPoolBuilder::default().build()?,
91
///     bvcomp_tmp_dir.path(),
92
/// )?;
93
/// # Ok::<(), Box<dyn std::error::Error>>(())
94
/// ```
95
pub struct ParSortGraph<L = ()> {
96
    num_nodes: usize,
97
    expected_num_pairs: Option<usize>,
98
    num_partitions: NonZeroUsize,
99
    memory_usage: MemoryUsage,
100
    marker: PhantomData<L>,
101
}
102

103
impl ParSortGraph<()> {
104
    /// See [`try_sort`](ParSortGraph::try_sort).
105
    pub fn sort(
×
106
        &self,
107
        pairs: impl IntoIterator<
108
            Item: IntoIterator<Item = (usize, usize), IntoIter: Send> + Send,
109
            IntoIter: ExactSizeIterator,
110
        >,
111
    ) -> Result<Vec<impl IntoIterator<Item = (usize, usize), IntoIter: Send + Sync>>> {
112
        self.try_sort::<std::convert::Infallible>(pairs)
×
113
    }
114

115
    /// Sorts the output of the provided parallel iterator,
116
    /// returning a vector of sorted iterators, one per partition.
117
    pub fn try_sort<E: Into<anyhow::Error>>(
×
118
        &self,
119
        pairs: impl IntoIterator<
120
            Item: IntoIterator<Item = (usize, usize), IntoIter: Send> + Send,
121
            IntoIter: ExactSizeIterator,
122
        >,
123
    ) -> Result<Vec<impl IntoIterator<Item = (usize, usize), IntoIter: Send + Sync>>> {
124
        Ok(<ParSortGraph<()>>::try_sort_labeled::<(), (), E>(
×
125
            self,
×
126
            &(),
×
127
            (),
128
            pairs
×
129
                .into_iter()
×
130
                .map(|iter| iter.into_iter().map(|(src, dst)| (src, dst, ()))),
×
131
        )?
132
        .into_iter()
×
133
        .map(|iter| iter.into_iter().map(|(src, dst, ())| (src, dst)))
×
134
        .collect())
×
135
    }
136
}
137

138
impl<L> ParSortGraph<L> {
139
    pub fn new(num_nodes: usize) -> Result<Self> {
×
140
        Ok(Self {
×
141
            num_nodes,
×
142
            expected_num_pairs: None,
×
143
            num_partitions: NonZeroUsize::new(num_cpus::get()).context("zero CPUs")?,
×
144
            memory_usage: MemoryUsage::default(),
×
145
            marker: PhantomData,
×
146
        })
147
    }
148

149
    /// Approximate number of pairs to be sorted.
150
    ///
151
    /// Used only for progress reporting.
152
    pub fn expected_num_pairs(self, expected_num_pairs: usize) -> Self {
×
153
        Self {
154
            expected_num_pairs: Some(expected_num_pairs),
×
155
            ..self
156
        }
157
    }
158

159
    /// How many partitions to split the nodes into.
160
    ///
161
    /// Defaults to `num_cpus::get()`.
162
    pub fn num_partitions(self, num_partitions: NonZeroUsize) -> Self {
×
163
        Self {
164
            num_partitions,
165
            ..self
166
        }
167
    }
168

169
    /// How much memory to use for in-memory sorts.
170
    ///
171
    /// Using the `MemoryUsage::MemorySize` variant you will set the overall
172
    /// memory size. The batch size will be determined dividing the
173
    /// overall memory size by `num_partitions * num_threads`. This
174
    /// is usually the best option.
175
    ///
176
    /// Using the `MemoryUsage::BatchSize` variant you will
177
    /// set the exact size of each batch to sort in memory. The overall
178
    /// number of elements will be `batch_size * num_partitions * num_threads`.
179
    /// This option is useful for fine tuning the memory usage, in particular
180
    /// when the number of threads and partitions is known in advance.
181
    ///
182
    /// Larger values yield faster merges (by reducing logarithmically the
183
    /// number of batches to merge) but consume linearly more memory. We suggest
184
    /// to set this parameter as large as possible, depending on the available
185
    /// memory.
186
    pub fn memory_usage(self, memory_usage: MemoryUsage) -> Self {
×
187
        Self {
188
            memory_usage,
189
            ..self
190
        }
191
    }
192

193
    /// See [`try_sort_labeled`](ParSortGraph::try_sort_labeled).
194
    ///
195
    /// This is a convenience method for parallel iterators that cannot fail.
196
    pub fn sort_labeled<S, D>(
×
197
        &self,
198
        serializer: &S,
199
        deserializer: D,
200
        pairs: impl IntoIterator<
201
            Item: IntoIterator<Item = (usize, usize, L), IntoIter: Send> + Send,
202
            IntoIter: ExactSizeIterator,
203
        >,
204
    ) -> Result<
205
        Vec<
206
            impl IntoIterator<
207
                Item = (
208
                    usize,
209
                    usize,
210
                    <D as BitDeserializer<NE, BitReader>>::DeserType,
211
                ),
212
                IntoIter: Send + Sync,
213
            >,
214
        >,
215
    >
216
    where
217
        L: Copy + Send + Sync,
218
        S: Sync + BitSerializer<NE, BitWriter, SerType = L>,
219
        D: Clone + Send + Sync + BitDeserializer<NE, BitReader, DeserType: Copy + Send + Sync>,
220
    {
221
        self.try_sort_labeled::<S, D, std::convert::Infallible>(serializer, deserializer, pairs)
×
222
    }
223

224
    /// Sorts the output of the provided parallel iterator,
225
    /// returning a vector of sorted iterators, one per partition.
226
    ///
227
    /// This  method accept as type parameter a [`BitSerializer`] and a
228
    /// [`BitDeserializer`] that are used to serialize and deserialize the labels.
229
    ///
230
    /// The bit deserializer must be [`Clone`] because we need one for each
231
    /// [`BatchIterator`], and there are possible scenarios in which the
232
    /// deserializer might be stateful.
233
    pub fn try_sort_labeled<S, D, E: Into<anyhow::Error>>(
×
234
        &self,
235
        serializer: &S,
236
        deserializer: D,
237
        pairs: impl IntoIterator<
238
            Item: IntoIterator<Item = (usize, usize, L), IntoIter: Send> + Send,
239
            IntoIter: ExactSizeIterator,
240
        >,
241
    ) -> Result<
242
        Vec<
243
            impl IntoIterator<
244
                Item = (
245
                    usize,
246
                    usize,
247
                    <D as BitDeserializer<NE, BitReader>>::DeserType,
248
                ),
249
                IntoIter: Send + Sync,
250
            >,
251
        >,
252
    >
253
    where
254
        L: Copy + Send + Sync,
255
        S: Sync + BitSerializer<NE, BitWriter, SerType = L>,
256
        D: Clone + Send + Sync + BitDeserializer<NE, BitReader, DeserType: Copy + Send + Sync>,
257
    {
258
        let unsorted_pairs = pairs;
×
259

260
        let num_partitions = self.num_partitions.into();
×
261
        let batch_size = match self.memory_usage {
×
262
            MemoryUsage::MemorySize(num_bytes) => {
×
263
                let pair_size = size_of::<usize>() * 2 + size_of::<L>();
×
264
                dbg!(rayon::max_num_threads(), num_partitions, pair_size);
×
265
                let num_buffers = rayon::current_num_threads() * num_partitions;
×
266
                num_bytes / (pair_size * num_buffers)
×
267
            }
268
            MemoryUsage::BatchSize(batch_size) => batch_size,
×
269
        };
270
        let num_nodes_per_partition = self.num_nodes.div_ceil(num_partitions);
×
271

272
        let mut pl = concurrent_progress_logger!(
×
273
            display_memory = true,
×
274
            item_name = "pair",
×
275
            local_speed = true,
×
276
            expected_updates = self.expected_num_pairs,
×
277
        );
278
        pl.start("Reading and sorting pairs");
×
279
        pl.info(format_args!("Per-processor batch size: {}", batch_size));
×
280

281
        let presort_tmp_dir =
×
282
            tempfile::tempdir().context("Could not create temporary directory")?;
×
283

284
        let unsorted_pairs = unsorted_pairs.into_iter();
×
285
        let num_blocks = unsorted_pairs.len();
×
286

287
        let partitioned_presorted_pairs = Mutex::new(vec![Vec::new(); num_blocks]);
×
288

289
        std::thread::scope(|s| {
×
290
            let partitioned_presorted_pairs = &partitioned_presorted_pairs;
×
291
            let presort_tmp_dir = &presort_tmp_dir;
×
292
            for (block_id, pair) in unsorted_pairs.enumerate() {
×
293
                let deserializer = deserializer.clone();
×
294
                let mut pl = pl.clone();
×
295
                s.spawn(move || {
×
296
                    let mut unsorted_buffers = (0..num_partitions)
×
297
                        .map(|_| Vec::with_capacity(batch_size))
×
298
                        .collect::<Vec<_>>();
×
299
                    let mut sorted_pairs =
×
300
                        (0..num_partitions).map(|_| Vec::new()).collect::<Vec<_>>();
×
301

302
                    for (src, dst, label) in pair {
×
303
                        /* ensure!(
304
                            src < self.num_nodes,
305
                            "Expected {}, but got {src}",
306
                            self.num_nodes
307
                        ); */
308
                        let partition_id = src / num_nodes_per_partition;
×
309

310
                        let sorted_pairs = &mut sorted_pairs[partition_id];
×
311
                        let buf = &mut unsorted_buffers[partition_id];
×
312
                        if buf.len() >= buf.capacity() {
×
313
                            let buf_len = buf.len();
×
314
                            flush_buffer(
×
315
                                presort_tmp_dir.path(),
×
316
                                serializer,
×
317
                                deserializer.clone(),
×
318
                                block_id,
×
319
                                partition_id,
×
320
                                sorted_pairs,
×
321
                                buf,
×
322
                            )
323
                            .context("Could not flush buffer")
×
324
                            .unwrap();
×
325
                            assert!(buf.is_empty(), "flush_buffer did not empty the buffer");
×
326
                            pl.update_with_count(buf_len);
×
327
                        }
328

329
                        buf.push(Triple {
×
330
                            pair: [src, dst],
×
331
                            label,
×
332
                        });
333
                    }
334

335
                    for (partition_id, (mut pairs, mut buf)) in sorted_pairs
×
336
                        .iter_mut()
×
337
                        .zip(unsorted_buffers.into_iter())
×
338
                        .enumerate()
×
339
                    {
340
                        let buf_len = buf.len();
×
341
                        flush_buffer(
×
342
                            presort_tmp_dir.path(),
×
343
                            serializer,
×
344
                            deserializer.clone(),
×
345
                            block_id,
×
346
                            partition_id,
×
347
                            &mut pairs,
×
348
                            &mut buf,
×
349
                        )
350
                        .context("Could not flush buffer at the end")
×
351
                        .unwrap();
×
352
                        assert!(buf.is_empty(), "flush_buffer did not empty the buffer");
×
353
                        pl.update_with_count(buf_len);
×
354
                    }
355

356
                    // TODO: ugly
357
                    partitioned_presorted_pairs.lock().unwrap()[block_id] = sorted_pairs;
×
358
                });
359
            }
360
        });
361

362
        // At this point, the iterator could be collected into
363
        // {worker_id -> {partition_id -> [iterators]}}
364
        // ie. Vec<Vec<Vec<BatchIterator>>>>.
365
        //
366
        // Let's merge the {partition_id -> [iterators]} maps of each worker
367
        let partitioned_presorted_pairs = partitioned_presorted_pairs
×
368
            .into_inner()
369
            .unwrap()
370
            .into_par_iter()
371
            .reduce(
372
                || (0..num_partitions).map(|_| Vec::new()).collect(),
×
373
                |mut pair_partitions1: Vec<Vec<BatchIterator<D>>>,
×
374
                 pair_partitions2: Vec<Vec<BatchIterator<D>>>|
×
375
                 -> Vec<Vec<BatchIterator<D>>> {
×
376
                    assert_eq!(pair_partitions1.len(), num_partitions);
×
377
                    assert_eq!(pair_partitions2.len(), num_partitions);
×
378
                    for (partition1, partition2) in pair_partitions1
×
379
                        .iter_mut()
×
380
                        .zip(pair_partitions2.into_iter())
×
381
                    {
382
                        partition1.extend(partition2.into_iter());
×
383
                    }
384
                    pair_partitions1
×
385
                },
386
            );
387
        // At this point, the iterator was turned into
388
        // {partition_id -> [iterators]}
389
        // ie. Vec<Vec<BatchIterator>>>.
390
        pl.done();
×
391

392
        Ok(partitioned_presorted_pairs
×
393
            .into_iter()
×
394
            .map(|partition| {
×
395
                // 'partition' contains N iterators that are not sorted with respect to each other.
396
                // We merge them and turn them into a single sorted iterator.
397
                KMergeIters::new(partition)
×
398
            })
399
            .collect())
×
400
    }
401
}
402

403
fn flush_buffer<
×
404
    L: Copy + Send + Sync,
405
    S: BitSerializer<NE, BitWriter, SerType = L>,
406
    D: BitDeserializer<NE, BitReader>,
407
>(
408
    tmp_dir: &Path,
409
    serializer: &S,
410
    deserializer: D,
411
    worker_id: usize,
412
    partition_id: usize,
413
    sorted_pairs: &mut Vec<BatchIterator<D>>,
414
    buf: &mut Vec<Triple<L>>,
415
) -> Result<()> {
416
    buf.radix_sort_unstable();
×
417

418
    let path = tmp_dir.join(format!(
×
419
        "sorted_batch_{worker_id}_{partition_id}_{}",
×
420
        sorted_pairs.len()
×
421
    ));
422

423
    // Safety check. It's not foolproof (TOCTOU) but should catch most programming errors.
424
    ensure!(
×
425
        !path.exists(),
×
426
        "Can't create temporary file {}, it already exists",
×
427
        path.display()
×
428
    );
429
    sorted_pairs.push(
×
430
        BatchIterator::new_from_vec_sorted_labeled(&path, buf, serializer, deserializer)
×
431
            .with_context(|| format!("Could not write sorted batch to {}", path.display()))?,
×
432
    );
433
    buf.clear();
×
434
    Ok(())
×
435
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc