• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vigna / webgraph-rs / 18095608314

29 Sep 2025 11:36AM UTC coverage: 49.48% (-0.5%) from 49.968%
18095608314

Pull #132

github

web-flow
Merge 4a86214ae into 9de0f5a5a
Pull Request #132: Optimize ArcListGraph::from_iter when the underlying iterator is already truncated

2 of 7 new or added lines in 1 file covered. (28.57%)

764 existing lines in 27 files now uncovered.

3854 of 7789 relevant lines covered (49.48%)

23517346.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

45.59
/webgraph/src/graphs/bvgraph/comp/impls.rs
1
/*
2
 * SPDX-FileCopyrightText: 2023 Inria
3
 * SPDX-FileCopyrightText: 2023 Sebastiano Vigna
4
 *
5
 * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
6
 */
7

8
use crate::prelude::*;
9
use anyhow::{ensure, Context, Result};
10
use dsi_bitstream::prelude::*;
11
use dsi_progress_logger::prelude::*;
12
use lender::prelude::*;
13
use rayon::ThreadPool;
14

15
use std::fs::File;
16
use std::io::{BufReader, BufWriter};
17
use std::path::{Path, PathBuf};
18

19
/// A queue that pulls jobs with ids in a contiguous initial segment of the
20
/// natural numbers from an iterator out of order and implement an iterator in
21
/// which they can be pulled in order.
22
///
23
/// Jobs must be ordered by their job id, and must implement [`Eq`] with a
24
/// [`usize`] using their job id.
25
struct TaskQueue<I: Iterator> {
26
    iter: I,
27
    jobs: Vec<Option<I::Item>>,
28
    next_id: usize,
29
}
30

31
trait JobId {
32
    fn id(&self) -> usize;
33
}
34

35
impl<I: Iterator> TaskQueue<I> {
36
    fn new(iter: I) -> Self {
17✔
37
        Self {
38
            iter,
39
            jobs: vec![],
17✔
40
            next_id: 0,
41
        }
42
    }
43
}
44

45
impl<I: Iterator> Iterator for TaskQueue<I>
46
where
47
    I::Item: JobId,
48
{
49
    type Item = I::Item;
50

51
    fn next(&mut self) -> Option<Self::Item> {
94✔
52
        loop {
×
53
            if let Some(item) = self.jobs.get_mut(self.next_id) {
430✔
54
                if item.is_some() {
×
55
                    self.next_id += 1;
77✔
56
                    return item.take();
154✔
57
                }
58
            }
59
            if let Some(item) = self.iter.next() {
171✔
60
                let id = item.id();
×
61
                if id >= self.jobs.len() {
×
62
                    self.jobs.resize_with(id + 1, || None);
204✔
63
                }
64
                self.jobs[id] = Some(item);
×
65
            } else {
66
                return None;
17✔
67
            }
68
        }
69
    }
70
}
71

72
/// A compression job.
73
#[derive(Debug, PartialEq, PartialOrd, Eq, Ord, Clone)]
74
struct Job {
75
    job_id: usize,
76
    first_node: usize,
77
    last_node: usize,
78
    chunk_graph_path: PathBuf,
79
    written_bits: u64,
80
    chunk_offsets_path: PathBuf,
81
    offsets_written_bits: u64,
82
    num_arcs: u64,
83
}
84

85
impl JobId for Job {
86
    fn id(&self) -> usize {
212✔
87
        self.job_id
212✔
88
    }
89
}
90

91
impl BvComp<()> {
92
    /// Compresses s [`NodeLabelsLender`] and returns the length in bits of the
93
    /// graph bitstream.
94
    pub fn single_thread<E, L>(
×
95
        basename: impl AsRef<Path>,
96
        iter: L,
97
        compression_flags: CompFlags,
98
        build_offsets: bool,
99
        num_nodes: Option<usize>,
100
    ) -> Result<u64>
101
    where
102
        E: Endianness,
103
        L: IntoLender,
104
        L::Lender: for<'next> NodeLabelsLender<'next, Label = usize>,
105
        BufBitWriter<E, WordAdapter<usize, BufWriter<File>>>: CodesWrite<E>,
106
    {
107
        let basename = basename.as_ref();
×
108
        let graph_path = basename.with_extension(GRAPH_EXTENSION);
×
109

110
        // Compress the graph
111
        let bit_write = <BufBitWriter<E, _>>::new(<WordAdapter<usize, _>>::new(BufWriter::new(
×
112
            File::create(&graph_path)
×
113
                .with_context(|| format!("Could not create {}", graph_path.display()))?,
×
114
        )));
115

116
        let comp_flags = CompFlags {
117
            ..Default::default()
118
        };
119

120
        let codes_writer = DynCodesEncoder::new(bit_write, &comp_flags)?;
×
121

122
        let mut bvcomp = BvComp::new(
123
            codes_writer,
×
124
            compression_flags.compression_window,
×
125
            compression_flags.max_ref_count,
×
126
            compression_flags.min_interval_length,
×
127
            0,
128
        );
129

130
        let mut pl = progress_logger!(
×
131
            display_memory = true,
×
UNCOV
132
            item_name = "node",
×
133
            expected_updates = num_nodes,
×
134
        );
135
        pl.start("Compressing successors...");
×
UNCOV
136
        let mut bitstream_len = 0;
×
137

138
        let mut real_num_nodes = 0;
×
139
        if build_offsets {
×
140
            let offsets_path = basename.with_extension(OFFSETS_EXTENSION);
×
141
            let file = std::fs::File::create(&offsets_path)
×
UNCOV
142
                .with_context(|| format!("Could not create {}", offsets_path.display()))?;
×
143
            // create a bit writer on the file
144
            let mut writer = <BufBitWriter<E, _>>::new(<WordAdapter<usize, _>>::new(
×
UNCOV
145
                BufWriter::with_capacity(1 << 20, file),
×
146
            ));
147

UNCOV
148
            writer
×
149
                .write_gamma(0)
150
                .context("Could not write initial delta")?;
151
            for_! ( (_node_id, successors) in iter {
×
152
                let delta = bvcomp.push(successors).context("Could not push successors")?;
×
153
                bitstream_len += delta;
×
154
                writer.write_gamma(delta).context("Could not write delta")?;
×
155
                pl.update();
×
UNCOV
156
                real_num_nodes += 1;
×
157
            });
158
        } else {
159
            for_! ( (_node_id, successors) in iter {
×
160
                bitstream_len += bvcomp.push(successors).context("Could not push successors")?;
×
161
                pl.update();
×
UNCOV
162
                real_num_nodes += 1;
×
163
            });
164
        }
UNCOV
165
        pl.done();
×
166

167
        if let Some(num_nodes) = num_nodes {
×
168
            if num_nodes != real_num_nodes {
×
169
                log::warn!(
×
UNCOV
170
                    "The expected number of nodes is {num_nodes} but the actual number of nodes is {real_num_nodes}"
×
171
                );
172
            }
173
        }
174
        let num_arcs = bvcomp.arcs;
×
UNCOV
175
        bitstream_len += bvcomp.flush().context("Could not flush bvcomp")? as u64;
×
176

177
        log::info!("Writing the .properties file");
×
178
        let properties = compression_flags
×
UNCOV
179
            .to_properties::<E>(real_num_nodes, num_arcs, bitstream_len)
×
180
            .context("Could not serialize properties")?;
181
        let properties_path = basename.with_extension(PROPERTIES_EXTENSION);
×
182
        std::fs::write(&properties_path, properties)
×
UNCOV
183
            .with_context(|| format!("Could not write {}", properties_path.display()))?;
×
184

UNCOV
185
        Ok(bitstream_len)
×
186
    }
187

188
    /// A wrapper over [`parallel_graph`](Self::parallel_graph) that takes the
189
    /// endianness as a string.
190
    ///
191
    /// Endianness can only be [`BE::NAME`](BE) or [`LE::NAME`](LE).
192
    ///
193
    ///  A given endianness is enabled only if the corresponding feature is
194
    /// enabled, `be_bins` for big endian and `le_bins` for little endian, or if
195
    /// neither features are enabled.
196
    pub fn parallel_endianness<P: AsRef<Path>, G: SplitLabeling + SequentialGraph>(
8✔
197
        basename: impl AsRef<Path> + Send + Sync,
198
        graph: &G,
199
        num_nodes: usize,
200
        compression_flags: CompFlags,
201
        threads: &ThreadPool,
202
        tmp_dir: P,
203
        endianness: &str,
204
    ) -> Result<u64>
205
    where
206
        for<'a> <G as SplitLabeling>::SplitLender<'a>: Send + Sync,
207
    {
208
        match endianness {
8✔
209
            #[cfg(any(
210
                feature = "be_bins",
211
                not(any(feature = "be_bins", feature = "le_bins"))
212
            ))]
213
            BE::NAME => {
8✔
214
                // compress the transposed graph
215
                Self::parallel_iter::<BigEndian, _>(
216
                    basename,
8✔
217
                    graph.split_iter(threads.current_num_threads()).into_iter(),
40✔
218
                    num_nodes,
8✔
219
                    compression_flags,
8✔
220
                    threads,
8✔
221
                    tmp_dir,
8✔
222
                )
223
            }
224
            #[cfg(any(
225
                feature = "le_bins",
226
                not(any(feature = "be_bins", feature = "le_bins"))
227
            ))]
UNCOV
228
            LE::NAME => {
×
229
                // compress the transposed graph
230
                Self::parallel_iter::<LittleEndian, _>(
231
                    basename,
×
232
                    graph.split_iter(threads.current_num_threads()).into_iter(),
×
233
                    num_nodes,
×
234
                    compression_flags,
×
235
                    threads,
×
UNCOV
236
                    tmp_dir,
×
237
                )
238
            }
UNCOV
239
            x => anyhow::bail!("Unknown endianness {}", x),
×
240
        }
241
    }
242

243
    /// Compresses a graph in parallel and returns the length in bits of the graph bitstream.
244
    pub fn parallel_graph<E: Endianness>(
9✔
245
        basename: impl AsRef<Path> + Send + Sync,
246
        graph: &(impl SequentialGraph + SplitLabeling),
247
        compression_flags: CompFlags,
248
        threads: &ThreadPool,
249
        tmp_dir: impl AsRef<Path>,
250
    ) -> Result<u64>
251
    where
252
        BufBitWriter<E, WordAdapter<usize, BufWriter<std::fs::File>>>: CodesWrite<E>,
253
        BufBitReader<E, WordAdapter<u32, BufReader<std::fs::File>>>: BitRead<E>,
254
    {
255
        Self::parallel_iter(
256
            basename,
9✔
257
            graph.split_iter(threads.current_num_threads()).into_iter(),
45✔
258
            graph.num_nodes(),
18✔
259
            compression_flags,
9✔
260
            threads,
9✔
261
            tmp_dir,
9✔
262
        )
263
    }
264

265
    /// Compresses multiple [`NodeLabelsLender`] in parallel and returns the length in bits
266
    /// of the graph bitstream.
267
    pub fn parallel_iter<
17✔
268
        E: Endianness,
269
        L: Lender + for<'next> NodeLabelsLender<'next, Label = usize> + Send,
270
    >(
271
        basename: impl AsRef<Path> + Send + Sync,
272
        iter: impl Iterator<Item = L>,
273
        num_nodes: usize,
274
        compression_flags: CompFlags,
275
        threads: &ThreadPool,
276
        tmp_dir: impl AsRef<Path>,
277
    ) -> Result<u64>
278
    where
279
        BufBitWriter<E, WordAdapter<usize, BufWriter<std::fs::File>>>: CodesWrite<E>,
280
        BufBitReader<E, WordAdapter<u32, BufReader<std::fs::File>>>: BitRead<E>,
281
    {
282
        let tmp_dir = tmp_dir.as_ref();
51✔
283
        let basename = basename.as_ref();
51✔
284

285
        let graph_path = basename.with_extension(GRAPH_EXTENSION);
51✔
286
        let offsets_path = basename.with_extension(OFFSETS_EXTENSION);
51✔
287

288
        let (tx, rx) = std::sync::mpsc::channel();
51✔
289

290
        let thread_path = |thread_id: usize| tmp_dir.join(format!("{thread_id:016x}.bitstream"));
325✔
291

292
        let mut comp_pl = concurrent_progress_logger!(
34✔
UNCOV
293
            log_target = "webgraph::graphs::bvgraph::comp::impls::parallel_iter::comp",
×
UNCOV
294
            display_memory = true,
×
295
            item_name = "node",
×
296
            local_speed = true,
×
297
            expected_updates = Some(num_nodes),
17✔
298
        );
299
        comp_pl.start("Compressing successors in parallel...");
34✔
300
        threads.in_place_scope(|s| {
51✔
301
            let cp_flags = &compression_flags;
34✔
302

303
            for (thread_id, mut thread_lender) in iter.enumerate() {
111✔
304
                let tmp_path = thread_path(thread_id);
×
305
                let chunk_graph_path = tmp_path.with_extension(GRAPH_EXTENSION);
×
306
                let chunk_offsets_path = tmp_path.with_extension(OFFSETS_EXTENSION);
×
UNCOV
307
                let tx = tx.clone();
×
UNCOV
308
                let mut comp_pl = comp_pl.clone();
×
309
                // Spawn the thread
310
                s.spawn(move |_| {
77✔
311
                    log::debug!("Thread {thread_id} started");
122✔
UNCOV
312
                    let first_node;
×
UNCOV
313
                    let mut bvcomp;
×
UNCOV
314
                    let mut offsets_writer;
×
UNCOV
315
                    let mut written_bits;
×
UNCOV
316
                    let mut offsets_written_bits;
×
317

318
                    match thread_lender.next() {
77✔
UNCOV
319
                        None => return,
×
320
                        Some((node_id, successors)) => {
231✔
321
                            first_node = node_id;
154✔
322

323
                            offsets_writer = <BufBitWriter<BigEndian, _>>::new(<WordAdapter<usize, _>>::new(
308✔
324
                                BufWriter::new(File::create(&chunk_offsets_path).unwrap()),
385✔
325
                            ));
326

327
                            let writer = <BufBitWriter<E, _>>::new(<WordAdapter<usize, _>>::new(
308✔
328
                                BufWriter::new(File::create(&chunk_graph_path).unwrap()),
385✔
329
                            ));
330
                            let codes_encoder = <DynCodesEncoder<E, _>>::new(writer, cp_flags).unwrap();
462✔
331

332
                            bvcomp = BvComp::new(
231✔
333
                                codes_encoder,
154✔
334
                                cp_flags.compression_window,
154✔
335
                                cp_flags.max_ref_count,
154✔
336
                                cp_flags.min_interval_length,
154✔
337
                                node_id,
154✔
338
                            );
339
                            written_bits = bvcomp.push(successors).unwrap();
308✔
340
                            offsets_written_bits = offsets_writer.write_gamma(written_bits).unwrap() as u64;
231✔
341

342
                        }
343
                    };
344

345
                    let mut last_node = first_node;
154✔
346
                    let iter_nodes = thread_lender.inspect(|(x, _)| last_node = *x);
5,534,623✔
347
                    for_! ( (_, succ) in iter_nodes {
5,534,469✔
UNCOV
348
                        let node_bits = bvcomp.push(succ.into_iter()).unwrap();
×
UNCOV
349
                        written_bits += node_bits;
×
UNCOV
350
                        offsets_written_bits += offsets_writer.write_gamma(node_bits).unwrap() as u64;
×
351
                    });
352

353
                    let num_arcs = bvcomp.arcs;
154✔
354
                    bvcomp.flush().unwrap();
231✔
355
                    offsets_writer.flush().unwrap();
231✔
356
                    comp_pl.update_with_count(last_node - first_node + 1);
231✔
357

358

359

360
                    log::debug!(
77✔
361
                        "Finished Compression thread {thread_id} and wrote {written_bits} bits for the graph and {offsets_written_bits} bits for the offsets",
45✔
362
                    );
363
                    tx.send(Job {
231✔
364
                        job_id: thread_id,
154✔
365
                        first_node,
154✔
366
                        last_node,
154✔
367
                        chunk_graph_path,
154✔
368
                        written_bits,
154✔
369
                        chunk_offsets_path,
154✔
370
                        offsets_written_bits,
77✔
371
                        num_arcs,
77✔
372
                    })
373
                    .unwrap()
77✔
374
                });
375
            }
376

377
            drop(tx);
34✔
378

379
            let mut copy_pl = progress_logger!(
34✔
380
                log_target = "webgraph::graphs::bvgraph::comp::impls::parallel_iter::copy",
×
UNCOV
381
                display_memory = true,
×
UNCOV
382
                item_name = "node",
×
383
                local_speed = true,
×
384
                expected_updates = Some(num_nodes),
17✔
385
            );
386
            copy_pl.start("Copying compressed successors to final graph");
34✔
387

388
            let file = File::create(&graph_path)
51✔
389
                .with_context(|| format!("Could not create graph {}", graph_path.display()))?;
17✔
UNCOV
390
            let mut graph_writer =
×
UNCOV
391
                <BufBitWriter<E, _>>::new(<WordAdapter<usize, _>>::new(BufWriter::new(file)));
×
392

393
            let file = File::create(&offsets_path)
17✔
UNCOV
394
                .with_context(|| format!("Could not create offsets {}", offsets_path.display()))?;
×
UNCOV
395
            let mut offsets_writer =
×
396
                <BufBitWriter<BigEndian, _>>::new(<WordAdapter<usize, _>>::new(BufWriter::new(file)));
×
397
            offsets_writer.write_gamma(0)?;
×
398

399
            let mut total_written_bits: u64 = 0;
17✔
UNCOV
400
            let mut total_offsets_written_bits: u64 = 0;
×
UNCOV
401
            let mut total_arcs: u64 = 0;
×
402

403
            let mut next_node = 0;
×
404
            // glue together the bitstreams as they finish, this allows us to do
405
            // task pipelining for better performance
406
            for Job {
×
407
                job_id,
77✔
408
                first_node,
77✔
409
                last_node,
77✔
410
                chunk_graph_path,
77✔
411
                written_bits,
77✔
412
                chunk_offsets_path,
77✔
413
                offsets_written_bits,
77✔
414
                num_arcs,
77✔
415
            } in TaskQueue::new(rx.iter())
×
416
            {
417
                ensure!(
77✔
418
                    first_node == next_node,
77✔
419
                    "Non-adjacent lenders: lender {} has first node {} instead of {}",
×
420
                    job_id,
×
421
                    first_node,
×
422
                    next_node
×
423
                );
424

425
                next_node = last_node + 1;
77✔
UNCOV
426
                total_arcs += num_arcs;
×
UNCOV
427
                log::debug!(
×
428
                    "Copying {} [{}..{}) bits from {} to {}",
45✔
UNCOV
429
                    written_bits,
×
UNCOV
430
                    total_written_bits,
×
431
                    total_written_bits + written_bits,
45✔
432
                    chunk_graph_path.display(),
45✔
433
                    graph_path.display()
45✔
434
                );
UNCOV
435
                total_written_bits += written_bits;
×
436

437
                let mut reader =
77✔
UNCOV
438
                    <BufBitReader<E, _>>::new(<WordAdapter<u32, _>>::new(BufReader::new(
×
UNCOV
439
                        File::open(&chunk_graph_path)
×
440
                            .with_context(|| format!("Could not open {}", chunk_graph_path.display()))?,
×
441
                    )));
442
                graph_writer
×
UNCOV
443
                    .copy_from(&mut reader, written_bits)
×
444
                    .with_context(|| {
×
445
                        format!(
×
446
                            "Could not copy from {} to {}",
×
447
                            chunk_graph_path.display(),
×
448
                            graph_path.display()
×
449
                        )
450
                    })?;
451

452
                log::debug!(
77✔
453
                    "Copying offsets {} [{}..{}) bits from {} to {}",
45✔
UNCOV
454
                    offsets_written_bits,
×
UNCOV
455
                    total_offsets_written_bits,
×
456
                    total_offsets_written_bits + offsets_written_bits,
45✔
457
                    chunk_offsets_path.display(),
45✔
458
                    offsets_path.display()
45✔
459
                );
UNCOV
460
                total_offsets_written_bits += offsets_written_bits;
×
461

462
                let mut reader =
77✔
463
                    <BufBitReader<BigEndian, _>>::new(<WordAdapter<u32, _>>::new(BufReader::new(
×
464
                        File::open(&chunk_offsets_path)
×
465
                            .with_context(|| format!("Could not open {}", chunk_offsets_path.display()))?,
×
466
                    )));
467
                offsets_writer
×
UNCOV
468
                    .copy_from(&mut reader, offsets_written_bits)
×
UNCOV
469
                    .with_context(|| {
×
UNCOV
470
                        format!(
×
UNCOV
471
                            "Could not copy from {} to {}",
×
UNCOV
472
                            chunk_offsets_path.display(),
×
473
                            offsets_path.display()
×
474
                        )
475
                    })?;
476

477
                copy_pl.update_with_count(last_node - first_node + 1);
77✔
478
            }
479

480

481
            log::info!("Flushing the merged bitstreams");
34✔
UNCOV
482
            graph_writer.flush()?;
×
483
            offsets_writer.flush()?;
17✔
484

485
            comp_pl.done();
17✔
UNCOV
486
            copy_pl.done();
×
487

488
            log::info!("Writing the .properties file");
17✔
489
            let properties = compression_flags
17✔
UNCOV
490
                .to_properties::<E>(num_nodes, total_arcs, total_written_bits)
×
UNCOV
491
                .context("Could not serialize properties")?;
×
UNCOV
492
            let properties_path = basename.with_extension(PROPERTIES_EXTENSION);
×
UNCOV
493
            std::fs::write(&properties_path, properties).with_context(|| {
×
UNCOV
494
                format!(
×
UNCOV
495
                    "Could not write properties to {}",
×
UNCOV
496
                    properties_path.display()
×
497
                )
498
            })?;
499

500
            log::info!(
17✔
501
                "Compressed {} arcs into {} bits for {:.4} bits/arc",
17✔
UNCOV
502
                total_arcs,
×
UNCOV
503
                total_written_bits,
×
504
                total_written_bits as f64 / total_arcs as f64
17✔
505
            );
UNCOV
506
            log::info!(
×
507
                "Created offsets file with {} bits for {:.4} bits/node",
17✔
UNCOV
508
                total_offsets_written_bits,
×
509
                total_offsets_written_bits as f64 / num_nodes as f64
17✔
510
            );
511

512
            // cleanup the temp files
UNCOV
513
            std::fs::remove_dir_all(tmp_dir).with_context(|| {
×
UNCOV
514
                format!("Could not clean temporary directory {}", tmp_dir.display())
×
515
            })?;
516
            Ok(total_written_bits)
17✔
517
        })
518
    }
519
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc