• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vigna / webgraph-rs / 11837625908

12 Nov 2024 07:33PM UTC coverage: 53.594% (-0.04%) from 53.631%
11837625908

push

github

vigna
No more Borrow

5 of 13 new or added lines in 6 files covered. (38.46%)

2 existing lines in 2 files now uncovered.

2371 of 4424 relevant lines covered (53.59%)

23193450.13 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.1
/src/graphs/bvgraph/comp/impls.rs
1
/*
2
 * SPDX-FileCopyrightText: 2023 Inria
3
 * SPDX-FileCopyrightText: 2023 Sebastiano Vigna
4
 *
5
 * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
6
 */
7

8
use crate::prelude::*;
9
use anyhow::{ensure, Context, Result};
10
use dsi_bitstream::prelude::*;
11
use dsi_progress_logger::prelude::*;
12
use lender::prelude::*;
13
use rayon::ThreadPool;
14

15
use std::fs::File;
16
use std::io::{BufReader, BufWriter};
17
use std::path::{Path, PathBuf};
18

19
/// A queue that pulls jobs with ids in a contiguous initial segment of the
20
/// natural numbers from an iterator out of order and implement an iterator in
21
/// which they can be pulled in order.
22
///
23
/// Jobs must be ordered by their job id, and must implement [`Eq`] with a
24
/// [`usize`] using their job id.
25
struct TaskQueue<I: Iterator> {
26
    iter: I,
27
    jobs: Vec<Option<I::Item>>,
28
    next_id: usize,
29
}
30

31
trait JobId {
32
    fn id(&self) -> usize;
33
}
34

35
impl<I: Iterator> TaskQueue<I> {
36
    fn new(iter: I) -> Self {
40✔
37
        Self {
38
            iter,
39
            jobs: vec![],
40✔
40
            next_id: 0,
41
        }
42
    }
43
}
44

45
impl<I: Iterator> Iterator for TaskQueue<I>
46
where
47
    I::Item: JobId,
48
{
49
    type Item = I::Item;
50

51
    fn next(&mut self) -> Option<Self::Item> {
236✔
52
        loop {
×
53
            if let Some(item) = self.jobs.get_mut(self.next_id) {
678✔
54
                if item.is_some() {
×
55
                    self.next_id += 1;
196✔
56
                    return item.take();
196✔
57
                }
58
            }
59
            if let Some(item) = self.iter.next() {
432✔
60
                let id = item.id();
×
61
                if id >= self.jobs.len() {
156✔
62
                    self.jobs.resize_with(id + 1, || None);
508✔
63
                }
64
                self.jobs[id] = Some(item);
196✔
65
            } else {
66
                return None;
40✔
67
            }
68
        }
69
    }
70
}
71

72
/// A compression job.
73
#[derive(Debug, PartialEq, PartialOrd, Eq, Ord, Clone)]
74
struct Job {
75
    job_id: usize,
76
    first_node: usize,
77
    last_node: usize,
78
    chunk_graph_path: PathBuf,
79
    written_bits: u64,
80
    chunk_offsets_path: PathBuf,
81
    offsets_written_bits: u64,
82
    num_arcs: u64,
83
}
84

85
impl JobId for Job {
86
    fn id(&self) -> usize {
196✔
87
        self.job_id
196✔
88
    }
89
}
90

91
impl BvComp<()> {
92
    /// Compresses s [`NodeLabelsLender`] and returns the length in bits of the
93
    /// graph bitstream.
94
    pub fn single_thread<E, L>(
×
95
        basename: impl AsRef<Path>,
96
        iter: L,
97
        compression_flags: CompFlags,
98
        build_offsets: bool,
99
        num_nodes: Option<usize>,
100
    ) -> Result<u64>
101
    where
102
        E: Endianness,
103
        L: IntoLender,
104
        L::Lender: for<'next> NodeLabelsLender<'next, Label = usize>,
105
        BufBitWriter<E, WordAdapter<usize, BufWriter<File>>>: CodeWrite<E>,
106
    {
107
        let basename = basename.as_ref();
×
108
        let graph_path = basename.with_extension(GRAPH_EXTENSION);
×
109

110
        // Compress the graph
111
        let bit_write = <BufBitWriter<E, _>>::new(<WordAdapter<usize, _>>::new(BufWriter::new(
×
112
            File::create(&graph_path)
×
113
                .with_context(|| format!("Could not create {}", graph_path.display()))?,
×
114
        )));
115

116
        let comp_flags = CompFlags {
117
            ..Default::default()
118
        };
119

120
        let codes_writer = DynCodesEncoder::new(bit_write, &comp_flags);
×
121

122
        let mut bvcomp = BvComp::new(
123
            codes_writer,
×
124
            compression_flags.compression_window,
×
125
            compression_flags.max_ref_count,
×
126
            compression_flags.min_interval_length,
×
127
            0,
128
        );
129

130
        let mut pl = ProgressLogger::default();
×
131
        pl.display_memory(true)
×
132
            .item_name("node")
133
            .expected_updates(num_nodes);
×
134
        pl.start("Compressing successors...");
×
135
        let mut bitstream_len = 0;
×
136

137
        let mut real_num_nodes = 0;
×
138
        if build_offsets {
×
139
            let offsets_path = basename.with_extension(OFFSETS_EXTENSION);
×
140
            let file = std::fs::File::create(&offsets_path)
×
141
                .with_context(|| format!("Could not create {}", offsets_path.display()))?;
×
142
            // create a bit writer on the file
143
            let mut writer = <BufBitWriter<E, _>>::new(<WordAdapter<usize, _>>::new(
×
144
                BufWriter::with_capacity(1 << 20, file),
×
145
            ));
146

147
            writer
×
148
                .write_gamma(0)
149
                .context("Could not write initial delta")?;
150
            for_! ( (_node_id, successors) in iter {
×
151
                let delta = bvcomp.push(successors).context("Could not push successors")?;
×
152
                bitstream_len += delta;
×
153
                writer.write_gamma(delta).context("Could not write delta")?;
×
154
                pl.update();
×
155
                real_num_nodes += 1;
×
156
            });
157
        } else {
158
            for_! ( (_node_id, successors) in iter {
×
159
                bitstream_len += bvcomp.push(successors).context("Could not push successors")?;
×
160
                pl.update();
×
161
                real_num_nodes += 1;
×
162
            });
163
        }
164
        pl.done();
×
165

166
        if let Some(num_nodes) = num_nodes {
×
167
            if num_nodes != real_num_nodes {
×
168
                log::warn!(
×
169
                    "The expected number of nodes is {} but the actual number of nodes is {}",
×
170
                    num_nodes,
×
171
                    real_num_nodes
×
172
                );
173
            }
174
        }
175
        let num_arcs = bvcomp.arcs;
×
176
        bitstream_len += bvcomp.flush().context("Could not flush bvcomp")? as u64;
×
177

178
        log::info!("Writing the .properties file");
×
179
        let properties = compression_flags
×
180
            .to_properties::<BE>(real_num_nodes, num_arcs, bitstream_len)
×
181
            .context("Could not serialize properties")?;
182
        let properties_path = basename.with_extension(PROPERTIES_EXTENSION);
×
183
        std::fs::write(&properties_path, properties)
×
184
            .with_context(|| format!("Could not write {}", properties_path.display()))?;
×
185

186
        Ok(bitstream_len)
×
187
    }
188

189
    /// A wrapper over [`parallel_graph`](Self::parallel_graph) that takes the
190
    /// endianness as a string.
191
    ///
192
    /// Endianness can only be [`BE::NAME`](BE) or [`LE::NAME`](LE).
193
    ///
194
    ///  A given endianness is enabled only if the corresponding feature is
195
    /// enabled, `be_bins` for big endian and `le_bins` for little endian, or if
196
    /// neither features are enabled.
197
    pub fn parallel_endianness<P: AsRef<Path>, G: SplitLabeling + SequentialGraph>(
4✔
198
        basename: impl AsRef<Path> + Send + Sync,
199
        graph: &G,
200
        num_nodes: usize,
201
        compression_flags: CompFlags,
202
        threads: &ThreadPool,
203
        tmp_dir: P,
204
        endianness: &str,
205
    ) -> Result<u64>
206
    where
207
        for<'a> <G as SplitLabeling>::SplitLender<'a>: Send + Sync,
208
    {
209
        match endianness {
4✔
210
            #[cfg(any(
211
                feature = "be_bins",
212
                not(any(feature = "be_bins", feature = "le_bins"))
213
            ))]
214
            BE::NAME => {
4✔
215
                // compress the transposed graph
216
                Self::parallel_iter::<BigEndian, _>(
217
                    basename,
4✔
218
                    graph.split_iter(threads.current_num_threads()).into_iter(),
4✔
219
                    num_nodes,
4✔
220
                    compression_flags,
4✔
221
                    threads,
4✔
222
                    tmp_dir,
4✔
223
                )
224
            }
225
            #[cfg(any(
226
                feature = "le_bins",
227
                not(any(feature = "be_bins", feature = "le_bins"))
228
            ))]
229
            LE::NAME => {
×
230
                // compress the transposed graph
231
                Self::parallel_iter::<LittleEndian, _>(
232
                    basename,
×
NEW
233
                    graph.split_iter(threads.current_num_threads()).into_iter(),
×
234
                    num_nodes,
×
235
                    compression_flags,
×
236
                    threads,
×
237
                    tmp_dir,
×
238
                )
239
            }
240
            x => anyhow::bail!("Unknown endianness {}", x),
×
241
        }
242
    }
243

244
    /// Compresses a graph in parallel and returns the length in bits of the graph bitstream.
245
    pub fn parallel_graph<E: Endianness>(
18✔
246
        basename: impl AsRef<Path> + Send + Sync,
247
        graph: &(impl SequentialGraph + SplitLabeling),
248
        compression_flags: CompFlags,
249
        threads: &ThreadPool,
250
        tmp_dir: impl AsRef<Path>,
251
    ) -> Result<u64>
252
    where
253
        BufBitWriter<E, WordAdapter<usize, BufWriter<std::fs::File>>>: CodeWrite<E>,
254
        BufBitReader<E, WordAdapter<u32, BufReader<std::fs::File>>>: BitRead<E>,
255
    {
256
        Self::parallel_iter(
257
            basename,
18✔
258
            graph.split_iter(threads.current_num_threads()).into_iter(),
18✔
259
            graph.num_nodes(),
18✔
260
            compression_flags,
18✔
261
            threads,
18✔
262
            tmp_dir,
18✔
263
        )
264
    }
265

266
    /// Compresses multiple [`NodeLabelsLender`] in parallel and returns the length in bits
267
    /// of the graph bitstream.
268
    pub fn parallel_iter<
22✔
269
        E: Endianness,
270
        L: Lender + for<'next> NodeLabelsLender<'next, Label = usize> + Send,
271
    >(
272
        basename: impl AsRef<Path> + Send + Sync,
273
        iter: impl Iterator<Item = L>,
274
        num_nodes: usize,
275
        compression_flags: CompFlags,
276
        threads: &ThreadPool,
277
        tmp_dir: impl AsRef<Path>,
278
    ) -> Result<u64>
279
    where
280
        BufBitWriter<E, WordAdapter<usize, BufWriter<std::fs::File>>>: CodeWrite<E>,
281
        BufBitReader<E, WordAdapter<u32, BufReader<std::fs::File>>>: BitRead<E>,
282
    {
283
        let tmp_dir = tmp_dir.as_ref();
22✔
284
        let basename = basename.as_ref();
22✔
285

286
        let graph_path = basename.with_extension(GRAPH_EXTENSION);
22✔
287
        let offsets_path = basename.with_extension(OFFSETS_EXTENSION);
22✔
288

289
        let (tx, rx) = std::sync::mpsc::channel();
22✔
290

291
        let thread_path = |thread_id: usize| tmp_dir.join(format!("{:016x}.bitstream", thread_id));
128✔
292

293
        threads.in_place_scope(|s| {
44✔
294
            let cp_flags = &compression_flags;
22✔
295

296
            for (thread_id, mut thread_lender) in iter.enumerate() {
234✔
297
                let tmp_path = thread_path(thread_id);
106✔
298
                let chunk_graph_path = tmp_path.with_extension(GRAPH_EXTENSION);
106✔
299
                let chunk_offsets_path = tmp_path.with_extension(OFFSETS_EXTENSION);
106✔
300
                let tx = tx.clone();
106✔
301
                // Spawn the thread
302
                s.spawn(move |_| {
212✔
303
                    log::info!("Thread {} started", thread_id);
212✔
304
                    let first_node;
×
305
                    let mut bvcomp;
×
306
                    let mut offsets_writer;
×
307
                    let mut written_bits;
×
308
                    let mut offsets_written_bits;
×
309

310
                    match thread_lender.next() {
106✔
311
                        None => return,
×
312
                        Some((node_id, successors)) => {
106✔
313
                            first_node = node_id;
106✔
314

315
                            offsets_writer = <BufBitWriter<BigEndian, _>>::new(<WordAdapter<usize, _>>::new(
106✔
316
                                BufWriter::new(File::create(&chunk_offsets_path).unwrap()),
106✔
317
                            ));
318

319
                            let writer = <BufBitWriter<E, _>>::new(<WordAdapter<usize, _>>::new(
106✔
320
                                BufWriter::new(File::create(&chunk_graph_path).unwrap()),
106✔
321
                            ));
322
                            let codes_encoder = <DynCodesEncoder<E, _>>::new(writer, cp_flags);
106✔
323

324
                            bvcomp = BvComp::new(
106✔
325
                                codes_encoder,
106✔
326
                                cp_flags.compression_window,
106✔
327
                                cp_flags.max_ref_count,
106✔
328
                                cp_flags.min_interval_length,
106✔
329
                                node_id,
106✔
330
                            );
331
                            written_bits = bvcomp.push(successors).unwrap();
106✔
332
                            offsets_written_bits = offsets_writer.write_gamma(written_bits).unwrap() as u64;
106✔
333
                        }
334
                    };
335

336
                    let mut last_node = first_node;
106✔
337
                    let iter_nodes = thread_lender.inspect(|(x, _)| last_node = *x);
7,162,360✔
338
                    for_! ( (_, succ) in iter_nodes {
7,162,254✔
339
                        let node_bits = bvcomp.push(succ.into_iter()).unwrap();
7,162,148✔
340
                        written_bits += node_bits;
7,162,148✔
341
                        offsets_written_bits += offsets_writer.write_gamma(node_bits).unwrap() as u64;
7,162,148✔
342
                    });
343

344
                    let num_arcs = bvcomp.arcs;
106✔
345
                    bvcomp.flush().unwrap();
106✔
346
                    offsets_writer.flush().unwrap();
106✔
347

348
                    log::info!(
106✔
349
                        "Finished Compression thread {} and wrote {} bits for the graph and {} bits for the offsets",
106✔
350
                        thread_id,
×
351
                        written_bits,
×
352
                        offsets_written_bits,
×
353
                    );
354
                    tx.send(Job {
106✔
355
                        job_id: thread_id,
106✔
356
                        first_node,
106✔
357
                        last_node,
106✔
358
                        chunk_graph_path,
106✔
359
                        written_bits,
106✔
360
                        chunk_offsets_path,
106✔
361
                        offsets_written_bits,
106✔
362
                        num_arcs,
106✔
363
                    })
364
                    .unwrap()
106✔
365
                });
366
            }
367

368
            drop(tx);
22✔
369

370
            let file = File::create(&graph_path)
44✔
371
                .with_context(|| format!("Could not create graph {}", graph_path.display()))?;
44✔
372
            let mut graph_writer =
×
373
                <BufBitWriter<E, _>>::new(<WordAdapter<usize, _>>::new(BufWriter::new(file)));
×
374

375
            let file = File::create(&offsets_path)
22✔
376
                .with_context(|| format!("Could not create offsets {}", offsets_path.display()))?;
×
377
            let mut offsets_writer =
×
378
                <BufBitWriter<BigEndian, _>>::new(<WordAdapter<usize, _>>::new(BufWriter::new(file)));
×
379
            offsets_writer.write_gamma(0)?;
×
380

381
            let mut total_written_bits: u64 = 0;
22✔
382
            let mut total_offsets_written_bits: u64 = 0;
22✔
383
            let mut total_arcs: u64 = 0;
22✔
384

385
            let mut next_node = 0;
22✔
386
            // glue together the bitstreams as they finish, this allows us to do
387
            // task pipelining for better performance
388
            for Job {
×
389
                job_id,
106✔
390
                first_node,
106✔
391
                last_node,
106✔
392
                chunk_graph_path,
106✔
393
                written_bits,
106✔
394
                chunk_offsets_path,
106✔
395
                offsets_written_bits,
106✔
396
                num_arcs,
106✔
397
            } in TaskQueue::new(rx.iter())
×
398
            {
399
                ensure!(
106✔
400
                    first_node == next_node,
106✔
401
                    "Non-adjacent lenders: lender {} has first node {} instead of {}",
×
402
                    job_id,
×
403
                    first_node,
×
404
                    next_node
×
405
                );
406

407
                next_node = last_node + 1;
106✔
408
                total_arcs += num_arcs;
106✔
409
                log::info!(
106✔
410
                    "Copying {} [{}..{}) bits from {} to {}",
106✔
411
                    written_bits,
106✔
412
                    total_written_bits,
106✔
413
                    total_written_bits + written_bits,
106✔
414
                    chunk_graph_path.display(),
106✔
415
                    graph_path.display()
106✔
416
                );
417
                total_written_bits += written_bits;
106✔
418

419
                let mut reader =
106✔
420
                    <BufBitReader<E, _>>::new(<WordAdapter<u32, _>>::new(BufReader::new(
×
421
                        File::open(&chunk_graph_path)
×
422
                            .with_context(|| format!("Could not open {}", chunk_graph_path.display()))?,
×
423
                    )));
424
                graph_writer
×
425
                    .copy_from(&mut reader, written_bits)
×
426
                    .with_context(|| {
×
427
                        format!(
×
428
                            "Could not copy from {} to {}",
×
429
                            chunk_graph_path.display(),
×
430
                            graph_path.display()
×
431
                        )
432
                    })?;
433

434
                log::info!(
106✔
435
                    "Copying offsets {} [{}..{}) bits from {} to {}",
106✔
436
                    offsets_written_bits,
106✔
437
                    total_offsets_written_bits,
106✔
438
                    total_offsets_written_bits + offsets_written_bits,
106✔
439
                    chunk_offsets_path.display(),
106✔
440
                    offsets_path.display()
106✔
441
                );
442
                total_offsets_written_bits += offsets_written_bits;
106✔
443

444
                let mut reader =
106✔
445
                    <BufBitReader<BigEndian, _>>::new(<WordAdapter<u32, _>>::new(BufReader::new(
×
446
                        File::open(&chunk_offsets_path)
×
447
                            .with_context(|| format!("Could not open {}", chunk_offsets_path.display()))?,
×
448
                    )));
449
                offsets_writer
×
450
                    .copy_from(&mut reader, offsets_written_bits)
×
451
                    .with_context(|| {
×
452
                        format!(
×
453
                            "Could not copy from {} to {}",
×
454
                            chunk_offsets_path.display(),
×
455
                            offsets_path.display()
×
456
                        )
457
                    })?;
458
            }
459

460
            log::info!("Flushing the merged bitstreams");
44✔
461
            graph_writer.flush()?;
22✔
462
            offsets_writer.flush()?;
22✔
463

464
            log::info!("Writing the .properties file");
44✔
465
            let properties = compression_flags
44✔
466
                .to_properties::<BE>(num_nodes, total_arcs, total_written_bits)
22✔
467
                .context("Could not serialize properties")?;
22✔
468
            let properties_path = basename.with_extension(PROPERTIES_EXTENSION);
×
469
            std::fs::write(&properties_path, properties).with_context(|| {
×
470
                format!(
×
471
                    "Could not write properties to {}",
×
472
                    properties_path.display()
×
473
                )
474
            })?;
475

476
            log::info!(
22✔
477
                "Compressed {} arcs into {} bits for {:.4} bits/arc",
22✔
478
                total_arcs,
22✔
479
                total_written_bits,
22✔
480
                total_written_bits as f64 / total_arcs as f64
22✔
481
            );
482
            log::info!(
22✔
483
                "Created offsets file with {} bits for {:.4} bits/node",
22✔
484
                total_offsets_written_bits,
22✔
485
                total_offsets_written_bits as f64 / num_nodes as f64
22✔
486
            );
487

488
            // cleanup the temp files
489
            std::fs::remove_dir_all(tmp_dir).with_context(|| {
22✔
490
                format!("Could not clean temporary directory {}", tmp_dir.display())
×
491
            })?;
492
            Ok(total_written_bits)
22✔
493
        })
494
    }
495
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc