• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vigna / webgraph-rs / 16005644811

01 Jul 2025 05:04PM UTC coverage: 50.317% (-0.04%) from 50.358%
16005644811

push

github

zommiommy
Fix tests

3888 of 7727 relevant lines covered (50.32%)

23067755.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

45.64
/webgraph/src/graphs/bvgraph/comp/impls.rs
1
/*
2
 * SPDX-FileCopyrightText: 2023 Inria
3
 * SPDX-FileCopyrightText: 2023 Sebastiano Vigna
4
 *
5
 * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
6
 */
7

8
use crate::prelude::*;
9
use anyhow::{ensure, Context, Result};
10
use dsi_bitstream::prelude::*;
11
use dsi_progress_logger::prelude::*;
12
use lender::prelude::*;
13
use rayon::ThreadPool;
14

15
use std::fs::File;
16
use std::io::{BufReader, BufWriter};
17
use std::path::{Path, PathBuf};
18

19
/// A queue that pulls jobs with ids in a contiguous initial segment of the
20
/// natural numbers from an iterator out of order and implement an iterator in
21
/// which they can be pulled in order.
22
///
23
/// Jobs must be ordered by their job id, and must implement [`Eq`] with a
24
/// [`usize`] using their job id.
25
struct TaskQueue<I: Iterator> {
26
    iter: I,
27
    jobs: Vec<Option<I::Item>>,
28
    next_id: usize,
29
}
30

31
trait JobId {
32
    fn id(&self) -> usize;
33
}
34

35
impl<I: Iterator> TaskQueue<I> {
36
    fn new(iter: I) -> Self {
17✔
37
        Self {
38
            iter,
39
            jobs: vec![],
17✔
40
            next_id: 0,
41
        }
42
    }
43
}
44

45
impl<I: Iterator> Iterator for TaskQueue<I>
46
where
47
    I::Item: JobId,
48
{
49
    type Item = I::Item;
50

51
    fn next(&mut self) -> Option<Self::Item> {
94✔
52
        loop {
×
53
            if let Some(item) = self.jobs.get_mut(self.next_id) {
436✔
54
                if item.is_some() {
×
55
                    self.next_id += 1;
77✔
56
                    return item.take();
154✔
57
                }
58
            }
59
            if let Some(item) = self.iter.next() {
171✔
60
                let id = item.id();
×
61
                if id >= self.jobs.len() {
×
62
                    self.jobs.resize_with(id + 1, || None);
186✔
63
                }
64
                self.jobs[id] = Some(item);
×
65
            } else {
66
                return None;
17✔
67
            }
68
        }
69
    }
70
}
71

72
/// A compression job.
73
#[derive(Debug, PartialEq, PartialOrd, Eq, Ord, Clone)]
74
struct Job {
75
    job_id: usize,
76
    first_node: usize,
77
    last_node: usize,
78
    chunk_graph_path: PathBuf,
79
    written_bits: u64,
80
    chunk_offsets_path: PathBuf,
81
    offsets_written_bits: u64,
82
    num_arcs: u64,
83
}
84

85
impl JobId for Job {
86
    fn id(&self) -> usize {
212✔
87
        self.job_id
212✔
88
    }
89
}
90

91
impl BvComp<()> {
92
    /// Compresses s [`NodeLabelsLender`] and returns the length in bits of the
93
    /// graph bitstream.
94
    pub fn single_thread<E, L>(
×
95
        basename: impl AsRef<Path>,
96
        iter: L,
97
        compression_flags: CompFlags,
98
        build_offsets: bool,
99
        num_nodes: Option<usize>,
100
    ) -> Result<u64>
101
    where
102
        E: Endianness,
103
        L: IntoLender,
104
        L::Lender: for<'next> NodeLabelsLender<'next, Label = usize>,
105
        BufBitWriter<E, WordAdapter<usize, BufWriter<File>>>: CodesWrite<E>,
106
    {
107
        let basename = basename.as_ref();
×
108
        let graph_path = basename.with_extension(GRAPH_EXTENSION);
×
109

110
        // Compress the graph
111
        let bit_write = <BufBitWriter<E, _>>::new(<WordAdapter<usize, _>>::new(BufWriter::new(
×
112
            File::create(&graph_path)
×
113
                .with_context(|| format!("Could not create {}", graph_path.display()))?,
×
114
        )));
115

116
        let comp_flags = CompFlags {
117
            ..Default::default()
118
        };
119

120
        let codes_writer = DynCodesEncoder::new(bit_write, &comp_flags)?;
×
121

122
        let mut bvcomp = BvComp::new(
123
            codes_writer,
×
124
            compression_flags.compression_window,
×
125
            compression_flags.max_ref_count,
×
126
            compression_flags.min_interval_length,
×
127
            0,
128
        );
129

130
        let mut pl = ProgressLogger::default();
×
131
        pl.display_memory(true)
×
132
            .item_name("node")
133
            .expected_updates(num_nodes);
×
134
        pl.start("Compressing successors...");
×
135
        let mut bitstream_len = 0;
×
136

137
        let mut real_num_nodes = 0;
×
138
        if build_offsets {
×
139
            let offsets_path = basename.with_extension(OFFSETS_EXTENSION);
×
140
            let file = std::fs::File::create(&offsets_path)
×
141
                .with_context(|| format!("Could not create {}", offsets_path.display()))?;
×
142
            // create a bit writer on the file
143
            let mut writer = <BufBitWriter<E, _>>::new(<WordAdapter<usize, _>>::new(
×
144
                BufWriter::with_capacity(1 << 20, file),
×
145
            ));
146

147
            writer
×
148
                .write_gamma(0)
149
                .context("Could not write initial delta")?;
150
            for_! ( (_node_id, successors) in iter {
×
151
                let delta = bvcomp.push(successors).context("Could not push successors")?;
×
152
                bitstream_len += delta;
×
153
                writer.write_gamma(delta).context("Could not write delta")?;
×
154
                pl.update();
×
155
                real_num_nodes += 1;
×
156
            });
157
        } else {
158
            for_! ( (_node_id, successors) in iter {
×
159
                bitstream_len += bvcomp.push(successors).context("Could not push successors")?;
×
160
                pl.update();
×
161
                real_num_nodes += 1;
×
162
            });
163
        }
164
        pl.done();
×
165

166
        if let Some(num_nodes) = num_nodes {
×
167
            if num_nodes != real_num_nodes {
×
168
                log::warn!(
×
169
                    "The expected number of nodes is {num_nodes} but the actual number of nodes is {real_num_nodes}"
×
170
                );
171
            }
172
        }
173
        let num_arcs = bvcomp.arcs;
×
174
        bitstream_len += bvcomp.flush().context("Could not flush bvcomp")? as u64;
×
175

176
        log::info!("Writing the .properties file");
×
177
        let properties = compression_flags
×
178
            .to_properties::<BE>(real_num_nodes, num_arcs, bitstream_len)
×
179
            .context("Could not serialize properties")?;
180
        let properties_path = basename.with_extension(PROPERTIES_EXTENSION);
×
181
        std::fs::write(&properties_path, properties)
×
182
            .with_context(|| format!("Could not write {}", properties_path.display()))?;
×
183

184
        Ok(bitstream_len)
×
185
    }
186

187
    /// A wrapper over [`parallel_graph`](Self::parallel_graph) that takes the
188
    /// endianness as a string.
189
    ///
190
    /// Endianness can only be [`BE::NAME`](BE) or [`LE::NAME`](LE).
191
    ///
192
    ///  A given endianness is enabled only if the corresponding feature is
193
    /// enabled, `be_bins` for big endian and `le_bins` for little endian, or if
194
    /// neither features are enabled.
195
    pub fn parallel_endianness<P: AsRef<Path>, G: SplitLabeling + SequentialGraph>(
8✔
196
        basename: impl AsRef<Path> + Send + Sync,
197
        graph: &G,
198
        num_nodes: usize,
199
        compression_flags: CompFlags,
200
        threads: &ThreadPool,
201
        tmp_dir: P,
202
        endianness: &str,
203
    ) -> Result<u64>
204
    where
205
        for<'a> <G as SplitLabeling>::SplitLender<'a>: Send + Sync,
206
    {
207
        match endianness {
8✔
208
            #[cfg(any(
209
                feature = "be_bins",
210
                not(any(feature = "be_bins", feature = "le_bins"))
211
            ))]
212
            BE::NAME => {
8✔
213
                // compress the transposed graph
214
                Self::parallel_iter::<BigEndian, _>(
215
                    basename,
8✔
216
                    graph.split_iter(threads.current_num_threads()).into_iter(),
40✔
217
                    num_nodes,
8✔
218
                    compression_flags,
8✔
219
                    threads,
8✔
220
                    tmp_dir,
8✔
221
                )
222
            }
223
            #[cfg(any(
224
                feature = "le_bins",
225
                not(any(feature = "be_bins", feature = "le_bins"))
226
            ))]
227
            LE::NAME => {
×
228
                // compress the transposed graph
229
                Self::parallel_iter::<LittleEndian, _>(
230
                    basename,
×
231
                    graph.split_iter(threads.current_num_threads()).into_iter(),
×
232
                    num_nodes,
×
233
                    compression_flags,
×
234
                    threads,
×
235
                    tmp_dir,
×
236
                )
237
            }
238
            x => anyhow::bail!("Unknown endianness {}", x),
×
239
        }
240
    }
241

242
    /// Compresses a graph in parallel and returns the length in bits of the graph bitstream.
243
    pub fn parallel_graph<E: Endianness>(
9✔
244
        basename: impl AsRef<Path> + Send + Sync,
245
        graph: &(impl SequentialGraph + SplitLabeling),
246
        compression_flags: CompFlags,
247
        threads: &ThreadPool,
248
        tmp_dir: impl AsRef<Path>,
249
    ) -> Result<u64>
250
    where
251
        BufBitWriter<E, WordAdapter<usize, BufWriter<std::fs::File>>>: CodesWrite<E>,
252
        BufBitReader<E, WordAdapter<u32, BufReader<std::fs::File>>>: BitRead<E>,
253
    {
254
        Self::parallel_iter(
255
            basename,
9✔
256
            graph.split_iter(threads.current_num_threads()).into_iter(),
45✔
257
            graph.num_nodes(),
18✔
258
            compression_flags,
9✔
259
            threads,
9✔
260
            tmp_dir,
9✔
261
        )
262
    }
263

264
    /// Compresses multiple [`NodeLabelsLender`] in parallel and returns the length in bits
265
    /// of the graph bitstream.
266
    pub fn parallel_iter<
17✔
267
        E: Endianness,
268
        L: Lender + for<'next> NodeLabelsLender<'next, Label = usize> + Send,
269
    >(
270
        basename: impl AsRef<Path> + Send + Sync,
271
        iter: impl Iterator<Item = L>,
272
        num_nodes: usize,
273
        compression_flags: CompFlags,
274
        threads: &ThreadPool,
275
        tmp_dir: impl AsRef<Path>,
276
    ) -> Result<u64>
277
    where
278
        BufBitWriter<E, WordAdapter<usize, BufWriter<std::fs::File>>>: CodesWrite<E>,
279
        BufBitReader<E, WordAdapter<u32, BufReader<std::fs::File>>>: BitRead<E>,
280
    {
281
        let tmp_dir = tmp_dir.as_ref();
51✔
282
        let basename = basename.as_ref();
51✔
283

284
        let graph_path = basename.with_extension(GRAPH_EXTENSION);
51✔
285
        let offsets_path = basename.with_extension(OFFSETS_EXTENSION);
51✔
286

287
        let (tx, rx) = std::sync::mpsc::channel();
51✔
288

289
        let thread_path = |thread_id: usize| tmp_dir.join(format!("{thread_id:016x}.bitstream"));
325✔
290

291
        threads.in_place_scope(|s| {
51✔
292
            let cp_flags = &compression_flags;
34✔
293

294
            for (thread_id, mut thread_lender) in iter.enumerate() {
111✔
295
                let tmp_path = thread_path(thread_id);
×
296
                let chunk_graph_path = tmp_path.with_extension(GRAPH_EXTENSION);
×
297
                let chunk_offsets_path = tmp_path.with_extension(OFFSETS_EXTENSION);
×
298
                let tx = tx.clone();
×
299
                // Spawn the thread
300
                s.spawn(move |_| {
77✔
301
                    log::info!("Thread {thread_id} started");
154✔
302
                    let first_node;
×
303
                    let mut bvcomp;
×
304
                    let mut offsets_writer;
×
305
                    let mut written_bits;
×
306
                    let mut offsets_written_bits;
×
307

308
                    match thread_lender.next() {
77✔
309
                        None => return,
×
310
                        Some((node_id, successors)) => {
231✔
311
                            first_node = node_id;
154✔
312

313
                            offsets_writer = <BufBitWriter<BigEndian, _>>::new(<WordAdapter<usize, _>>::new(
308✔
314
                                BufWriter::new(File::create(&chunk_offsets_path).unwrap()),
385✔
315
                            ));
316

317
                            let writer = <BufBitWriter<E, _>>::new(<WordAdapter<usize, _>>::new(
308✔
318
                                BufWriter::new(File::create(&chunk_graph_path).unwrap()),
385✔
319
                            ));
320
                            let codes_encoder = <DynCodesEncoder<E, _>>::new(writer, cp_flags).unwrap();
462✔
321

322
                            bvcomp = BvComp::new(
231✔
323
                                codes_encoder,
154✔
324
                                cp_flags.compression_window,
154✔
325
                                cp_flags.max_ref_count,
154✔
326
                                cp_flags.min_interval_length,
154✔
327
                                node_id,
154✔
328
                            );
329
                            written_bits = bvcomp.push(successors).unwrap();
308✔
330
                            offsets_written_bits = offsets_writer.write_gamma(written_bits).unwrap() as u64;
231✔
331
                        }
332
                    };
333

334
                    let mut last_node = first_node;
154✔
335
                    let iter_nodes = thread_lender.inspect(|(x, _)| last_node = *x);
5,534,623✔
336
                    for_! ( (_, succ) in iter_nodes {
5,534,469✔
337
                        let node_bits = bvcomp.push(succ.into_iter()).unwrap();
×
338
                        written_bits += node_bits;
×
339
                        offsets_written_bits += offsets_writer.write_gamma(node_bits).unwrap() as u64;
×
340
                    });
341

342
                    let num_arcs = bvcomp.arcs;
154✔
343
                    bvcomp.flush().unwrap();
231✔
344
                    offsets_writer.flush().unwrap();
231✔
345

346
                    log::info!(
77✔
347
                        "Finished Compression thread {thread_id} and wrote {written_bits} bits for the graph and {offsets_written_bits} bits for the offsets",
77✔
348
                    );
349
                    tx.send(Job {
231✔
350
                        job_id: thread_id,
154✔
351
                        first_node,
154✔
352
                        last_node,
154✔
353
                        chunk_graph_path,
154✔
354
                        written_bits,
154✔
355
                        chunk_offsets_path,
154✔
356
                        offsets_written_bits,
77✔
357
                        num_arcs,
77✔
358
                    })
359
                    .unwrap()
77✔
360
                });
361
            }
362

363
            drop(tx);
34✔
364

365
            let file = File::create(&graph_path)
51✔
366
                .with_context(|| format!("Could not create graph {}", graph_path.display()))?;
17✔
367
            let mut graph_writer =
×
368
                <BufBitWriter<E, _>>::new(<WordAdapter<usize, _>>::new(BufWriter::new(file)));
×
369

370
            let file = File::create(&offsets_path)
17✔
371
                .with_context(|| format!("Could not create offsets {}", offsets_path.display()))?;
×
372
            let mut offsets_writer =
×
373
                <BufBitWriter<BigEndian, _>>::new(<WordAdapter<usize, _>>::new(BufWriter::new(file)));
×
374
            offsets_writer.write_gamma(0)?;
×
375

376
            let mut total_written_bits: u64 = 0;
17✔
377
            let mut total_offsets_written_bits: u64 = 0;
×
378
            let mut total_arcs: u64 = 0;
×
379

380
            let mut next_node = 0;
×
381
            // glue together the bitstreams as they finish, this allows us to do
382
            // task pipelining for better performance
383
            for Job {
×
384
                job_id,
77✔
385
                first_node,
77✔
386
                last_node,
77✔
387
                chunk_graph_path,
77✔
388
                written_bits,
77✔
389
                chunk_offsets_path,
77✔
390
                offsets_written_bits,
77✔
391
                num_arcs,
77✔
392
            } in TaskQueue::new(rx.iter())
×
393
            {
394
                ensure!(
77✔
395
                    first_node == next_node,
77✔
396
                    "Non-adjacent lenders: lender {} has first node {} instead of {}",
×
397
                    job_id,
×
398
                    first_node,
×
399
                    next_node
×
400
                );
401

402
                next_node = last_node + 1;
77✔
403
                total_arcs += num_arcs;
×
404
                log::info!(
×
405
                    "Copying {} [{}..{}) bits from {} to {}",
77✔
406
                    written_bits,
×
407
                    total_written_bits,
×
408
                    total_written_bits + written_bits,
77✔
409
                    chunk_graph_path.display(),
77✔
410
                    graph_path.display()
77✔
411
                );
412
                total_written_bits += written_bits;
×
413

414
                let mut reader =
77✔
415
                    <BufBitReader<E, _>>::new(<WordAdapter<u32, _>>::new(BufReader::new(
×
416
                        File::open(&chunk_graph_path)
×
417
                            .with_context(|| format!("Could not open {}", chunk_graph_path.display()))?,
×
418
                    )));
419
                graph_writer
×
420
                    .copy_from(&mut reader, written_bits)
×
421
                    .with_context(|| {
×
422
                        format!(
×
423
                            "Could not copy from {} to {}",
×
424
                            chunk_graph_path.display(),
×
425
                            graph_path.display()
×
426
                        )
427
                    })?;
428

429
                log::info!(
77✔
430
                    "Copying offsets {} [{}..{}) bits from {} to {}",
77✔
431
                    offsets_written_bits,
×
432
                    total_offsets_written_bits,
×
433
                    total_offsets_written_bits + offsets_written_bits,
77✔
434
                    chunk_offsets_path.display(),
77✔
435
                    offsets_path.display()
77✔
436
                );
437
                total_offsets_written_bits += offsets_written_bits;
×
438

439
                let mut reader =
77✔
440
                    <BufBitReader<BigEndian, _>>::new(<WordAdapter<u32, _>>::new(BufReader::new(
×
441
                        File::open(&chunk_offsets_path)
×
442
                            .with_context(|| format!("Could not open {}", chunk_offsets_path.display()))?,
×
443
                    )));
444
                offsets_writer
×
445
                    .copy_from(&mut reader, offsets_written_bits)
×
446
                    .with_context(|| {
×
447
                        format!(
×
448
                            "Could not copy from {} to {}",
×
449
                            chunk_offsets_path.display(),
×
450
                            offsets_path.display()
×
451
                        )
452
                    })?;
453
            }
454

455
            log::info!("Flushing the merged bitstreams");
34✔
456
            graph_writer.flush()?;
×
457
            offsets_writer.flush()?;
17✔
458

459
            log::info!("Writing the .properties file");
34✔
460
            let properties = compression_flags
17✔
461
                .to_properties::<BE>(num_nodes, total_arcs, total_written_bits)
×
462
                .context("Could not serialize properties")?;
×
463
            let properties_path = basename.with_extension(PROPERTIES_EXTENSION);
×
464
            std::fs::write(&properties_path, properties).with_context(|| {
×
465
                format!(
×
466
                    "Could not write properties to {}",
×
467
                    properties_path.display()
×
468
                )
469
            })?;
470

471
            log::info!(
17✔
472
                "Compressed {} arcs into {} bits for {:.4} bits/arc",
17✔
473
                total_arcs,
×
474
                total_written_bits,
×
475
                total_written_bits as f64 / total_arcs as f64
17✔
476
            );
477
            log::info!(
×
478
                "Created offsets file with {} bits for {:.4} bits/node",
17✔
479
                total_offsets_written_bits,
×
480
                total_offsets_written_bits as f64 / num_nodes as f64
17✔
481
            );
482

483
            // cleanup the temp files
484
            std::fs::remove_dir_all(tmp_dir).with_context(|| {
×
485
                format!("Could not clean temporary directory {}", tmp_dir.display())
×
486
            })?;
487
            Ok(total_written_bits)
17✔
488
        })
489
    }
490
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc