• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vigna / webgraph-rs / 16988151144

15 Aug 2025 10:28AM UTC coverage: 49.936% (-0.03%) from 49.968%
16988151144

Pull #131

github

web-flow
Merge c1e9613e6 into dd7049ac5
Pull Request #131: Use dsi-progress-logger in BVComp::parallel_iter

13 of 30 new or added lines in 1 file covered. (43.33%)

1 existing line in 1 file now uncovered.

3897 of 7804 relevant lines covered (49.94%)

23684263.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

45.45
/webgraph/src/graphs/bvgraph/comp/impls.rs
1
/*
2
 * SPDX-FileCopyrightText: 2023 Inria
3
 * SPDX-FileCopyrightText: 2023 Sebastiano Vigna
4
 *
5
 * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
6
 */
7

8
use crate::prelude::*;
9
use anyhow::{ensure, Context, Result};
10
use dsi_bitstream::prelude::*;
11
use dsi_progress_logger::prelude::*;
12
use lender::prelude::*;
13
use rayon::ThreadPool;
14

15
use std::fs::File;
16
use std::io::{BufReader, BufWriter};
17
use std::path::{Path, PathBuf};
18
use std::sync::{Arc, Mutex};
19

20
/// A queue that pulls jobs with ids in a contiguous initial segment of the
21
/// natural numbers from an iterator out of order and implement an iterator in
22
/// which they can be pulled in order.
23
///
24
/// Jobs must be ordered by their job id, and must implement [`Eq`] with a
25
/// [`usize`] using their job id.
26
struct TaskQueue<I: Iterator> {
27
    iter: I,
28
    jobs: Vec<Option<I::Item>>,
29
    next_id: usize,
30
}
31

32
trait JobId {
33
    fn id(&self) -> usize;
34
}
35

36
impl<I: Iterator> TaskQueue<I> {
37
    fn new(iter: I) -> Self {
17✔
38
        Self {
39
            iter,
40
            jobs: vec![],
17✔
41
            next_id: 0,
42
        }
43
    }
44
}
45

46
impl<I: Iterator> Iterator for TaskQueue<I>
47
where
48
    I::Item: JobId,
49
{
50
    type Item = I::Item;
51

52
    fn next(&mut self) -> Option<Self::Item> {
94✔
53
        loop {
×
54
            if let Some(item) = self.jobs.get_mut(self.next_id) {
432✔
55
                if item.is_some() {
×
56
                    self.next_id += 1;
77✔
57
                    return item.take();
154✔
58
                }
59
            }
60
            if let Some(item) = self.iter.next() {
171✔
61
                let id = item.id();
×
62
                if id >= self.jobs.len() {
×
63
                    self.jobs.resize_with(id + 1, || None);
198✔
64
                }
65
                self.jobs[id] = Some(item);
×
66
            } else {
67
                return None;
17✔
68
            }
69
        }
70
    }
71
}
72

73
/// A compression job.
74
#[derive(Debug, PartialEq, PartialOrd, Eq, Ord, Clone)]
75
struct Job {
76
    job_id: usize,
77
    first_node: usize,
78
    last_node: usize,
79
    chunk_graph_path: PathBuf,
80
    written_bits: u64,
81
    chunk_offsets_path: PathBuf,
82
    offsets_written_bits: u64,
83
    num_arcs: u64,
84
}
85

86
impl JobId for Job {
87
    fn id(&self) -> usize {
212✔
88
        self.job_id
212✔
89
    }
90
}
91

92
impl BvComp<()> {
93
    /// Compresses s [`NodeLabelsLender`] and returns the length in bits of the
94
    /// graph bitstream.
95
    pub fn single_thread<E, L>(
×
96
        basename: impl AsRef<Path>,
97
        iter: L,
98
        compression_flags: CompFlags,
99
        build_offsets: bool,
100
        num_nodes: Option<usize>,
101
    ) -> Result<u64>
102
    where
103
        E: Endianness,
104
        L: IntoLender,
105
        L::Lender: for<'next> NodeLabelsLender<'next, Label = usize>,
106
        BufBitWriter<E, WordAdapter<usize, BufWriter<File>>>: CodesWrite<E>,
107
    {
108
        let basename = basename.as_ref();
×
109
        let graph_path = basename.with_extension(GRAPH_EXTENSION);
×
110

111
        // Compress the graph
112
        let bit_write = <BufBitWriter<E, _>>::new(<WordAdapter<usize, _>>::new(BufWriter::new(
×
113
            File::create(&graph_path)
×
114
                .with_context(|| format!("Could not create {}", graph_path.display()))?,
×
115
        )));
116

117
        let comp_flags = CompFlags {
118
            ..Default::default()
119
        };
120

121
        let codes_writer = DynCodesEncoder::new(bit_write, &comp_flags)?;
×
122

123
        let mut bvcomp = BvComp::new(
124
            codes_writer,
×
125
            compression_flags.compression_window,
×
126
            compression_flags.max_ref_count,
×
127
            compression_flags.min_interval_length,
×
128
            0,
129
        );
130

NEW
131
        let mut pl = progress_logger!(
×
NEW
132
            display_memory = true,
×
NEW
133
            item_name = "node",
×
NEW
134
            expected_updates = num_nodes,
×
135
        );
136
        pl.start("Compressing successors...");
×
137
        let mut bitstream_len = 0;
×
138

139
        let mut real_num_nodes = 0;
×
140
        if build_offsets {
×
141
            let offsets_path = basename.with_extension(OFFSETS_EXTENSION);
×
142
            let file = std::fs::File::create(&offsets_path)
×
143
                .with_context(|| format!("Could not create {}", offsets_path.display()))?;
×
144
            // create a bit writer on the file
145
            let mut writer = <BufBitWriter<E, _>>::new(<WordAdapter<usize, _>>::new(
×
146
                BufWriter::with_capacity(1 << 20, file),
×
147
            ));
148

149
            writer
×
150
                .write_gamma(0)
151
                .context("Could not write initial delta")?;
152
            for_! ( (_node_id, successors) in iter {
×
153
                let delta = bvcomp.push(successors).context("Could not push successors")?;
×
154
                bitstream_len += delta;
×
155
                writer.write_gamma(delta).context("Could not write delta")?;
×
156
                pl.update();
×
157
                real_num_nodes += 1;
×
158
            });
159
        } else {
160
            for_! ( (_node_id, successors) in iter {
×
161
                bitstream_len += bvcomp.push(successors).context("Could not push successors")?;
×
162
                pl.update();
×
163
                real_num_nodes += 1;
×
164
            });
165
        }
166
        pl.done();
×
167

168
        if let Some(num_nodes) = num_nodes {
×
169
            if num_nodes != real_num_nodes {
×
170
                log::warn!(
×
171
                    "The expected number of nodes is {num_nodes} but the actual number of nodes is {real_num_nodes}"
×
172
                );
173
            }
174
        }
175
        let num_arcs = bvcomp.arcs;
×
176
        bitstream_len += bvcomp.flush().context("Could not flush bvcomp")? as u64;
×
177

178
        log::info!("Writing the .properties file");
×
179
        let properties = compression_flags
×
180
            .to_properties::<BE>(real_num_nodes, num_arcs, bitstream_len)
×
181
            .context("Could not serialize properties")?;
182
        let properties_path = basename.with_extension(PROPERTIES_EXTENSION);
×
183
        std::fs::write(&properties_path, properties)
×
184
            .with_context(|| format!("Could not write {}", properties_path.display()))?;
×
185

186
        Ok(bitstream_len)
×
187
    }
188

189
    /// A wrapper over [`parallel_graph`](Self::parallel_graph) that takes the
190
    /// endianness as a string.
191
    ///
192
    /// Endianness can only be [`BE::NAME`](BE) or [`LE::NAME`](LE).
193
    ///
194
    ///  A given endianness is enabled only if the corresponding feature is
195
    /// enabled, `be_bins` for big endian and `le_bins` for little endian, or if
196
    /// neither features are enabled.
197
    pub fn parallel_endianness<P: AsRef<Path>, G: SplitLabeling + SequentialGraph>(
8✔
198
        basename: impl AsRef<Path> + Send + Sync,
199
        graph: &G,
200
        num_nodes: usize,
201
        compression_flags: CompFlags,
202
        threads: &ThreadPool,
203
        tmp_dir: P,
204
        endianness: &str,
205
    ) -> Result<u64>
206
    where
207
        for<'a> <G as SplitLabeling>::SplitLender<'a>: Send + Sync,
208
    {
209
        match endianness {
8✔
210
            #[cfg(any(
211
                feature = "be_bins",
212
                not(any(feature = "be_bins", feature = "le_bins"))
213
            ))]
214
            BE::NAME => {
8✔
215
                // compress the transposed graph
216
                Self::parallel_iter::<BigEndian, _>(
217
                    basename,
8✔
218
                    graph.split_iter(threads.current_num_threads()).into_iter(),
40✔
219
                    num_nodes,
8✔
220
                    compression_flags,
8✔
221
                    threads,
8✔
222
                    tmp_dir,
8✔
223
                )
224
            }
225
            #[cfg(any(
226
                feature = "le_bins",
227
                not(any(feature = "be_bins", feature = "le_bins"))
228
            ))]
229
            LE::NAME => {
×
230
                // compress the transposed graph
231
                Self::parallel_iter::<LittleEndian, _>(
232
                    basename,
×
233
                    graph.split_iter(threads.current_num_threads()).into_iter(),
×
234
                    num_nodes,
×
235
                    compression_flags,
×
236
                    threads,
×
237
                    tmp_dir,
×
238
                )
239
            }
240
            x => anyhow::bail!("Unknown endianness {}", x),
×
241
        }
242
    }
243

244
    /// Compresses a graph in parallel and returns the length in bits of the graph bitstream.
245
    pub fn parallel_graph<E: Endianness>(
9✔
246
        basename: impl AsRef<Path> + Send + Sync,
247
        graph: &(impl SequentialGraph + SplitLabeling),
248
        compression_flags: CompFlags,
249
        threads: &ThreadPool,
250
        tmp_dir: impl AsRef<Path>,
251
    ) -> Result<u64>
252
    where
253
        BufBitWriter<E, WordAdapter<usize, BufWriter<std::fs::File>>>: CodesWrite<E>,
254
        BufBitReader<E, WordAdapter<u32, BufReader<std::fs::File>>>: BitRead<E>,
255
    {
256
        Self::parallel_iter(
257
            basename,
9✔
258
            graph.split_iter(threads.current_num_threads()).into_iter(),
45✔
259
            graph.num_nodes(),
18✔
260
            compression_flags,
9✔
261
            threads,
9✔
262
            tmp_dir,
9✔
263
        )
264
    }
265

266
    /// Compresses multiple [`NodeLabelsLender`] in parallel and returns the length in bits
267
    /// of the graph bitstream.
268
    pub fn parallel_iter<
17✔
269
        E: Endianness,
270
        L: Lender + for<'next> NodeLabelsLender<'next, Label = usize> + Send,
271
    >(
272
        basename: impl AsRef<Path> + Send + Sync,
273
        iter: impl Iterator<Item = L>,
274
        num_nodes: usize,
275
        compression_flags: CompFlags,
276
        threads: &ThreadPool,
277
        tmp_dir: impl AsRef<Path>,
278
    ) -> Result<u64>
279
    where
280
        BufBitWriter<E, WordAdapter<usize, BufWriter<std::fs::File>>>: CodesWrite<E>,
281
        BufBitReader<E, WordAdapter<u32, BufReader<std::fs::File>>>: BitRead<E>,
282
    {
283
        let tmp_dir = tmp_dir.as_ref();
51✔
284
        let basename = basename.as_ref();
51✔
285

286
        let graph_path = basename.with_extension(GRAPH_EXTENSION);
51✔
287
        let offsets_path = basename.with_extension(OFFSETS_EXTENSION);
51✔
288

289
        let (tx, rx) = std::sync::mpsc::channel();
51✔
290

291
        let thread_path = |thread_id: usize| tmp_dir.join(format!("{thread_id:016x}.bitstream"));
325✔
292

293
        let mut comp_pl = progress_logger!(
34✔
NEW
294
            log_target = "webgraph::graphs::bvgraph::comp::impls::parallel_iter::comp",
×
NEW
295
            display_memory = true,
×
NEW
296
            item_name = "node",
×
NEW
297
            local_speed = true,
×
298
            expected_updates = Some(num_nodes),
17✔
299
        );
300
        comp_pl.start("Compressing successors in parallel...");
34✔
301
        let comp_pl = Arc::new(Mutex::new(comp_pl));
68✔
302
        threads.in_place_scope(|s| {
51✔
303
            let cp_flags = &compression_flags;
34✔
304

305
            for (thread_id, mut thread_lender) in iter.enumerate() {
111✔
306
                let tmp_path = thread_path(thread_id);
×
307
                let chunk_graph_path = tmp_path.with_extension(GRAPH_EXTENSION);
×
308
                let chunk_offsets_path = tmp_path.with_extension(OFFSETS_EXTENSION);
×
309
                let tx = tx.clone();
×
NEW
310
                let comp_pl = comp_pl.clone();
×
311
                // Spawn the thread
312
                s.spawn(move |_| {
77✔
313
                    log::debug!("Thread {thread_id} started");
122✔
314
                    let first_node;
×
315
                    let mut bvcomp;
×
316
                    let mut offsets_writer;
×
317
                    let mut written_bits;
×
318
                    let mut offsets_written_bits;
×
319

320
                    match thread_lender.next() {
77✔
321
                        None => return,
×
322
                        Some((node_id, successors)) => {
231✔
323
                            first_node = node_id;
154✔
324

325
                            offsets_writer = <BufBitWriter<BigEndian, _>>::new(<WordAdapter<usize, _>>::new(
308✔
326
                                BufWriter::new(File::create(&chunk_offsets_path).unwrap()),
385✔
327
                            ));
328

329
                            let writer = <BufBitWriter<E, _>>::new(<WordAdapter<usize, _>>::new(
308✔
330
                                BufWriter::new(File::create(&chunk_graph_path).unwrap()),
385✔
331
                            ));
332
                            let codes_encoder = <DynCodesEncoder<E, _>>::new(writer, cp_flags).unwrap();
462✔
333

334
                            bvcomp = BvComp::new(
231✔
335
                                codes_encoder,
154✔
336
                                cp_flags.compression_window,
154✔
337
                                cp_flags.max_ref_count,
154✔
338
                                cp_flags.min_interval_length,
154✔
339
                                node_id,
154✔
340
                            );
341
                            written_bits = bvcomp.push(successors).unwrap();
308✔
342
                            offsets_written_bits = offsets_writer.write_gamma(written_bits).unwrap() as u64;
231✔
343

344
                        }
345
                    };
346

347
                    let mut last_node = first_node;
154✔
348
                    let iter_nodes = thread_lender.inspect(|(x, _)| last_node = *x);
5,534,623✔
349
                    for_! ( (_, succ) in iter_nodes {
5,534,469✔
350
                        let node_bits = bvcomp.push(succ.into_iter()).unwrap();
×
351
                        written_bits += node_bits;
×
352
                        offsets_written_bits += offsets_writer.write_gamma(node_bits).unwrap() as u64;
×
353
                    });
354

355
                    let num_arcs = bvcomp.arcs;
154✔
356
                    bvcomp.flush().unwrap();
231✔
357
                    offsets_writer.flush().unwrap();
231✔
358
                    comp_pl.lock().unwrap().update_with_count(last_node - first_node + 1);
231✔
359

360

361

362
                    log::debug!(
77✔
363
                        "Finished Compression thread {thread_id} and wrote {written_bits} bits for the graph and {offsets_written_bits} bits for the offsets",
45✔
364
                    );
365
                    tx.send(Job {
231✔
366
                        job_id: thread_id,
154✔
367
                        first_node,
154✔
368
                        last_node,
154✔
369
                        chunk_graph_path,
154✔
370
                        written_bits,
154✔
371
                        chunk_offsets_path,
154✔
372
                        offsets_written_bits,
77✔
373
                        num_arcs,
77✔
374
                    })
375
                    .unwrap()
77✔
376
                });
377
            }
378

379
            drop(tx);
34✔
380

381
            let mut copy_pl = progress_logger!(
34✔
NEW
382
                log_target = "webgraph::graphs::bvgraph::comp::impls::parallel_iter::copy",
×
NEW
383
                display_memory = true,
×
NEW
384
                item_name = "node",
×
NEW
385
                local_speed = true,
×
386
                expected_updates = Some(num_nodes),
17✔
387
            );
388
            copy_pl.start("Copying compressed successors to final graph");
34✔
389

390
            let file = File::create(&graph_path)
51✔
391
                .with_context(|| format!("Could not create graph {}", graph_path.display()))?;
17✔
392
            let mut graph_writer =
×
393
                <BufBitWriter<E, _>>::new(<WordAdapter<usize, _>>::new(BufWriter::new(file)));
×
394

395
            let file = File::create(&offsets_path)
17✔
396
                .with_context(|| format!("Could not create offsets {}", offsets_path.display()))?;
×
397
            let mut offsets_writer =
×
398
                <BufBitWriter<BigEndian, _>>::new(<WordAdapter<usize, _>>::new(BufWriter::new(file)));
×
399
            offsets_writer.write_gamma(0)?;
×
400

401
            let mut total_written_bits: u64 = 0;
17✔
402
            let mut total_offsets_written_bits: u64 = 0;
×
403
            let mut total_arcs: u64 = 0;
×
404

405
            let mut next_node = 0;
×
406
            // glue together the bitstreams as they finish, this allows us to do
407
            // task pipelining for better performance
408
            for Job {
×
409
                job_id,
77✔
410
                first_node,
77✔
411
                last_node,
77✔
412
                chunk_graph_path,
77✔
413
                written_bits,
77✔
414
                chunk_offsets_path,
77✔
415
                offsets_written_bits,
77✔
416
                num_arcs,
77✔
417
            } in TaskQueue::new(rx.iter())
×
418
            {
419
                ensure!(
77✔
420
                    first_node == next_node,
77✔
421
                    "Non-adjacent lenders: lender {} has first node {} instead of {}",
×
422
                    job_id,
×
423
                    first_node,
×
424
                    next_node
×
425
                );
426

427
                next_node = last_node + 1;
77✔
428
                total_arcs += num_arcs;
×
NEW
429
                log::debug!(
×
430
                    "Copying {} [{}..{}) bits from {} to {}",
45✔
431
                    written_bits,
×
432
                    total_written_bits,
×
433
                    total_written_bits + written_bits,
45✔
434
                    chunk_graph_path.display(),
45✔
435
                    graph_path.display()
45✔
436
                );
437
                total_written_bits += written_bits;
×
438

439
                let mut reader =
77✔
440
                    <BufBitReader<E, _>>::new(<WordAdapter<u32, _>>::new(BufReader::new(
×
441
                        File::open(&chunk_graph_path)
×
442
                            .with_context(|| format!("Could not open {}", chunk_graph_path.display()))?,
×
443
                    )));
444
                graph_writer
×
445
                    .copy_from(&mut reader, written_bits)
×
446
                    .with_context(|| {
×
447
                        format!(
×
448
                            "Could not copy from {} to {}",
×
449
                            chunk_graph_path.display(),
×
450
                            graph_path.display()
×
451
                        )
452
                    })?;
453

454
                log::debug!(
77✔
455
                    "Copying offsets {} [{}..{}) bits from {} to {}",
45✔
456
                    offsets_written_bits,
×
457
                    total_offsets_written_bits,
×
458
                    total_offsets_written_bits + offsets_written_bits,
45✔
459
                    chunk_offsets_path.display(),
45✔
460
                    offsets_path.display()
45✔
461
                );
462
                total_offsets_written_bits += offsets_written_bits;
×
463

464
                let mut reader =
77✔
465
                    <BufBitReader<BigEndian, _>>::new(<WordAdapter<u32, _>>::new(BufReader::new(
×
466
                        File::open(&chunk_offsets_path)
×
467
                            .with_context(|| format!("Could not open {}", chunk_offsets_path.display()))?,
×
468
                    )));
469
                offsets_writer
×
470
                    .copy_from(&mut reader, offsets_written_bits)
×
471
                    .with_context(|| {
×
472
                        format!(
×
473
                            "Could not copy from {} to {}",
×
474
                            chunk_offsets_path.display(),
×
475
                            offsets_path.display()
×
476
                        )
477
                    })?;
478

479
                copy_pl.update_with_count(last_node - first_node + 1);
77✔
480
            }
481

482

483
            log::info!("Flushing the merged bitstreams");
34✔
484
            graph_writer.flush()?;
×
485
            offsets_writer.flush()?;
17✔
486

487
            Arc::into_inner(comp_pl)
17✔
NEW
488
                .expect("comp_pl reference leaked or compression thread still running")
×
NEW
489
                .into_inner().unwrap().done();
×
NEW
490
            copy_pl.done();
×
491

492
            log::info!("Writing the .properties file");
17✔
493
            let properties = compression_flags
17✔
494
                .to_properties::<BE>(num_nodes, total_arcs, total_written_bits)
×
495
                .context("Could not serialize properties")?;
×
496
            let properties_path = basename.with_extension(PROPERTIES_EXTENSION);
×
497
            std::fs::write(&properties_path, properties).with_context(|| {
×
498
                format!(
×
499
                    "Could not write properties to {}",
×
500
                    properties_path.display()
×
501
                )
502
            })?;
503

504
            log::info!(
17✔
505
                "Compressed {} arcs into {} bits for {:.4} bits/arc",
17✔
506
                total_arcs,
×
507
                total_written_bits,
×
508
                total_written_bits as f64 / total_arcs as f64
17✔
509
            );
510
            log::info!(
×
511
                "Created offsets file with {} bits for {:.4} bits/node",
17✔
512
                total_offsets_written_bits,
×
513
                total_offsets_written_bits as f64 / num_nodes as f64
17✔
514
            );
515

516
            // cleanup the temp files
517
            std::fs::remove_dir_all(tmp_dir).with_context(|| {
×
518
                format!("Could not clean temporary directory {}", tmp_dir.display())
×
519
            })?;
520
            Ok(total_written_bits)
17✔
521
        })
522
    }
523
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc