• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vigna / webgraph-rs / 20724877148

05 Jan 2026 06:16PM UTC coverage: 62.048% (-0.02%) from 62.07%
20724877148

push

github

web-flow
Merge pull request #160 from progval/patch-2

Fix typo in docstring

5441 of 8769 relevant lines covered (62.05%)

43310641.57 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.93
/webgraph/src/graphs/bvgraph/comp/impls.rs
1
/*
2
 * SPDX-FileCopyrightText: 2023 Inria
3
 * SPDX-FileCopyrightText: 2023 Sebastiano Vigna
4
 *
5
 * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
6
 */
7

8
use crate::prelude::*;
9
use anyhow::{Context, Result, ensure};
10
use dsi_bitstream::prelude::*;
11
use dsi_progress_logger::prelude::*;
12
use lender::prelude::*;
13
use rayon::{current_num_threads, in_place_scope};
14
use std::fs::File;
15
use std::io::Write;
16
use std::io::{BufReader, BufWriter};
17
use std::path::{Path, PathBuf};
18

19
/// A queue that pulls jobs with ids in a contiguous initial segment of the
20
/// natural numbers from an iterator out of order and implement an iterator in
21
/// which they can be pulled in order.
22
///
23
/// Jobs must be ordered by their job id, and must implement [`Eq`] with a
24
/// [`usize`] using their job id.
25
struct TaskQueue<I: Iterator> {
26
    iter: I,
27
    jobs: Vec<Option<I::Item>>,
28
    next_id: usize,
29
}
30

31
trait JobId {
32
    fn id(&self) -> usize;
33
}
34

35
impl<I: Iterator> TaskQueue<I> {
36
    fn new(iter: I) -> Self {
497,710✔
37
        Self {
38
            iter,
39
            jobs: vec![],
497,710✔
40
            next_id: 0,
41
        }
42
    }
43
}
44

45
impl<I: Iterator> Iterator for TaskQueue<I>
46
where
47
    I::Item: JobId,
48
{
49
    type Item = I::Item;
50

51
    fn next(&mut self) -> Option<Self::Item> {
2,488,550✔
52
        loop {
×
53
            if let Some(item) = self.jobs.get_mut(self.next_id) {
11,354,020✔
54
                if item.is_some() {
4,790,480✔
55
                    self.next_id += 1;
1,990,840✔
56
                    return item.take();
3,981,680✔
57
                }
58
            }
59
            if let Some(item) = self.iter.next() {
4,479,390✔
60
                let id = item.id();
5,972,520✔
61
                if id >= self.jobs.len() {
3,981,680✔
62
                    self.jobs.resize_with(id + 1, || None);
4,943,142✔
63
                }
64
                self.jobs[id] = Some(item);
3,981,680✔
65
            } else {
66
                return None;
497,710✔
67
            }
68
        }
69
    }
70
}
71

72
/// A compression job.
73
#[derive(Debug, PartialEq, PartialOrd, Eq, Ord, Clone)]
74
struct Job {
75
    job_id: usize,
76
    first_node: usize,
77
    last_node: usize,
78
    chunk_graph_path: PathBuf,
79
    written_bits: u64,
80
    chunk_offsets_path: PathBuf,
81
    offsets_written_bits: u64,
82
    num_arcs: u64,
83
}
84

85
impl JobId for Job {
86
    fn id(&self) -> usize {
1,990,840✔
87
        self.job_id
1,990,840✔
88
    }
89
}
90

91
/// A writer for offsets.
92
///
93
/// TODO: This currently uses Write which requires std. To support no_std we will want to make W a WordWriter
94
#[derive(Debug)]
95
#[repr(transparent)]
96
pub struct OffsetsWriter<W: Write> {
97
    buffer: BufBitWriter<BigEndian, WordAdapter<usize, BufWriter<W>>>,
98
}
99

100
impl OffsetsWriter<File> {
101
    /// Creates a new writer and writes the first offset value (0) if requested.
102
    ///
103
    /// Usually, parallel compressor will write autonomously the first offset
104
    /// when copying the partial offsets files into the final offsets file.
105
    pub fn from_path(path: impl AsRef<Path>, write_zero: bool) -> Result<Self> {
2,488,524✔
106
        let file = std::fs::File::create(&path)
7,465,572✔
107
            .with_context(|| format!("Could not create {}", path.as_ref().display()))?;
2,488,524✔
108
        Self::from_write(file, write_zero)
7,465,572✔
109
    }
110
}
111

112
impl<W: Write> OffsetsWriter<W> {
113
    /// Creates a new writer and writes the first offset value (0).
114
    pub fn from_write(writer: W, write_zero: bool) -> Result<Self> {
2,495,476✔
115
        let mut buffer = BufBitWriter::new(WordAdapter::new(BufWriter::new(writer)));
12,477,380✔
116
        if write_zero {
2,495,476✔
117
            // the first offset (of the first parallel offsets file) is always zero
118
            buffer.write_gamma(0)?;
1,009,272✔
119
        }
120
        Ok(Self { buffer })
2,495,476✔
121
    }
122

123
    /// Pushes a new delta offset.
124
    pub fn push(&mut self, delta: u64) -> Result<usize> {
121,604,026✔
125
        Ok(self.buffer.write_gamma(delta)?)
364,812,078✔
126
    }
127

128
    /// Flushes the buffer.
129
    pub fn flush(&mut self) -> Result<()> {
2,495,476✔
130
        BitWrite::flush(&mut self.buffer)? as u64;
4,990,952✔
131
        Ok(())
2,495,476✔
132
    }
133
}
134

135
/// A configuration for BvGraph compression.
136
///
137
/// Once set up, the methods [`comp_graph`](Self::comp_graph) (sequential graph
138
/// compression), [`comp_lender`](Self::comp_lender) (sequential lender
139
/// compression), [`par_comp_graph`](Self::par_comp_graph) (parallel compression
140
/// of a splittable graph), and [`par_comp_lenders`](Self::par_comp_lenders)
141
/// (parallel compression of multiple lenders) can be used to compress a graph.
142
#[derive(Debug)]
143
pub struct BvCompConfig {
144
    /// The basename of the output files.
145
    basename: PathBuf,
146
    /// Compression flags for BvComp/BvCompZ.
147
    comp_flags: CompFlags,
148
    /// Selects the Zuckerli-based BVGraph compressor
149
    bvgraphz: bool,
150
    /// The chunk size for the Zuckerli-based compressor
151
    chunk_size: usize,
152
    /// Temporary directory for all operations.
153
    tmp_dir: Option<PathBuf>,
154
    /// Owns the TempDir that [`Self::tmp_dir`] refers to, if it was created by default.
155
    owned_tmp_dir: Option<tempfile::TempDir>,
156
}
157

158
impl BvCompConfig {
159
    /// Creates a new compression configuration with the given basename and
160
    /// default options.
161
    ///
162
    /// Note that the convenience methods
163
    /// [`BvComp::with_basename`](crate::graphs::bvgraph::comp::BvComp::with_basename)
164
    /// and
165
    /// [`BvCompZ::with_basename`](crate::graphs::bvgraph::comp::BvCompZ::with_basename)
166
    /// can be used to create a configuration with default options.
167
    pub fn new(basename: impl AsRef<Path>) -> Self {
124,482✔
168
        Self {
169
            basename: basename.as_ref().into(),
373,446✔
170
            comp_flags: CompFlags::default(),
248,964✔
171
            bvgraphz: false,
172
            chunk_size: 10_000,
173
            tmp_dir: None,
174
            owned_tmp_dir: None,
175
        }
176
    }
177
}
178

179
impl BvCompConfig {
180
    pub fn with_comp_flags(mut self, compression_flags: CompFlags) -> Self {
870,985✔
181
        self.comp_flags = compression_flags;
870,985✔
182
        self
870,985✔
183
    }
184

185
    pub fn with_tmp_dir(mut self, tmp_dir: impl AsRef<Path>) -> Self {
8✔
186
        self.tmp_dir = Some(tmp_dir.as_ref().into());
16✔
187
        self
8✔
188
    }
189

190
    pub fn with_bvgraphz(mut self) -> Self {
373,259✔
191
        self.bvgraphz = true;
373,259✔
192
        self
373,259✔
193
    }
194

195
    pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
373,248✔
196
        self.bvgraphz = true;
373,248✔
197
        self.chunk_size = chunk_size;
373,248✔
198
        self
373,248✔
199
    }
200

201
    fn tmp_dir(&mut self) -> Result<PathBuf> {
497,710✔
202
        if self.tmp_dir.is_none() {
995,420✔
203
            let tmp_dir = tempfile::tempdir()?;
995,404✔
204
            self.tmp_dir = Some(tmp_dir.path().to_owned());
995,404✔
205
            self.owned_tmp_dir = Some(tmp_dir);
995,404✔
206
        }
207

208
        let tmp_dir = self.tmp_dir.clone().unwrap();
1,990,840✔
209
        if !std::fs::exists(&tmp_dir)
995,420✔
210
            .with_context(|| format!("Could not check whether {} exists", tmp_dir.display()))?
497,710✔
211
        {
212
            std::fs::create_dir_all(&tmp_dir)
×
213
                .with_context(|| format!("Could not create {}", tmp_dir.display()))?;
×
214
        }
215
        Ok(tmp_dir)
497,710✔
216
    }
217

218
    /// Compresses sequentially a [`SequentialGraph`] and returns
219
    /// the number of bits written to the graph bitstream.
220
    pub fn comp_graph<E: Endianness>(&mut self, graph: impl SequentialGraph) -> Result<u64>
311,060✔
221
    where
222
        BufBitWriter<E, WordAdapter<usize, BufWriter<File>>>: CodesWrite<E>,
223
    {
224
        self.comp_lender::<E, _>(graph.iter(), Some(graph.num_nodes()))
1,555,300✔
225
    }
226

227
    /// Compresses sequentially a [`NodeLabelsLender`] and returns
228
    /// the number of bits written to the graph bitstream.
229
    ///
230
    /// The optional `expected_num_nodes` parameter will be used to provide
231
    /// forecasts on the progress logger.
232
    pub fn comp_lender<E, L>(&mut self, iter: L, expected_num_nodes: Option<usize>) -> Result<u64>
311,060✔
233
    where
234
        E: Endianness,
235
        L: IntoLender,
236
        L::Lender: for<'next> NodeLabelsLender<'next, Label = usize>,
237
        BufBitWriter<E, WordAdapter<usize, BufWriter<File>>>: CodesWrite<E>,
238
    {
239
        let graph_path = self.basename.with_extension(GRAPH_EXTENSION);
622,120✔
240

241
        // Compress the graph
242
        let bit_write = buf_bit_writer::from_path::<E, usize>(&graph_path)
933,180✔
243
            .with_context(|| format!("Could not create {}", graph_path.display()))?;
311,060✔
244

245
        let codes_writer = DynCodesEncoder::new(bit_write, &self.comp_flags)?;
1,244,240✔
246

247
        // create a file for offsets
248
        let offsets_path = self.basename.with_extension(OFFSETS_EXTENSION);
622,120✔
249
        let offset_writer = OffsetsWriter::from_path(offsets_path, true)?;
933,180✔
250

251
        let mut pl = progress_logger![
622,120✔
252
            display_memory = true,
×
253
            item_name = "node",
×
254
            expected_updates = expected_num_nodes,
311,060✔
255
        ];
256
        pl.start("Compressing successors...");
622,120✔
257
        let comp_stats = if self.bvgraphz {
622,120✔
258
            let mut bvcompz = BvCompZ::new(
259
                codes_writer,
233,290✔
260
                offset_writer,
233,290✔
261
                self.comp_flags.compression_window,
233,290✔
262
                self.chunk_size,
233,290✔
263
                self.comp_flags.max_ref_count,
233,290✔
264
                self.comp_flags.min_interval_length,
233,290✔
265
                0,
266
            );
267

268
            for_! ( (_node_id, successors) in iter {
53,400,430✔
269
                bvcompz.push(successors).context("Could not push successors")?;
106,334,280✔
270
                pl.update();
53,167,140✔
271
            });
272
            pl.done();
466,580✔
273

274
            bvcompz.flush()?
466,580✔
275
        } else {
276
            let mut bvcomp = BvComp::new(
277
                codes_writer,
77,770✔
278
                offset_writer,
77,770✔
279
                self.comp_flags.compression_window,
77,770✔
280
                self.comp_flags.max_ref_count,
77,770✔
281
                self.comp_flags.min_interval_length,
77,770✔
282
                0,
283
            );
284

285
            for_! ( (_node_id, successors) in iter {
22,140,910✔
286
                bvcomp.push(successors).context("Could not push successors")?;
44,126,280✔
287
                pl.update();
22,063,140✔
288
            });
289
            pl.done();
155,540✔
290

291
            bvcomp.flush()?
155,540✔
292
        };
293

294
        if let Some(num_nodes) = expected_num_nodes {
622,120✔
295
            if num_nodes != comp_stats.num_nodes {
311,060✔
296
                log::warn!(
×
297
                    "The expected number of nodes is {num_nodes} but the actual number of nodes is {}",
×
298
                    comp_stats.num_nodes,
×
299
                );
300
            }
301
        }
302

303
        log::info!("Writing the .properties file");
311,060✔
304
        let properties = self
622,120✔
305
            .comp_flags
311,060✔
306
            .to_properties::<E>(
307
                comp_stats.num_nodes,
311,060✔
308
                comp_stats.num_arcs,
311,060✔
309
                comp_stats.written_bits,
311,060✔
310
            )
311
            .context("Could not serialize properties")?;
312
        let properties_path = self.basename.with_extension(PROPERTIES_EXTENSION);
622,120✔
313
        std::fs::write(&properties_path, properties)
933,180✔
314
            .with_context(|| format!("Could not write {}", properties_path.display()))?;
311,060✔
315

316
        Ok(comp_stats.written_bits)
311,060✔
317
    }
318

319
    /// A wrapper over [`par_comp_lenders`](Self::par_comp_lenders) that takes the
320
    /// endianness as a string.
321
    ///
322
    /// Endianness can only be [`BE::NAME`](BE) or [`LE::NAME`](LE).
323
    ///
324
    ///  A given endianness is enabled only if the corresponding feature is
325
    /// enabled, `be_bins` for big endian and `le_bins` for little endian, or if
326
    /// neither features are enabled.
327
    pub fn par_comp_lenders_endianness<G: SplitLabeling + SequentialGraph>(
8✔
328
        &mut self,
329
        graph: &G,
330
        num_nodes: usize,
331
        endianness: &str,
332
    ) -> Result<u64>
333
    where
334
        for<'a> <G as SplitLabeling>::SplitLender<'a>: ExactSizeLender + Send + Sync,
335
    {
336
        let num_threads = current_num_threads();
16✔
337

338
        match endianness {
8✔
339
            #[cfg(any(
340
                feature = "be_bins",
341
                not(any(feature = "be_bins", feature = "le_bins"))
342
            ))]
343
            BE::NAME => {
8✔
344
                // compress the transposed graph
345
                self.par_comp_lenders::<BigEndian, _>(graph.split_iter(num_threads), num_nodes)
48✔
346
            }
347
            #[cfg(any(
348
                feature = "le_bins",
349
                not(any(feature = "be_bins", feature = "le_bins"))
350
            ))]
351
            LE::NAME => {
×
352
                // compress the transposed graph
353
                self.par_comp_lenders::<LittleEndian, _>(graph.split_iter(num_threads), num_nodes)
×
354
            }
355
            x => anyhow::bail!("Unknown endianness {}", x),
×
356
        }
357
    }
358

359
    /// Compresses a splittable sequential graph in parallel and returns the
360
    /// length in bits of the graph bitstream.
361
    ///
362
    /// Note that the number of parallel compression threads will be
363
    /// [`current_num_threads`]. The graph will be split into as many
364
    /// lenders as there are threads.
365
    pub fn par_comp_graph<E: Endianness>(
311,051✔
366
        &mut self,
367
        graph: &(impl SequentialGraph + for<'a> SplitLabeling<SplitLender<'a>: ExactSizeLender>),
368
    ) -> Result<u64>
369
    where
370
        BufBitWriter<E, WordAdapter<usize, BufWriter<std::fs::File>>>: CodesWrite<E>,
371
        BufBitReader<E, WordAdapter<u32, BufReader<std::fs::File>>>: BitRead<E>,
372
    {
373
        let num_threads = current_num_threads();
622,102✔
374
        self.par_comp_lenders(graph.split_iter(num_threads), graph.num_nodes())
2,177,357✔
375
    }
376

377
    /// Compresses multiple [`NodeLabelsLender`] in parallel and returns
378
    /// a [`CompStats`] instance.
379
    ///
380
    /// The first lender must start from node 0, and each lender must
381
    /// continue from where the previous lender stopped. All in all,
382
    /// they must return a total of `num_nodes` nodes.
383
    ///
384
    /// Note that the number of parallel compression threads will be
385
    /// [`current_num_threads`]. It is your responsibility to ensure that the
386
    /// number of threads is appropriate for the number of lenders you pass,
387
    /// possibly using [`install`](rayon::ThreadPool::install).
388
    ///
389
    /// This method is useful to compress graphs that can be iterated upon using
390
    /// multiple lenders, but such lenders do not derive from splitting. For
391
    /// example, this happens when using
392
    /// [`ParSortIters`].
393
    pub fn par_comp_lenders<
311,059✔
394
        E: Endianness,
395
        L: Lender + for<'next> NodeLabelsLender<'next, Label = usize> + ExactSizeLender + Send,
396
    >(
397
        &mut self,
398
        iter: impl IntoIterator<Item = L>,
399
        num_nodes: usize,
400
    ) -> Result<u64>
401
    where
402
        BufBitWriter<E, WordAdapter<usize, BufWriter<std::fs::File>>>: CodesWrite<E>,
403
        BufBitReader<E, WordAdapter<u32, BufReader<std::fs::File>>>: BitRead<E>,
404
    {
405
        let tmp_dir = self.tmp_dir()?;
933,177✔
406

407
        let graph_path = self.basename.with_extension(GRAPH_EXTENSION);
622,118✔
408
        let offsets_path = self.basename.with_extension(OFFSETS_EXTENSION);
622,118✔
409

410
        let (tx, rx) = crossbeam_channel::unbounded();
933,177✔
411

412
        let thread_path = |thread_id: usize| tmp_dir.join(format!("{thread_id:016x}.bitstream"));
4,043,767✔
413

414
        let mut comp_pl = concurrent_progress_logger![
622,118✔
415
            log_target = "webgraph::graphs::bvgraph::comp::impls::par_comp_lenders::comp",
×
416
            display_memory = true,
×
417
            item_name = "node",
×
418
            local_speed = true,
×
419
            expected_updates = Some(num_nodes),
311,059✔
420
        ];
421
        comp_pl.start("Compressing successors in parallel...");
622,118✔
422
        let mut expected_first_node = 0;
622,118✔
423
        let cp_flags = &self.comp_flags;
622,118✔
424
        let bvgraphz = self.bvgraphz;
622,118✔
425
        let chunk_size = self.chunk_size;
622,118✔
426

427
        in_place_scope(|s| {
622,118✔
428
            for (thread_id, mut thread_lender) in iter.into_iter().enumerate() {
3,421,649✔
429
                let tmp_path = thread_path(thread_id);
2,488,472✔
430
                let chunk_graph_path = tmp_path.with_extension(GRAPH_EXTENSION);
2,488,472✔
431
                let chunk_offsets_path = tmp_path.with_extension(OFFSETS_EXTENSION);
2,488,472✔
432
                let tx = tx.clone();
3,732,708✔
433
                let mut comp_pl = comp_pl.clone();
3,732,708✔
434
                let lender_len = thread_lender.len();
3,732,708✔
435
                // Spawn the thread
436
                s.spawn(move |_| {
3,732,708✔
437
                    log::debug!("Thread {thread_id} started");
1,244,236✔
438

439
                    let Some((node_id, successors)) = thread_lender.next() else {
3,732,708✔
440
                        return;
×
441
                    };
442

443
                    let first_node = node_id;
2,488,472✔
444
                    if first_node != expected_first_node {
1,244,236✔
445
                        panic!(
×
446
                            "Lender {} expected to start from node {} but started from {}",
×
447
                            thread_id,
×
448
                            expected_first_node,
×
449
                            first_node
×
450
                        );
451
                    }
452

453
                    let writer = buf_bit_writer::from_path::<E, usize>(&chunk_graph_path).unwrap();
4,976,944✔
454
                    let codes_encoder = <DynCodesEncoder<E, _>>::new(writer, cp_flags).unwrap();
6,221,180✔
455

456
                    let stats;
×
457
                    let mut last_node;
×
458
                    if bvgraphz {
1,244,236✔
459
                        let mut bvcomp = BvCompZ::new(
1,866,248✔
460
                            codes_encoder,
933,124✔
461
                            OffsetsWriter::from_path(&chunk_offsets_path, false).unwrap(),
2,799,372✔
462
                            cp_flags.compression_window,
933,124✔
463
                            chunk_size,
933,124✔
464
                            cp_flags.max_ref_count,
933,124✔
465
                            cp_flags.min_interval_length,
933,124✔
466
                            node_id,
933,124✔
467
                        );
468
                        bvcomp.push(successors).unwrap();
3,732,496✔
469
                        last_node = first_node;
933,124✔
470
                        let iter_nodes = thread_lender.inspect(|(x, _)| last_node = *x);
25,519,805✔
471
                        for_! ( (_, succ) in iter_nodes {
46,373,990✔
472
                            bvcomp.push(succ.into_iter()).unwrap();
136,322,598✔
473
                            comp_pl.update();
22,720,433✔
474
                        });
475
                        stats = bvcomp.flush().unwrap();
1,866,248✔
476
                    } else {
477
                        let mut bvcomp = BvComp::new(
622,224✔
478
                            codes_encoder,
311,112✔
479
                            OffsetsWriter::from_path(&chunk_offsets_path, false).unwrap(),
933,336✔
480
                            cp_flags.compression_window,
311,112✔
481
                            cp_flags.max_ref_count,
311,112✔
482
                            cp_flags.min_interval_length,
311,112✔
483
                            node_id,
311,112✔
484
                        );
485
                        bvcomp.push(successors).unwrap();
1,244,448✔
486
                        last_node = first_node;
311,112✔
487
                        let iter_nodes = thread_lender.inspect(|(x, _)| last_node = *x);
14,258,250✔
488
                        for_! ( (_, succ) in iter_nodes {
26,960,940✔
489
                            bvcomp.push(succ.into_iter()).unwrap();
79,949,484✔
490
                            comp_pl.update();
13,324,914✔
491
                        });
492
                        stats = bvcomp.flush().unwrap();
622,224✔
493
                    }
494

495
                    log::debug!(
1,244,236✔
496
                        "Finished Compression thread {thread_id} and wrote {} bits for the graph and {} bits for the offsets",
×
497
                        stats.written_bits, stats.offsets_written_bits,
×
498
                    );
499
                    tx.send(Job {
3,732,708✔
500
                        job_id: thread_id,
2,488,472✔
501
                        first_node,
2,488,472✔
502
                        last_node,
2,488,472✔
503
                        chunk_graph_path,
2,488,472✔
504
                        written_bits: stats.written_bits,
2,488,472✔
505
                        chunk_offsets_path,
2,488,472✔
506
                        offsets_written_bits: stats.offsets_written_bits,
1,244,236✔
507
                        num_arcs: stats.num_arcs,
1,244,236✔
508
                    })
509
                    .unwrap()
1,244,236✔
510
                });
511

512
                expected_first_node += lender_len;
1,244,236✔
513
            }
514

515
            if num_nodes != expected_first_node {
311,059✔
516
                panic!(
×
517
                    "The lenders were supposed to return {} nodes but returned {} instead",
×
518
                    num_nodes, expected_first_node
×
519
                );
520
            }
521

522
            drop(tx);
622,118✔
523

524
            let mut copy_pl = progress_logger![
622,118✔
525
                log_target = "webgraph::graphs::bvgraph::comp::impls::par_comp_lenders::copy",
×
526
                display_memory = true,
×
527
                item_name = "node",
×
528
                local_speed = true,
×
529
                expected_updates = Some(num_nodes),
311,059✔
530
            ];
531
            copy_pl.start("Copying compressed successors to final graph");
622,118✔
532

533
            let mut graph_writer = buf_bit_writer::from_path::<E, usize>(&graph_path)
933,177✔
534
                .with_context(|| format!("Could not create graph {}", graph_path.display()))?;
311,059✔
535

536
            let mut offsets_writer = buf_bit_writer::from_path::<BE, usize>(&offsets_path)
933,177✔
537
                .with_context(|| format!("Could not create offsets {}", offsets_path.display()))?;
311,059✔
538
            offsets_writer.write_gamma(0)?;
622,118✔
539

540
            let mut total_written_bits: u64 = 0;
933,177✔
541
            let mut total_offsets_written_bits: u64 = 0;
933,177✔
542
            let mut total_arcs: u64 = 0;
933,177✔
543

544
            let mut next_node = 0;
622,118✔
545
            // glue together the bitstreams as they finish, this allows us to do
546
            // task pipelining for better performance
547
            for Job {
×
548
                job_id,
1,244,236✔
549
                first_node,
1,244,236✔
550
                last_node,
1,244,236✔
551
                chunk_graph_path,
1,244,236✔
552
                written_bits,
1,244,236✔
553
                chunk_offsets_path,
1,244,236✔
554
                offsets_written_bits,
1,244,236✔
555
                num_arcs,
1,244,236✔
556
            } in TaskQueue::new(rx.into_rayon_iter())
933,177✔
557
            {
558
                ensure!(
1,244,236✔
559
                    first_node == next_node,
1,244,236✔
560
                    "Non-adjacent lenders: lender {} has first node {} instead of {}",
×
561
                    job_id,
×
562
                    first_node,
×
563
                    next_node
×
564
                );
565

566
                next_node = last_node + 1;
1,244,236✔
567
                total_arcs += num_arcs;
1,244,236✔
568
                log::debug!(
1,244,236✔
569
                    "Copying {} [{}..{}) bits from {} to {}",
×
570
                    written_bits,
×
571
                    total_written_bits,
×
572
                    total_written_bits + written_bits,
36✔
573
                    chunk_graph_path.display(),
36✔
574
                    graph_path.display()
36✔
575
                );
576
                total_written_bits += written_bits;
1,244,236✔
577
                let mut reader = buf_bit_reader::from_path::<E, u32>(&chunk_graph_path)?;
3,732,708✔
578
                graph_writer
1,244,236✔
579
                    .copy_from(&mut reader, written_bits)
3,732,708✔
580
                    .with_context(|| {
1,244,236✔
581
                        format!(
×
582
                            "Could not copy from {} to {}",
×
583
                            chunk_graph_path.display(),
×
584
                            graph_path.display()
×
585
                        )
586
                    })?;
587

588
                log::debug!(
1,244,236✔
589
                    "Copying offsets {} [{}..{}) bits from {} to {}",
×
590
                    offsets_written_bits,
×
591
                    total_offsets_written_bits,
×
592
                    total_offsets_written_bits + offsets_written_bits,
36✔
593
                    chunk_offsets_path.display(),
36✔
594
                    offsets_path.display()
36✔
595
                );
596
                total_offsets_written_bits += offsets_written_bits;
1,244,236✔
597

598
                let mut reader = <BufBitReader<BigEndian, _>>::new(<WordAdapter<u32, _>>::new(
3,732,708✔
599
                    BufReader::new(File::open(&chunk_offsets_path).with_context(|| {
4,976,944✔
600
                        format!("Could not open {}", chunk_offsets_path.display())
×
601
                    })?),
602
                ));
603
                offsets_writer
1,244,236✔
604
                    .copy_from(&mut reader, offsets_written_bits)
3,732,708✔
605
                    .with_context(|| {
1,244,236✔
606
                        format!(
×
607
                            "Could not copy from {} to {}",
×
608
                            chunk_offsets_path.display(),
×
609
                            offsets_path.display()
×
610
                        )
611
                    })?;
612

613
                copy_pl.update_with_count(last_node - first_node + 1);
3,732,708✔
614
            }
615

616
            log::info!("Flushing the merged bitstreams");
311,059✔
617
            graph_writer.flush()?;
622,118✔
618
            BitWrite::flush(&mut offsets_writer)?;
622,118✔
619

620
            comp_pl.done();
622,118✔
621
            copy_pl.done();
622,118✔
622

623
            log::info!("Writing the .properties file");
311,059✔
624
            let properties = self
622,118✔
625
                .comp_flags
311,059✔
626
                .to_properties::<E>(num_nodes, total_arcs, total_written_bits)
1,244,236✔
627
                .context("Could not serialize properties")?;
311,059✔
628
            let properties_path = self.basename.with_extension(PROPERTIES_EXTENSION);
622,118✔
629
            std::fs::write(&properties_path, properties).with_context(|| {
1,244,236✔
630
                format!(
×
631
                    "Could not write properties to {}",
×
632
                    properties_path.display()
×
633
                )
634
            })?;
635

636
            log::info!(
311,059✔
637
                "Compressed {} arcs into {} bits for {:.4} bits/arc",
×
638
                total_arcs,
×
639
                total_written_bits,
×
640
                total_written_bits as f64 / total_arcs as f64
311,057✔
641
            );
642
            log::info!(
311,059✔
643
                "Created offsets file with {} bits for {:.4} bits/node",
×
644
                total_offsets_written_bits,
×
645
                total_offsets_written_bits as f64 / num_nodes as f64
311,057✔
646
            );
647

648
            // cleanup the temp files
649
            std::fs::remove_dir_all(&tmp_dir).with_context(|| {
933,177✔
650
                format!("Could not clean temporary directory {}", tmp_dir.display())
×
651
            })?;
652
            Ok(total_written_bits)
311,059✔
653
        })
654
    }
655
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc