• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vigna / webgraph-rs / 24951701368

26 Apr 2026 07:57AM UTC coverage: 68.409%. Remained the same
24951701368

push

github

vigna
Fixed safety issues in IntoPairs; programmable labels basename

24 of 35 new or added lines in 3 files covered. (68.57%)

1 existing line in 1 file now uncovered.

7328 of 10712 relevant lines covered (68.41%)

46396849.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.5
/webgraph/src/graphs/bvgraph/comp/impls.rs
1
/*
2
 * SPDX-FileCopyrightText: 2023 Inria
3
 * SPDX-FileCopyrightText: 2023 Sebastiano Vigna
4
 *
5
 * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
6
 */
7

8
use crate::prelude::*;
9
use anyhow::{Context, Result, ensure};
10
use dsi_bitstream::prelude::*;
11
use dsi_progress_logger::prelude::*;
12
use lender::prelude::*;
13
use rayon::{current_num_threads, in_place_scope};
14
use std::fs::File;
15
use std::io::Write;
16
use std::io::{BufReader, BufWriter};
17
use std::path::{Path, PathBuf};
18

19
/// A queue that pulls jobs with ids in a contiguous initial segment of the
20
/// natural numbers from an iterator out of order and implement an iterator in
21
/// which they can be pulled in order.
22
///
23
/// Jobs must be ordered by their job id, and must implement [`Eq`] with a
24
/// [`usize`] using their job id.
25
struct TaskQueue<I: Iterator> {
26
    iter: I,
27
    jobs: Vec<Option<I::Item>>,
28
    next_id: usize,
29
}
30

31
trait JobId {
32
    fn id(&self) -> usize;
33
}
34

35
impl<I: Iterator> TaskQueue<I> {
36
    fn new(iter: I) -> Self {
622,187✔
37
        Self {
38
            iter,
39
            jobs: vec![],
622,187✔
40
            next_id: 0,
41
        }
42
    }
43
}
44

45
impl<I: Iterator> Iterator for TaskQueue<I>
46
where
47
    I::Item: JobId,
48
{
49
    type Item = I::Item;
50

51
    fn next(&mut self) -> Option<Self::Item> {
3,110,905✔
52
        loop {
53
            if let Some(item) = self.jobs.get_mut(self.next_id) {
14,276,623✔
54
                if item.is_some() {
6,154,754✔
55
                    self.next_id += 1;
2,488,718✔
56
                    return item.take();
4,977,436✔
57
                }
58
            }
59
            if let Some(item) = self.iter.next() {
5,599,623✔
60
                let id = item.id();
7,466,154✔
61
                if id >= self.jobs.len() {
4,977,436✔
62
                    self.jobs.resize_with(id + 1, || None);
5,826,183✔
63
                }
64
                self.jobs[id] = Some(item);
4,977,436✔
65
            } else {
66
                return None;
622,187✔
67
            }
68
        }
69
    }
70
}
71

72
/// Threshold mask for periodic compression statistics logging.
73
///
74
/// Statistics are logged every `STATS_THRESHOLD + 1` nodes (approximately 4M).
75
const STATS_THRESHOLD: usize = (1 << 22) - 1;
76

77
/// Logs compression statistics periodically (every ~4M nodes) or
78
/// unconditionally when `force` is true.
79
#[inline]
80
fn log_comp_stats(stats: &super::CompStats, force: bool) {
87,902,729✔
81
    if force || (stats.num_nodes & STATS_THRESHOLD == 0 && stats.num_nodes > 0) {
175,058,846✔
82
        let bits_per_link = stats.written_bits as f64
1,493,224✔
83
            / if stats.num_arcs > 0 {
746,612✔
84
                stats.num_arcs as f64
746,611✔
85
            } else {
86
                1.0
1✔
87
            };
88
        let n = stats.num_nodes as f64;
1,493,224✔
89
        log::info!(
746,612✔
90
            "bits/link: {:.3}; bits/node: {:.3}; avgref: {:.3}; avgdist: {:.3}",
×
91
            bits_per_link,
×
92
            stats.written_bits as f64 / n,
746,523✔
93
            stats.tot_ref as f64 / n,
746,523✔
94
            stats.tot_dist as f64 / n,
746,523✔
95
        );
96
    }
97
}
98

99
/// Writes the `.properties` file for a label bitstream.
100
fn write_label_properties<E: dsi_bitstream::traits::Endianness>(
30✔
101
    labels_basename: &Path,
102
    serializer_name: &str,
103
    num_nodes: usize,
104
    num_arcs: u64,
105
    labels_written_bits: u64,
106
) -> Result<()> {
107
    let mut s = String::new();
60✔
108
    s.push_str("#Label properties\n");
90✔
109
    s.push_str(&format!("endianness={}\n", E::NAME));
90✔
110
    s.push_str(&format!("nodes={num_nodes}\n"));
90✔
111
    s.push_str(&format!("arcs={num_arcs}\n"));
90✔
112
    s.push_str(&format!("serializer={serializer_name}\n"));
90✔
113
    s.push_str(&format!("length={labels_written_bits}\n"));
90✔
114
    if num_arcs > 0 {
60✔
115
        s.push_str(&format!(
120✔
116
            "bitsperlink={:.3}\n",
30✔
117
            labels_written_bits as f64 / num_arcs as f64
30✔
118
        ));
119
    }
120
    if num_nodes > 0 {
60✔
121
        s.push_str(&format!(
120✔
122
            "bitspernode={:.3}\n",
30✔
123
            labels_written_bits as f64 / num_nodes as f64
30✔
124
        ));
125
    }
126
    let props_path = labels_basename.with_extension(PROPERTIES_EXTENSION);
90✔
127
    std::fs::write(&props_path, &s)
90✔
128
        .with_context(|| format!("Could not write {}", props_path.display()))?;
30✔
129
    Ok(())
30✔
130
}
131

132
/// A compression job.
133
#[derive(Debug, PartialEq, PartialOrd, Eq, Ord, Clone)]
134
struct Job {
135
    job_id: usize,
136
    first_node: usize,
137
    last_node: usize,
138
    chunk_graph_path: PathBuf,
139
    written_bits: u64,
140
    chunk_offsets_path: PathBuf,
141
    offsets_written_bits: u64,
142
    num_arcs: u64,
143
    tot_ref: u64,
144
    tot_dist: u64,
145
    part_labels_path: Option<PathBuf>,
146
    labels_written_bits: u64,
147
    part_label_offsets_path: Option<PathBuf>,
148
    label_offsets_written_bits: u64,
149
}
150

151
impl JobId for Job {
152
    fn id(&self) -> usize {
2,488,718✔
153
        self.job_id
2,488,718✔
154
    }
155
}
156

157
/// Writes γ-coded delta offsets to a bitstream.
158
///
159
/// Used internally by [`BvComp`] and [`BvCompConfig`] to produce the
160
/// `.offsets` file during compression.
161
#[derive(Debug)]
162
#[repr(transparent)]
163
pub struct OffsetsWriter<W: Write>(BufBitWriter<BigEndian, WordAdapter<usize, BufWriter<W>>>);
164

165
impl OffsetsWriter<File> {
166
    /// Creates a new writer and writes the first offset value (0) if requested.
167
    ///
168
    /// Usually, parallel compressor will write autonomously the first offset
169
    /// when copying the partial offsets files into the final offsets file.
170
    pub fn from_path(path: impl AsRef<Path>, write_zero: bool) -> Result<Self> {
3,111,171✔
171
        let file = std::fs::File::create(&path)
9,333,513✔
172
            .with_context(|| format!("Could not create {}", path.as_ref().display()))?;
3,111,171✔
173
        Self::from_write(file, write_zero)
9,333,513✔
174
    }
175
}
176

177
impl<W: Write> OffsetsWriter<W> {
178
    /// Creates a new writer and writes the first offset value (0) if requested.
179
    pub fn from_write(writer: W, write_zero: bool) -> Result<Self> {
3,119,933✔
180
        let mut buffer = BufBitWriter::new(WordAdapter::new(BufWriter::new(writer)));
15,599,665✔
181
        if write_zero {
3,119,933✔
182
            buffer.write_gamma(0)?;
1,262,240✔
183
        }
184
        Ok(Self(buffer))
3,119,933✔
185
    }
186

187
    /// Pushes a new delta offset.
188
    pub fn push(&mut self, delta: u64) -> Result<usize> {
156,729,104✔
189
        Ok(self.0.write_gamma(delta)?)
470,187,312✔
190
    }
191

192
    /// Flushes the buffer.
193
    pub fn flush(&mut self) -> Result<()> {
3,119,933✔
194
        BitWrite::flush(&mut self.0)?;
6,239,866✔
195
        Ok(())
3,119,933✔
196
    }
197
}
198

199
/// Configures and runs BvGraph compression.
200
///
201
/// A `BvCompConfig` is normally obtained via the convenience methods
202
/// [`BvComp::with_basename`] (for the standard compressor) or
203
/// [`BvCompZ::with_basename`] (for the [Zuckerli-based] compressor). It
204
/// can then be customized using the builder methods below and finally used
205
/// to compress a graph.
206
///
207
/// # Configuration
208
///
209
/// - [`with_comp_flags`]: sets [`CompFlags`] (compression window, maximum
210
///   reference count, minimum interval length, and the instantaneous codes
211
///   used for each component);
212
/// - [`with_bvgraphz`]: switches to the [Zuckerli-based]
213
///   reference-selection algorithm;
214
/// - [`with_chunk_size`]: sets the chunk size for [`BvCompZ`] (implies
215
///   `with_bvgraphz`);
216
/// - [`with_labels_basename`]: overrides the default label-file basename
217
///   (e.g., to write labels to a different disk);
218
/// - [`with_tmp_dir`]: sets the temporary directory for parallel
219
///   compression.
220
///
221
/// # Compression Methods
222
///
223
/// ## Unlabeled
224
///
225
/// - [`comp_graph`]: compresses a [`SequentialGraph`] sequentially;
226
/// - [`comp_lender`]: compresses a [`NodeLabelsLender`] sequentially;
227
/// - [`par_comp`]: compresses an [`IntoParLenders`] in parallel.
228
///
229
/// ## Labeled
230
///
231
/// - [`comp_labeled_graph`]: compresses a [`LabeledSequentialGraph`] and
232
///   its labels sequentially;
233
/// - [`comp_labeled_lender`]: compresses a labeled [`NodeLabelsLender`]
234
///   sequentially;
235
/// - [`par_comp_labeled`]: compresses a labeled [`IntoParLenders`] in
236
///   parallel.
237
///
238
/// The labeled variants accept a [`StoreLabelsConfig`] parameter (e.g.,
239
/// [`BitStreamStoreLabelsConfig`]) that controls how labels are written.
240
/// The unlabeled methods are thin wrappers that delegate to the labeled
241
/// ones with `()` as label configuration.
242
///
243
/// All methods produce the `.graph`, `.offsets`, and `.properties` files
244
/// and return the total number of bits written to the graph bitstream.
245
/// Labeled methods additionally produce label files (by default under the
246
/// basename `<basename>-labels`; see [`default_labels_basename`]).
247
///
248
/// After generating the files, you can use [`store_ef`] or
249
/// [`store_ef_with_data`] (or the command `webgraph build ef`) to generate
250
/// the `.ef` file necessary for random access.
251
///
252
/// # Examples
253
///
254
/// ```ignore
255
/// // Standard compression with default settings
256
/// BvComp::with_basename("output").comp_graph::<BE>(&graph)?;
257
///
258
/// // Standard compression with custom flags
259
/// BvComp::with_basename("output")
260
///     .with_comp_flags(CompFlags {
261
///         compression_window: 10,
262
///         min_interval_length: 2,
263
///         ..Default::default()
264
///     })
265
///     .comp_graph::<BE>(&graph)?;
266
///
267
/// // Parallel compression
268
/// BvComp::with_basename("output").par_comp::<BE, _>(&graph)?;
269
///
270
/// // Parallel compression with labels
271
/// let label_config =
272
///     BitStreamStoreLabelsConfig::<BE, _>::new(FixedWidth::<u32>::new());
273
/// BvComp::with_basename("output")
274
///     .par_comp_labeled::<BE, _, _>(&labeled_graph, label_config)?;
275
///
276
/// // Zuckerli-based compression
277
/// BvCompZ::with_basename("output").comp_graph::<BE>(&graph)?;
278
/// ```
279
///
280
/// [Zuckerli-based]: BvCompZ
281
/// [`with_comp_flags`]: Self::with_comp_flags
282
/// [`with_bvgraphz`]: Self::with_bvgraphz
283
/// [`with_chunk_size`]: Self::with_chunk_size
284
/// [`with_labels_basename`]: Self::with_labels_basename
285
/// [`default_labels_basename`]: Self::default_labels_basename
286
/// [`with_tmp_dir`]: Self::with_tmp_dir
287
/// [`comp_graph`]: Self::comp_graph
288
/// [`comp_lender`]: Self::comp_lender
289
/// [`comp_labeled_graph`]: Self::comp_labeled_graph
290
/// [`comp_labeled_lender`]: Self::comp_labeled_lender
291
/// [`NodeLabelsLender`]: crate::traits::NodeLabelsLender
292
/// [`par_comp`]: Self::par_comp
293
/// [`par_comp_labeled`]: Self::par_comp_labeled
294
/// [`store_ef`]: crate::graphs::bvgraph::store_ef
295
/// [`store_ef_with_data`]: crate::graphs::bvgraph::store_ef_with_data
296
/// [`BitStreamStoreLabelsConfig`]: crate::labels::BitStreamStoreLabelsConfig
297
#[derive(Debug)]
298
pub struct BvCompConfig {
299
    /// The basename of the output files.
300
    basename: PathBuf,
301
    /// Compression flags for BvComp/BvCompZ.
302
    comp_flags: CompFlags,
303
    /// Selects the Zuckerli-based BVGraph compressor
304
    bvgraphz: bool,
305
    /// The chunk size for the Zuckerli-based compressor
306
    chunk_size: usize,
307
    /// Custom basename for label files, overriding the default derivation.
308
    labels_basename: Option<PathBuf>,
309
    /// Temporary directory for all operations.
310
    tmp_dir: Option<PathBuf>,
311
    /// Owns the TempDir that [`Self::tmp_dir`] refers to, if it was created by default.
312
    owned_tmp_dir: Option<tempfile::TempDir>,
313
}
314

315
impl BvCompConfig {
316
    /// Creates a new compression configuration with the given basename and
317
    /// default options.
318
    ///
319
    /// Note that the convenience methods [`BvComp::with_basename`] and
320
    /// [`BvCompZ::with_basename`] can be used to create a configuration with
321
    /// default options.
322
    ///
323
    /// [`BvComp::with_basename`]: crate::graphs::bvgraph::comp::BvComp::with_basename
324
    /// [`BvCompZ::with_basename`]: crate::graphs::bvgraph::comp::BvCompZ::with_basename
325
    pub fn new(basename: impl AsRef<Path>) -> Self {
124,752✔
326
        Self {
327
            basename: basename.as_ref().into(),
374,256✔
328
            comp_flags: CompFlags::default(),
249,504✔
329
            bvgraphz: false,
330
            chunk_size: 10_000,
331
            labels_basename: None,
332
            tmp_dir: None,
333
            owned_tmp_dir: None,
334
        }
335
    }
336
}
337

338
impl BvCompConfig {
339
    /// Sets the [`CompFlags`] controlling the compression parameters
340
    /// (compression window, maximum reference count, minimum interval length,
341
    /// and the instantaneous codes used for each component of the successor
342
    /// list).
343
    pub fn with_comp_flags(mut self, compression_flags: CompFlags) -> Self {
1,088,834✔
344
        self.comp_flags = compression_flags;
1,088,834✔
345
        self
1,088,834✔
346
    }
347

348
    /// Sets the temporary directory used by [`par_comp`] to store
349
    /// partial bitstreams. If not set, a system temporary directory is created
350
    /// automatically.
351
    ///
352
    /// [`par_comp`]: Self::par_comp
353
    pub fn with_tmp_dir(mut self, tmp_dir: impl AsRef<Path>) -> Self {
12✔
354
        self.tmp_dir = Some(tmp_dir.as_ref().into());
24✔
355
        self
12✔
356
    }
357

358
    /// Switches to the [`BvCompZ`] (Zuckerli-based) reference-selection
359
    /// algorithm.
360
    pub fn with_bvgraphz(mut self) -> Self {
466,576✔
361
        self.bvgraphz = true;
466,576✔
362
        self
466,576✔
363
    }
364

365
    /// Sets the chunk size for [`BvCompZ`] and enables the Zuckerli-based
366
    /// compressor. The chunk size controls how many consecutive nodes are
367
    /// buffered before running the dynamic-programming reference-selection
368
    /// algorithm; larger chunks can yield better compression at the cost of
369
    /// more memory. Implies [`with_bvgraphz`].
370
    ///
371
    /// [`with_bvgraphz`]: Self::with_bvgraphz
372
    pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
466,570✔
373
        self.bvgraphz = true;
466,570✔
374
        self.chunk_size = chunk_size;
466,570✔
375
        self
466,570✔
376
    }
377

378
    /// Returns the default basename for label files given a graph basename.
379
    ///
380
    /// By convention, label files use a basename derived from the graph
381
    /// basename by appending [`LABELS_BASENAME_SUFFIX`] (e.g., `graph-labels`
382
    /// for a graph with basename `graph`). The label files then use standard
383
    /// extensions: `.labels`, `.offsets`, `.properties`, `.ef`.
384
    pub fn default_labels_basename(basename: impl AsRef<Path>) -> PathBuf {
1,244,642✔
385
        let mut name = basename.as_ref().as_os_str().to_owned();
3,733,926✔
386
        name.push(LABELS_BASENAME_SUFFIX);
2,489,284✔
387
        PathBuf::from(name)
2,489,284✔
388
    }
389

390
    /// Sets a custom basename for label files, overriding the
391
    /// [`default_labels_basename`](Self::default_labels_basename) convention.
392
    ///
393
    /// This is useful, for example, to write labels to a different disk.
NEW
394
    pub fn with_labels_basename(mut self, labels_basename: impl AsRef<Path>) -> Self {
×
NEW
395
        self.labels_basename = Some(labels_basename.as_ref().into());
×
NEW
396
        self
×
397
    }
398

399
    fn labels_basename(&self) -> PathBuf {
1,244,612✔
400
        self.labels_basename
1,244,612✔
401
            .clone()
402
            .unwrap_or_else(|| Self::default_labels_basename(&self.basename))
3,733,836✔
403
    }
404

405
    fn tmp_dir(&mut self) -> Result<PathBuf> {
622,187✔
406
        if self.tmp_dir.is_none() {
1,244,374✔
407
            let tmp_dir = tempfile::tempdir()?;
1,244,354✔
408
            self.tmp_dir = Some(tmp_dir.path().to_owned());
1,244,354✔
409
            self.owned_tmp_dir = Some(tmp_dir);
1,244,354✔
410
        }
411

412
        let tmp_dir = self.tmp_dir.clone().unwrap();
2,488,748✔
413
        if !std::fs::exists(&tmp_dir)
1,244,374✔
414
            .with_context(|| format!("Could not check whether {} exists", tmp_dir.display()))?
622,187✔
415
        {
416
            std::fs::create_dir_all(&tmp_dir)
×
417
                .with_context(|| format!("Could not create {}", tmp_dir.display()))?;
×
418
        }
419
        Ok(tmp_dir)
622,187✔
420
    }
421

422
    /// Compresses sequentially a [`SequentialGraph`] and returns
423
    /// the number of bits written to the graph bitstream.
424
    ///
425
    /// This is a convenience wrapper around [`comp_labeled_graph`] with no
426
    /// label storage.
427
    ///
428
    /// [`comp_labeled_graph`]: Self::comp_labeled_graph
429
    pub fn comp_graph<E: Endianness>(&mut self, graph: impl SequentialGraph) -> Result<u64>
373,330✔
430
    where
431
        BufBitWriter<E, WordAdapter<usize, BufWriter<File>>>: CodesWrite<E>,
432
    {
433
        self.comp_labeled_graph::<E, (), ()>(UnitLabelGraph(graph), ())
1,493,320✔
434
    }
435

436
    /// Compresses sequentially a [`NodeLabelsLender`] and returns
437
    /// the number of bits written to the graph bitstream.
438
    ///
439
    /// This is a convenience wrapper around [`comp_labeled_lender`] with no
440
    /// label storage. The optional `expected_num_nodes` parameter will be
441
    /// used to provide forecasts on the progress logger.
442
    ///
443
    /// [`comp_labeled_lender`]: Self::comp_labeled_lender
444
    pub fn comp_lender<E, L>(&mut self, iter: L, expected_num_nodes: Option<usize>) -> Result<u64>
1✔
445
    where
446
        E: Endianness,
447
        L: IntoLender,
448
        L::Lender: for<'next> NodeLabelsLender<'next, Label = usize>,
449
        BufBitWriter<E, WordAdapter<usize, BufWriter<File>>>: CodesWrite<E>,
450
    {
451
        self.comp_labeled_lender::<E, _, _>(
2✔
452
            UnitLabelLender(iter.into_lender()),
1✔
453
            (),
454
            expected_num_nodes,
1✔
455
        )
456
    }
457

458
    /// Compresses sequentially a [`LabeledSequentialGraph`] and returns
459
    /// the number of bits written to the graph bitstream.
460
    ///
461
    /// The `store_labels_config` parameter provides the factory for creating
462
    /// label storage instances alongside graph compression (e.g.,
463
    /// [`BitStreamStoreLabelsConfig`]).
464
    ///
465
    /// [`BitStreamStoreLabelsConfig`]: crate::labels::BitStreamStoreLabelsConfig
466
    pub fn comp_labeled_graph<E: Endianness, L, SLC: StoreLabelsConfig>(
373,332✔
467
        &mut self,
468
        graph: impl LabeledSequentialGraph<L>,
469
        store_labels_config: SLC,
470
    ) -> Result<u64>
471
    where
472
        SLC::StoreLabels: StoreLabels<Label = L>,
473
        BufBitWriter<E, WordAdapter<usize, BufWriter<File>>>: CodesWrite<E>,
474
    {
475
        let num_nodes = graph.num_nodes();
1,119,996✔
476
        self.comp_labeled_lender::<E, _, _>(graph.iter(), store_labels_config, Some(num_nodes))
2,239,992✔
477
    }
478

479
    /// Compresses sequentially a labeled [`NodeLabelsLender`] and returns
480
    /// the number of bits written to the graph bitstream.
481
    ///
482
    /// The `store_labels_config` parameter provides the factory for creating
483
    /// label storage instances alongside graph compression. Use `()` for
484
    /// unlabeled graphs.
485
    ///
486
    /// The optional `expected_num_nodes` parameter will be used to provide
487
    /// forecasts on the progress logger.
488
    pub fn comp_labeled_lender<E, L, SLC>(
373,333✔
489
        &mut self,
490
        iter: L,
491
        store_labels_config: SLC,
492
        expected_num_nodes: Option<usize>,
493
    ) -> Result<u64>
494
    where
495
        E: Endianness,
496
        L: IntoLender,
497
        SLC: StoreLabelsConfig,
498
        SLC::StoreLabels: StoreLabels,
499
        L::Lender: for<'next> NodeLabelsLender<
500
                'next,
501
                Label = (usize, <SLC::StoreLabels as StoreLabels>::Label),
502
            >,
503
        BufBitWriter<E, WordAdapter<usize, BufWriter<File>>>: CodesWrite<E>,
504
    {
505
        let graph_path = self.basename.with_extension(GRAPH_EXTENSION);
746,666✔
506
        let labels_basename = self.labels_basename();
1,119,999✔
507
        let labels_path = labels_basename.with_extension(LABELS_EXTENSION);
746,666✔
508
        let label_offsets_path = labels_basename.with_extension(OFFSETS_EXTENSION);
746,666✔
509

510
        // Compress the graph
511
        let bit_write = buf_bit_writer::from_path::<E, usize>(&graph_path)
1,119,999✔
512
            .with_context(|| format!("Could not create {}", graph_path.display()))?;
373,333✔
513

514
        let codes_writer = DynCodesEncoder::new(bit_write, &self.comp_flags)?;
1,493,332✔
515

516
        // create a file for offsets
517
        let offsets_path = self.basename.with_extension(OFFSETS_EXTENSION);
746,666✔
518
        let offset_writer = OffsetsWriter::from_path(offsets_path, true)?;
1,119,999✔
519

520
        let mut store_labels =
373,333✔
521
            store_labels_config.new_storage(&labels_path, &label_offsets_path)?;
1,493,332✔
522
        store_labels.init()?;
746,666✔
523

524
        let mut pl = progress_logger![
746,666✔
525
            display_memory = true,
×
526
            item_name = "node",
×
527
            expected_updates = expected_num_nodes,
373,333✔
528
        ];
529
        pl.start("Compressing successors...");
746,666✔
530
        let comp_stats = if self.bvgraphz {
746,666✔
531
            let mut bvcompz = BvCompZ::new(
532
                codes_writer,
279,948✔
533
                offset_writer,
279,948✔
534
                self.comp_flags.compression_window,
279,948✔
535
                self.chunk_size,
279,948✔
536
                self.comp_flags.max_ref_count,
279,948✔
537
                self.comp_flags.min_interval_length,
279,948✔
538
                0,
539
                store_labels,
279,948✔
540
            );
541

542
            for_! ( (_node_id, successors) in iter {
62,778,302✔
543
                bvcompz.push(successors).context("Could not push successors")?;
124,996,708✔
544
                log_comp_stats(&bvcompz.stats(), false);
62,498,354✔
545
                pl.update();
62,498,354✔
546
            });
547
            log_comp_stats(&bvcompz.stats(), true);
559,896✔
548
            pl.done();
559,896✔
549

550
            bvcompz.flush()?
559,896✔
551
        } else {
552
            let mut bvcomp = BvComp::new(
553
                codes_writer,
93,385✔
554
                offset_writer,
93,385✔
555
                self.comp_flags.compression_window,
93,385✔
556
                self.comp_flags.max_ref_count,
93,385✔
557
                self.comp_flags.min_interval_length,
93,385✔
558
                0,
559
                store_labels,
93,385✔
560
            );
561

562
            for_! ( (_node_id, successors) in iter {
25,919,299✔
563
                bvcomp.push(successors).context("Could not push successors")?;
51,651,828✔
564
                log_comp_stats(&bvcomp.stats(), false);
25,825,914✔
565
                pl.update();
25,825,914✔
566
            });
567
            log_comp_stats(&bvcomp.stats(), true);
186,770✔
568
            pl.done();
186,770✔
569

570
            bvcomp.flush()?
186,770✔
571
        };
572

573
        if let Some(num_nodes) = expected_num_nodes {
746,666✔
574
            if num_nodes != comp_stats.num_nodes {
373,333✔
575
                log::warn!(
×
576
                    "The expected number of nodes is {num_nodes} but the actual number of nodes is {}",
×
577
                    comp_stats.num_nodes,
×
578
                );
579
            }
580
        }
581

582
        log::info!("Writing the .properties file");
373,333✔
583
        let properties = self
746,666✔
584
            .comp_flags
373,333✔
585
            .to_properties::<E>(&comp_stats)
746,666✔
586
            .context("Could not serialize properties")?;
587
        let properties_path = self.basename.with_extension(PROPERTIES_EXTENSION);
746,666✔
588
        std::fs::write(&properties_path, properties)
1,119,999✔
589
            .with_context(|| format!("Could not write {}", properties_path.display()))?;
373,333✔
590

591
        let label_ser_name = store_labels_config.label_serializer_name();
1,119,999✔
592
        if label_ser_name != "()" {
373,333✔
593
            write_label_properties::<E>(
594
                &labels_basename,
2✔
595
                &label_ser_name,
2✔
596
                comp_stats.num_nodes,
2✔
597
                comp_stats.num_arcs,
2✔
598
                comp_stats.labels_written_bits,
2✔
599
            )?;
600
        }
601

602
        Ok(comp_stats.written_bits)
373,333✔
603
    }
604

605
    /// Compresses an [`IntoParLenders`] in parallel and returns the length
606
    /// in bits of the graph bitstream.
607
    ///
608
    /// This is a convenience wrapper around [`par_comp_labeled`] with no
609
    /// label storage. See [`par_comp_labeled`] for details on the parallel
610
    /// compression strategy.
611
    ///
612
    /// [`par_comp_labeled`]: Self::par_comp_labeled
613
    pub fn par_comp<E: Endianness, G>(&mut self, graph: G) -> Result<u64>
373,275✔
614
    where
615
        G: for<'a> IntoParLenders<ParLender: NodeLabelsLender<'a, Label = usize>>,
616
        BufBitWriter<E, WordAdapter<usize, BufWriter<std::fs::File>>>: CodesWrite<E>,
617
        BufBitReader<E, WordAdapter<u32, BufReader<std::fs::File>>>: BitRead<E>,
618
    {
619
        self.par_comp_labeled::<E, _, _>(UnitLabelParLenders(graph), ())
1,493,100✔
620
    }
621

622
    /// Compresses a labeled [`IntoParLenders`] in parallel and returns the
623
    /// length in bits of the graph bitstream.
624
    ///
625
    /// The method calls [`into_par_lenders`] to obtain lenders and
626
    /// boundaries, then compresses each lender in a separate thread. Each
627
    /// worker thread also writes per-chunk label and label-offset files via
628
    /// a [`StoreLabels`] instance obtained from `store_labels_config` (use
629
    /// `()` for unlabeled graphs). After all threads finish, the main thread
630
    /// concatenates the chunk files using
631
    /// [`StoreLabelsConfig::concat_part`].
632
    ///
633
    /// The number of parallel compression threads will be
634
    /// [`current_num_threads`]. It is your responsibility to ensure that the
635
    /// number of threads is appropriate for the number of lenders returned
636
    /// by [`into_par_lenders`], possibly using [`install`].
637
    ///
638
    /// A concrete implementation of [`StoreLabelsConfig`] for
639
    /// bitstream-based labels is [`BitStreamStoreLabelsConfig`].
640
    ///
641
    /// # Examples
642
    ///
643
    /// Compresses a labeled graph in parallel, then loads and verifies the
644
    /// result using [`BitStreamLabelingSeq::load`]:
645
    ///
646
    /// [`into_par_lenders`]: IntoParLenders::into_par_lenders
647
    /// [`install`]: rayon::ThreadPool::install
648
    /// [`BitStreamStoreLabelsConfig`]: crate::labels::BitStreamStoreLabelsConfig
649
    ///
650
    /// ```
651
    /// # use anyhow::Result;
652
    /// # use dsi_bitstream::prelude::*;
653
    /// # use webgraph::prelude::*;
654
    /// # use webgraph::graphs::bvgraph::*;
655
    /// # use webgraph::labels::BitStreamLabelingSeq;
656
    /// # use webgraph::graphs::vec_graph::LabeledVecGraph;
657
    /// # use webgraph::labels::BitStreamStoreLabelsConfig;
658
    /// # use webgraph::traits::FixedWidth;
659
    /// # fn main() -> Result<()> {
660
    /// # let tmp = tempfile::TempDir::new()?;
661
    /// # let basename = tmp.path().join("example");
662
    /// let graph = LabeledVecGraph::from_arcs([
663
    ///     ((0, 1), 10u32),
664
    ///     ((0, 2), 20),
665
    ///     ((1, 3), 30),
666
    ///     ((2, 3), 40),
667
    ///     ((3, 0), 50),
668
    /// ]);
669
    ///
670
    /// let label_config =
671
    ///     BitStreamStoreLabelsConfig::<BE, _>::new(FixedWidth::<u32>::new());
672
    ///
673
    /// BvComp::with_basename(&basename)
674
    ///     .par_comp_labeled::<BE, _, _>(&graph, label_config)?;
675
    ///
676
    /// let labels_basename = BvCompConfig::default_labels_basename(&basename);
677
    ///
678
    /// // Load the compressed graph and labeling, then verify
679
    /// let seq = BvGraphSeq::with_basename(&basename)
680
    ///     .endianness::<BE>()
681
    ///     .load()?;
682
    /// let labeling = BitStreamLabelingSeq::<BE, _, _>::load(
683
    ///     &labels_basename,
684
    ///     FixedWidth::<u32>::new(),
685
    /// )?;
686
    ///
687
    /// graph::eq_labeled(&graph, &Zip(seq, labeling))?;
688
    /// # Ok(())
689
    /// # }
690
    /// ```
691
    ///
692
    /// [`par_comp`]: Self::par_comp
693
    /// [`BitStreamLabelingSeq::load`]: crate::labels::BitStreamLabelingSeq::load
694
    pub fn par_comp_labeled<E: Endianness, G, SLC>(
373,279✔
695
        &mut self,
696
        graph: G,
697
        mut store_labels_config: SLC,
698
    ) -> Result<u64>
699
    where
700
        G: for<'a> IntoParLenders<
701
            ParLender: NodeLabelsLender<
702
                'a,
703
                Label = (usize, <SLC::StoreLabels as StoreLabels>::Label),
704
            >,
705
        >,
706
        SLC: StoreLabelsConfig + Send + Sync,
707
        SLC::StoreLabels: Send,
708
        BufBitWriter<E, WordAdapter<usize, BufWriter<std::fs::File>>>: CodesWrite<E>,
709
        BufBitReader<E, WordAdapter<u32, BufReader<std::fs::File>>>: BitRead<E>,
710
    {
711
        let (lenders, boundaries) = graph.into_par_lenders();
1,119,837✔
712
        let num_nodes = *boundaries.last().unwrap_or(&0);
1,493,116✔
713
        let tmp_dir = self.tmp_dir()?;
1,119,837✔
714

715
        let graph_path = self.basename.with_extension(GRAPH_EXTENSION);
746,558✔
716
        let offsets_path = self.basename.with_extension(OFFSETS_EXTENSION);
746,558✔
717
        let label_base = self.labels_basename();
1,119,837✔
718
        let labels_path = label_base.with_extension(LABELS_EXTENSION);
746,558✔
719
        let label_offsets_path = label_base.with_extension(OFFSETS_EXTENSION);
746,558✔
720

721
        let (tx, rx) = crossbeam_channel::unbounded();
1,119,837✔
722

723
        let thread_path = |thread_id: usize| tmp_dir.join(format!("{thread_id:016x}.bitstream"));
4,852,615✔
724

725
        let mut comp_pl = concurrent_progress_logger![
746,558✔
726
            log_target = "webgraph::graphs::bvgraph::comp::impls::par_comp::comp",
×
727
            display_memory = true,
×
728
            item_name = "node",
×
729
            local_speed = true,
×
730
            expected_updates = Some(num_nodes),
373,279✔
731
        ];
732
        comp_pl.start(format!(
1,119,837✔
733
            "Compressing successors in parallel using {} threads...",
×
734
            current_num_threads()
373,279✔
735
        ));
736
        let cp_flags = &self.comp_flags;
746,558✔
737
        let bvgraphz = self.bvgraphz;
746,558✔
738
        let chunk_size = self.chunk_size;
746,558✔
739

740
        in_place_scope(|s| {
746,558✔
741
            for (thread_id, mut thread_lender) in Vec::from(lenders).into_iter().enumerate() {
4,479,340✔
742
                let tmp_path = thread_path(thread_id);
2,986,224✔
743
                let chunk_graph_path = tmp_path.with_extension(GRAPH_EXTENSION);
2,986,224✔
744
                let chunk_offsets_path = tmp_path.with_extension(OFFSETS_EXTENSION);
2,986,224✔
745
                let label_tmp_stem = tmp_dir.join(format!("{thread_id:016x}-labels"));
4,479,336✔
746
                let part_labels_path = label_tmp_stem.with_extension(LABELS_EXTENSION);
2,986,224✔
747
                let part_label_offsets_path = label_tmp_stem.with_extension(OFFSETS_EXTENSION);
2,986,224✔
748
                let store_labels =
1,493,112✔
749
                    store_labels_config.new_storage(&part_labels_path, &part_label_offsets_path)?;
5,972,448✔
750
                let tx = tx.clone();
4,479,336✔
751
                let mut comp_pl = comp_pl.clone();
4,479,336✔
752
                s.spawn(move |_| {
4,479,336✔
753
                    log::debug!("Thread {thread_id} started");
1,493,112✔
754

755
                    let Some((node_id, successors)) = thread_lender.next() else {
4,479,332✔
756
                        return;
2✔
757
                    };
758

759
                    let first_node = node_id;
2,986,220✔
760
                    let writer = buf_bit_writer::from_path::<E, usize>(&chunk_graph_path).unwrap();
5,972,440✔
761
                    let codes_encoder = <DynCodesEncoder<E, _>>::new(writer, cp_flags).unwrap();
7,465,550✔
762

763
                    let stats;
×
764
                    let mut last_node;
×
765
                    if bvgraphz {
1,493,110✔
766
                        let mut bvcomp = BvCompZ::new(
2,239,496✔
767
                            codes_encoder,
1,119,748✔
768
                            OffsetsWriter::from_path(&chunk_offsets_path, false).unwrap(),
3,359,244✔
769
                            cp_flags.compression_window,
1,119,748✔
770
                            chunk_size,
1,119,748✔
771
                            cp_flags.max_ref_count,
1,119,748✔
772
                            cp_flags.min_interval_length,
1,119,748✔
773
                            node_id,
1,119,748✔
774
                            store_labels,
1,119,748✔
775
                        );
776
                        bvcomp.push(successors).unwrap();
4,478,992✔
777
                        last_node = first_node;
1,119,748✔
778
                        let iter_nodes = thread_lender.inspect(|(x, _)| last_node = *x);
30,558,653✔
779
                        for_! ( (_, succ) in iter_nodes {
55,518,566✔
780
                            bvcomp.push(succ.into_iter()).unwrap();
163,196,454✔
781
                            log_comp_stats(&bvcomp.stats(), false);
81,598,227✔
782
                            comp_pl.update();
27,199,409✔
783
                        });
784
                        stats = bvcomp.flush().unwrap();
2,239,496✔
785
                    } else {
786
                        let mut bvcomp = BvComp::new(
746,724✔
787
                            codes_encoder,
373,362✔
788
                            OffsetsWriter::from_path(&chunk_offsets_path, false).unwrap(),
1,120,086✔
789
                            cp_flags.compression_window,
373,362✔
790
                            cp_flags.max_ref_count,
373,362✔
791
                            cp_flags.min_interval_length,
373,362✔
792
                            node_id,
373,362✔
793
                            store_labels,
373,362✔
794
                        );
795
                        bvcomp.push(successors).unwrap();
1,493,448✔
796
                        last_node = first_node;
373,362✔
797
                        let iter_nodes = thread_lender.inspect(|(x, _)| last_node = *x);
16,914,660✔
798
                        for_! ( (_, succ) in iter_nodes {
31,962,510✔
799
                            bvcomp.push(succ.into_iter()).unwrap();
94,767,444✔
800
                            log_comp_stats(&bvcomp.stats(), false);
47,383,722✔
801
                            comp_pl.update();
15,794,574✔
802
                        });
803
                        stats = bvcomp.flush().unwrap();
746,724✔
804
                    }
805

806
                    log::debug!(
1,493,110✔
807
                        "Finished Compression thread {thread_id} and wrote {} bits for the graph and {} bits for the offsets",
×
808
                        stats.written_bits, stats.offsets_written_bits,
×
809
                    );
810
                    tx.send(Job {
4,479,330✔
811
                        job_id: thread_id,
2,986,220✔
812
                        first_node,
2,986,220✔
813
                        last_node,
2,986,220✔
814
                        chunk_graph_path,
2,986,220✔
815
                        written_bits: stats.written_bits,
2,986,220✔
816
                        chunk_offsets_path,
2,986,220✔
817
                        offsets_written_bits: stats.offsets_written_bits,
2,986,220✔
818
                        num_arcs: stats.num_arcs,
2,986,220✔
819
                        tot_ref: stats.tot_ref,
2,986,220✔
820
                        tot_dist: stats.tot_dist,
2,986,220✔
821
                        part_labels_path: Some(part_labels_path),
2,986,220✔
822
                        labels_written_bits: stats.labels_written_bits,
2,986,220✔
823
                        part_label_offsets_path: Some(part_label_offsets_path),
1,493,110✔
824
                        label_offsets_written_bits: stats.label_offsets_written_bits,
1,493,110✔
825
                    })
826
                    .ok(); // If channel is closed, main thread already has an error
1,493,110✔
827
                });
828
            }
829

830
            drop(tx);
746,558✔
831

832
            let mut copy_pl = progress_logger![
746,558✔
833
                log_target = "webgraph::graphs::bvgraph::comp::impls::par_comp::copy",
×
834
                display_memory = true,
×
835
                item_name = "node",
×
836
                local_speed = true,
×
837
                expected_updates = Some(num_nodes),
373,279✔
838
            ];
839
            copy_pl.start("Copying compressed successors to final graph");
746,558✔
840

841
            let mut graph_writer = buf_bit_writer::from_path::<E, usize>(&graph_path)
1,119,837✔
842
                .with_context(|| format!("Could not create graph {}", graph_path.display()))?;
373,279✔
843

844
            let mut offsets_writer = buf_bit_writer::from_path::<BE, usize>(&offsets_path)
1,119,837✔
845
                .with_context(|| format!("Could not create offsets {}", offsets_path.display()))?;
373,279✔
846
            offsets_writer.write_gamma(0)?;
746,558✔
847

848
            store_labels_config.init_concat(&labels_path, &label_offsets_path)?;
1,493,116✔
849

850
            let mut total_stats = super::CompStats::default();
746,558✔
851

852
            let mut next_node = 0;
746,558✔
853
            // glue together the bitstreams as they finish, this allows us to do
854
            // task pipelining for better performance
855
            for Job {
×
856
                job_id,
1,493,110✔
857
                first_node,
1,493,110✔
858
                last_node,
1,493,110✔
859
                chunk_graph_path,
1,493,110✔
860
                written_bits,
1,493,110✔
861
                chunk_offsets_path,
1,493,110✔
862
                offsets_written_bits,
1,493,110✔
863
                num_arcs,
1,493,110✔
864
                tot_ref,
1,493,110✔
865
                tot_dist,
1,493,110✔
866
                part_labels_path,
1,493,110✔
867
                labels_written_bits,
1,493,110✔
868
                part_label_offsets_path,
1,493,110✔
869
                label_offsets_written_bits,
1,493,110✔
870
            } in TaskQueue::new(rx.into_rayon_iter())
1,119,837✔
871
            {
872
                ensure!(
1,493,110✔
873
                    first_node == next_node,
1,493,110✔
874
                    "Non-adjacent lenders: lender {} has first node {} instead of {}",
×
875
                    job_id,
×
876
                    first_node,
×
877
                    next_node
×
878
                );
879

880
                next_node = last_node + 1;
1,493,110✔
881
                log::debug!(
1,493,110✔
882
                    "Copying {} [{}..{}) bits from {} to {}",
×
883
                    written_bits,
×
884
                    total_stats.written_bits,
×
885
                    total_stats.written_bits + written_bits,
36✔
886
                    chunk_graph_path.display(),
36✔
887
                    graph_path.display()
36✔
888
                );
889
                let mut reader = buf_bit_reader::from_path::<E, u32>(&chunk_graph_path)?;
4,479,330✔
890
                graph_writer
1,493,110✔
891
                    .copy_from(&mut reader, written_bits)
4,479,330✔
892
                    .with_context(|| {
1,493,110✔
893
                        format!(
×
894
                            "Could not copy from {} to {}",
×
895
                            chunk_graph_path.display(),
×
896
                            graph_path.display()
×
897
                        )
898
                    })?;
899
                std::fs::remove_file(chunk_graph_path)?;
2,986,220✔
900

901
                log::debug!(
1,493,110✔
902
                    "Copying offsets {} [{}..{}) bits from {} to {}",
×
903
                    offsets_written_bits,
×
904
                    total_stats.offsets_written_bits,
×
905
                    total_stats.offsets_written_bits + offsets_written_bits,
36✔
906
                    chunk_offsets_path.display(),
36✔
907
                    offsets_path.display()
36✔
908
                );
909

910
                let mut reader = <BufBitReader<BigEndian, _>>::new(<WordAdapter<u32, _>>::new(
4,479,330✔
911
                    BufReader::new(File::open(&chunk_offsets_path).with_context(|| {
5,972,440✔
912
                        format!("Could not open {}", chunk_offsets_path.display())
×
913
                    })?),
914
                ));
915
                offsets_writer
1,493,110✔
916
                    .copy_from(&mut reader, offsets_written_bits)
4,479,330✔
917
                    .with_context(|| {
1,493,110✔
918
                        format!(
×
919
                            "Could not copy from {} to {}",
×
920
                            chunk_offsets_path.display(),
×
921
                            offsets_path.display()
×
922
                        )
923
                    })?;
924
                std::fs::remove_file(chunk_offsets_path)?;
2,986,220✔
925

926
                if let (Some(lp), Some(lop)) = (part_labels_path, part_label_offsets_path) {
5,972,440✔
927
                    store_labels_config.concat_part(
2,986,220✔
928
                        &lp,
1,493,110✔
929
                        labels_written_bits,
1,493,110✔
930
                        &lop,
1,493,110✔
931
                        label_offsets_written_bits,
1,493,110✔
932
                    )?;
933
                }
934

935
                total_stats += super::CompStats {
2,986,220✔
936
                    num_nodes: last_node - first_node + 1,
2,986,220✔
937
                    num_arcs,
2,986,220✔
938
                    written_bits,
2,986,220✔
939
                    offsets_written_bits,
2,986,220✔
940
                    tot_ref,
2,986,220✔
941
                    tot_dist,
2,986,220✔
942
                    labels_written_bits,
1,493,110✔
943
                    label_offsets_written_bits,
1,493,110✔
944
                };
945
                copy_pl.update_with_count(last_node - first_node + 1);
4,479,330✔
946
            }
947

948
            store_labels_config.flush_concat()?;
746,558✔
949

950
            log::info!("Flushing the merged bitstreams");
373,279✔
951
            graph_writer.flush()?;
746,558✔
952
            BitWrite::flush(&mut offsets_writer)?;
746,558✔
953

954
            // Use the authoritative num_nodes from the boundaries
955
            total_stats.num_nodes = num_nodes;
373,279✔
956
            log_comp_stats(&total_stats, true);
746,558✔
957
            comp_pl.done();
746,558✔
958
            copy_pl.done();
746,558✔
959

960
            log::info!("Writing the .properties file");
373,279✔
961
            let properties = self
746,558✔
962
                .comp_flags
373,279✔
963
                .to_properties::<E>(&total_stats)
746,558✔
964
                .context("Could not serialize properties")?;
373,279✔
965
            let properties_path = self.basename.with_extension(PROPERTIES_EXTENSION);
746,558✔
966
            std::fs::write(&properties_path, properties).with_context(|| {
1,493,116✔
967
                format!(
×
968
                    "Could not write properties to {}",
×
969
                    properties_path.display()
×
970
                )
971
            })?;
972

973
            let label_ser_name = store_labels_config.label_serializer_name();
1,119,837✔
974
            if label_ser_name != "()" {
373,279✔
975
                write_label_properties::<E>(
4✔
976
                    &label_base,
4✔
977
                    &label_ser_name,
4✔
978
                    total_stats.num_nodes,
4✔
979
                    total_stats.num_arcs,
4✔
980
                    total_stats.labels_written_bits,
4✔
981
                )?;
982
            }
983

984
            log::info!(
373,279✔
985
                "Compressed {} arcs into {} bits at {:.4} bits/arc",
×
986
                total_stats.num_arcs,
×
987
                total_stats.written_bits,
×
988
                total_stats.written_bits as f64 / total_stats.num_arcs as f64
373,273✔
989
            );
990
            log::info!(
373,279✔
991
                "Created offsets file with {} bits at {:.4} bits/node",
×
992
                total_stats.offsets_written_bits,
×
993
                total_stats.offsets_written_bits as f64 / num_nodes as f64
373,273✔
994
            );
995

996
            // cleanup the temp files
997
            std::fs::remove_dir_all(&tmp_dir).with_context(|| {
1,119,837✔
998
                format!("Could not clean temporary directory {}", tmp_dir.display())
×
999
            })?;
1000
            Ok(total_stats.written_bits)
373,279✔
1001
        })
1002
    }
1003
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc