• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vigna / webgraph-rs / 20017908129

08 Dec 2025 05:36AM UTC coverage: 62.065% (+0.4%) from 61.641%
20017908129

push

github

zommiommy
Fix doctests

5435 of 8757 relevant lines covered (62.06%)

46674689.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.32
/webgraph/src/graphs/bvgraph/comp/impls.rs
1
/*
2
 * SPDX-FileCopyrightText: 2023 Inria
3
 * SPDX-FileCopyrightText: 2023 Sebastiano Vigna
4
 *
5
 * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
6
 */
7

8
use crate::prelude::*;
9
use anyhow::{Context, Result, ensure};
10
use dsi_bitstream::prelude::*;
11
use dsi_progress_logger::prelude::*;
12
use lender::prelude::*;
13
use rayon::{current_num_threads, in_place_scope};
14
use std::fs::File;
15
use std::io::Write;
16
use std::io::{BufReader, BufWriter};
17
use std::path::{Path, PathBuf};
18

19
/// A queue that pulls jobs with ids in a contiguous initial segment of the
20
/// natural numbers from an iterator out of order and implement an iterator in
21
/// which they can be pulled in order.
22
///
23
/// Jobs must be ordered by their job id, and must implement [`Eq`] with a
24
/// [`usize`] using their job id.
25
struct TaskQueue<I: Iterator> {
26
    iter: I,
27
    jobs: Vec<Option<I::Item>>,
28
    next_id: usize,
29
}
30

31
trait JobId {
32
    fn id(&self) -> usize;
33
}
34

35
impl<I: Iterator> TaskQueue<I> {
36
    fn new(iter: I) -> Self {
497,710✔
37
        Self {
38
            iter,
39
            jobs: vec![],
497,710✔
40
            next_id: 0,
41
        }
42
    }
43
}
44

45
impl<I: Iterator> Iterator for TaskQueue<I>
46
where
47
    I::Item: JobId,
48
{
49
    type Item = I::Item;
50

51
    fn next(&mut self) -> Option<Self::Item> {
2,488,550✔
52
        loop {
×
53
            if let Some(item) = self.jobs.get_mut(self.next_id) {
11,583,782✔
54
                if item.is_some() {
5,250,004✔
55
                    self.next_id += 1;
1,990,840✔
56
                    return item.take();
3,981,680✔
57
                }
58
            }
59
            if let Some(item) = self.iter.next() {
4,479,390✔
60
                let id = item.id();
5,972,520✔
61
                if id >= self.jobs.len() {
3,981,680✔
62
                    self.jobs.resize_with(id + 1, || None);
4,471,281✔
63
                }
64
                self.jobs[id] = Some(item);
3,981,680✔
65
            } else {
66
                return None;
497,710✔
67
            }
68
        }
69
    }
70
}
71

72
/// A compression job.
73
#[derive(Debug, PartialEq, PartialOrd, Eq, Ord, Clone)]
74
struct Job {
75
    job_id: usize,
76
    first_node: usize,
77
    last_node: usize,
78
    chunk_graph_path: PathBuf,
79
    written_bits: u64,
80
    chunk_offsets_path: PathBuf,
81
    offsets_written_bits: u64,
82
    num_arcs: u64,
83
}
84

85
impl JobId for Job {
86
    fn id(&self) -> usize {
1,990,840✔
87
        self.job_id
1,990,840✔
88
    }
89
}
90

91
/// A writer for offsets.
92
///
93
/// TODO: This currently uses Write which requires std. To support no_std we will want to make W a WordWriter
94
#[derive(Debug)]
95
#[repr(transparent)]
96
pub struct OffsetsWriter<W: Write> {
97
    buffer: BufBitWriter<BigEndian, WordAdapter<usize, BufWriter<W>>>,
98
}
99

100
impl OffsetsWriter<File> {
101
    /// Creates a new writer and writes the first offset value (0) if requested.
102
    ///
103
    /// Usually, parallel compressor will write autonomously the first offset
104
    /// when copying the partial offsets files into the final offsets file.
105
    pub fn from_path(path: impl AsRef<Path>, write_zero: bool) -> Result<Self> {
2,488,524✔
106
        let file = std::fs::File::create(&path)
7,465,572✔
107
            .with_context(|| format!("Could not create {}", path.as_ref().display()))?;
2,488,524✔
108
        Self::from_write(file, write_zero)
7,465,572✔
109
    }
110
}
111

112
impl<W: Write> OffsetsWriter<W> {
113
    /// Creates a new writer and writes the first offset value (0).
114
    pub fn from_write(writer: W, write_zero: bool) -> Result<Self> {
2,495,476✔
115
        let mut buffer = BufBitWriter::new(WordAdapter::new(BufWriter::new(writer)));
12,477,380✔
116
        if write_zero {
2,495,476✔
117
            // the first offset (of the first parallel offsets file) is always zero
118
            buffer.write_gamma(0)?;
1,009,272✔
119
        }
120
        Ok(Self { buffer })
2,495,476✔
121
    }
122

123
    /// Pushes a new delta offset.
124
    pub fn push(&mut self, delta: u64) -> Result<usize> {
121,604,026✔
125
        Ok(self.buffer.write_gamma(delta)?)
364,812,078✔
126
    }
127

128
    /// Flushes the buffer.
129
    pub fn flush(&mut self) -> Result<()> {
2,495,476✔
130
        BitWrite::flush(&mut self.buffer)? as u64;
4,990,952✔
131
        Ok(())
2,495,476✔
132
    }
133
}
134

135
/// A configuration for BvGraph compression.
136
///
137
/// Once set up, the methods [`comp_graph`](Self::comp_graph) (sequential graph
138
/// compression), [`comp_lender`](Self::comp_lender) (sequential lender
139
/// compression), [`par_comp_graph`](Self::par_comp_graph) (parallel compression
140
/// of a splittable graph), and [`par_comp_lenders`](Self::par_comp_lenders)
141
/// (parallel compression of multiple lenders) can be used to compress a graph.
142
#[derive(Debug)]
143
pub struct BvCompConfig {
144
    /// The basename of the output files.
145
    basename: PathBuf,
146
    /// Compression flags for BvComp/BvCompZ.
147
    comp_flags: CompFlags,
148
    /// Selects the Zuckerli-based BVGraph compressor
149
    bvgraphz: bool,
150
    /// The chunk size for the Zuckerli-based compressor
151
    chunk_size: usize,
152
    /// Temporary directory for all operations.
153
    tmp_dir: Option<PathBuf>,
154
    /// Owns the TempDir that [`Self::tmp_dir`] refers to, if it was created by default.
155
    owned_tmp_dir: Option<tempfile::TempDir>,
156
}
157

158
impl BvCompConfig {
159
    /// Creates a new compression configuration with the given basename and
160
    /// default options.
161
    ///
162
    /// Note that the convenience methods
163
    /// [`BvComp::with_basename`](crate::graphs::bvgraph::comp::BvComp::with_basename)
164
    /// and
165
    /// [`BvCompZ::with_basename`](crate::graphs::bvgraph::comp::BvCompZ::with_basename)
166
    /// can be used to create a configuration with default options.
167
    pub fn new(basename: impl AsRef<Path>) -> Self {
124,482✔
168
        Self {
169
            basename: basename.as_ref().into(),
373,446✔
170
            comp_flags: CompFlags::default(),
248,964✔
171
            bvgraphz: false,
172
            chunk_size: 10_000,
173
            tmp_dir: None,
174
            owned_tmp_dir: None,
175
        }
176
    }
177
}
178

179
impl BvCompConfig {
180
    pub fn with_comp_flags(mut self, compression_flags: CompFlags) -> Self {
870,985✔
181
        self.comp_flags = compression_flags;
870,985✔
182
        self
870,985✔
183
    }
184

185
    pub fn with_tmp_dir(mut self, tmp_dir: impl AsRef<Path>) -> Self {
8✔
186
        self.tmp_dir = Some(tmp_dir.as_ref().into());
16✔
187
        self
8✔
188
    }
189

190
    pub fn with_bvgraphz(mut self) -> Self {
373,259✔
191
        self.bvgraphz = true;
373,259✔
192
        self
373,259✔
193
    }
194

195
    pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
373,248✔
196
        self.bvgraphz = true;
373,248✔
197
        self.chunk_size = chunk_size;
373,248✔
198
        self
373,248✔
199
    }
200

201
    fn tmp_dir(&mut self) -> Result<PathBuf> {
497,710✔
202
        if self.tmp_dir.is_none() {
995,420✔
203
            let tmp_dir = tempfile::tempdir()?;
995,404✔
204
            self.tmp_dir = Some(tmp_dir.path().to_owned());
995,404✔
205
            self.owned_tmp_dir = Some(tmp_dir);
995,404✔
206
        }
207

208
        let tmp_dir = self.tmp_dir.clone().unwrap();
1,990,840✔
209
        if !std::fs::exists(&tmp_dir)
995,420✔
210
            .with_context(|| format!("Could not check whether {} exists", tmp_dir.display()))?
497,710✔
211
        {
212
            std::fs::create_dir_all(&tmp_dir)
×
213
                .with_context(|| format!("Could not create {}", tmp_dir.display()))?;
×
214
        }
215
        Ok(tmp_dir)
497,710✔
216
    }
217

218
    /// Compresses sequentially a [`SequentialGraph`] and returns
219
    /// the number of bits written to the graph bitstream.
220
    pub fn comp_graph<E: Endianness>(&mut self, graph: impl SequentialGraph) -> Result<u64>
311,060✔
221
    where
222
        BufBitWriter<E, WordAdapter<usize, BufWriter<File>>>: CodesWrite<E>,
223
    {
224
        self.comp_lender::<E, _>(graph.iter(), Some(graph.num_nodes()))
1,555,300✔
225
    }
226

227
    /// Compresses sequentially a [`NodeLabelsLender`] and returns
228
    /// the number of bits written to the graph bitstream.
229
    ///
230
    /// The optional `expected_num_nodes` parameter will be used to provide
231
    /// forecasts on the progress logger.
232
    pub fn comp_lender<E, L>(&mut self, iter: L, expected_num_nodes: Option<usize>) -> Result<u64>
311,060✔
233
    where
234
        E: Endianness,
235
        L: IntoLender,
236
        L::Lender: for<'next> NodeLabelsLender<'next, Label = usize>,
237
        BufBitWriter<E, WordAdapter<usize, BufWriter<File>>>: CodesWrite<E>,
238
    {
239
        let graph_path = self.basename.with_extension(GRAPH_EXTENSION);
622,120✔
240

241
        // Compress the graph
242
        let bit_write = <BufBitWriter<E, _>>::new(<WordAdapter<usize, _>>::new(BufWriter::new(
1,244,240✔
243
            File::create(&graph_path)
622,120✔
244
                .with_context(|| format!("Could not create {}", graph_path.display()))?,
311,060✔
245
        )));
246

247
        let codes_writer = DynCodesEncoder::new(bit_write, &self.comp_flags)?;
1,244,240✔
248

249
        // create a file for offsets
250
        let offsets_path = self.basename.with_extension(OFFSETS_EXTENSION);
622,120✔
251
        let offset_writer = OffsetsWriter::from_path(offsets_path, true)?;
933,180✔
252

253
        let mut pl = progress_logger![
622,120✔
254
            display_memory = true,
×
255
            item_name = "node",
×
256
            expected_updates = expected_num_nodes,
311,060✔
257
        ];
258
        pl.start("Compressing successors...");
622,120✔
259
        let comp_stats = if self.bvgraphz {
622,120✔
260
            let mut bvcompz = BvCompZ::new(
261
                codes_writer,
233,290✔
262
                offset_writer,
233,290✔
263
                self.comp_flags.compression_window,
233,290✔
264
                self.chunk_size,
233,290✔
265
                self.comp_flags.max_ref_count,
233,290✔
266
                self.comp_flags.min_interval_length,
233,290✔
267
                0,
268
            );
269

270
            for_! ( (_node_id, successors) in iter {
53,400,430✔
271
                bvcompz.push(successors).context("Could not push successors")?;
106,334,280✔
272
                pl.update();
53,167,140✔
273
            });
274
            pl.done();
466,580✔
275

276
            bvcompz.flush()?
466,580✔
277
        } else {
278
            let mut bvcomp = BvComp::new(
279
                codes_writer,
77,770✔
280
                offset_writer,
77,770✔
281
                self.comp_flags.compression_window,
77,770✔
282
                self.comp_flags.max_ref_count,
77,770✔
283
                self.comp_flags.min_interval_length,
77,770✔
284
                0,
285
            );
286

287
            for_! ( (_node_id, successors) in iter {
22,140,910✔
288
                bvcomp.push(successors).context("Could not push successors")?;
44,126,280✔
289
                pl.update();
22,063,140✔
290
            });
291
            pl.done();
155,540✔
292

293
            bvcomp.flush()?
155,540✔
294
        };
295

296
        if let Some(num_nodes) = expected_num_nodes {
622,120✔
297
            if num_nodes != comp_stats.num_nodes {
311,060✔
298
                log::warn!(
×
299
                    "The expected number of nodes is {num_nodes} but the actual number of nodes is {}",
×
300
                    comp_stats.num_nodes,
×
301
                );
302
            }
303
        }
304

305
        log::info!("Writing the .properties file");
311,060✔
306
        let properties = self
622,120✔
307
            .comp_flags
311,060✔
308
            .to_properties::<E>(
309
                comp_stats.num_nodes,
311,060✔
310
                comp_stats.num_arcs,
311,060✔
311
                comp_stats.written_bits,
311,060✔
312
            )
313
            .context("Could not serialize properties")?;
314
        let properties_path = self.basename.with_extension(PROPERTIES_EXTENSION);
622,120✔
315
        std::fs::write(&properties_path, properties)
933,180✔
316
            .with_context(|| format!("Could not write {}", properties_path.display()))?;
311,060✔
317

318
        Ok(comp_stats.written_bits)
311,060✔
319
    }
320

321
    /// A wrapper over [`par_comp_lenders`](Self::par_comp_lenders) that takes the
322
    /// endianness as a string.
323
    ///
324
    /// Endianness can only be [`BE::NAME`](BE) or [`LE::NAME`](LE).
325
    ///
326
    ///  A given endianness is enabled only if the corresponding feature is
327
    /// enabled, `be_bins` for big endian and `le_bins` for little endian, or if
328
    /// neither features are enabled.
329
    pub fn par_comp_lenders_endianness<G: SplitLabeling + SequentialGraph>(
8✔
330
        &mut self,
331
        graph: &G,
332
        num_nodes: usize,
333
        endianness: &str,
334
    ) -> Result<u64>
335
    where
336
        for<'a> <G as SplitLabeling>::SplitLender<'a>: ExactSizeLender + Send + Sync,
337
    {
338
        let num_threads = current_num_threads();
16✔
339

340
        match endianness {
8✔
341
            #[cfg(any(
342
                feature = "be_bins",
343
                not(any(feature = "be_bins", feature = "le_bins"))
344
            ))]
345
            BE::NAME => {
8✔
346
                // compress the transposed graph
347
                self.par_comp_lenders::<BigEndian, _>(graph.split_iter(num_threads), num_nodes)
48✔
348
            }
349
            #[cfg(any(
350
                feature = "le_bins",
351
                not(any(feature = "be_bins", feature = "le_bins"))
352
            ))]
353
            LE::NAME => {
×
354
                // compress the transposed graph
355
                self.par_comp_lenders::<LittleEndian, _>(graph.split_iter(num_threads), num_nodes)
×
356
            }
357
            x => anyhow::bail!("Unknown endianness {}", x),
×
358
        }
359
    }
360

361
    /// Compresses a splittable sequential graph in parallel and returns the
362
    /// length in bits of the graph bitstream.
363
    ///
364
    /// Note that the number of parallel compression threads will be
365
    /// [`current_num_threads`]. The graph will be split into as many
366
    /// lenders as there are threads.
367
    pub fn par_comp_graph<E: Endianness>(
311,051✔
368
        &mut self,
369
        graph: &(impl SequentialGraph + for<'a> SplitLabeling<SplitLender<'a>: ExactSizeLender>),
370
    ) -> Result<u64>
371
    where
372
        BufBitWriter<E, WordAdapter<usize, BufWriter<std::fs::File>>>: CodesWrite<E>,
373
        BufBitReader<E, WordAdapter<u32, BufReader<std::fs::File>>>: BitRead<E>,
374
    {
375
        let num_threads = current_num_threads();
622,102✔
376
        self.par_comp_lenders(graph.split_iter(num_threads), graph.num_nodes())
2,177,357✔
377
    }
378

379
    /// Compresses multiple [`NodeLabelsLender`] in parallel and returns
380
    /// a [`CompStats`] instance.
381
    ///
382
    /// The first lender must start from node 0, and each lender must
383
    /// continue from where the previous lender stopped. All in all,
384
    /// they must return a total of `num_nodes` nodes.
385
    ///
386
    /// Note that the number of parallel compression threads will be
387
    /// [`current_num_threads`]. It is your responsibility to ensure that the
388
    /// number of threads is appropriate for the number of lenders you pass,
389
    /// possibly using [`install`](rayon::ThreadPool::install).
390
    ///
391
    /// This method is useful to compress graphs that can be iterated upon using
392
    /// multiple lenders, but such lenders do not derive from splitting. For
393
    /// example, this happens when using
394
    /// [`ParSortIters`].
395
    pub fn par_comp_lenders<
311,059✔
396
        E: Endianness,
397
        L: Lender + for<'next> NodeLabelsLender<'next, Label = usize> + ExactSizeLender + Send,
398
    >(
399
        &mut self,
400
        iter: impl IntoIterator<Item = L>,
401
        num_nodes: usize,
402
    ) -> Result<u64>
403
    where
404
        BufBitWriter<E, WordAdapter<usize, BufWriter<std::fs::File>>>: CodesWrite<E>,
405
        BufBitReader<E, WordAdapter<u32, BufReader<std::fs::File>>>: BitRead<E>,
406
    {
407
        let tmp_dir = self.tmp_dir()?;
933,177✔
408

409
        let graph_path = self.basename.with_extension(GRAPH_EXTENSION);
622,118✔
410
        let offsets_path = self.basename.with_extension(OFFSETS_EXTENSION);
622,118✔
411

412
        let (tx, rx) = std::sync::mpsc::channel();
933,177✔
413

414
        let thread_path = |thread_id: usize| tmp_dir.join(format!("{thread_id:016x}.bitstream"));
4,043,767✔
415

416
        let mut comp_pl = concurrent_progress_logger![
622,118✔
417
            log_target = "webgraph::graphs::bvgraph::comp::impls::par_comp_lenders::comp",
×
418
            display_memory = true,
×
419
            item_name = "node",
×
420
            local_speed = true,
×
421
            expected_updates = Some(num_nodes),
311,059✔
422
        ];
423
        comp_pl.start("Compressing successors in parallel...");
622,118✔
424
        let mut expected_first_node = 0;
622,118✔
425
        let cp_flags = &self.comp_flags;
622,118✔
426
        let bvgraphz = self.bvgraphz;
622,118✔
427
        let chunk_size = self.chunk_size;
622,118✔
428

429
        in_place_scope(|s| {
622,118✔
430
            for (thread_id, mut thread_lender) in iter.into_iter().enumerate() {
3,421,649✔
431
                let tmp_path = thread_path(thread_id);
2,488,472✔
432
                let chunk_graph_path = tmp_path.with_extension(GRAPH_EXTENSION);
2,488,472✔
433
                let chunk_offsets_path = tmp_path.with_extension(OFFSETS_EXTENSION);
2,488,472✔
434
                let tx = tx.clone();
3,732,708✔
435
                let mut comp_pl = comp_pl.clone();
3,732,708✔
436
                let lender_len = thread_lender.len();
3,732,708✔
437
                // Spawn the thread
438
                s.spawn(move |_| {
3,732,708✔
439
                    log::debug!("Thread {thread_id} started");
1,244,236✔
440

441
                    let Some((node_id, successors)) = thread_lender.next() else {
3,732,708✔
442
                        return;
×
443
                    };
444

445
                    let first_node = node_id;
2,488,472✔
446
                    if first_node != expected_first_node {
1,244,236✔
447
                        panic!(
×
448
                            "Lender {} expected to start from node {} but started from {}",
×
449
                            thread_id,
×
450
                            expected_first_node,
×
451
                            first_node
×
452
                        );
453
                    }
454

455
                    let writer = <BufBitWriter<E, _>>::new(<WordAdapter<usize, _>>::new(
3,732,708✔
456
                        BufWriter::new(File::create(&chunk_graph_path).unwrap()),
4,976,944✔
457
                    ));
458
                    let codes_encoder = <DynCodesEncoder<E, _>>::new(writer, cp_flags).unwrap();
6,221,180✔
459

460
                    let stats;
×
461
                    let mut last_node;
×
462
                    if bvgraphz {
1,244,236✔
463
                        let mut bvcomp = BvCompZ::new(
1,866,248✔
464
                            codes_encoder,
933,124✔
465
                            OffsetsWriter::from_path(&chunk_offsets_path, false).unwrap(),
2,799,372✔
466
                            cp_flags.compression_window,
933,124✔
467
                            chunk_size,
933,124✔
468
                            cp_flags.max_ref_count,
933,124✔
469
                            cp_flags.min_interval_length,
933,124✔
470
                            node_id,
933,124✔
471
                        );
472
                        bvcomp.push(successors).unwrap();
3,732,496✔
473
                        last_node = first_node;
933,124✔
474
                        let iter_nodes = thread_lender.inspect(|(x, _)| last_node = *x);
25,519,805✔
475
                        for_! ( (_, succ) in iter_nodes {
46,373,990✔
476
                            bvcomp.push(succ.into_iter()).unwrap();
136,322,598✔
477
                            comp_pl.update();
22,720,433✔
478
                        });
479
                        stats = bvcomp.flush().unwrap();
1,866,248✔
480
                    } else {
481
                        let mut bvcomp = BvComp::new(
622,224✔
482
                            codes_encoder,
311,112✔
483
                            OffsetsWriter::from_path(&chunk_offsets_path, false).unwrap(),
933,336✔
484
                            cp_flags.compression_window,
311,112✔
485
                            cp_flags.max_ref_count,
311,112✔
486
                            cp_flags.min_interval_length,
311,112✔
487
                            node_id,
311,112✔
488
                        );
489
                        bvcomp.push(successors).unwrap();
1,244,448✔
490
                        last_node = first_node;
311,112✔
491
                        let iter_nodes = thread_lender.inspect(|(x, _)| last_node = *x);
14,258,250✔
492
                        for_! ( (_, succ) in iter_nodes {
26,960,940✔
493
                            bvcomp.push(succ.into_iter()).unwrap();
79,949,484✔
494
                            comp_pl.update();
13,324,914✔
495
                        });
496
                        stats = bvcomp.flush().unwrap();
622,224✔
497
                    }
498

499
                    log::debug!(
1,244,236✔
500
                        "Finished Compression thread {thread_id} and wrote {} bits for the graph and {} bits for the offsets",
×
501
                        stats.written_bits, stats.offsets_written_bits,
×
502
                    );
503
                    tx.send(Job {
3,732,708✔
504
                        job_id: thread_id,
2,488,472✔
505
                        first_node,
2,488,472✔
506
                        last_node,
2,488,472✔
507
                        chunk_graph_path,
2,488,472✔
508
                        written_bits: stats.written_bits,
2,488,472✔
509
                        chunk_offsets_path,
2,488,472✔
510
                        offsets_written_bits: stats.offsets_written_bits,
1,244,236✔
511
                        num_arcs: stats.num_arcs,
1,244,236✔
512
                    })
513
                    .unwrap()
1,244,236✔
514
                });
515

516
                expected_first_node += lender_len;
1,244,236✔
517
            }
518

519
            if num_nodes != expected_first_node {
311,059✔
520
                panic!(
×
521
                    "The lenders were supposed to return {} nodes but returned {} instead",
×
522
                    num_nodes, expected_first_node
×
523
                );
524
            }
525

526
            drop(tx);
622,118✔
527

528
            let mut copy_pl = progress_logger![
622,118✔
529
                log_target = "webgraph::graphs::bvgraph::comp::impls::par_comp_lenders::copy",
×
530
                display_memory = true,
×
531
                item_name = "node",
×
532
                local_speed = true,
×
533
                expected_updates = Some(num_nodes),
311,059✔
534
            ];
535
            copy_pl.start("Copying compressed successors to final graph");
622,118✔
536

537
            let file = File::create(&graph_path)
933,177✔
538
                .with_context(|| format!("Could not create graph {}", graph_path.display()))?;
311,059✔
539
            let mut graph_writer =
311,059✔
540
                <BufBitWriter<E, _>>::new(<WordAdapter<usize, _>>::new(BufWriter::new(file)));
1,244,236✔
541

542
            let file = File::create(&offsets_path)
933,177✔
543
                .with_context(|| format!("Could not create offsets {}", offsets_path.display()))?;
311,059✔
544
            let mut offsets_writer = <BufBitWriter<BigEndian, _>>::new(
622,118✔
545
                <WordAdapter<usize, _>>::new(BufWriter::new(file)),
933,177✔
546
            );
547
            offsets_writer.write_gamma(0)?;
622,118✔
548

549
            let mut total_written_bits: u64 = 0;
933,177✔
550
            let mut total_offsets_written_bits: u64 = 0;
933,177✔
551
            let mut total_arcs: u64 = 0;
933,177✔
552

553
            let mut next_node = 0;
622,118✔
554
            // glue together the bitstreams as they finish, this allows us to do
555
            // task pipelining for better performance
556
            for Job {
×
557
                job_id,
1,244,236✔
558
                first_node,
1,244,236✔
559
                last_node,
1,244,236✔
560
                chunk_graph_path,
1,244,236✔
561
                written_bits,
1,244,236✔
562
                chunk_offsets_path,
1,244,236✔
563
                offsets_written_bits,
1,244,236✔
564
                num_arcs,
1,244,236✔
565
            } in TaskQueue::new(rx.iter())
933,177✔
566
            {
567
                ensure!(
1,244,236✔
568
                    first_node == next_node,
1,244,236✔
569
                    "Non-adjacent lenders: lender {} has first node {} instead of {}",
×
570
                    job_id,
×
571
                    first_node,
×
572
                    next_node
×
573
                );
574

575
                next_node = last_node + 1;
1,244,236✔
576
                total_arcs += num_arcs;
1,244,236✔
577
                log::debug!(
1,244,236✔
578
                    "Copying {} [{}..{}) bits from {} to {}",
×
579
                    written_bits,
×
580
                    total_written_bits,
×
581
                    total_written_bits + written_bits,
36✔
582
                    chunk_graph_path.display(),
36✔
583
                    graph_path.display()
36✔
584
                );
585
                total_written_bits += written_bits;
1,244,236✔
586
                let mut reader = buf_bit_reader::from_path::<E, u32>(&chunk_graph_path)?;
3,732,708✔
587
                graph_writer
1,244,236✔
588
                    .copy_from(&mut reader, written_bits)
3,732,708✔
589
                    .with_context(|| {
1,244,236✔
590
                        format!(
×
591
                            "Could not copy from {} to {}",
×
592
                            chunk_graph_path.display(),
×
593
                            graph_path.display()
×
594
                        )
595
                    })?;
596

597
                log::debug!(
1,244,236✔
598
                    "Copying offsets {} [{}..{}) bits from {} to {}",
×
599
                    offsets_written_bits,
×
600
                    total_offsets_written_bits,
×
601
                    total_offsets_written_bits + offsets_written_bits,
36✔
602
                    chunk_offsets_path.display(),
36✔
603
                    offsets_path.display()
36✔
604
                );
605
                total_offsets_written_bits += offsets_written_bits;
1,244,236✔
606

607
                let mut reader = <BufBitReader<BigEndian, _>>::new(<WordAdapter<u32, _>>::new(
3,732,708✔
608
                    BufReader::new(File::open(&chunk_offsets_path).with_context(|| {
4,976,944✔
609
                        format!("Could not open {}", chunk_offsets_path.display())
×
610
                    })?),
611
                ));
612
                offsets_writer
1,244,236✔
613
                    .copy_from(&mut reader, offsets_written_bits)
3,732,708✔
614
                    .with_context(|| {
1,244,236✔
615
                        format!(
×
616
                            "Could not copy from {} to {}",
×
617
                            chunk_offsets_path.display(),
×
618
                            offsets_path.display()
×
619
                        )
620
                    })?;
621

622
                copy_pl.update_with_count(last_node - first_node + 1);
3,732,708✔
623
            }
624

625
            log::info!("Flushing the merged bitstreams");
311,059✔
626
            graph_writer.flush()?;
622,118✔
627
            BitWrite::flush(&mut offsets_writer)?;
622,118✔
628

629
            comp_pl.done();
622,118✔
630
            copy_pl.done();
622,118✔
631

632
            log::info!("Writing the .properties file");
311,059✔
633
            let properties = self
622,118✔
634
                .comp_flags
311,059✔
635
                .to_properties::<E>(num_nodes, total_arcs, total_written_bits)
1,244,236✔
636
                .context("Could not serialize properties")?;
311,059✔
637
            let properties_path = self.basename.with_extension(PROPERTIES_EXTENSION);
622,118✔
638
            std::fs::write(&properties_path, properties).with_context(|| {
1,244,236✔
639
                format!(
×
640
                    "Could not write properties to {}",
×
641
                    properties_path.display()
×
642
                )
643
            })?;
644

645
            log::info!(
311,059✔
646
                "Compressed {} arcs into {} bits for {:.4} bits/arc",
×
647
                total_arcs,
×
648
                total_written_bits,
×
649
                total_written_bits as f64 / total_arcs as f64
311,057✔
650
            );
651
            log::info!(
311,059✔
652
                "Created offsets file with {} bits for {:.4} bits/node",
×
653
                total_offsets_written_bits,
×
654
                total_offsets_written_bits as f64 / num_nodes as f64
311,057✔
655
            );
656

657
            // cleanup the temp files
658
            std::fs::remove_dir_all(&tmp_dir).with_context(|| {
933,177✔
659
                format!("Could not clean temporary directory {}", tmp_dir.display())
×
660
            })?;
661
            Ok(total_written_bits)
311,059✔
662
        })
663
    }
664
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc