• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vigna / webgraph-rs / 18430102467

11 Oct 2025 01:33PM UTC coverage: 47.997% (+0.2%) from 47.835%
18430102467

Pull #152

github

web-flow
Merge ee4f83205 into 761fcb7c1
Pull Request #152: Introduce BatchCodec to substitute BatchIterator

104 of 226 new or added lines in 9 files covered. (46.02%)

7 existing lines in 3 files now uncovered.

4001 of 8336 relevant lines covered (48.0%)

21830783.16 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/webgraph/src/utils/par_sort_graph.rs
1
/*
2
 * SPDX-FileCopyrightText: 2025 Inria
3
 * SPDX-FileCopyrightText: 2025 Tommaso Fontana
4
 *
5
 * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
6
 */
7

8
#![allow(clippy::type_complexity)]
9

10
//! Facilities to sort in parallel externally pairs of nodes with an associated
11
//! label returned by a [`ParallelIterator`].
12

13
use core::num::NonZeroUsize;
14
use std::path::Path;
15
use std::sync::Mutex;
16

17
use anyhow::{ensure, Context, Result};
18
use dsi_progress_logger::{concurrent_progress_logger, ProgressLog};
19
use rayon::prelude::*;
20

21
use super::sort_pairs::KMergeIters;
22
use super::MemoryUsage;
23
use crate::utils::{BatchCodec, CodecIter, DefaultBatchCodec};
24

25
/// Takes a parallel iterator of pairs as input, and returns them into a vector
26
/// of sorted iterators (which can be flattened into a single iterator),
27
/// suitable for
28
/// [`BvComp::parallel_iter`](crate::graphs::bvgraph::BvComp::parallel_iter).
29
///
30
/// Note that batches will be memory-mapped. If you encounter OS-level errors
31
/// using this class (e.g., `ENOMEM: Out of memory` under Linux), please review
32
/// the limitations of your OS regarding memory-mapping (e.g.,
33
/// `/proc/sys/vm/max_map_count` under Linux).
34
///
35
/// ```ignore TODO
36
/// use std::num::NonZeroUsize;
37
///
38
/// use dsi_bitstream::traits::BigEndian;
39
/// use lender::Lender;
40
/// use rayon::prelude::*;
41
/// use webgraph::traits::SequentialLabeling;
42
/// use webgraph::graphs::bvgraph::{BvComp, CompFlags};
43
/// use webgraph::graphs::arc_list_graph::Iter;
44
/// use webgraph::utils::par_sort_graph::ParSortGraph;
45
///
46
/// let num_partitions = 2;
47
/// let num_nodes: usize = 5;
48
/// let num_nodes_per_partition = num_nodes.div_ceil(num_partitions);
49
/// let unsorted_pairs = vec![(1, 3), (3, 2), (2, 1), (1, 0), (0, 4)];
50
///
51
/// let pair_sorter = ParSortGraph::new(num_nodes)?
52
///     .expected_num_pairs(unsorted_pairs.len())
53
///     .num_partitions(NonZeroUsize::new(num_partitions).unwrap());
54
///
55
/// assert_eq!(
56
///     pair_sorter.sort(
57
///         unsorted_pairs.par_iter().copied()
58
///     )?
59
///         .into_iter()
60
///         .map(|partition| partition.into_iter().collect::<Vec<_>>())
61
///         .collect::<Vec<_>>(),
62
///     vec![
63
///         vec![(0, 4), (1, 0), (1, 3), (2, 1)], // nodes 0, 1, and 2 are in partition 0
64
///         vec![(3, 2)], // nodes 3 and 4 are in partition 1
65
///     ],
66
/// );
67
///
68
/// let bvcomp_tmp_dir = tempfile::tempdir()?;
69
/// let bvcomp_out_dir = tempfile::tempdir()?;
70
///
71
/// BvComp::parallel_iter::<BigEndian, _>(
72
///     &bvcomp_out_dir.path().join("graph"),
73
///     pair_sorter.sort(
74
///         unsorted_pairs.par_iter().copied()
75
///     )?
76
///         .into_iter()
77
///         .into_iter()
78
///         .enumerate()
79
///         .map(|(partition_id, partition)| {
80
///             webgraph::prelude::LeftIterator(Iter::<(), _>::try_new_from(
81
///                 num_nodes_per_partition,
82
///                 partition.into_iter().map(|(src, dst)| (src, dst, ())),
83
///                 partition_id*num_nodes_per_partition,
84
///             ).unwrap())
85
///         }),
86
///     num_nodes,
87
///     CompFlags::default(),
88
///     &rayon::ThreadPoolBuilder::default().build()?,
89
///     bvcomp_tmp_dir.path(),
90
/// )?;
91
/// # Ok::<(), Box<dyn std::error::Error>>(())
92
/// ```
93
pub struct ParSortGraph {
94
    num_nodes: usize,
95
    expected_num_pairs: Option<usize>,
96
    num_partitions: NonZeroUsize,
97
    memory_usage: MemoryUsage,
98
}
99

100
impl ParSortGraph {
101
    /// See [`try_sort`](ParSortGraph::try_sort).
102
    pub fn sort(
×
103
        &self,
104
        pairs: impl IntoIterator<
105
            Item: IntoIterator<Item = (usize, usize), IntoIter: Send + Sync> + Send + Sync,
106
            IntoIter: ExactSizeIterator + Send + Sync,
107
        >,
108
    ) -> Result<Vec<impl IntoIterator<Item = (usize, usize), IntoIter: Clone + Send + Sync>>> {
109
        self.try_sort::<std::convert::Infallible>(pairs)
×
110
    }
111

112
    /// Sorts the output of the provided parallel iterator,
113
    /// returning a vector of sorted iterators, one per partition.
114
    pub fn try_sort<E: Into<anyhow::Error>>(
×
115
        &self,
116
        pairs: impl IntoIterator<
117
            Item: IntoIterator<Item = (usize, usize), IntoIter: Send + Sync> + Send + Sync,
118
            IntoIter: ExactSizeIterator + Send + Sync,
119
        >,
120
    ) -> Result<Vec<impl IntoIterator<Item = (usize, usize), IntoIter: Clone + Send + Sync>>> {
NEW
121
        Ok(<ParSortGraph>::try_sort_labeled::<_, E>(
×
122
            self,
×
NEW
123
            DefaultBatchCodec::default(),
×
UNCOV
124
            pairs
×
125
                .into_iter()
×
126
                .map(|iter| iter.into_iter().map(|(src, dst)| (src, dst, ()))),
×
127
        )?
128
        .into_iter()
×
129
        .map(|iter| iter.into_iter().map(|(src, dst, ())| (src, dst)))
×
130
        .collect())
×
131
    }
132
}
133

134
impl ParSortGraph {
135
    pub fn new(num_nodes: usize) -> Result<Self> {
×
136
        Ok(Self {
137
            num_nodes,
×
138
            expected_num_pairs: None,
×
139
            num_partitions: NonZeroUsize::new(num_cpus::get()).context("zero CPUs")?,
×
140
            memory_usage: MemoryUsage::default(),
×
141
        })
142
    }
143

144
    /// Approximate number of pairs to be sorted.
145
    ///
146
    /// Used only for progress reporting.
147
    pub fn expected_num_pairs(self, expected_num_pairs: usize) -> Self {
×
148
        Self {
149
            expected_num_pairs: Some(expected_num_pairs),
×
150
            ..self
151
        }
152
    }
153

154
    /// How many partitions to split the nodes into.
155
    ///
156
    /// Defaults to `num_cpus::get()`.
157
    pub fn num_partitions(self, num_partitions: NonZeroUsize) -> Self {
×
158
        Self {
159
            num_partitions,
160
            ..self
161
        }
162
    }
163

164
    /// How much memory to use for in-memory sorts.
165
    ///
166
    /// Using the `MemoryUsage::MemorySize` variant you will set the overall
167
    /// memory size. The batch size will be determined dividing the
168
    /// overall memory size by `num_partitions * num_threads`. This
169
    /// is usually the best option.
170
    ///
171
    /// Using the `MemoryUsage::BatchSize` variant you will
172
    /// set the exact size of each batch to sort in memory. The overall
173
    /// number of elements will be `batch_size * num_partitions * num_threads`.
174
    /// This option is useful for fine tuning the memory usage, in particular
175
    /// when the number of threads and partitions is known in advance.
176
    ///
177
    /// Larger values yield faster merges (by reducing logarithmically the
178
    /// number of batches to merge) but consume linearly more memory. We suggest
179
    /// to set this parameter as large as possible, depending on the available
180
    /// memory.
181
    pub fn memory_usage(self, memory_usage: MemoryUsage) -> Self {
×
182
        Self {
183
            memory_usage,
184
            ..self
185
        }
186
    }
187

188
    /// See [`try_sort_labeled`](ParSortGraph::try_sort_labeled).
189
    ///
190
    /// This is a convenience method for parallel iterators that cannot fail.
NEW
191
    pub fn sort_labeled<C: BatchCodec>(
×
192
        &self,
193
        batch_codec: C,
194
        pairs: impl IntoIterator<
195
            Item: IntoIterator<Item = (usize, usize, C::Label), IntoIter: Send + Sync>
196
                      + Send
197
                      + Sync,
198
            IntoIter: ExactSizeIterator + Send + Sync,
199
        >,
200
    ) -> Result<
201
        Vec<impl IntoIterator<Item = (usize, usize, C::Label), IntoIter: Clone + Send + Sync>>,
202
    > {
NEW
203
        self.try_sort_labeled::<C, std::convert::Infallible>(batch_codec, pairs)
×
204
    }
205

206
    /// Sorts the output of the provided parallel iterator,
207
    /// returning a vector of sorted iterators, one per partition.
208
    ///
209
    /// This  method accept as type parameter a [`BitSerializer`] and a
210
    /// [`BitDeserializer`] that are used to serialize and deserialize the labels.
211
    ///
212
    /// The bit deserializer must be [`Clone`] because we need one for each
213
    /// [`BatchIterator`], and there are possible scenarios in which the
214
    /// deserializer might be stateful.
NEW
215
    pub fn try_sort_labeled<C: BatchCodec, E: Into<anyhow::Error>>(
×
216
        &self,
217
        batch_codec: C,
218
        pairs: impl IntoIterator<
219
            Item: IntoIterator<Item = (usize, usize, C::Label), IntoIter: Send + Sync>
220
                      + Send
221
                      + Sync,
222
            IntoIter: ExactSizeIterator + Send + Sync,
223
        >,
224
    ) -> Result<
225
        Vec<impl IntoIterator<Item = (usize, usize, C::Label), IntoIter: Clone + Send + Sync>>,
226
    > {
UNCOV
227
        let unsorted_pairs = pairs;
×
228

229
        let num_partitions = self.num_partitions.into();
×
230
        let batch_size = match self.memory_usage {
×
231
            MemoryUsage::MemorySize(num_bytes) => {
×
NEW
232
                let pair_size = size_of::<(usize, usize, C::Label)>();
×
233
                dbg!(rayon::max_num_threads(), num_partitions, pair_size);
×
234
                let num_buffers = rayon::current_num_threads() * num_partitions;
×
235
                num_bytes / (pair_size * num_buffers)
×
236
            }
237
            MemoryUsage::BatchSize(batch_size) => batch_size,
×
238
        };
239
        let num_nodes_per_partition = self.num_nodes.div_ceil(num_partitions);
×
240

241
        let mut pl = concurrent_progress_logger!(
×
242
            display_memory = true,
×
243
            item_name = "pair",
×
244
            local_speed = true,
×
245
            expected_updates = self.expected_num_pairs,
×
246
        );
247
        pl.start("Reading and sorting pairs");
×
248
        pl.info(format_args!("Per-processor batch size: {}", batch_size));
×
249

250
        let presort_tmp_dir =
×
251
            tempfile::tempdir().context("Could not create temporary directory")?;
×
252

253
        let unsorted_pairs = unsorted_pairs.into_iter();
×
254
        let num_blocks = unsorted_pairs.len();
×
255

NEW
256
        let partitioned_presorted_pairs =
×
NEW
257
            Mutex::new((0..num_blocks).map(|_| Vec::new()).collect::<Vec<_>>());
×
258

259
        std::thread::scope(|s| {
×
260
            let partitioned_presorted_pairs = &partitioned_presorted_pairs;
×
261
            let presort_tmp_dir = &presort_tmp_dir;
×
262
            for (block_id, pair) in unsorted_pairs.enumerate() {
×
263
                let mut pl = pl.clone();
×
NEW
264
                let batch_codec = &batch_codec;
×
265
                s.spawn(move || {
×
266
                    let mut unsorted_buffers = (0..num_partitions)
×
267
                        .map(|_| Vec::with_capacity(batch_size))
×
268
                        .collect::<Vec<_>>();
×
269
                    let mut sorted_pairs =
×
270
                        (0..num_partitions).map(|_| Vec::new()).collect::<Vec<_>>();
×
271

272
                    for (src, dst, label) in pair {
×
273
                        /* ensure!(
274
                            src < self.num_nodes,
275
                            "Expected {}, but got {src}",
276
                            self.num_nodes
277
                        ); */
278
                        let partition_id = src / num_nodes_per_partition;
×
279

280
                        let sorted_pairs = &mut sorted_pairs[partition_id];
×
281
                        let buf = &mut unsorted_buffers[partition_id];
×
282
                        if buf.len() >= buf.capacity() {
×
283
                            let buf_len = buf.len();
×
284
                            flush_buffer(
×
285
                                presort_tmp_dir.path(),
×
NEW
286
                                batch_codec,
×
287
                                block_id,
×
288
                                partition_id,
×
289
                                sorted_pairs,
×
290
                                buf,
×
291
                            )
292
                            .context("Could not flush buffer")
×
293
                            .unwrap();
×
294
                            assert!(buf.is_empty(), "flush_buffer did not empty the buffer");
×
295
                            pl.update_with_count(buf_len);
×
296
                        }
297

NEW
298
                        buf.push((src, dst, label));
×
299
                    }
300

301
                    for (partition_id, (mut pairs, mut buf)) in sorted_pairs
×
302
                        .iter_mut()
×
303
                        .zip(unsorted_buffers.into_iter())
×
304
                        .enumerate()
×
305
                    {
306
                        let buf_len = buf.len();
×
307
                        flush_buffer(
×
308
                            presort_tmp_dir.path(),
×
NEW
309
                            batch_codec,
×
310
                            block_id,
×
311
                            partition_id,
×
312
                            &mut pairs,
×
313
                            &mut buf,
×
314
                        )
315
                        .context("Could not flush buffer at the end")
×
316
                        .unwrap();
×
317
                        assert!(buf.is_empty(), "flush_buffer did not empty the buffer");
×
318
                        pl.update_with_count(buf_len);
×
319
                    }
320

321
                    // TODO: ugly
322
                    partitioned_presorted_pairs.lock().unwrap()[block_id] = sorted_pairs;
×
323
                });
324
            }
325
        });
326

327
        // At this point, the iterator could be collected into
328
        // {worker_id -> {partition_id -> [iterators]}}
329
        // ie. Vec<Vec<Vec<BatchIterator>>>>.
330
        //
331
        // Let's merge the {partition_id -> [iterators]} maps of each worker
332
        let partitioned_presorted_pairs = partitioned_presorted_pairs
×
333
            .into_inner()
334
            .unwrap()
335
            .into_par_iter()
336
            .reduce(
337
                || (0..num_partitions).map(|_| Vec::new()).collect(),
×
NEW
338
                |mut pair_partitions1: Vec<Vec<CodecIter<C>>>,
×
NEW
339
                 pair_partitions2: Vec<Vec<CodecIter<C>>>|
×
NEW
340
                 -> Vec<Vec<CodecIter<C>>> {
×
341
                    assert_eq!(pair_partitions1.len(), num_partitions);
×
342
                    assert_eq!(pair_partitions2.len(), num_partitions);
×
343
                    for (partition1, partition2) in pair_partitions1
×
344
                        .iter_mut()
×
345
                        .zip(pair_partitions2.into_iter())
×
346
                    {
347
                        partition1.extend(partition2.into_iter());
×
348
                    }
349
                    pair_partitions1
×
350
                },
351
            );
352
        // At this point, the iterator was turned into
353
        // {partition_id -> [iterators]}
354
        // ie. Vec<Vec<BatchIterator>>>.
355
        pl.done();
×
356

357
        Ok(partitioned_presorted_pairs
×
358
            .into_iter()
×
359
            .map(|partition| {
×
360
                // 'partition' contains N iterators that are not sorted with respect to each other.
361
                // We merge them and turn them into a single sorted iterator.
362
                KMergeIters::new(partition)
×
363
            })
364
            .collect())
×
365
    }
366
}
367

NEW
368
fn flush_buffer<C: BatchCodec>(
×
369
    tmp_dir: &Path,
370
    batch_codec: &C,
371
    worker_id: usize,
372
    partition_id: usize,
373
    sorted_pairs: &mut Vec<CodecIter<C>>,
374
    buf: &mut Vec<(usize, usize, C::Label)>,
375
) -> Result<()> {
376
    let path = tmp_dir.join(format!(
×
UNCOV
377
        "sorted_batch_{worker_id}_{partition_id}_{}",
×
378
        sorted_pairs.len()
×
379
    ));
380

381
    // Safety check. It's not foolproof (TOCTOU) but should catch most programming errors.
382
    ensure!(
×
383
        !path.exists(),
×
384
        "Can't create temporary file {}, it already exists",
×
385
        path.display()
×
386
    );
NEW
387
    batch_codec
×
NEW
388
        .encode_batch(&path, buf)
×
NEW
389
        .with_context(|| format!("Could not write sorted batch to {}", path.display()))?;
×
390
    sorted_pairs.push(
×
NEW
391
        batch_codec
×
NEW
392
            .decode_batch(&path)
×
NEW
393
            .with_context(|| format!("Could not read sorted batch from {}", path.display()))?
×
NEW
394
            .into_iter(),
×
395
    );
396
    buf.clear();
×
397
    Ok(())
×
398
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc