• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vigna / webgraph-rs / 25115490570

29 Apr 2026 02:40PM UTC coverage: 69.176% (-0.003%) from 69.179%
25115490570

push

github

vigna
Now we use the last number of scanned arcs as expected updates

3 of 5 new or added lines in 2 files covered. (60.0%)

5 existing lines in 2 files now uncovered.

7507 of 10852 relevant lines covered (69.18%)

49867981.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.97
/webgraph/src/graphs/bvgraph/comp/impls.rs
1
/*
2
 * SPDX-FileCopyrightText: 2023 Inria
3
 * SPDX-FileCopyrightText: 2023 Sebastiano Vigna
4
 *
5
 * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
6
 */
7

8
use crate::prelude::*;
9
use anyhow::{Context, Result, ensure};
10
use dsi_bitstream::prelude::*;
11
use dsi_progress_logger::prelude::*;
12
use lender::prelude::*;
13
use rayon::{current_num_threads, in_place_scope};
14
use std::fs::File;
15
use std::io::Write;
16
use std::io::{BufReader, BufWriter};
17
use std::path::{Path, PathBuf};
18

19
/// A queue that pulls jobs with ids in a contiguous initial segment of the
20
/// natural numbers from an iterator out of order and implement an iterator in
21
/// which they can be pulled in order.
22
///
23
/// Jobs must be ordered by their job id, and must implement [`Eq`] with a
24
/// [`usize`] using their job id.
25
struct TaskQueue<I: Iterator> {
26
    iter: I,
27
    jobs: Vec<Option<I::Item>>,
28
    next_id: usize,
29
}
30

31
trait JobId {
32
    fn id(&self) -> usize;
33
}
34

35
impl<I: Iterator> TaskQueue<I> {
36
    fn new(iter: I) -> Self {
622,187✔
37
        Self {
38
            iter,
39
            jobs: vec![],
622,187✔
40
            next_id: 0,
41
        }
42
    }
43
}
44

45
impl<I: Iterator> Iterator for TaskQueue<I>
46
where
47
    I::Item: JobId,
48
{
49
    type Item = I::Item;
50

51
    fn next(&mut self) -> Option<Self::Item> {
3,110,905✔
52
        loop {
53
            if let Some(item) = self.jobs.get_mut(self.next_id) {
14,311,569✔
54
                if item.is_some() {
6,224,646✔
55
                    self.next_id += 1;
2,488,718✔
56
                    return item.take();
4,977,436✔
57
                }
58
            }
59
            if let Some(item) = self.iter.next() {
5,599,623✔
60
                let id = item.id();
7,466,154✔
61
                if id >= self.jobs.len() {
4,977,436✔
62
                    self.jobs.resize_with(id + 1, || None);
5,759,820✔
63
                }
64
                self.jobs[id] = Some(item);
4,977,436✔
65
            } else {
66
                return None;
622,187✔
67
            }
68
        }
69
    }
70
}
71

72
/// Threshold mask for periodic compression statistics logging.
73
///
74
/// Statistics are logged every `STATS_THRESHOLD + 1` nodes (approximately 4M).
75
const STATS_THRESHOLD: usize = (1 << 22) - 1;
76

77
/// Logs compression statistics periodically (every ~4M nodes) or
78
/// unconditionally when `force` is true.
79
#[inline]
80
fn log_comp_stats(stats: &super::CompStats, force: bool) {
87,902,729✔
81
    if force || (stats.num_nodes & STATS_THRESHOLD == 0 && stats.num_nodes > 0) {
175,058,846✔
82
        let bits_per_link = stats.written_bits as f64
1,493,224✔
83
            / if stats.num_arcs > 0 {
746,612✔
84
                stats.num_arcs as f64
746,611✔
85
            } else {
86
                1.0
1✔
87
            };
88
        let n = stats.num_nodes as f64;
1,493,224✔
89
        log::info!(
746,612✔
90
            "bits/link: {:.3}; bits/node: {:.3}; avgref: {:.3}; avgdist: {:.3}",
×
91
            bits_per_link,
×
92
            stats.written_bits as f64 / n,
746,523✔
93
            stats.tot_ref as f64 / n,
746,523✔
94
            stats.tot_dist as f64 / n,
746,523✔
95
        );
96
    }
97
}
98

99
/// Writes the `.properties` file for a label bitstream.
100
fn write_label_properties<E: dsi_bitstream::traits::Endianness>(
30✔
101
    labels_basename: &Path,
102
    serializer_name: &str,
103
    num_nodes: usize,
104
    num_arcs: u64,
105
    labels_written_bits: u64,
106
) -> Result<()> {
107
    let mut s = String::new();
60✔
108
    s.push_str("#Label properties\n");
90✔
109
    s.push_str(&format!("endianness={}\n", E::NAME));
90✔
110
    s.push_str(&format!("nodes={num_nodes}\n"));
90✔
111
    s.push_str(&format!("arcs={num_arcs}\n"));
90✔
112
    s.push_str(&format!("serializer={serializer_name}\n"));
90✔
113
    s.push_str(&format!("length={labels_written_bits}\n"));
90✔
114
    if num_arcs > 0 {
60✔
115
        s.push_str(&format!(
120✔
116
            "bitsperlink={:.3}\n",
30✔
117
            labels_written_bits as f64 / num_arcs as f64
30✔
118
        ));
119
    }
120
    if num_nodes > 0 {
60✔
121
        s.push_str(&format!(
120✔
122
            "bitspernode={:.3}\n",
30✔
123
            labels_written_bits as f64 / num_nodes as f64
30✔
124
        ));
125
    }
126
    let props_path = labels_basename.with_extension(PROPERTIES_EXTENSION);
90✔
127
    std::fs::write(&props_path, &s)
90✔
128
        .with_context(|| format!("Could not write {}", props_path.display()))?;
30✔
129
    Ok(())
30✔
130
}
131

132
/// A compression job.
133
#[derive(Debug, PartialEq, PartialOrd, Eq, Ord, Clone)]
134
struct Job {
135
    job_id: usize,
136
    first_node: usize,
137
    last_node: usize,
138
    chunk_graph_path: PathBuf,
139
    written_bits: u64,
140
    chunk_offsets_path: PathBuf,
141
    offsets_written_bits: u64,
142
    num_arcs: u64,
143
    tot_ref: u64,
144
    tot_dist: u64,
145
    part_labels_path: Option<PathBuf>,
146
    labels_written_bits: u64,
147
    part_label_offsets_path: Option<PathBuf>,
148
    label_offsets_written_bits: u64,
149
}
150

151
impl JobId for Job {
152
    fn id(&self) -> usize {
2,488,718✔
153
        self.job_id
2,488,718✔
154
    }
155
}
156

157
/// Writes γ-coded delta offsets to a bitstream.
158
///
159
/// Used internally by [`BvComp`] and [`BvCompConf`] to produce the
160
/// `.offsets` file during compression.
161
#[derive(Debug)]
162
#[repr(transparent)]
163
pub struct OffsetsWriter<W: Write>(BufBitWriter<BigEndian, WordAdapter<usize, BufWriter<W>>>);
164

165
impl OffsetsWriter<File> {
166
    /// Creates a new writer and writes the first offset value (0) if requested.
167
    ///
168
    /// Usually, parallel compressor will write autonomously the first offset
169
    /// when copying the partial offsets files into the final offsets file.
170
    pub fn from_path(path: impl AsRef<Path>, write_zero: bool) -> Result<Self> {
3,111,171✔
171
        let file = std::fs::File::create(&path)
9,333,513✔
172
            .with_context(|| format!("Could not create {}", path.as_ref().display()))?;
3,111,171✔
173
        Self::from_write(file, write_zero)
9,333,513✔
174
    }
175
}
176

177
impl<W: Write> OffsetsWriter<W> {
178
    /// Creates a new writer and writes the first offset value (0) if requested.
179
    pub fn from_write(writer: W, write_zero: bool) -> Result<Self> {
3,119,933✔
180
        let mut buffer = BufBitWriter::new(WordAdapter::new(BufWriter::new(writer)));
15,599,665✔
181
        if write_zero {
3,119,933✔
182
            buffer.write_gamma(0)?;
1,262,240✔
183
        }
184
        Ok(Self(buffer))
3,119,933✔
185
    }
186

187
    /// Pushes a new delta offset.
188
    pub fn push(&mut self, delta: u64) -> Result<usize> {
156,729,104✔
189
        Ok(self.0.write_gamma(delta)?)
470,187,312✔
190
    }
191

192
    /// Flushes the buffer.
193
    pub fn flush(&mut self) -> Result<()> {
3,119,933✔
194
        BitWrite::flush(&mut self.0)?;
6,239,866✔
195
        Ok(())
3,119,933✔
196
    }
197
}
198

199
/// Configures and runs BvGraph compression.
200
///
201
/// A `BvCompConf` is normally obtained via the convenience methods
202
/// [`BvComp::with_basename`] (for the standard compressor) or
203
/// [`BvCompZ::with_basename`] (for the [Zuckerli-based] compressor). It
204
/// can then be customized using the builder methods below and finally used
205
/// to compress a graph.
206
///
207
/// # Configuration
208
///
209
/// - [`comp_flags`]: sets [`CompFlags`] (compression window, maximum
210
///   reference count, minimum interval length, and the instantaneous codes
211
///   used for each component);
212
/// - [`bvgraphz`]: switches to the [Zuckerli-based]
213
///   reference-selection algorithm;
214
/// - [`chunk_size`]: sets the chunk size for [`BvCompZ`] (implies
215
///   `bvgraphz`);
216
/// - [`labels_basename`]: overrides the default label-file basename
217
///   (e.g., to write labels to a different disk);
218
/// - [`tmp_dir`]: sets the temporary directory for parallel
219
///   compression.
220
///
221
/// # Compression Methods
222
///
223
/// ## Unlabeled
224
///
225
/// - [`comp_graph`]: compresses a [`SequentialGraph`] sequentially;
226
/// - [`comp_lender`]: compresses a [`NodeLabelsLender`] sequentially;
227
/// - [`par_comp`]: compresses an [`IntoParLenders`] in parallel.
228
///
229
/// ## Labeled
230
///
231
/// - [`comp_labeled_graph`]: compresses a [`LabeledSequentialGraph`] and
232
///   its labels sequentially;
233
/// - [`comp_labeled_lender`]: compresses a labeled [`NodeLabelsLender`]
234
///   sequentially;
235
/// - [`par_comp_labeled`]: compresses a labeled [`IntoParLenders`] in
236
///   parallel.
237
///
238
/// The labeled variants accept a [`StoreLabelsConf`] parameter (e.g.,
239
/// [`BitStreamStoreLabelsConf`]) that controls how labels are written.
240
/// The unlabeled methods are thin wrappers that delegate to the labeled
241
/// ones with `()` as label configuration.
242
///
243
/// All methods produce the `.graph`, `.offsets`, and `.properties` files
244
/// and return the total number of bits written to the graph bitstream.
245
/// Labeled methods additionally produce label files (by default under the
246
/// basename `<basename>-labels`; see [`default_labels_basename`]).
247
///
248
/// After generating the files, you can use [`store_ef`] or
249
/// [`store_ef_with_data`] (or the command `webgraph build ef`) to generate
250
/// the `.ef` file necessary for random access.
251
///
252
/// # Examples
253
///
254
/// ```ignore
255
/// // Standard compression with default settings
256
/// BvComp::with_basename("output").comp_graph::<BE>(&graph)?;
257
///
258
/// // Standard compression with custom flags
259
/// BvComp::with_basename("output")
260
///     .comp_flags(CompFlags {
261
///         compression_window: 10,
262
///         min_interval_length: 2,
263
///         ..Default::default()
264
///     })
265
///     .comp_graph::<BE>(&graph)?;
266
///
267
/// // Parallel compression
268
/// BvComp::with_basename("output").par_comp::<BE, _>(&graph)?;
269
///
270
/// // Parallel compression with labels
271
/// let label_config =
272
///     BitStreamStoreLabelsConf::<BE, _>::new(FixedWidth::<u32>::new());
273
/// BvComp::with_basename("output")
274
///     .par_comp_labeled::<BE, _, _>(&labeled_graph, label_config)?;
275
///
276
/// // Zuckerli-based compression
277
/// BvCompZ::with_basename("output").comp_graph::<BE>(&graph)?;
278
/// ```
279
///
280
/// [Zuckerli-based]: BvCompZ
281
/// [`comp_flags`]: Self::comp_flags
282
/// [`bvgraphz`]: Self::bvgraphz
283
/// [`chunk_size`]: Self::chunk_size
284
/// [`labels_basename`]: Self::labels_basename
285
/// [`default_labels_basename`]: Self::default_labels_basename
286
/// [`tmp_dir`]: Self::tmp_dir
287
/// [`comp_graph`]: Self::comp_graph
288
/// [`comp_lender`]: Self::comp_lender
289
/// [`comp_labeled_graph`]: Self::comp_labeled_graph
290
/// [`comp_labeled_lender`]: Self::comp_labeled_lender
291
/// [`NodeLabelsLender`]: crate::traits::NodeLabelsLender
292
/// [`par_comp`]: Self::par_comp
293
/// [`par_comp_labeled`]: Self::par_comp_labeled
294
/// [`store_ef`]: crate::graphs::bvgraph::store_ef
295
/// [`store_ef_with_data`]: crate::graphs::bvgraph::store_ef_with_data
296
/// [`BitStreamStoreLabelsConf`]: crate::labels::BitStreamStoreLabelsConf
297
#[derive(Debug)]
298
pub struct BvCompConf<PL = Option<ProgressLogger>> {
299
    /// The basename of the output files.
300
    basename: PathBuf,
301
    /// Compression flags for BvComp/BvCompZ.
302
    comp_flags: CompFlags,
303
    /// Selects the Zuckerli-based BVGraph compressor
304
    bvgraphz: bool,
305
    /// The chunk size for the Zuckerli-based compressor
306
    chunk_size: usize,
307
    /// Custom basename for label files, overriding the default derivation.
308
    labels_basename: Option<PathBuf>,
309
    /// Temporary directory for all operations.
310
    tmp_dir: Option<PathBuf>,
311
    /// Owns the TempDir that [`Self::tmp_dir`] refers to, if it was created by default.
312
    owned_tmp_dir: Option<tempfile::TempDir>,
313
    /// Progress logger for compression methods.
314
    pl: PL,
315
}
316

317
impl BvCompConf {
318
    /// Creates a new compression configuration with the given basename and
319
    /// default options.
320
    ///
321
    /// Note that the convenience methods [`BvComp::with_basename`] and
322
    /// [`BvCompZ::with_basename`] can be used to create a configuration with
323
    /// default options.
324
    ///
325
    /// [`BvComp::with_basename`]: crate::graphs::bvgraph::comp::BvComp::with_basename
326
    /// [`BvCompZ::with_basename`]: crate::graphs::bvgraph::comp::BvCompZ::with_basename
327
    pub fn new(basename: impl AsRef<Path>) -> Self {
124,752✔
328
        Self {
329
            basename: basename.as_ref().into(),
374,256✔
330
            comp_flags: CompFlags::default(),
249,504✔
331
            bvgraphz: false,
332
            chunk_size: 10_000,
333
            labels_basename: None,
334
            tmp_dir: None,
335
            owned_tmp_dir: None,
336
            pl: None,
337
        }
338
    }
339

340
    /// Returns the default basename for label files given a graph basename.
341
    ///
342
    /// By convention, label files use a basename derived from the graph
343
    /// basename by appending [`LABELS_BASENAME_SUFFIX`] (e.g., `graph-labels`
344
    /// for a graph with basename `graph`). The label files then use standard
345
    /// extensions: `.labels`, `.offsets`, `.properties`, `.ef`.
346
    pub fn default_labels_basename(basename: impl AsRef<Path>) -> PathBuf {
1,244,642✔
347
        let mut name = basename.as_ref().as_os_str().to_owned();
3,733,926✔
348
        name.push(LABELS_BASENAME_SUFFIX);
2,489,284✔
349
        PathBuf::from(name)
2,489,284✔
350
    }
351
}
352

353
impl<PL> BvCompConf<PL> {
354
    /// Sets the [`CompFlags`] controlling the compression parameters
355
    /// (compression window, maximum reference count, minimum interval length,
356
    /// and the instantaneous codes used for each component of the successor
357
    /// list).
358
    pub fn comp_flags(mut self, compression_flags: CompFlags) -> Self {
1,088,834✔
359
        self.comp_flags = compression_flags;
1,088,834✔
360
        self
1,088,834✔
361
    }
362

363
    /// Sets the temporary directory used by [`par_comp`] to store
364
    /// partial bitstreams. If not set, a system temporary directory is created
365
    /// automatically.
366
    ///
367
    /// [`par_comp`]: Self::par_comp
368
    pub fn tmp_dir(mut self, tmp_dir: impl AsRef<Path>) -> Self {
12✔
369
        self.tmp_dir = Some(tmp_dir.as_ref().into());
24✔
370
        self
12✔
371
    }
372

373
    /// Switches to the [`BvCompZ`] (Zuckerli-based) reference-selection
374
    /// algorithm.
375
    pub fn bvgraphz(mut self) -> Self {
466,576✔
376
        self.bvgraphz = true;
466,576✔
377
        self
466,576✔
378
    }
379

380
    /// Sets the chunk size for [`BvCompZ`] and enables the Zuckerli-based
381
    /// compressor. The chunk size controls how many consecutive nodes are
382
    /// buffered before running the dynamic-programming reference-selection
383
    /// algorithm; larger chunks can yield better compression at the cost of
384
    /// more memory. Implies [`bvgraphz`].
385
    ///
386
    /// [`bvgraphz`]: Self::bvgraphz
387
    pub fn chunk_size(mut self, chunk_size: usize) -> Self {
466,570✔
388
        self.bvgraphz = true;
466,570✔
389
        self.chunk_size = chunk_size;
466,570✔
390
        self
466,570✔
391
    }
392

393
    /// Sets a custom basename for label files, overriding the
394
    /// [`default_labels_basename`](Self::default_labels_basename) convention.
395
    ///
396
    /// This is useful, for example, to write labels to a different disk.
397
    pub fn labels_basename(mut self, labels_basename: impl AsRef<Path>) -> Self {
×
398
        self.labels_basename = Some(labels_basename.as_ref().into());
×
399
        self
×
400
    }
401

402
    /// Sets the progress logger for the compression methods.
403
    ///
404
    /// Only the [`item_name`](ProgressLog::item_name) and
405
    /// [`expected_updates`](ProgressLog::expected_updates) are set by the
406
    /// compression methods; all other properties (e.g., display options,
407
    /// log interval) should be configured by the caller.
408
    pub fn progress_logger<PL2>(self, pl: PL2) -> BvCompConf<PL2> {
10✔
409
        BvCompConf {
410
            basename: self.basename,
20✔
411
            comp_flags: self.comp_flags,
20✔
412
            bvgraphz: self.bvgraphz,
20✔
413
            chunk_size: self.chunk_size,
20✔
414
            labels_basename: self.labels_basename,
20✔
415
            tmp_dir: self.tmp_dir,
20✔
416
            owned_tmp_dir: self.owned_tmp_dir,
10✔
417
            pl,
418
        }
419
    }
420

421
    fn resolve_labels_basename(&self) -> PathBuf {
1,244,612✔
422
        self.labels_basename
1,244,612✔
423
            .clone()
424
            .unwrap_or_else(|| BvCompConf::default_labels_basename(&self.basename))
3,733,836✔
425
    }
426

427
    fn resolve_tmp_dir(&mut self) -> Result<PathBuf> {
622,187✔
428
        if self.tmp_dir.is_none() {
1,244,374✔
429
            let tmp_dir = tempfile::tempdir()?;
1,244,354✔
430
            self.tmp_dir = Some(tmp_dir.path().to_owned());
1,244,354✔
431
            self.owned_tmp_dir = Some(tmp_dir);
1,244,354✔
432
        }
433

434
        let tmp_dir = self.tmp_dir.clone().unwrap();
2,488,748✔
435
        if !std::fs::exists(&tmp_dir)
1,244,374✔
436
            .with_context(|| format!("Could not check whether {} exists", tmp_dir.display()))?
622,187✔
437
        {
438
            std::fs::create_dir_all(&tmp_dir)
×
439
                .with_context(|| format!("Could not create {}", tmp_dir.display()))?;
×
440
        }
441
        Ok(tmp_dir)
622,187✔
442
    }
443
}
444

445
impl<PL: ProgressLog> BvCompConf<PL> {
446
    /// Compresses sequentially a [`SequentialGraph`] and returns
447
    /// the number of bits written to the graph bitstream.
448
    ///
449
    /// This is a convenience wrapper around [`comp_labeled_graph`] with no
450
    /// label storage.
451
    ///
452
    /// [`comp_labeled_graph`]: Self::comp_labeled_graph
453
    pub fn comp_graph<E: Endianness>(&mut self, graph: impl SequentialGraph) -> Result<u64>
373,330✔
454
    where
455
        BufBitWriter<E, WordAdapter<usize, BufWriter<File>>>: CodesWrite<E>,
456
    {
457
        self.comp_labeled_graph::<E, (), ()>(UnitLabelGraph(graph), ())
1,493,320✔
458
    }
459

460
    /// Compresses sequentially a [`NodeLabelsLender`] and returns
461
    /// the number of bits written to the graph bitstream.
462
    ///
463
    /// This is a convenience wrapper around [`comp_labeled_lender`] with no
464
    /// label storage. The optional `expected_num_nodes` parameter will be
465
    /// used to provide forecasts on the progress logger.
466
    ///
467
    /// [`comp_labeled_lender`]: Self::comp_labeled_lender
468
    pub fn comp_lender<E, L>(&mut self, iter: L, expected_num_nodes: Option<usize>) -> Result<u64>
1✔
469
    where
470
        E: Endianness,
471
        L: IntoLender,
472
        L::Lender: for<'next> NodeLabelsLender<'next, Label = usize>,
473
        BufBitWriter<E, WordAdapter<usize, BufWriter<File>>>: CodesWrite<E>,
474
    {
475
        self.comp_labeled_lender::<E, _, _>(
2✔
476
            UnitLabelLender(iter.into_lender()),
1✔
477
            (),
478
            expected_num_nodes,
1✔
479
        )
480
    }
481

482
    /// Compresses sequentially a [`LabeledSequentialGraph`] and returns
483
    /// the number of bits written to the graph bitstream.
484
    ///
485
    /// The `store_labels_config` parameter provides the factory for creating
486
    /// label storage instances alongside graph compression (e.g.,
487
    /// [`BitStreamStoreLabelsConf`]).
488
    ///
489
    /// [`BitStreamStoreLabelsConf`]: crate::labels::BitStreamStoreLabelsConf
490
    pub fn comp_labeled_graph<E: Endianness, L, SLC: StoreLabelsConf>(
373,332✔
491
        &mut self,
492
        graph: impl LabeledSequentialGraph<L>,
493
        store_labels_config: SLC,
494
    ) -> Result<u64>
495
    where
496
        SLC::StoreLabels: StoreLabels<Label = L>,
497
        BufBitWriter<E, WordAdapter<usize, BufWriter<File>>>: CodesWrite<E>,
498
    {
499
        let num_nodes = graph.num_nodes();
1,119,996✔
500
        self.comp_labeled_lender::<E, _, _>(graph.iter(), store_labels_config, Some(num_nodes))
2,239,992✔
501
    }
502

503
    /// Compresses sequentially a labeled [`NodeLabelsLender`] and returns
504
    /// the number of bits written to the graph bitstream.
505
    ///
506
    /// The `store_labels_config` parameter provides the factory for creating
507
    /// label storage instances alongside graph compression. Use `()` for
508
    /// unlabeled graphs.
509
    ///
510
    /// The optional `expected_num_nodes` parameter will be used to provide
511
    /// forecasts on the progress logger.
512
    pub fn comp_labeled_lender<E, L, SLC>(
373,333✔
513
        &mut self,
514
        iter: L,
515
        store_labels_config: SLC,
516
        expected_num_nodes: Option<usize>,
517
    ) -> Result<u64>
518
    where
519
        E: Endianness,
520
        L: IntoLender,
521
        SLC: StoreLabelsConf,
522
        SLC::StoreLabels: StoreLabels,
523
        L::Lender: for<'next> NodeLabelsLender<
524
                'next,
525
                Label = (usize, <SLC::StoreLabels as StoreLabels>::Label),
526
            >,
527
        BufBitWriter<E, WordAdapter<usize, BufWriter<File>>>: CodesWrite<E>,
528
    {
529
        let graph_path = self.basename.with_extension(GRAPH_EXTENSION);
746,666✔
530
        let labels_basename = self.resolve_labels_basename();
1,119,999✔
531
        let labels_path = labels_basename.with_extension(LABELS_EXTENSION);
746,666✔
532
        let label_offsets_path = labels_basename.with_extension(OFFSETS_EXTENSION);
746,666✔
533

534
        // Compress the graph
535
        let bit_write = buf_bit_writer::from_path::<E, usize>(&graph_path)
1,119,999✔
536
            .with_context(|| format!("Could not create {}", graph_path.display()))?;
373,333✔
537

538
        let codes_writer = DynCodesEncoder::new(bit_write, &self.comp_flags)?;
1,493,332✔
539

540
        // create a file for offsets
541
        let offsets_path = self.basename.with_extension(OFFSETS_EXTENSION);
746,666✔
542
        let offset_writer = OffsetsWriter::from_path(offsets_path, true)?;
1,119,999✔
543

544
        let mut store_labels =
373,333✔
545
            store_labels_config.new_storage(&labels_path, &label_offsets_path)?;
1,493,332✔
546
        store_labels.init()?;
746,666✔
547

548
        self.pl
373,333✔
549
            .item_name("node")
550
            .expected_updates(expected_num_nodes);
746,666✔
551
        self.pl.start("Compressing successors...");
746,666✔
552
        let comp_stats = if self.bvgraphz {
746,666✔
553
            let mut bvcompz = BvCompZ::new(
554
                codes_writer,
279,948✔
555
                offset_writer,
279,948✔
556
                self.comp_flags.compression_window,
279,948✔
557
                self.chunk_size,
279,948✔
558
                self.comp_flags.max_ref_count,
279,948✔
559
                self.comp_flags.min_interval_length,
279,948✔
560
                0,
561
                store_labels,
279,948✔
562
            );
563

564
            for_! ( (_node_id, successors) in iter {
62,778,302✔
565
                bvcompz.push(successors).context("Could not push successors")?;
124,996,708✔
566
                log_comp_stats(&bvcompz.stats(), false);
62,498,354✔
567
                self.pl.update();
62,498,354✔
568
            });
569
            log_comp_stats(&bvcompz.stats(), true);
559,896✔
570
            self.pl.done();
559,896✔
571

572
            bvcompz.flush()?
559,896✔
573
        } else {
574
            let mut bvcomp = BvComp::new(
575
                codes_writer,
93,385✔
576
                offset_writer,
93,385✔
577
                self.comp_flags.compression_window,
93,385✔
578
                self.comp_flags.max_ref_count,
93,385✔
579
                self.comp_flags.min_interval_length,
93,385✔
580
                0,
581
                store_labels,
93,385✔
582
            );
583

584
            for_! ( (_node_id, successors) in iter {
25,919,299✔
585
                bvcomp.push(successors).context("Could not push successors")?;
51,651,828✔
586
                log_comp_stats(&bvcomp.stats(), false);
25,825,914✔
587
                self.pl.update();
25,825,914✔
588
            });
589
            log_comp_stats(&bvcomp.stats(), true);
186,770✔
590
            self.pl.done();
186,770✔
591

592
            bvcomp.flush()?
186,770✔
593
        };
594

595
        if let Some(num_nodes) = expected_num_nodes {
746,666✔
596
            if num_nodes != comp_stats.num_nodes {
373,333✔
597
                log::warn!(
×
598
                    "The expected number of nodes is {num_nodes} but the actual number of nodes is {}",
×
599
                    comp_stats.num_nodes,
×
600
                );
601
            }
602
        }
603

604
        log::info!("Writing the .properties file");
373,333✔
605
        let properties = self
746,666✔
606
            .comp_flags
373,333✔
607
            .to_properties::<E>(&comp_stats)
746,666✔
608
            .context("Could not serialize properties")?;
609
        let properties_path = self.basename.with_extension(PROPERTIES_EXTENSION);
746,666✔
610
        std::fs::write(&properties_path, properties)
1,119,999✔
611
            .with_context(|| format!("Could not write {}", properties_path.display()))?;
373,333✔
612

613
        let label_ser_name = store_labels_config.label_serializer_name();
1,119,999✔
614
        if label_ser_name != "()" {
373,333✔
615
            write_label_properties::<E>(
616
                &labels_basename,
2✔
617
                &label_ser_name,
2✔
618
                comp_stats.num_nodes,
2✔
619
                comp_stats.num_arcs,
2✔
620
                comp_stats.labels_written_bits,
2✔
621
            )?;
622
        }
623

624
        Ok(comp_stats.written_bits)
373,333✔
625
    }
626

627
    /// Compresses an [`IntoParLenders`] in parallel and returns the length
628
    /// in bits of the graph bitstream.
629
    ///
630
    /// This is a convenience wrapper around [`par_comp_labeled`] with no
631
    /// label storage. See [`par_comp_labeled`] for details on the parallel
632
    /// compression strategy.
633
    ///
634
    /// [`par_comp_labeled`]: Self::par_comp_labeled
635
    pub fn par_comp<E: Endianness, G>(&mut self, graph: G) -> Result<u64>
373,275✔
636
    where
637
        G: for<'a> IntoParLenders<ParLender: NodeLabelsLender<'a, Label = usize>>,
638
        BufBitWriter<E, WordAdapter<usize, BufWriter<std::fs::File>>>: CodesWrite<E>,
639
        BufBitReader<E, WordAdapter<u32, BufReader<std::fs::File>>>: BitRead<E>,
640
    {
641
        self.par_comp_labeled::<E, _, _>(UnitLabelParLenders(graph), ())
1,493,100✔
642
    }
643

644
    /// Compresses a labeled [`IntoParLenders`] in parallel and returns the
645
    /// length in bits of the graph bitstream.
646
    ///
647
    /// The method calls [`into_par_lenders`] to obtain lenders and
648
    /// boundaries, then compresses each lender in a separate thread. Each
649
    /// worker thread also writes per-chunk label and label-offset files via
650
    /// a [`StoreLabels`] instance obtained from `store_labels_config` (use
651
    /// `()` for unlabeled graphs). After all threads finish, the main thread
652
    /// concatenates the chunk files using
653
    /// [`StoreLabelsConf::concat_part`].
654
    ///
655
    /// The number of parallel compression threads will be
656
    /// [`current_num_threads`]. It is your responsibility to ensure that the
657
    /// number of threads is appropriate for the number of lenders returned
658
    /// by [`into_par_lenders`], possibly using [`install`].
659
    ///
660
    /// A concrete implementation of [`StoreLabelsConf`] for
661
    /// bitstream-based labels is [`BitStreamStoreLabelsConf`].
662
    ///
663
    /// # Examples
664
    ///
665
    /// Compresses a labeled graph in parallel, then loads and verifies the
666
    /// result using [`BitStreamLabelingSeq::load`]:
667
    ///
668
    /// [`into_par_lenders`]: IntoParLenders::into_par_lenders
669
    /// [`install`]: rayon::ThreadPool::install
670
    /// [`BitStreamStoreLabelsConf`]: crate::labels::BitStreamStoreLabelsConf
671
    ///
672
    /// ```
673
    /// # use anyhow::Result;
674
    /// # use dsi_bitstream::prelude::*;
675
    /// # use webgraph::prelude::*;
676
    /// # use webgraph::graphs::bvgraph::*;
677
    /// # use webgraph::labels::BitStreamLabelingSeq;
678
    /// # use webgraph::graphs::vec_graph::LabeledVecGraph;
679
    /// # use webgraph::labels::BitStreamStoreLabelsConf;
680
    /// # use webgraph::traits::FixedWidth;
681
    /// # fn main() -> Result<()> {
682
    /// # let tmp = tempfile::TempDir::new()?;
683
    /// # let basename = tmp.path().join("example");
684
    /// let graph = LabeledVecGraph::from_arcs([
685
    ///     ((0, 1), 10u32),
686
    ///     ((0, 2), 20),
687
    ///     ((1, 3), 30),
688
    ///     ((2, 3), 40),
689
    ///     ((3, 0), 50),
690
    /// ]);
691
    ///
692
    /// let label_config =
693
    ///     BitStreamStoreLabelsConf::<BE, _>::new(FixedWidth::<u32>::new());
694
    ///
695
    /// BvComp::with_basename(&basename)
696
    ///     .par_comp_labeled::<BE, _, _>(&graph, label_config)?;
697
    ///
698
    /// let labels_basename = BvCompConf::default_labels_basename(&basename);
699
    ///
700
    /// // Load the compressed graph and labeling, then verify
701
    /// let seq = BvGraphSeq::with_basename(&basename)
702
    ///     .endianness::<BE>()
703
    ///     .load()?;
704
    /// let labeling = BitStreamLabelingSeq::<BE, _, _>::load(
705
    ///     &labels_basename,
706
    ///     FixedWidth::<u32>::new(),
707
    /// )?;
708
    ///
709
    /// graph::eq_labeled(&graph, &Zip(seq, labeling))?;
710
    /// # Ok(())
711
    /// # }
712
    /// ```
713
    ///
714
    /// [`par_comp`]: Self::par_comp
715
    /// [`BitStreamLabelingSeq::load`]: crate::labels::BitStreamLabelingSeq::load
716
    pub fn par_comp_labeled<E: Endianness, G, SLC>(
373,279✔
717
        &mut self,
718
        graph: G,
719
        mut store_labels_config: SLC,
720
    ) -> Result<u64>
721
    where
722
        G: for<'a> IntoParLenders<
723
            ParLender: NodeLabelsLender<
724
                'a,
725
                Label = (usize, <SLC::StoreLabels as StoreLabels>::Label),
726
            >,
727
        >,
728
        SLC: StoreLabelsConf + Send + Sync,
729
        SLC::StoreLabels: Send,
730
        BufBitWriter<E, WordAdapter<usize, BufWriter<std::fs::File>>>: CodesWrite<E>,
731
        BufBitReader<E, WordAdapter<u32, BufReader<std::fs::File>>>: BitRead<E>,
732
    {
733
        let (lenders, boundaries) = graph.into_par_lenders();
1,119,837✔
734
        let num_nodes = *boundaries.last().unwrap_or(&0);
1,493,116✔
735
        let tmp_dir = self.resolve_tmp_dir()?;
1,119,837✔
736

737
        let graph_path = self.basename.with_extension(GRAPH_EXTENSION);
746,558✔
738
        let offsets_path = self.basename.with_extension(OFFSETS_EXTENSION);
746,558✔
739
        let label_base = self.resolve_labels_basename();
1,119,837✔
740
        let labels_path = label_base.with_extension(LABELS_EXTENSION);
746,558✔
741
        let label_offsets_path = label_base.with_extension(OFFSETS_EXTENSION);
746,558✔
742

743
        let (tx, rx) = crossbeam_channel::unbounded();
1,119,837✔
744

745
        let thread_path = |thread_id: usize| tmp_dir.join(format!("{thread_id:016x}.bitstream"));
4,852,615✔
746

747
        let mut comp_pl = self.pl.concurrent();
1,119,837✔
748
        comp_pl
746,558✔
749
            .item_name("node")
750
            .expected_updates(num_nodes)
373,279✔
751
            .local_speed(true);
752
        comp_pl.start(format!(
1,119,837✔
753
            "Compressing successors in parallel using {} threads...",
×
754
            current_num_threads()
373,279✔
755
        ));
756
        let cp_flags = &self.comp_flags;
746,558✔
757
        let bvgraphz = self.bvgraphz;
746,558✔
758
        let chunk_size = self.chunk_size;
746,558✔
759

760
        in_place_scope(|s| {
746,558✔
761
            for (thread_id, mut thread_lender) in Vec::from(lenders).into_iter().enumerate() {
4,479,340✔
762
                let tmp_path = thread_path(thread_id);
2,986,224✔
763
                let chunk_graph_path = tmp_path.with_extension(GRAPH_EXTENSION);
2,986,224✔
764
                let chunk_offsets_path = tmp_path.with_extension(OFFSETS_EXTENSION);
2,986,224✔
765
                let label_tmp_stem = tmp_dir.join(format!("{thread_id:016x}-labels"));
4,479,336✔
766
                let part_labels_path = label_tmp_stem.with_extension(LABELS_EXTENSION);
2,986,224✔
767
                let part_label_offsets_path = label_tmp_stem.with_extension(OFFSETS_EXTENSION);
2,986,224✔
768
                let store_labels =
1,493,112✔
769
                    store_labels_config.new_storage(&part_labels_path, &part_label_offsets_path)?;
5,972,448✔
770
                let tx = tx.clone();
4,479,336✔
771
                let mut comp_pl = comp_pl.clone();
4,479,336✔
772
                s.spawn(move |_| {
4,479,336✔
773
                    log::debug!("Thread {thread_id} started");
1,493,112✔
774

775
                    let Some((node_id, successors)) = thread_lender.next() else {
4,479,332✔
776
                        return;
2✔
777
                    };
778

779
                    let first_node = node_id;
2,986,220✔
780
                    let writer = buf_bit_writer::from_path::<E, usize>(&chunk_graph_path).unwrap();
5,972,440✔
781
                    let codes_encoder = <DynCodesEncoder<E, _>>::new(writer, cp_flags).unwrap();
7,465,550✔
782

783
                    let stats;
×
784
                    let mut last_node;
×
785
                    if bvgraphz {
1,493,110✔
786
                        let mut bvcomp = BvCompZ::new(
2,239,496✔
787
                            codes_encoder,
1,119,748✔
788
                            OffsetsWriter::from_path(&chunk_offsets_path, false).unwrap(),
3,359,244✔
789
                            cp_flags.compression_window,
1,119,748✔
790
                            chunk_size,
1,119,748✔
791
                            cp_flags.max_ref_count,
1,119,748✔
792
                            cp_flags.min_interval_length,
1,119,748✔
793
                            node_id,
1,119,748✔
794
                            store_labels,
1,119,748✔
795
                        );
796
                        bvcomp.push(successors).unwrap();
4,478,992✔
797
                        last_node = first_node;
1,119,748✔
798
                        let iter_nodes = thread_lender.inspect(|(x, _)| last_node = *x);
30,558,653✔
799
                        for_! ( (_, succ) in iter_nodes {
55,518,566✔
800
                            bvcomp.push(succ.into_iter()).unwrap();
163,196,454✔
801
                            log_comp_stats(&bvcomp.stats(), false);
81,598,227✔
802
                            comp_pl.update();
27,199,409✔
803
                        });
804
                        stats = bvcomp.flush().unwrap();
2,239,496✔
805
                    } else {
806
                        let mut bvcomp = BvComp::new(
746,724✔
807
                            codes_encoder,
373,362✔
808
                            OffsetsWriter::from_path(&chunk_offsets_path, false).unwrap(),
1,120,086✔
809
                            cp_flags.compression_window,
373,362✔
810
                            cp_flags.max_ref_count,
373,362✔
811
                            cp_flags.min_interval_length,
373,362✔
812
                            node_id,
373,362✔
813
                            store_labels,
373,362✔
814
                        );
815
                        bvcomp.push(successors).unwrap();
1,493,448✔
816
                        last_node = first_node;
373,362✔
817
                        let iter_nodes = thread_lender.inspect(|(x, _)| last_node = *x);
16,914,660✔
818
                        for_! ( (_, succ) in iter_nodes {
31,962,510✔
819
                            bvcomp.push(succ.into_iter()).unwrap();
94,767,444✔
820
                            log_comp_stats(&bvcomp.stats(), false);
47,383,722✔
821
                            comp_pl.update();
15,794,574✔
822
                        });
823
                        stats = bvcomp.flush().unwrap();
746,724✔
824
                    }
825

826
                    log::debug!(
1,493,110✔
827
                        "Finished Compression thread {thread_id} and wrote {} bits for the graph and {} bits for the offsets",
×
828
                        stats.written_bits, stats.offsets_written_bits,
×
829
                    );
830
                    tx.send(Job {
4,479,330✔
831
                        job_id: thread_id,
2,986,220✔
832
                        first_node,
2,986,220✔
833
                        last_node,
2,986,220✔
834
                        chunk_graph_path,
2,986,220✔
835
                        written_bits: stats.written_bits,
2,986,220✔
836
                        chunk_offsets_path,
2,986,220✔
837
                        offsets_written_bits: stats.offsets_written_bits,
2,986,220✔
838
                        num_arcs: stats.num_arcs,
2,986,220✔
839
                        tot_ref: stats.tot_ref,
2,986,220✔
840
                        tot_dist: stats.tot_dist,
2,986,220✔
841
                        part_labels_path: Some(part_labels_path),
2,986,220✔
842
                        labels_written_bits: stats.labels_written_bits,
2,986,220✔
843
                        part_label_offsets_path: Some(part_label_offsets_path),
1,493,110✔
844
                        label_offsets_written_bits: stats.label_offsets_written_bits,
1,493,110✔
845
                    })
846
                    .ok(); // If channel is closed, main thread already has an error
1,493,110✔
847
                });
848
            }
849

850
            drop(tx);
746,558✔
851

852
            self.pl.item_name("node").expected_updates(num_nodes);
1,119,837✔
853
            self.pl
373,279✔
854
                .start("Copying compressed successors to final graph");
373,279✔
855

856
            let mut graph_writer = buf_bit_writer::from_path::<E, usize>(&graph_path)
1,119,837✔
857
                .with_context(|| format!("Could not create graph {}", graph_path.display()))?;
373,279✔
858

859
            let mut offsets_writer = buf_bit_writer::from_path::<BE, usize>(&offsets_path)
1,119,837✔
860
                .with_context(|| format!("Could not create offsets {}", offsets_path.display()))?;
373,279✔
861
            offsets_writer.write_gamma(0)?;
746,558✔
862

863
            store_labels_config.init_concat(&labels_path, &label_offsets_path)?;
1,493,116✔
864

865
            let mut total_stats = super::CompStats::default();
746,558✔
866

867
            let mut next_node = 0;
746,558✔
868
            // glue together the bitstreams as they finish, this allows us to do
869
            // task pipelining for better performance
870
            for Job {
×
871
                job_id,
1,493,110✔
872
                first_node,
1,493,110✔
873
                last_node,
1,493,110✔
874
                chunk_graph_path,
1,493,110✔
875
                written_bits,
1,493,110✔
876
                chunk_offsets_path,
1,493,110✔
877
                offsets_written_bits,
1,493,110✔
878
                num_arcs,
1,493,110✔
879
                tot_ref,
1,493,110✔
880
                tot_dist,
1,493,110✔
881
                part_labels_path,
1,493,110✔
882
                labels_written_bits,
1,493,110✔
883
                part_label_offsets_path,
1,493,110✔
884
                label_offsets_written_bits,
1,493,110✔
885
            } in TaskQueue::new(rx.into_rayon_iter())
1,119,837✔
886
            {
887
                ensure!(
1,493,110✔
888
                    first_node == next_node,
1,493,110✔
889
                    "Non-adjacent lenders: lender {} has first node {} instead of {}",
×
890
                    job_id,
×
891
                    first_node,
×
892
                    next_node
×
893
                );
894

895
                next_node = last_node + 1;
1,493,110✔
896
                log::debug!(
1,493,110✔
897
                    "Copying {} [{}..{}) bits from {} to {}",
×
898
                    written_bits,
×
899
                    total_stats.written_bits,
×
900
                    total_stats.written_bits + written_bits,
36✔
901
                    chunk_graph_path.display(),
36✔
902
                    graph_path.display()
36✔
903
                );
904
                let mut reader = buf_bit_reader::from_path::<E, u32>(&chunk_graph_path)?;
4,479,330✔
905
                graph_writer
1,493,110✔
906
                    .copy_from(&mut reader, written_bits)
4,479,330✔
907
                    .with_context(|| {
1,493,110✔
908
                        format!(
×
909
                            "Could not copy from {} to {}",
×
910
                            chunk_graph_path.display(),
×
911
                            graph_path.display()
×
912
                        )
913
                    })?;
914
                std::fs::remove_file(chunk_graph_path)?;
2,986,220✔
915

916
                log::debug!(
1,493,110✔
917
                    "Copying offsets {} [{}..{}) bits from {} to {}",
×
918
                    offsets_written_bits,
×
919
                    total_stats.offsets_written_bits,
×
920
                    total_stats.offsets_written_bits + offsets_written_bits,
36✔
921
                    chunk_offsets_path.display(),
36✔
922
                    offsets_path.display()
36✔
923
                );
924

925
                let mut reader = <BufBitReader<BigEndian, _>>::new(<WordAdapter<u32, _>>::new(
4,479,330✔
926
                    BufReader::new(File::open(&chunk_offsets_path).with_context(|| {
5,972,440✔
927
                        format!("Could not open {}", chunk_offsets_path.display())
×
928
                    })?),
929
                ));
930
                offsets_writer
1,493,110✔
931
                    .copy_from(&mut reader, offsets_written_bits)
4,479,330✔
932
                    .with_context(|| {
1,493,110✔
933
                        format!(
×
934
                            "Could not copy from {} to {}",
×
935
                            chunk_offsets_path.display(),
×
936
                            offsets_path.display()
×
937
                        )
938
                    })?;
939
                std::fs::remove_file(chunk_offsets_path)?;
2,986,220✔
940

941
                if let (Some(lp), Some(lop)) = (part_labels_path, part_label_offsets_path) {
5,972,440✔
942
                    store_labels_config.concat_part(
2,986,220✔
943
                        &lp,
1,493,110✔
944
                        labels_written_bits,
1,493,110✔
945
                        &lop,
1,493,110✔
946
                        label_offsets_written_bits,
1,493,110✔
947
                    )?;
948
                }
949

950
                total_stats += super::CompStats {
2,986,220✔
951
                    num_nodes: last_node - first_node + 1,
2,986,220✔
952
                    num_arcs,
2,986,220✔
953
                    written_bits,
2,986,220✔
954
                    offsets_written_bits,
2,986,220✔
955
                    tot_ref,
2,986,220✔
956
                    tot_dist,
2,986,220✔
957
                    labels_written_bits,
1,493,110✔
958
                    label_offsets_written_bits,
1,493,110✔
959
                };
960
                self.pl.update_with_count(last_node - first_node + 1);
4,479,330✔
961
            }
962

963
            store_labels_config.flush_concat()?;
746,558✔
964

965
            log::info!("Flushing the merged bitstreams");
373,279✔
966
            graph_writer.flush()?;
746,558✔
967
            BitWrite::flush(&mut offsets_writer)?;
746,558✔
968

969
            // Use the authoritative num_nodes from the boundaries
970
            total_stats.num_nodes = num_nodes;
373,279✔
971
            log_comp_stats(&total_stats, true);
746,558✔
972
            comp_pl.done();
746,558✔
973
            self.pl.done();
746,558✔
974

975
            log::info!("Writing the .properties file");
373,279✔
976
            let properties = self
746,558✔
977
                .comp_flags
373,279✔
978
                .to_properties::<E>(&total_stats)
746,558✔
979
                .context("Could not serialize properties")?;
373,279✔
980
            let properties_path = self.basename.with_extension(PROPERTIES_EXTENSION);
746,558✔
981
            std::fs::write(&properties_path, properties).with_context(|| {
1,493,116✔
982
                format!(
×
983
                    "Could not write properties to {}",
×
984
                    properties_path.display()
×
985
                )
986
            })?;
987

988
            let label_ser_name = store_labels_config.label_serializer_name();
1,119,837✔
989
            if label_ser_name != "()" {
373,279✔
990
                write_label_properties::<E>(
4✔
991
                    &label_base,
4✔
992
                    &label_ser_name,
4✔
993
                    total_stats.num_nodes,
4✔
994
                    total_stats.num_arcs,
4✔
995
                    total_stats.labels_written_bits,
4✔
996
                )?;
997
            }
998

999
            log::info!(
373,279✔
NEW
1000
                "Compressed {} arcs into {} bits at {:.3} bits/arc",
×
1001
                total_stats.num_arcs,
×
1002
                total_stats.written_bits,
×
1003
                total_stats.written_bits as f64 / total_stats.num_arcs as f64
373,273✔
1004
            );
1005
            log::info!(
373,279✔
NEW
1006
                "Created offsets file with {} bits at {:.3} bits/node",
×
1007
                total_stats.offsets_written_bits,
×
1008
                total_stats.offsets_written_bits as f64 / num_nodes as f64
373,273✔
1009
            );
1010

1011
            // cleanup the temp files
1012
            std::fs::remove_dir_all(&tmp_dir).with_context(|| {
1,119,837✔
1013
                format!("Could not clean temporary directory {}", tmp_dir.display())
×
1014
            })?;
1015
            Ok(total_stats.written_bits)
373,279✔
1016
        })
1017
    }
1018
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc