• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vigna / webgraph-rs / 11837625908

12 Nov 2024 07:33PM UTC coverage: 53.594% (-0.04%) from 53.631%
11837625908

push

github

vigna
No more Borrow

5 of 13 new or added lines in 6 files covered. (38.46%)

2 existing lines in 2 files now uncovered.

2371 of 4424 relevant lines covered (53.59%)

23193450.13 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

26.92
/src/transform/simplify.rs
1
/*
2
 * SPDX-FileCopyrightText: 2023 Inria
3
 *
4
 * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
5
 */
6

7
use crate::graphs::{
8
    arc_list_graph, no_selfloops_graph::NoSelfLoopsGraph, union_graph::UnionGraph,
9
};
10
use crate::labels::Left;
11
use crate::traits::{SequentialGraph, SortedIterator, SortedLender, SplitLabeling};
12
use crate::utils::sort_pairs::{BatchIterator, KMergeIters, SortPairs};
13
use anyhow::{Context, Result};
14
use dsi_progress_logger::prelude::*;
15
use itertools::{Dedup, Itertools};
16
use lender::*;
17
use rayon::ThreadPool;
18
use tempfile::Builder;
19

20
use super::transpose;
21

22
/// Returns a simplified (i.e., undirected and loopless) version of the provided
23
/// sorted (both on nodes and successors) graph as a [sequential
24
/// graph](crate::traits::SequentialGraph).
25
///
26
/// This method exploits the fact that the input graph is already sorted,
27
/// sorting half the number of arcs of
28
/// [`simplify`](crate::transform::simplify::simplify).
29
#[allow(clippy::type_complexity)]
30
pub fn simplify_sorted<G: SequentialGraph + SortedLender + SortedIterator>(
×
31
    graph: &G,
32
    batch_size: usize,
33
) -> Result<
34
    NoSelfLoopsGraph<
35
        UnionGraph<&G, Left<arc_list_graph::ArcListGraph<KMergeIters<BatchIterator<()>, ()>>>>,
36
    >,
37
> {
38
    Ok(NoSelfLoopsGraph(UnionGraph(
×
39
        graph,
×
40
        transpose(graph, batch_size).context("Could not transpose the graph")?,
×
41
    )))
42
}
43

44
/// Returns a simplified (i.e., undirected and loopless) version of the provided
45
/// graph as a [sequential graph](crate::traits::SequentialGraph).
46
///
47
/// Note that if the graph is sorted (both on nodes and successors), it is
48
/// recommended to use [`simplify_sorted`](crate::transform::simplify::simplify_sorted).
49
///
50
/// For the meaning of the additional parameter, see
51
/// [`SortPairs`](crate::prelude::sort_pairs::SortPairs).
52
#[allow(clippy::type_complexity)]
53
pub fn simplify(
2✔
54
    graph: &impl SequentialGraph,
55
    batch_size: usize,
56
) -> Result<
57
    Left<
58
        arc_list_graph::ArcListGraph<
59
            std::iter::Map<
60
                Dedup<
61
                    core::iter::Filter<
62
                        core::iter::Map<
63
                            KMergeIters<BatchIterator<()>>,
64
                            fn((usize, usize, ())) -> (usize, usize),
65
                        >,
66
                        fn(&(usize, usize)) -> bool,
67
                    >,
68
                >,
69
                fn((usize, usize)) -> (usize, usize, ()),
70
            >,
71
        >,
72
    >,
73
> {
74
    let dir = Builder::new().prefix("simplify_").tempdir()?;
4✔
75
    let mut sorted = SortPairs::new(batch_size, dir.path())?;
2✔
76

77
    let mut pl = ProgressLogger::default();
78
    pl.item_name("node")
79
        .expected_updates(Some(graph.num_nodes()));
80
    pl.start("Creating batches...");
81
    // create batches of sorted edges
82
    let mut iter = graph.iter();
83
    while let Some((src, succ)) = iter.next() {
1,302,230✔
84
        for dst in succ {
13,515,722✔
85
            if src != dst {
86
                sorted.push(src, dst)?;
6,257,420✔
87
                sorted.push(dst, src)?;
6,257,420✔
88
            }
89
        }
90
        pl.light_update();
651,114✔
91
    }
92
    // merge the batches
93
    let map: fn((usize, usize, ())) -> (usize, usize) = |(src, dst, _)| (src, dst);
22,472,232✔
94
    let filter: fn(&(usize, usize)) -> bool = |(src, dst)| src != dst;
22,472,232✔
95
    let iter = Itertools::dedup(sorted.iter()?.map(map).filter(filter));
6✔
96
    let sorted = arc_list_graph::ArcListGraph::new(graph.num_nodes(), iter);
2✔
97
    pl.done();
2✔
98

99
    Ok(Left(sorted))
2✔
100
}
101

102
/// Returns a simplified (i.e., undirected and loopless) version of the provided
103
/// graph as a [sequential graph](crate::traits::SequentialGraph).
104
///
105
/// This method uses splitting to sort in parallel different parts of the graph.
106
///
107
/// For the meaning of the additional parameter, see
108
/// [`SortPairs`](crate::prelude::sort_pairs::SortPairs).
109
#[allow(clippy::type_complexity)]
110
pub fn simplify_split<S>(
×
111
    graph: &S,
112
    batch_size: usize,
113
    threads: &ThreadPool,
114
) -> Result<Left<arc_list_graph::ArcListGraph<itertools::Dedup<KMergeIters<BatchIterator<()>, ()>>>>>
115
where
116
    S: SequentialGraph + SplitLabeling,
117
{
NEW
118
    let num_threads = threads.current_num_threads();
×
119
    let (tx, rx) = std::sync::mpsc::channel();
×
120

121
    let mut dirs = vec![];
×
122

NEW
123
    threads.in_place_scope(|scope| {
×
124
        let mut thread_id = 0;
×
125
        #[allow(clippy::explicit_counter_loop)] // enumerate requires some extra bounds here
126
        for iter in graph.split_iter(num_threads) {
×
127
            let tx = tx.clone();
×
128
            let dir = Builder::new()
×
129
                .prefix(&format!("simplify_split_{}_", thread_id))
×
130
                .tempdir()
×
131
                .expect("Could not create a temporary directory");
×
132
            let dir_path = dir.path().to_path_buf();
×
133
            dirs.push(dir);
×
134
            scope.spawn(move |_| {
×
135
                log::debug!("Spawned thread {}", thread_id);
×
136
                let mut sorted = SortPairs::new(batch_size / num_threads, dir_path).unwrap();
×
137
                for_!( (src, succ) in iter {
×
138
                    for dst in succ {
×
139
                        if src != dst {
×
140
                            sorted.push(src, dst).unwrap();
×
141
                            sorted.push(dst, src).unwrap();
×
142
                        }
143
                    }
144
                });
145
                let result = sorted.iter().context("Could not read arcs").unwrap();
×
146
                tx.send(result).expect("Could not send the sorted pairs");
×
147
                log::debug!("Thread {} finished", thread_id);
×
148
            });
149
            thread_id += 1;
×
150
        }
151
    });
152
    drop(tx);
×
153

154
    // get a graph on the sorted data
155
    log::debug!("Waiting for threads to finish");
×
156
    let edges: KMergeIters<BatchIterator> = rx.iter().sum();
×
157
    let edges = edges.dedup();
×
158
    log::debug!("All threads finished");
×
159
    let sorted = arc_list_graph::ArcListGraph::new_labeled(graph.num_nodes(), edges);
×
160

161
    drop(dirs);
×
162
    Ok(Left(sorted))
×
163
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc