• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vigna / webgraph-rs / 22730878689

05 Mar 2026 06:30PM UTC coverage: 71.263% (-0.2%) from 71.467%
22730878689

push

github

vigna
Support for arbitrary splittings and DCF-based splittings in the CLI

65 of 128 new or added lines in 13 files covered. (50.78%)

1 existing line in 1 file now uncovered.

6378 of 8950 relevant lines covered (71.26%)

50385114.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.0
/webgraph/src/graphs/bvgraph/comp/impls.rs
1
/*
2
 * SPDX-FileCopyrightText: 2023 Inria
3
 * SPDX-FileCopyrightText: 2023 Sebastiano Vigna
4
 *
5
 * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
6
 */
7

8
use crate::prelude::*;
9
use anyhow::{Context, Result, ensure};
10
use dsi_bitstream::prelude::*;
11
use dsi_progress_logger::prelude::*;
12
use lender::prelude::*;
13
use rayon::{current_num_threads, in_place_scope};
14
use std::fs::File;
15
use std::io::Write;
16
use std::io::{BufReader, BufWriter};
17
use std::path::{Path, PathBuf};
18

19
/// A queue that pulls jobs with ids in a contiguous initial segment of the
20
/// natural numbers from an iterator out of order and implement an iterator in
21
/// which they can be pulled in order.
22
///
23
/// Jobs must be ordered by their job id, and must implement [`Eq`] with a
24
/// [`usize`] using their job id.
25
struct TaskQueue<I: Iterator> {
26
    iter: I,
27
    jobs: Vec<Option<I::Item>>,
28
    next_id: usize,
29
}
30

31
trait JobId {
32
    fn id(&self) -> usize;
33
}
34

35
impl<I: Iterator> TaskQueue<I> {
36
    fn new(iter: I) -> Self {
622,157✔
37
        Self {
38
            iter,
39
            jobs: vec![],
622,157✔
40
            next_id: 0,
41
        }
42
    }
43
}
44

45
impl<I: Iterator> Iterator for TaskQueue<I>
46
where
47
    I::Item: JobId,
48
{
49
    type Item = I::Item;
50

51
    fn next(&mut self) -> Option<Self::Item> {
3,110,770✔
52
        loop {
×
53
            if let Some(item) = self.jobs.get_mut(self.next_id) {
14,390,426✔
54
                if item.is_some() {
6,383,320✔
55
                    self.next_id += 1;
2,488,613✔
56
                    return item.take();
4,977,226✔
57
                }
58
            }
59
            if let Some(item) = self.iter.next() {
5,599,383✔
60
                let id = item.id();
7,465,839✔
61
                if id >= self.jobs.len() {
4,977,226✔
62
                    self.jobs.resize_with(id + 1, || None);
5,669,481✔
63
                }
64
                self.jobs[id] = Some(item);
4,977,226✔
65
            } else {
66
                return None;
622,157✔
67
            }
68
        }
69
    }
70
}
71

72
/// A compression job.
73
#[derive(Debug, PartialEq, PartialOrd, Eq, Ord, Clone)]
74
struct Job {
75
    job_id: usize,
76
    first_node: usize,
77
    last_node: usize,
78
    chunk_graph_path: PathBuf,
79
    written_bits: u64,
80
    chunk_offsets_path: PathBuf,
81
    offsets_written_bits: u64,
82
    num_arcs: u64,
83
}
84

85
impl JobId for Job {
86
    fn id(&self) -> usize {
2,488,613✔
87
        self.job_id
2,488,613✔
88
    }
89
}
90

91
/// A writer for offsets.
92
///
93
/// TODO: This currently uses Write which requires std. To support no_std we will want to make W a WordWriter
94
#[derive(Debug)]
95
#[repr(transparent)]
96
pub struct OffsetsWriter<W: Write> {
97
    buffer: BufBitWriter<BigEndian, WordAdapter<usize, BufWriter<W>>>,
98
}
99

100
impl OffsetsWriter<File> {
101
    /// Creates a new writer and writes the first offset value (0) if requested.
102
    ///
103
    /// Usually, parallel compressor will write autonomously the first offset
104
    /// when copying the partial offsets files into the final offsets file.
105
    pub fn from_path(path: impl AsRef<Path>, write_zero: bool) -> Result<Self> {
3,111,018✔
106
        let file = std::fs::File::create(&path)
9,333,054✔
107
            .with_context(|| format!("Could not create {}", path.as_ref().display()))?;
3,111,018✔
108
        Self::from_write(file, write_zero)
9,333,054✔
109
    }
110
}
111

112
impl<W: Write> OffsetsWriter<W> {
113
    /// Creates a new writer and writes the first offset value (0) if requested.
114
    pub fn from_write(writer: W, write_zero: bool) -> Result<Self> {
3,119,708✔
115
        let mut buffer = BufBitWriter::new(WordAdapter::new(BufWriter::new(writer)));
15,598,540✔
116
        if write_zero {
3,119,708✔
117
            // the first offset (of the first parallel offsets file) is always zero
118
            buffer.write_gamma(0)?;
1,262,180✔
119
        }
120
        Ok(Self { buffer })
3,119,708✔
121
    }
122

123
    /// Pushes a new delta offset.
124
    pub fn push(&mut self, delta: u64) -> Result<usize> {
150,217,684✔
125
        Ok(self.buffer.write_gamma(delta)?)
450,653,052✔
126
    }
127

128
    /// Flushes the buffer.
129
    pub fn flush(&mut self) -> Result<()> {
3,119,708✔
130
        BitWrite::flush(&mut self.buffer)?;
6,239,416✔
131
        Ok(())
3,119,708✔
132
    }
133
}
134

135
/// Configures and runs BvGraph compression.
136
///
137
/// A `BvCompConfig` is normally obtained via the convenience methods
138
/// [`BvComp::with_basename`] (for the standard compressor) or
139
/// [`BvCompZ::with_basename`] (for the [Zuckerli-based](BvCompZ) compressor).
140
/// It can then be customized using the builder methods below and finally
141
/// used to compress a graph.
142
///
143
/// # Configuration
144
///
145
/// - [`with_comp_flags`](Self::with_comp_flags): sets [`CompFlags`]
146
///   (compression window, maximum reference count, minimum interval length,
147
///   and the instantaneous codes used for each component);
148
/// - [`with_bvgraphz`](Self::with_bvgraphz): switches to the
149
///   [Zuckerli-based](BvCompZ) reference-selection algorithm;
150
/// - [`with_chunk_size`](Self::with_chunk_size): sets the chunk size for
151
///   [`BvCompZ`] (implies `with_bvgraphz`);
152
/// - [`with_tmp_dir`](Self::with_tmp_dir): sets the temporary directory for
153
///   parallel compression.
154
///
155
/// # Compression Methods
156
///
157
/// - [`comp_graph`](Self::comp_graph): compresses a [`SequentialGraph`]
158
///   sequentially;
159
/// - [`comp_lender`](Self::comp_lender): compresses a
160
///   [`NodeLabelsLender`](crate::traits::NodeLabelsLender) sequentially;
161
/// - [`par_comp_graph`](Self::par_comp_graph): compresses a
162
///   [splittable](SplitLabeling) graph in parallel;
163
/// - [`par_comp_lenders`](Self::par_comp_lenders): compresses multiple
164
///   lenders in parallel.
165
///
166
/// All methods produce the `.graph`, `.offsets`, and `.properties` files
167
/// and return the total number of bits written to the graph bitstream.
168
///
169
/// # Examples
170
///
171
/// ```ignore
172
/// // Standard compression with default settings
173
/// BvComp::with_basename("output").comp_graph::<BE>(&graph)?;
174
///
175
/// // Standard compression with custom flags
176
/// BvComp::with_basename("output")
177
///     .with_comp_flags(CompFlags {
178
///         compression_window: 10,
179
///         min_interval_length: 2,
180
///         ..Default::default()
181
///     })
182
///     .comp_graph::<BE>(&graph)?;
183
///
184
/// // Parallel compression
185
/// BvComp::with_basename("output").par_comp_graph::<BE>(&graph)?;
186
///
187
/// // Zuckerli-based compression
188
/// BvCompZ::with_basename("output").comp_graph::<BE>(&graph)?;
189
/// ```
190
#[derive(Debug)]
191
pub struct BvCompConfig {
192
    /// The basename of the output files.
193
    basename: PathBuf,
194
    /// Compression flags for BvComp/BvCompZ.
195
    comp_flags: CompFlags,
196
    /// Selects the Zuckerli-based BVGraph compressor
197
    bvgraphz: bool,
198
    /// The chunk size for the Zuckerli-based compressor
199
    chunk_size: usize,
200
    /// Temporary directory for all operations.
201
    tmp_dir: Option<PathBuf>,
202
    /// Owns the TempDir that [`Self::tmp_dir`] refers to, if it was created by default.
203
    owned_tmp_dir: Option<tempfile::TempDir>,
204
}
205

206
impl BvCompConfig {
207
    /// Creates a new compression configuration with the given basename and
208
    /// default options.
209
    ///
210
    /// Note that the convenience methods
211
    /// [`BvComp::with_basename`](crate::graphs::bvgraph::comp::BvComp::with_basename)
212
    /// and
213
    /// [`BvCompZ::with_basename`](crate::graphs::bvgraph::comp::BvCompZ::with_basename)
214
    /// can be used to create a configuration with default options.
215
    pub fn new(basename: impl AsRef<Path>) -> Self {
124,708✔
216
        Self {
217
            basename: basename.as_ref().into(),
374,124✔
218
            comp_flags: CompFlags::default(),
249,416✔
219
            bvgraphz: false,
220
            chunk_size: 10_000,
221
            tmp_dir: None,
222
            owned_tmp_dir: None,
223
        }
224
    }
225
}
226

227
impl BvCompConfig {
228
    /// Sets the [`CompFlags`] controlling the compression parameters
229
    /// (compression window, maximum reference count, minimum interval length,
230
    /// and the instantaneous codes used for each component of the successor
231
    /// list).
232
    pub fn with_comp_flags(mut self, compression_flags: CompFlags) -> Self {
1,088,834✔
233
        self.comp_flags = compression_flags;
1,088,834✔
234
        self
1,088,834✔
235
    }
236

237
    /// Sets the temporary directory used by
238
    /// [`par_comp_lenders`](Self::par_comp_lenders) to store partial
239
    /// bitstreams. If not set, a system temporary directory is created
240
    /// automatically.
241
    pub fn with_tmp_dir(mut self, tmp_dir: impl AsRef<Path>) -> Self {
12✔
242
        self.tmp_dir = Some(tmp_dir.as_ref().into());
24✔
243
        self
12✔
244
    }
245

246
    /// Switches to the [`BvCompZ`] (Zuckerli-based) reference-selection
247
    /// algorithm.
248
    pub fn with_bvgraphz(mut self) -> Self {
466,576✔
249
        self.bvgraphz = true;
466,576✔
250
        self
466,576✔
251
    }
252

253
    /// Sets the chunk size for [`BvCompZ`] and enables the Zuckerli-based
254
    /// compressor. The chunk size controls how many consecutive nodes are
255
    /// buffered before running the dynamic-programming reference-selection
256
    /// algorithm; larger chunks can yield better compression at the cost of
257
    /// more memory. Implies [`with_bvgraphz`](Self::with_bvgraphz).
258
    pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
466,570✔
259
        self.bvgraphz = true;
466,570✔
260
        self.chunk_size = chunk_size;
466,570✔
261
        self
466,570✔
262
    }
263

264
    fn tmp_dir(&mut self) -> Result<PathBuf> {
622,157✔
265
        if self.tmp_dir.is_none() {
1,244,314✔
266
            let tmp_dir = tempfile::tempdir()?;
1,244,294✔
267
            self.tmp_dir = Some(tmp_dir.path().to_owned());
1,244,294✔
268
            self.owned_tmp_dir = Some(tmp_dir);
1,244,294✔
269
        }
270

271
        let tmp_dir = self.tmp_dir.clone().unwrap();
2,488,628✔
272
        if !std::fs::exists(&tmp_dir)
1,244,314✔
273
            .with_context(|| format!("Could not check whether {} exists", tmp_dir.display()))?
622,157✔
274
        {
275
            std::fs::create_dir_all(&tmp_dir)
×
276
                .with_context(|| format!("Could not create {}", tmp_dir.display()))?;
×
277
        }
278
        Ok(tmp_dir)
622,157✔
279
    }
280

281
    /// Compresses sequentially a [`SequentialGraph`] and returns
282
    /// the number of bits written to the graph bitstream.
283
    pub fn comp_graph<E: Endianness>(&mut self, graph: impl SequentialGraph) -> Result<u64>
373,326✔
284
    where
285
        BufBitWriter<E, WordAdapter<usize, BufWriter<File>>>: CodesWrite<E>,
286
    {
287
        self.comp_lender::<E, _>(graph.iter(), Some(graph.num_nodes()))
1,866,630✔
288
    }
289

290
    /// Compresses sequentially a [`NodeLabelsLender`] and returns
291
    /// the number of bits written to the graph bitstream.
292
    ///
293
    /// The optional `expected_num_nodes` parameter will be used to provide
294
    /// forecasts on the progress logger.
295
    pub fn comp_lender<E, L>(&mut self, iter: L, expected_num_nodes: Option<usize>) -> Result<u64>
373,327✔
296
    where
297
        E: Endianness,
298
        L: IntoLender,
299
        L::Lender: for<'next> NodeLabelsLender<'next, Label = usize>,
300
        BufBitWriter<E, WordAdapter<usize, BufWriter<File>>>: CodesWrite<E>,
301
    {
302
        let graph_path = self.basename.with_extension(GRAPH_EXTENSION);
746,654✔
303

304
        // Compress the graph
305
        let bit_write = buf_bit_writer::from_path::<E, usize>(&graph_path)
1,119,981✔
306
            .with_context(|| format!("Could not create {}", graph_path.display()))?;
373,327✔
307

308
        let codes_writer = DynCodesEncoder::new(bit_write, &self.comp_flags)?;
1,493,308✔
309

310
        // create a file for offsets
311
        let offsets_path = self.basename.with_extension(OFFSETS_EXTENSION);
746,654✔
312
        let offset_writer = OffsetsWriter::from_path(offsets_path, true)?;
1,119,981✔
313

314
        let mut pl = progress_logger![
746,654✔
315
            display_memory = true,
×
316
            item_name = "node",
×
317
            expected_updates = expected_num_nodes,
373,327✔
318
        ];
319
        pl.start("Compressing successors...");
746,654✔
320
        let comp_stats = if self.bvgraphz {
746,654✔
321
            let mut bvcompz = BvCompZ::new(
322
                codes_writer,
279,948✔
323
                offset_writer,
279,948✔
324
                self.comp_flags.compression_window,
279,948✔
325
                self.chunk_size,
279,948✔
326
                self.comp_flags.max_ref_count,
279,948✔
327
                self.comp_flags.min_interval_length,
279,948✔
328
                0,
329
            );
330

331
            for_! ( (_node_id, successors) in iter {
62,778,302✔
332
                bvcompz.push(successors).context("Could not push successors")?;
124,996,708✔
333
                pl.update();
62,498,354✔
334
            });
335
            pl.done();
559,896✔
336

337
            bvcompz.flush()?
559,896✔
338
        } else {
339
            let mut bvcomp = BvComp::new(
340
                codes_writer,
93,379✔
341
                offset_writer,
93,379✔
342
                self.comp_flags.compression_window,
93,379✔
343
                self.comp_flags.max_ref_count,
93,379✔
344
                self.comp_flags.min_interval_length,
93,379✔
345
                0,
346
            );
347

348
            for_! ( (_node_id, successors) in iter {
25,268,147✔
349
                bvcomp.push(successors).context("Could not push successors")?;
50,349,536✔
350
                pl.update();
25,174,768✔
351
            });
352
            pl.done();
186,758✔
353

354
            bvcomp.flush()?
186,758✔
355
        };
356

357
        if let Some(num_nodes) = expected_num_nodes {
746,654✔
358
            if num_nodes != comp_stats.num_nodes {
373,327✔
359
                log::warn!(
×
360
                    "The expected number of nodes is {num_nodes} but the actual number of nodes is {}",
×
361
                    comp_stats.num_nodes,
×
362
                );
363
            }
364
        }
365

366
        log::info!("Writing the .properties file");
373,327✔
367
        let properties = self
746,654✔
368
            .comp_flags
373,327✔
369
            .to_properties::<E>(
370
                comp_stats.num_nodes,
373,327✔
371
                comp_stats.num_arcs,
373,327✔
372
                comp_stats.written_bits,
373,327✔
373
            )
374
            .context("Could not serialize properties")?;
375
        let properties_path = self.basename.with_extension(PROPERTIES_EXTENSION);
746,654✔
376
        std::fs::write(&properties_path, properties)
1,119,981✔
377
            .with_context(|| format!("Could not write {}", properties_path.display()))?;
373,327✔
378

379
        Ok(comp_stats.written_bits)
373,327✔
380
    }
381

382
    /// Splits a graph into as many parts as there are threads in the current
383
    /// Rayon thread pool and compresses it in parallel, returning the length
384
    /// in bits of the graph bitstream.
385
    ///
386
    /// Endianness can only be [`BE::NAME`](BE) or [`LE::NAME`](LE).
387
    ///
388
    /// A given endianness is enabled only if the corresponding feature is
389
    /// enabled, `be_bins` for big endian and `le_bins` for little endian, or if
390
    /// neither features are enabled.
391
    pub fn par_comp_lenders_endianness<G: SplitLabeling + SequentialGraph>(
5✔
392
        &mut self,
393
        graph: &G,
394
        endianness: &str,
395
    ) -> Result<u64>
396
    where
397
        for<'a> <G as SplitLabeling>::SplitLender<'a>: ExactSizeLender + Send + Sync,
398
    {
399
        let num_nodes = graph.num_nodes();
15✔
400
        let n = rayon::current_num_threads();
10✔
401

402
        match endianness {
5✔
403
            #[cfg(any(
404
                feature = "be_bins",
405
                not(any(feature = "be_bins", feature = "le_bins"))
406
            ))]
407
            BE::NAME => self.par_comp_lenders::<BigEndian, _>(graph.split_iter(n), num_nodes),
35✔
408
            #[cfg(any(
409
                feature = "le_bins",
410
                not(any(feature = "be_bins", feature = "le_bins"))
411
            ))]
NEW
412
            LE::NAME => self.par_comp_lenders::<LittleEndian, _>(graph.split_iter(n), num_nodes),
×
NEW
413
            x => anyhow::bail!("Unknown endianness {}", x),
×
414
        }
415
    }
416

417
    /// Splits a graph at the given cutpoints and compresses it in parallel,
418
    /// returning the length in bits of the graph bitstream.
419
    ///
420
    /// The cutpoints are passed to
421
    /// [`split_iter_at`](SplitLabeling::split_iter_at).
422
    ///
423
    /// Endianness can only be [`BE::NAME`](BE) or [`LE::NAME`](LE).
424
    ///
425
    /// A given endianness is enabled only if the corresponding feature is
426
    /// enabled, `be_bins` for big endian and `le_bins` for little endian, or if
427
    /// neither features are enabled.
428
    pub fn par_comp_lenders_endianness_at<G: SplitLabeling + SequentialGraph>(
5✔
429
        &mut self,
430
        graph: &G,
431
        endianness: &str,
432
        cutpoints: impl IntoIterator<Item = usize>,
433
    ) -> Result<u64>
434
    where
435
        for<'a> <G as SplitLabeling>::SplitLender<'a>: ExactSizeLender + Send + Sync,
436
    {
437
        let num_nodes = graph.num_nodes();
15✔
438

439
        match endianness {
5✔
440
            #[cfg(any(
441
                feature = "be_bins",
442
                not(any(feature = "be_bins", feature = "le_bins"))
443
            ))]
444
            BE::NAME => {
5✔
445
                self.par_comp_lenders::<BigEndian, _>(graph.split_iter_at(cutpoints), num_nodes)
30✔
446
            }
447
            #[cfg(any(
448
                feature = "le_bins",
449
                not(any(feature = "be_bins", feature = "le_bins"))
450
            ))]
451
            LE::NAME => {
×
NEW
452
                self.par_comp_lenders::<LittleEndian, _>(graph.split_iter_at(cutpoints), num_nodes)
×
453
            }
454
            x => anyhow::bail!("Unknown endianness {}", x),
×
455
        }
456
    }
457

458
    /// Compresses a splittable sequential graph in parallel and returns the
459
    /// length in bits of the graph bitstream.
460
    ///
461
    /// Note that the number of parallel compression threads will be
462
    /// [`current_num_threads`]. The graph will be split into as many
463
    /// lenders as there are threads.
464
    pub fn par_comp_graph<E: Endianness>(
373,262✔
465
        &mut self,
466
        graph: &(impl SequentialGraph + for<'a> SplitLabeling<SplitLender<'a>: ExactSizeLender>),
467
    ) -> Result<u64>
468
    where
469
        BufBitWriter<E, WordAdapter<usize, BufWriter<std::fs::File>>>: CodesWrite<E>,
470
        BufBitReader<E, WordAdapter<u32, BufReader<std::fs::File>>>: BitRead<E>,
471
    {
472
        let num_threads = current_num_threads();
746,524✔
473
        self.par_comp_lenders(graph.split_iter(num_threads), graph.num_nodes())
2,612,834✔
474
    }
475

476
    /// Compresses multiple [`NodeLabelsLender`] in parallel and returns
477
    /// the length in bits of the graph bitstream.
478
    ///
479
    /// The first lender must start from node 0, and each lender must
480
    /// continue from where the previous lender stopped. All in all,
481
    /// they must return a total of `num_nodes` nodes.
482
    ///
483
    /// Note that the number of parallel compression threads will be
484
    /// [`current_num_threads`]. It is your responsibility to ensure that the
485
    /// number of threads is appropriate for the number of lenders you pass,
486
    /// possibly using [`install`](rayon::ThreadPool::install).
487
    ///
488
    /// This method is useful to compress graphs that can be iterated upon using
489
    /// multiple lenders, but such lenders do not derive from splitting. For
490
    /// example, this happens when using
491
    /// [`ParSortIters`].
492
    pub fn par_comp_lenders<
373,273✔
493
        E: Endianness,
494
        L: Lender + for<'next> NodeLabelsLender<'next, Label = usize> + ExactSizeLender + Send,
495
    >(
496
        &mut self,
497
        iter: impl IntoIterator<Item = L>,
498
        num_nodes: usize,
499
    ) -> Result<u64>
500
    where
501
        BufBitWriter<E, WordAdapter<usize, BufWriter<std::fs::File>>>: CodesWrite<E>,
502
        BufBitReader<E, WordAdapter<u32, BufReader<std::fs::File>>>: BitRead<E>,
503
    {
504
        let tmp_dir = self.tmp_dir()?;
1,119,819✔
505

506
        let graph_path = self.basename.with_extension(GRAPH_EXTENSION);
746,546✔
507
        let offsets_path = self.basename.with_extension(OFFSETS_EXTENSION);
746,546✔
508

509
        let (tx, rx) = crossbeam_channel::unbounded();
1,119,819✔
510

511
        let thread_path = |thread_id: usize| tmp_dir.join(format!("{thread_id:016x}.bitstream"));
4,852,543✔
512

513
        let mut comp_pl = concurrent_progress_logger![
746,546✔
514
            log_target = "webgraph::graphs::bvgraph::comp::impls::par_comp_lenders::comp",
×
515
            display_memory = true,
×
516
            item_name = "node",
×
517
            local_speed = true,
×
518
            expected_updates = Some(num_nodes),
373,273✔
519
        ];
520
        comp_pl.start(format!(
1,119,819✔
521
            "Compressing successors in parallel using {} threads...",
×
522
            current_num_threads()
373,273✔
523
        ));
524
        let mut expected_first_node = 0;
746,546✔
525
        let cp_flags = &self.comp_flags;
746,546✔
526
        let bvgraphz = self.bvgraphz;
746,546✔
527
        let chunk_size = self.chunk_size;
746,546✔
528

529
        in_place_scope(|s| {
746,546✔
530
            for (thread_id, mut thread_lender) in iter.into_iter().enumerate() {
4,105,999✔
531
                let tmp_path = thread_path(thread_id);
2,986,180✔
532
                let chunk_graph_path = tmp_path.with_extension(GRAPH_EXTENSION);
2,986,180✔
533
                let chunk_offsets_path = tmp_path.with_extension(OFFSETS_EXTENSION);
2,986,180✔
534
                let tx = tx.clone();
4,479,270✔
535
                let mut comp_pl = comp_pl.clone();
4,479,270✔
536
                let lender_len = thread_lender.len();
4,479,270✔
537
                // Spawn the thread
538
                s.spawn(move |_| {
4,479,270✔
539
                    log::debug!("Thread {thread_id} started");
1,493,090✔
540

541
                    let Some((node_id, successors)) = thread_lender.next() else {
4,479,268✔
542
                        return;
1✔
543
                    };
544

545
                    let first_node = node_id;
2,986,178✔
546
                    if first_node != expected_first_node {
1,493,089✔
547
                        panic!(
×
548
                            "Lender {} expected to start from node {} but started from {}",
×
549
                            thread_id,
×
550
                            expected_first_node,
×
551
                            first_node
×
552
                        );
553
                    }
554

555
                    let writer = buf_bit_writer::from_path::<E, usize>(&chunk_graph_path).unwrap();
5,972,356✔
556
                    let codes_encoder = <DynCodesEncoder<E, _>>::new(writer, cp_flags).unwrap();
7,465,445✔
557

558
                    let stats;
×
559
                    let mut last_node;
×
560
                    if bvgraphz {
1,493,089✔
561
                        let mut bvcomp = BvCompZ::new(
2,239,496✔
562
                            codes_encoder,
1,119,748✔
563
                            OffsetsWriter::from_path(&chunk_offsets_path, false).unwrap(),
3,359,244✔
564
                            cp_flags.compression_window,
1,119,748✔
565
                            chunk_size,
1,119,748✔
566
                            cp_flags.max_ref_count,
1,119,748✔
567
                            cp_flags.min_interval_length,
1,119,748✔
568
                            node_id,
1,119,748✔
569
                        );
570
                        bvcomp.push(successors).unwrap();
4,478,992✔
571
                        last_node = first_node;
1,119,748✔
572
                        let iter_nodes = thread_lender.inspect(|(x, _)| last_node = *x);
30,558,653✔
573
                        for_! ( (_, succ) in iter_nodes {
55,518,566✔
574
                            bvcomp.push(succ.into_iter()).unwrap();
163,196,454✔
575
                            comp_pl.update();
27,199,409✔
576
                        });
577
                        stats = bvcomp.flush().unwrap();
2,239,496✔
578
                    } else {
579
                        let mut bvcomp = BvComp::new(
746,682✔
580
                            codes_encoder,
373,341✔
581
                            OffsetsWriter::from_path(&chunk_offsets_path, false).unwrap(),
1,120,023✔
582
                            cp_flags.compression_window,
373,341✔
583
                            cp_flags.max_ref_count,
373,341✔
584
                            cp_flags.min_interval_length,
373,341✔
585
                            node_id,
373,341✔
586
                        );
587
                        bvcomp.push(successors).unwrap();
1,493,364✔
588
                        last_node = first_node;
373,341✔
589
                        let iter_nodes = thread_lender.inspect(|(x, _)| last_node = *x);
16,589,039✔
590
                        for_! ( (_, succ) in iter_nodes {
31,311,373✔
591
                            bvcomp.push(succ.into_iter()).unwrap();
92,814,096✔
592
                            comp_pl.update();
15,469,016✔
593
                        });
594
                        stats = bvcomp.flush().unwrap();
746,682✔
595
                    }
596

597
                    log::debug!(
1,493,089✔
598
                        "Finished Compression thread {thread_id} and wrote {} bits for the graph and {} bits for the offsets",
×
599
                        stats.written_bits, stats.offsets_written_bits,
×
600
                    );
601
                    tx.send(Job {
4,479,267✔
602
                        job_id: thread_id,
2,986,178✔
603
                        first_node,
2,986,178✔
604
                        last_node,
2,986,178✔
605
                        chunk_graph_path,
2,986,178✔
606
                        written_bits: stats.written_bits,
2,986,178✔
607
                        chunk_offsets_path,
2,986,178✔
608
                        offsets_written_bits: stats.offsets_written_bits,
1,493,089✔
609
                        num_arcs: stats.num_arcs,
1,493,089✔
610
                    })
611
                    .ok(); // If channel is closed, main thread already has an error
1,493,089✔
612
                });
613

614
                expected_first_node += lender_len;
1,493,090✔
615
            }
616

617
            if num_nodes != expected_first_node {
373,273✔
618
                panic!(
×
619
                    "The lenders were supposed to return {} nodes but returned {} instead",
×
620
                    num_nodes, expected_first_node
×
621
                );
622
            }
623

624
            drop(tx);
746,546✔
625

626
            let mut copy_pl = progress_logger![
746,546✔
627
                log_target = "webgraph::graphs::bvgraph::comp::impls::par_comp_lenders::copy",
×
628
                display_memory = true,
×
629
                item_name = "node",
×
630
                local_speed = true,
×
631
                expected_updates = Some(num_nodes),
373,273✔
632
            ];
633
            copy_pl.start("Copying compressed successors to final graph");
746,546✔
634

635
            let mut graph_writer = buf_bit_writer::from_path::<E, usize>(&graph_path)
1,119,819✔
636
                .with_context(|| format!("Could not create graph {}", graph_path.display()))?;
373,273✔
637

638
            let mut offsets_writer = buf_bit_writer::from_path::<BE, usize>(&offsets_path)
1,119,819✔
639
                .with_context(|| format!("Could not create offsets {}", offsets_path.display()))?;
373,273✔
640
            offsets_writer.write_gamma(0)?;
746,546✔
641

642
            let mut total_written_bits: u64 = 0;
1,119,819✔
643
            let mut total_offsets_written_bits: u64 = 0;
1,119,819✔
644
            let mut total_arcs: u64 = 0;
1,119,819✔
645

646
            let mut next_node = 0;
746,546✔
647
            // glue together the bitstreams as they finish, this allows us to do
648
            // task pipelining for better performance
649
            for Job {
×
650
                job_id,
1,493,089✔
651
                first_node,
1,493,089✔
652
                last_node,
1,493,089✔
653
                chunk_graph_path,
1,493,089✔
654
                written_bits,
1,493,089✔
655
                chunk_offsets_path,
1,493,089✔
656
                offsets_written_bits,
1,493,089✔
657
                num_arcs,
1,493,089✔
658
            } in TaskQueue::new(rx.into_rayon_iter())
1,119,819✔
659
            {
660
                ensure!(
1,493,089✔
661
                    first_node == next_node,
1,493,089✔
662
                    "Non-adjacent lenders: lender {} has first node {} instead of {}",
×
663
                    job_id,
×
664
                    first_node,
×
665
                    next_node
×
666
                );
667

668
                next_node = last_node + 1;
1,493,089✔
669
                total_arcs += num_arcs;
1,493,089✔
670
                log::debug!(
1,493,089✔
671
                    "Copying {} [{}..{}) bits from {} to {}",
×
672
                    written_bits,
×
673
                    total_written_bits,
×
674
                    total_written_bits + written_bits,
36✔
675
                    chunk_graph_path.display(),
36✔
676
                    graph_path.display()
36✔
677
                );
678
                total_written_bits += written_bits;
1,493,089✔
679
                let mut reader = buf_bit_reader::from_path::<E, u32>(&chunk_graph_path)?;
4,479,267✔
680
                graph_writer
1,493,089✔
681
                    .copy_from(&mut reader, written_bits)
4,479,267✔
682
                    .with_context(|| {
1,493,089✔
683
                        format!(
×
684
                            "Could not copy from {} to {}",
×
685
                            chunk_graph_path.display(),
×
686
                            graph_path.display()
×
687
                        )
688
                    })?;
689
                std::fs::remove_file(chunk_graph_path)?;
2,986,178✔
690

691
                log::debug!(
1,493,089✔
692
                    "Copying offsets {} [{}..{}) bits from {} to {}",
×
693
                    offsets_written_bits,
×
694
                    total_offsets_written_bits,
×
695
                    total_offsets_written_bits + offsets_written_bits,
36✔
696
                    chunk_offsets_path.display(),
36✔
697
                    offsets_path.display()
36✔
698
                );
699
                total_offsets_written_bits += offsets_written_bits;
1,493,089✔
700

701
                let mut reader = <BufBitReader<BigEndian, _>>::new(<WordAdapter<u32, _>>::new(
4,479,267✔
702
                    BufReader::new(File::open(&chunk_offsets_path).with_context(|| {
5,972,356✔
703
                        format!("Could not open {}", chunk_offsets_path.display())
×
704
                    })?),
705
                ));
706
                offsets_writer
1,493,089✔
707
                    .copy_from(&mut reader, offsets_written_bits)
4,479,267✔
708
                    .with_context(|| {
1,493,089✔
709
                        format!(
×
710
                            "Could not copy from {} to {}",
×
711
                            chunk_offsets_path.display(),
×
712
                            offsets_path.display()
×
713
                        )
714
                    })?;
715
                std::fs::remove_file(chunk_offsets_path)?;
2,986,178✔
716

717
                copy_pl.update_with_count(last_node - first_node + 1);
4,479,267✔
718
            }
719

720
            log::info!("Flushing the merged bitstreams");
373,273✔
721
            graph_writer.flush()?;
746,546✔
722
            BitWrite::flush(&mut offsets_writer)?;
746,546✔
723

724
            comp_pl.done();
746,546✔
725
            copy_pl.done();
746,546✔
726

727
            log::info!("Writing the .properties file");
373,273✔
728
            let properties = self
746,546✔
729
                .comp_flags
373,273✔
730
                .to_properties::<E>(num_nodes, total_arcs, total_written_bits)
1,493,092✔
731
                .context("Could not serialize properties")?;
373,273✔
732
            let properties_path = self.basename.with_extension(PROPERTIES_EXTENSION);
746,546✔
733
            std::fs::write(&properties_path, properties).with_context(|| {
1,493,092✔
734
                format!(
×
735
                    "Could not write properties to {}",
×
736
                    properties_path.display()
×
737
                )
738
            })?;
739

740
            log::info!(
373,273✔
741
                "Compressed {} arcs into {} bits for {:.4} bits/arc",
×
742
                total_arcs,
×
743
                total_written_bits,
×
744
                total_written_bits as f64 / total_arcs as f64
373,267✔
745
            );
746
            log::info!(
373,273✔
747
                "Created offsets file with {} bits for {:.4} bits/node",
×
748
                total_offsets_written_bits,
×
749
                total_offsets_written_bits as f64 / num_nodes as f64
373,267✔
750
            );
751

752
            // cleanup the temp files
753
            std::fs::remove_dir_all(&tmp_dir).with_context(|| {
1,119,819✔
754
                format!("Could not clean temporary directory {}", tmp_dir.display())
×
755
            })?;
756
            Ok(total_written_bits)
373,273✔
757
        })
758
    }
759
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc