• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vigna / webgraph-rs / 18095608314

29 Sep 2025 11:36AM UTC coverage: 49.48% (-0.5%) from 49.968%
18095608314

Pull #132

github

web-flow
Merge 4a86214ae into 9de0f5a5a
Pull Request #132: Optimize ArcListGraph::from_iter when the underlying iterator is already truncated

2 of 7 new or added lines in 1 file covered. (28.57%)

764 existing lines in 27 files now uncovered.

3854 of 7789 relevant lines covered (49.48%)

23517346.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

49.12
/webgraph/src/transform/simplify.rs
1
/*
2
 * SPDX-FileCopyrightText: 2023 Inria
3
 *
4
 * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
5
 */
6

7
use crate::graphs::{
8
    arc_list_graph, no_selfloops_graph::NoSelfLoopsGraph, union_graph::UnionGraph,
9
};
10
use crate::labels::Left;
11
use crate::traits::{LenderIntoIter, SequentialGraph, SortedIterator, SortedLender, SplitLabeling};
12
use crate::utils::sort_pairs::{BatchIterator, KMergeIters, SortPairs};
13
use anyhow::{Context, Result};
14
use dsi_progress_logger::prelude::*;
15
use itertools::Itertools;
16
use lender::*;
17
use rayon::ThreadPool;
18
use tempfile::Builder;
19

20
use super::transpose;
21

22
/// Returns a simplified (i.e., undirected and loopless) version of the provided
23
/// sorted (both on nodes and successors) graph as a [sequential
24
/// graph](crate::traits::SequentialGraph).
25
///
26
/// This method exploits the fact that the input graph is already sorted,
27
/// sorting half the number of arcs of
28
/// [`simplify`](crate::transform::simplify::simplify).
29
#[allow(clippy::type_complexity)]
30
pub fn simplify_sorted<G: SequentialGraph>(
1✔
31
    graph: G,
32
    batch_size: usize,
33
) -> Result<
34
    NoSelfLoopsGraph<
35
        UnionGraph<G, Left<arc_list_graph::ArcListGraph<KMergeIters<BatchIterator<()>, ()>>>>,
36
    >,
37
>
38
where
39
    for<'a> G::Lender<'a>: SortedLender,
40
    for<'a, 'b> LenderIntoIter<'a, G::Lender<'b>>: SortedIterator,
41
{
42
    let transpose = transpose(&graph, batch_size).context("Could not transpose the graph")?;
5✔
43
    Ok(NoSelfLoopsGraph(UnionGraph(graph, transpose)))
×
44
}
45

46
/// Returns a simplified (i.e., undirected and loopless) version of the provided
47
/// graph as a [sequential graph](crate::traits::SequentialGraph).
48
///
49
/// Note that if the graph is sorted (both on nodes and successors), it is
50
/// recommended to use [`simplify_sorted`](crate::transform::simplify::simplify_sorted).
51
///
52
/// For the meaning of the additional parameter, see
53
/// [`SortPairs`](crate::prelude::sort_pairs::SortPairs).
54
#[allow(clippy::type_complexity)]
55
pub fn simplify(
×
56
    graph: &impl SequentialGraph,
57
    batch_size: usize,
58
) -> Result<
59
    Left<
60
        arc_list_graph::ArcListGraph<
61
            impl Iterator<Item = (usize, usize, ())> + Clone + Send + Sync + 'static,
62
        >,
63
    >,
64
> {
UNCOV
65
    let dir = Builder::new().prefix("simplify_").tempdir()?;
×
UNCOV
66
    let mut sorted = SortPairs::new(batch_size, dir.path())?;
×
67

UNCOV
68
    let mut pl = ProgressLogger::default();
×
UNCOV
69
    pl.item_name("node")
×
UNCOV
70
        .expected_updates(Some(graph.num_nodes()));
×
UNCOV
71
    pl.start("Creating batches...");
×
72
    // create batches of sorted edges
UNCOV
73
    let mut iter = graph.iter();
×
UNCOV
74
    while let Some((src, succ)) = iter.next() {
×
UNCOV
75
        for dst in succ {
×
76
            if src != dst {
×
77
                sorted.push(src, dst)?;
×
UNCOV
78
                sorted.push(dst, src)?;
×
79
            }
80
        }
81
        pl.light_update();
×
82
    }
83
    // merge the batches
84
    let map: fn((usize, usize, ())) -> (usize, usize) = |(src, dst, _)| (src, dst);
×
85
    let filter: fn(&(usize, usize)) -> bool = |(src, dst)| src != dst;
×
86
    let iter = Itertools::dedup(sorted.iter()?.map(map).filter(filter));
×
87
    let sorted = arc_list_graph::ArcListGraph::new(graph.num_nodes(), iter);
×
88
    pl.done();
×
89

UNCOV
90
    Ok(sorted)
×
91
}
92

93
/// Returns a simplified (i.e., undirected and loopless) version of the provided
94
/// graph as a [sequential graph](crate::traits::SequentialGraph).
95
///
96
/// This method uses splitting to sort in parallel different parts of the graph.
97
///
98
/// For the meaning of the additional parameter, see
99
/// [`SortPairs`](crate::prelude::sort_pairs::SortPairs).
100
#[allow(clippy::type_complexity)]
101
pub fn simplify_split<S>(
4✔
102
    graph: &S,
103
    batch_size: usize,
104
    threads: &ThreadPool,
105
) -> Result<Left<arc_list_graph::ArcListGraph<itertools::Dedup<KMergeIters<BatchIterator<()>, ()>>>>>
106
where
107
    S: SequentialGraph + SplitLabeling,
108
{
109
    let num_threads = threads.current_num_threads();
12✔
110
    let (tx, rx) = std::sync::mpsc::channel();
12✔
111

112
    let mut dirs = vec![];
8✔
113

114
    threads.in_place_scope(|scope| {
12✔
115
        let mut thread_id = 0;
8✔
116
        #[allow(clippy::explicit_counter_loop)] // enumerate requires some extra bounds here
117
        for iter in graph.split_iter(num_threads) {
28✔
UNCOV
118
            let tx = tx.clone();
×
UNCOV
119
            let dir = Builder::new()
×
UNCOV
120
                .prefix(&format!("simplify_split_{thread_id}_"))
×
UNCOV
121
                .tempdir()
×
UNCOV
122
                .expect("Could not create a temporary directory");
×
UNCOV
123
            let dir_path = dir.path().to_path_buf();
×
UNCOV
124
            dirs.push(dir);
×
125
            scope.spawn(move |_| {
16✔
126
                log::debug!("Spawned thread {thread_id}");
16✔
127
                let mut sorted = SortPairs::new(batch_size / num_threads, dir_path).unwrap();
80✔
128
                for_!( (src, succ) in iter {
1,302,244✔
129
                    for dst in succ {
27,031,444✔
130
                        if src != dst {
12,514,840✔
131
                            sorted.push(src, dst).unwrap();
75,089,040✔
132
                            sorted.push(dst, src).unwrap();
50,059,360✔
133
                        }
134
                    }
135
                });
136
                let result = sorted.iter().context("Could not read arcs").unwrap();
80✔
137
                tx.send(result).expect("Could not send the sorted pairs");
80✔
138
                log::debug!("Thread {thread_id} finished");
16✔
139
            });
UNCOV
140
            thread_id += 1;
×
141
        }
142
    });
143
    drop(tx);
8✔
144

145
    // get a graph on the sorted data
146
    log::debug!("Waiting for threads to finish");
4✔
147
    let edges: KMergeIters<BatchIterator> = rx.iter().sum();
20✔
148
    let edges = edges.dedup();
12✔
149
    log::debug!("All threads finished");
4✔
150
    let sorted = arc_list_graph::ArcListGraph::new_labeled(graph.num_nodes(), edges);
20✔
151

152
    drop(dirs);
8✔
153
    Ok(Left(sorted))
4✔
154
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc