• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vigna / webgraph-rs / 19667053797

25 Nov 2025 10:54AM UTC coverage: 61.661% (-0.5%) from 62.139%
19667053797

push

github

vigna
fmt

0 of 15 new or added lines in 2 files covered. (0.0%)

96 existing lines in 15 files now uncovered.

5211 of 8451 relevant lines covered (61.66%)

27155375.1 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.56
/webgraph/src/graphs/bvgraph/comp/impls.rs
1
/*
2
 * SPDX-FileCopyrightText: 2023 Inria
3
 * SPDX-FileCopyrightText: 2023 Sebastiano Vigna
4
 *
5
 * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
6
 */
7

8
use crate::prelude::*;
9
use anyhow::{Context, Result, ensure};
10
use dsi_bitstream::prelude::*;
11
use dsi_progress_logger::prelude::*;
12
use lender::prelude::*;
13
use rayon::{ThreadPool, ThreadPoolBuilder};
14
use std::fs::File;
15
use std::io::{BufReader, BufWriter};
16
use std::path::{Path, PathBuf};
17

18
/// A queue that pulls jobs with ids in a contiguous initial segment of the
19
/// natural numbers from an iterator out of order and implement an iterator in
20
/// which they can be pulled in order.
21
///
22
/// Jobs must be ordered by their job id, and must implement [`Eq`] with a
23
/// [`usize`] using their job id.
24
struct TaskQueue<I: Iterator> {
25
    iter: I,
26
    jobs: Vec<Option<I::Item>>,
27
    next_id: usize,
28
}
29

30
trait JobId {
31
    fn id(&self) -> usize;
32
}
33

34
impl<I: Iterator> TaskQueue<I> {
35
    fn new(iter: I) -> Self {
17✔
36
        Self {
37
            iter,
38
            jobs: vec![],
17✔
39
            next_id: 0,
40
        }
41
    }
42
}
43

44
impl<I: Iterator> Iterator for TaskQueue<I>
45
where
46
    I::Item: JobId,
47
{
48
    type Item = I::Item;
49

50
    fn next(&mut self) -> Option<Self::Item> {
94✔
51
        loop {
×
52
            if let Some(item) = self.jobs.get_mut(self.next_id) {
430✔
53
                if item.is_some() {
176✔
54
                    self.next_id += 1;
77✔
55
                    return item.take();
154✔
56
                }
57
            }
58
            if let Some(item) = self.iter.next() {
171✔
59
                let id = item.id();
231✔
60
                if id >= self.jobs.len() {
154✔
61
                    self.jobs.resize_with(id + 1, || None);
201✔
62
                }
63
                self.jobs[id] = Some(item);
154✔
64
            } else {
65
                return None;
17✔
66
            }
67
        }
68
    }
69
}
70

71
/// A compression job.
72
#[derive(Debug, PartialEq, PartialOrd, Eq, Ord, Clone)]
73
struct Job {
74
    job_id: usize,
75
    first_node: usize,
76
    last_node: usize,
77
    chunk_graph_path: PathBuf,
78
    written_bits: u64,
79
    chunk_offsets_path: PathBuf,
80
    offsets_written_bits: u64,
81
    num_arcs: u64,
82
}
83

84
impl JobId for Job {
85
    fn id(&self) -> usize {
212✔
86
        self.job_id
212✔
87
    }
88
}
89

90
impl BvComp<()> {
91
    /// Compresses s [`NodeLabelsLender`] and returns the length in bits of the
92
    /// graph bitstream.
93
    pub fn single_thread<E, L>(
×
94
        basename: impl AsRef<Path>,
95
        iter: L,
96
        compression_flags: CompFlags,
97
        build_offsets: bool,
98
        num_nodes: Option<usize>,
99
    ) -> Result<u64>
100
    where
101
        E: Endianness,
102
        L: IntoLender,
103
        L::Lender: for<'next> NodeLabelsLender<'next, Label = usize>,
104
        BufBitWriter<E, WordAdapter<usize, BufWriter<File>>>: CodesWrite<E>,
105
    {
106
        BvCompBuilder::new(basename)
×
107
            .with_compression_flags(compression_flags)
×
108
            .single_thread::<E, L>(iter, build_offsets, num_nodes)
×
109
    }
110

111
    /// A wrapper over [`parallel_graph`](Self::parallel_graph) that takes the
112
    /// endianness as a string.
113
    ///
114
    /// Endianness can only be [`BE::NAME`](BE) or [`LE::NAME`](LE).
115
    ///
116
    ///  A given endianness is enabled only if the corresponding feature is
117
    /// enabled, `be_bins` for big endian and `le_bins` for little endian, or if
118
    /// neither features are enabled.
119
    pub fn parallel_endianness<P: AsRef<Path>, G: SplitLabeling + SequentialGraph>(
8✔
120
        basename: impl AsRef<Path> + Send + Sync,
121
        graph: &G,
122
        num_nodes: usize,
123
        compression_flags: CompFlags,
124
        threads: &ThreadPool,
125
        tmp_dir: P,
126
        endianness: &str,
127
    ) -> Result<u64>
128
    where
129
        for<'a> <G as SplitLabeling>::SplitLender<'a>: ExactSizeLender + Send + Sync,
130
    {
131
        BvCompBuilder::new(basename)
24✔
132
            .with_compression_flags(compression_flags)
24✔
133
            .with_threads(threads)
24✔
134
            .with_tmp_dir(tmp_dir)
8✔
135
            .parallel_endianness(graph, num_nodes, endianness)
32✔
136
    }
137

138
    /// Compresses a graph in parallel and returns the length in bits of the graph bitstream.
139
    pub fn parallel_graph<E: Endianness>(
9✔
140
        basename: impl AsRef<Path> + Send + Sync,
141
        graph: &(impl SequentialGraph + for<'a> SplitLabeling<SplitLender<'a>: ExactSizeLender>),
142
        compression_flags: CompFlags,
143
        threads: &ThreadPool,
144
        tmp_dir: impl AsRef<Path>,
145
    ) -> Result<u64>
146
    where
147
        BufBitWriter<E, WordAdapter<usize, BufWriter<std::fs::File>>>: CodesWrite<E>,
148
        BufBitReader<E, WordAdapter<u32, BufReader<std::fs::File>>>: BitRead<E>,
149
    {
150
        BvCompBuilder::new(basename)
27✔
151
            .with_compression_flags(compression_flags)
27✔
152
            .with_threads(threads)
27✔
153
            .with_tmp_dir(tmp_dir)
9✔
154
            .parallel_graph::<E>(graph)
18✔
155
    }
156

157
    /// Compresses multiple [`NodeLabelsLender`] in parallel and returns the length in bits
158
    /// of the graph bitstream.
159
    pub fn parallel_iter<
×
160
        E: Endianness,
161
        L: Lender + for<'next> NodeLabelsLender<'next, Label = usize> + ExactSizeLender + Send,
162
    >(
163
        basename: impl AsRef<Path> + Send + Sync,
164
        iter: impl IntoIterator<Item = L>,
165
        num_nodes: usize,
166
        compression_flags: CompFlags,
167
        threads: &ThreadPool,
168
        tmp_dir: impl AsRef<Path>,
169
    ) -> Result<u64>
170
    where
171
        BufBitWriter<E, WordAdapter<usize, BufWriter<std::fs::File>>>: CodesWrite<E>,
172
        BufBitReader<E, WordAdapter<u32, BufReader<std::fs::File>>>: BitRead<E>,
173
    {
174
        BvCompBuilder::new(basename)
×
175
            .with_compression_flags(compression_flags)
×
176
            .with_threads(threads)
×
177
            .with_tmp_dir(tmp_dir)
×
178
            .parallel_iter::<E, L>(iter, num_nodes)
×
179
    }
180
}
181

182
/// Like [`std::borrow::Cow`] but does not require `T: ToOwned`
183
#[derive(Debug)]
184
enum MaybeOwned<'a, T> {
185
    Borrowed(&'a T),
186
    Owned(T),
187
}
188

189
#[derive(Debug)]
190
pub struct BvCompBuilder<'t> {
191
    basename: PathBuf,
192
    compression_flags: CompFlags,
193
    threads: Option<MaybeOwned<'t, ThreadPool>>,
194
    tmp_dir: Option<PathBuf>,
195
    /// owns the TempDir that [`Self::tmp_dir`] refers to, if it was created by default
196
    owned_tmp_dir: Option<tempfile::TempDir>,
197
}
198

199
impl BvCompBuilder<'static> {
200
    pub fn new(basename: impl AsRef<Path>) -> Self {
17✔
201
        Self {
202
            basename: basename.as_ref().into(),
51✔
203
            compression_flags: CompFlags::default(),
34✔
204
            threads: None,
205
            tmp_dir: None,
206
            owned_tmp_dir: None,
207
        }
208
    }
209
}
210

211
impl<'t> BvCompBuilder<'t> {
212
    pub fn with_compression_flags(mut self, compression_flags: CompFlags) -> Self {
44✔
213
        self.compression_flags = compression_flags;
44✔
214
        self
44✔
215
    }
216

217
    pub fn with_tmp_dir(mut self, tmp_dir: impl AsRef<Path>) -> Self {
17✔
218
        self.tmp_dir = Some(tmp_dir.as_ref().into());
34✔
219
        self
17✔
220
    }
221

222
    pub fn with_threads(self, threads: &'_ ThreadPool) -> BvCompBuilder<'_> {
44✔
223
        BvCompBuilder {
224
            threads: Some(MaybeOwned::Borrowed(threads)),
44✔
225
            ..self
226
        }
227
    }
228

229
    fn tmp_dir(&mut self) -> Result<PathBuf> {
44✔
230
        if self.tmp_dir.is_none() {
88✔
231
            let tmp_dir = tempfile::tempdir()?;
×
232
            self.tmp_dir = Some(tmp_dir.path().to_owned());
×
233
            self.owned_tmp_dir = Some(tmp_dir);
×
234
        }
235

236
        let tmp_dir = self.tmp_dir.clone().unwrap();
176✔
237
        if !std::fs::exists(&tmp_dir)
88✔
238
            .with_context(|| format!("Could not check whether {} exists", tmp_dir.display()))?
44✔
239
        {
240
            std::fs::create_dir_all(&tmp_dir)
16✔
241
                .with_context(|| format!("Could not create {}", tmp_dir.display()))?;
8✔
242
        }
243
        Ok(tmp_dir)
44✔
244
    }
245

246
    fn ensure_threads(&mut self) -> Result<()> {
88✔
247
        if self.threads.is_none() {
176✔
248
            self.threads = Some(MaybeOwned::Owned(
×
249
                ThreadPoolBuilder::default()
×
250
                    .build()
×
251
                    .context("Could not build default thread pool")?,
×
252
            ));
253
        }
254

255
        Ok(())
88✔
256
    }
257

258
    fn threads(&self) -> &ThreadPool {
88✔
259
        match self.threads.as_ref().unwrap() {
176✔
260
            MaybeOwned::Owned(threads) => threads,
×
261
            MaybeOwned::Borrowed(threads) => threads,
176✔
262
        }
263
    }
264

265
    /// Compresses s [`NodeLabelsLender`] and returns the length in bits of the
266
    /// graph bitstream.
267
    pub fn single_thread<E, L>(
×
268
        &mut self,
269
        iter: L,
270
        build_offsets: bool,
271
        num_nodes: Option<usize>,
272
    ) -> Result<u64>
273
    where
274
        E: Endianness,
275
        L: IntoLender,
276
        L::Lender: for<'next> NodeLabelsLender<'next, Label = usize>,
277
        BufBitWriter<E, WordAdapter<usize, BufWriter<File>>>: CodesWrite<E>,
278
    {
279
        let graph_path = self.basename.with_extension(GRAPH_EXTENSION);
×
280

281
        // Compress the graph
282
        let bit_write = <BufBitWriter<E, _>>::new(<WordAdapter<usize, _>>::new(BufWriter::new(
×
283
            File::create(&graph_path)
×
284
                .with_context(|| format!("Could not create {}", graph_path.display()))?,
×
285
        )));
286

287
        let comp_flags = CompFlags {
288
            ..Default::default()
289
        };
290

291
        let codes_writer = DynCodesEncoder::new(bit_write, &comp_flags)?;
×
292

293
        let mut bvcomp = BvComp::new(
294
            codes_writer,
×
295
            self.compression_flags.compression_window,
×
296
            self.compression_flags.max_ref_count,
×
297
            self.compression_flags.min_interval_length,
×
298
            0,
299
        );
300

301
        let mut pl = progress_logger![
×
302
            display_memory = true,
×
303
            item_name = "node",
×
304
            expected_updates = num_nodes,
×
305
        ];
306
        pl.start("Compressing successors...");
×
307
        let mut bitstream_len = 0;
×
308

309
        let mut real_num_nodes = 0;
×
310
        if build_offsets {
×
311
            let offsets_path = self.basename.with_extension(OFFSETS_EXTENSION);
×
312
            let file = std::fs::File::create(&offsets_path)
×
313
                .with_context(|| format!("Could not create {}", offsets_path.display()))?;
×
314
            // create a bit writer on the file
315
            let mut writer = <BufBitWriter<E, _>>::new(<WordAdapter<usize, _>>::new(
×
316
                BufWriter::with_capacity(1 << 20, file),
×
317
            ));
318

319
            writer
×
320
                .write_gamma(0)
321
                .context("Could not write initial delta")?;
322
            for_! ( (_node_id, successors) in iter {
×
323
                let delta = bvcomp.push(successors).context("Could not push successors")?;
×
324
                bitstream_len += delta;
×
325
                writer.write_gamma(delta).context("Could not write delta")?;
×
326
                pl.update();
×
327
                real_num_nodes += 1;
×
328
            });
329
        } else {
330
            for_! ( (_node_id, successors) in iter {
×
331
                bitstream_len += bvcomp.push(successors).context("Could not push successors")?;
×
332
                pl.update();
×
333
                real_num_nodes += 1;
×
334
            });
335
        }
336
        pl.done();
×
337

338
        if let Some(num_nodes) = num_nodes {
×
339
            if num_nodes != real_num_nodes {
×
340
                log::warn!(
×
341
                    "The expected number of nodes is {num_nodes} but the actual number of nodes is {real_num_nodes}"
×
342
                );
343
            }
344
        }
345
        let num_arcs = bvcomp.arcs;
×
346
        bitstream_len += bvcomp.flush().context("Could not flush bvcomp")? as u64;
×
347

348
        log::info!("Writing the .properties file");
×
349
        let properties = self
×
350
            .compression_flags
×
351
            .to_properties::<E>(real_num_nodes, num_arcs, bitstream_len)
×
352
            .context("Could not serialize properties")?;
353
        let properties_path = self.basename.with_extension(PROPERTIES_EXTENSION);
×
354
        std::fs::write(&properties_path, properties)
×
355
            .with_context(|| format!("Could not write {}", properties_path.display()))?;
×
356

357
        Ok(bitstream_len)
×
358
    }
359

360
    /// A wrapper over [`parallel_graph`](Self::parallel_graph) that takes the
361
    /// endianness as a string.
362
    ///
363
    /// Endianness can only be [`BE::NAME`](BE) or [`LE::NAME`](LE).
364
    ///
365
    ///  A given endianness is enabled only if the corresponding feature is
366
    /// enabled, `be_bins` for big endian and `le_bins` for little endian, or if
367
    /// neither features are enabled.
368
    pub fn parallel_endianness<G: SplitLabeling + SequentialGraph>(
8✔
369
        &mut self,
370
        graph: &G,
371
        num_nodes: usize,
372
        endianness: &str,
373
    ) -> Result<u64>
374
    where
375
        for<'a> <G as SplitLabeling>::SplitLender<'a>: ExactSizeLender + Send + Sync,
376
    {
377
        self.ensure_threads()?;
16✔
378
        let num_threads = self.threads().current_num_threads();
24✔
379

380
        match endianness {
8✔
381
            #[cfg(any(
382
                feature = "be_bins",
383
                not(any(feature = "be_bins", feature = "le_bins"))
384
            ))]
385
            BE::NAME => {
8✔
386
                // compress the transposed graph
387
                self.parallel_iter::<BigEndian, _>(graph.split_iter(num_threads), num_nodes)
48✔
388
            }
389
            #[cfg(any(
390
                feature = "le_bins",
391
                not(any(feature = "be_bins", feature = "le_bins"))
392
            ))]
393
            LE::NAME => {
×
394
                // compress the transposed graph
395
                self.parallel_iter::<LittleEndian, _>(graph.split_iter(num_threads), num_nodes)
×
396
            }
397
            x => anyhow::bail!("Unknown endianness {}", x),
×
398
        }
399
    }
400

401
    /// Compresses a graph in parallel and returns the length in bits of the graph bitstream.
402
    pub fn parallel_graph<E: Endianness>(
9✔
403
        &mut self,
404
        graph: &(impl SequentialGraph + for<'a> SplitLabeling<SplitLender<'a>: ExactSizeLender>),
405
    ) -> Result<u64>
406
    where
407
        BufBitWriter<E, WordAdapter<usize, BufWriter<std::fs::File>>>: CodesWrite<E>,
408
        BufBitReader<E, WordAdapter<u32, BufReader<std::fs::File>>>: BitRead<E>,
409
    {
410
        self.ensure_threads()?;
18✔
411
        let num_threads = self.threads().current_num_threads();
27✔
412
        self.parallel_iter(graph.split_iter(num_threads), graph.num_nodes())
63✔
413
    }
414

415
    /// Compresses multiple [`NodeLabelsLender`] in parallel and returns the length in bits
416
    /// of the graph bitstream.
417
    pub fn parallel_iter<
17✔
418
        E: Endianness,
419
        L: Lender + for<'next> NodeLabelsLender<'next, Label = usize> + ExactSizeLender + Send,
420
    >(
421
        &mut self,
422
        iter: impl IntoIterator<Item = L>,
423
        num_nodes: usize,
424
    ) -> Result<u64>
425
    where
426
        BufBitWriter<E, WordAdapter<usize, BufWriter<std::fs::File>>>: CodesWrite<E>,
427
        BufBitReader<E, WordAdapter<u32, BufReader<std::fs::File>>>: BitRead<E>,
428
    {
429
        self.ensure_threads()?;
34✔
430
        let tmp_dir = self.tmp_dir()?;
51✔
431
        let threads = self.threads();
51✔
432

433
        let graph_path = self.basename.with_extension(GRAPH_EXTENSION);
34✔
434
        let offsets_path = self.basename.with_extension(OFFSETS_EXTENSION);
34✔
435

436
        let (tx, rx) = std::sync::mpsc::channel();
51✔
437

438
        let thread_path = |thread_id: usize| tmp_dir.join(format!("{thread_id:016x}.bitstream"));
248✔
439

440
        let mut comp_pl = concurrent_progress_logger![
34✔
441
            log_target = "webgraph::graphs::bvgraph::comp::impls::parallel_iter::comp",
×
442
            display_memory = true,
×
443
            item_name = "node",
×
444
            local_speed = true,
×
445
            expected_updates = Some(num_nodes),
17✔
446
        ];
447
        comp_pl.start("Compressing successors in parallel...");
34✔
448
        let mut expected_first_node = 0;
34✔
449
        threads.in_place_scope(|s| {
51✔
450
            let cp_flags = &self.compression_flags;
34✔
451

452
            for (thread_id, mut thread_lender) in iter.into_iter().enumerate() {
205✔
453
                let tmp_path = thread_path(thread_id);
154✔
454
                let chunk_graph_path = tmp_path.with_extension(GRAPH_EXTENSION);
154✔
455
                let chunk_offsets_path = tmp_path.with_extension(OFFSETS_EXTENSION);
154✔
456
                let tx = tx.clone();
231✔
457
                let mut comp_pl = comp_pl.clone();
231✔
458
                let lender_len = thread_lender.len();
231✔
459
                // Spawn the thread
460
                s.spawn(move |_| {
231✔
461
                    log::debug!("Thread {thread_id} started");
77✔
462
                    let first_node;
×
463
                    let mut bvcomp;
×
464
                    let mut offsets_writer;
×
465
                    let mut written_bits;
×
466
                    let mut offsets_written_bits;
×
467
                    match thread_lender.next() {
77✔
468
                        None => return,
×
469
                        Some((node_id, successors)) => {
154✔
470
                            first_node = node_id;
77✔
471
                            if first_node != expected_first_node {
77✔
472
                                panic!(
×
473
                                    "Lender {} expected to start from node {} but started from {}",
×
474
                                    thread_id,
×
475
                                    expected_first_node,
×
476
                                    first_node
×
477
                                );
478
                            }
479

480
                            offsets_writer = <BufBitWriter<BigEndian, _>>::new(<WordAdapter<usize, _>>::new(
231✔
481
                                BufWriter::new(File::create(&chunk_offsets_path).unwrap()),
308✔
482
                            ));
483

484
                            let writer = <BufBitWriter<E, _>>::new(<WordAdapter<usize, _>>::new(
231✔
485
                                BufWriter::new(File::create(&chunk_graph_path).unwrap()),
308✔
486
                            ));
487
                            let codes_encoder = <DynCodesEncoder<E, _>>::new(writer, cp_flags).unwrap();
385✔
488

489
                            bvcomp = BvComp::new(
154✔
490
                                codes_encoder,
77✔
491
                                cp_flags.compression_window,
77✔
492
                                cp_flags.max_ref_count,
77✔
493
                                cp_flags.min_interval_length,
77✔
494
                                node_id,
77✔
495
                            );
496
                            written_bits = bvcomp.push(successors).unwrap();
231✔
497
                            offsets_written_bits = offsets_writer.write_gamma(written_bits).unwrap() as u64;
231✔
498

499
                        }
500
                    };
501

502
                    let mut last_node = first_node;
154✔
503
                    let iter_nodes = thread_lender.inspect(|(x, _)| last_node = *x);
5,534,623✔
504
                    for_! ( (_, succ) in iter_nodes {
11,068,861✔
505
                        let node_bits = bvcomp.push(succ.into_iter()).unwrap();
38,740,744✔
506
                        written_bits += node_bits;
11,068,784✔
507
                        offsets_written_bits += offsets_writer.write_gamma(node_bits).unwrap() as u64;
16,603,176✔
508
                    });
509

510
                    let num_arcs = bvcomp.arcs;
154✔
511
                    bvcomp.flush().unwrap();
231✔
512
                    offsets_writer.flush().unwrap();
231✔
513
                    comp_pl.update_with_count(last_node - first_node + 1);
231✔
514

515

516

517
                    log::debug!(
77✔
UNCOV
518
                        "Finished Compression thread {thread_id} and wrote {written_bits} bits for the graph and {offsets_written_bits} bits for the offsets",
×
519
                    );
520
                    tx.send(Job {
231✔
521
                        job_id: thread_id,
154✔
522
                        first_node,
154✔
523
                        last_node,
154✔
524
                        chunk_graph_path,
154✔
525
                        written_bits,
154✔
526
                        chunk_offsets_path,
154✔
527
                        offsets_written_bits,
77✔
528
                        num_arcs,
77✔
529
                    })
530
                    .unwrap()
77✔
531
                });
532

533
                expected_first_node += lender_len;
77✔
534
            }
535

536
            if num_nodes != expected_first_node {
17✔
537
                panic!(
×
538
                    "The lenders were supposed to return {} nodes but returned {} instead",
×
539
                    num_nodes,
×
540
                    expected_first_node
×
541
                );
542
            }
543

544
            drop(tx);
34✔
545

546
            let mut copy_pl = progress_logger![
34✔
547
                log_target = "webgraph::graphs::bvgraph::comp::impls::parallel_iter::copy",
×
548
                display_memory = true,
×
549
                item_name = "node",
×
550
                local_speed = true,
×
551
                expected_updates = Some(num_nodes),
17✔
552
            ];
553
            copy_pl.start("Copying compressed successors to final graph");
34✔
554

555
            let file = File::create(&graph_path)
51✔
556
                .with_context(|| format!("Could not create graph {}", graph_path.display()))?;
17✔
557
            let mut graph_writer =
17✔
558
                <BufBitWriter<E, _>>::new(<WordAdapter<usize, _>>::new(BufWriter::new(file)));
68✔
559

560
            let file = File::create(&offsets_path)
51✔
561
                .with_context(|| format!("Could not create offsets {}", offsets_path.display()))?;
17✔
562
            let mut offsets_writer =
17✔
563
                <BufBitWriter<BigEndian, _>>::new(<WordAdapter<usize, _>>::new(BufWriter::new(file)));
68✔
564
            offsets_writer.write_gamma(0)?;
34✔
565

566
            let mut total_written_bits: u64 = 0;
51✔
567
            let mut total_offsets_written_bits: u64 = 0;
51✔
568
            let mut total_arcs: u64 = 0;
51✔
569

570
            let mut next_node = 0;
34✔
571
            // glue together the bitstreams as they finish, this allows us to do
572
            // task pipelining for better performance
573
            for Job {
×
574
                job_id,
77✔
575
                first_node,
77✔
576
                last_node,
77✔
577
                chunk_graph_path,
77✔
578
                written_bits,
77✔
579
                chunk_offsets_path,
77✔
580
                offsets_written_bits,
77✔
581
                num_arcs,
77✔
582
            } in TaskQueue::new(rx.iter())
51✔
583
            {
584
                ensure!(
77✔
585
                    first_node == next_node,
77✔
586
                    "Non-adjacent lenders: lender {} has first node {} instead of {}",
×
587
                    job_id,
×
588
                    first_node,
×
589
                    next_node
×
590
                );
591

592
                next_node = last_node + 1;
77✔
593
                total_arcs += num_arcs;
77✔
594
                log::debug!(
77✔
UNCOV
595
                    "Copying {} [{}..{}) bits from {} to {}",
×
596
                    written_bits,
×
597
                    total_written_bits,
×
598
                    total_written_bits + written_bits,
45✔
599
                    chunk_graph_path.display(),
45✔
600
                    graph_path.display()
45✔
601
                );
602
                total_written_bits += written_bits;
77✔
603

604
                let mut reader =
77✔
605
                    <BufBitReader<E, _>>::new(<WordAdapter<u32, _>>::new(BufReader::new(
231✔
606
                        File::open(&chunk_graph_path)
154✔
607
                            .with_context(|| format!("Could not open {}", chunk_graph_path.display()))?,
77✔
608
                    )));
609
                graph_writer
77✔
610
                    .copy_from(&mut reader, written_bits)
231✔
611
                    .with_context(|| {
77✔
612
                        format!(
×
613
                            "Could not copy from {} to {}",
×
614
                            chunk_graph_path.display(),
×
615
                            graph_path.display()
×
616
                        )
617
                    })?;
618

619
                log::debug!(
77✔
UNCOV
620
                    "Copying offsets {} [{}..{}) bits from {} to {}",
×
621
                    offsets_written_bits,
×
622
                    total_offsets_written_bits,
×
623
                    total_offsets_written_bits + offsets_written_bits,
45✔
624
                    chunk_offsets_path.display(),
45✔
625
                    offsets_path.display()
45✔
626
                );
627
                total_offsets_written_bits += offsets_written_bits;
77✔
628

629
                let mut reader =
77✔
630
                    <BufBitReader<BigEndian, _>>::new(<WordAdapter<u32, _>>::new(BufReader::new(
231✔
631
                        File::open(&chunk_offsets_path)
154✔
632
                            .with_context(|| format!("Could not open {}", chunk_offsets_path.display()))?,
77✔
633
                    )));
634
                offsets_writer
77✔
635
                    .copy_from(&mut reader, offsets_written_bits)
231✔
636
                    .with_context(|| {
77✔
637
                        format!(
×
638
                            "Could not copy from {} to {}",
×
639
                            chunk_offsets_path.display(),
×
640
                            offsets_path.display()
×
641
                        )
642
                    })?;
643

644
                copy_pl.update_with_count(last_node - first_node + 1);
231✔
645
            }
646

647

648
            log::info!("Flushing the merged bitstreams");
17✔
649
            graph_writer.flush()?;
34✔
650
            offsets_writer.flush()?;
34✔
651

652
            comp_pl.done();
34✔
653
            copy_pl.done();
34✔
654

655
            log::info!("Writing the .properties file");
17✔
656
            let properties = self.compression_flags
34✔
657
                .to_properties::<E>(num_nodes, total_arcs, total_written_bits)
68✔
658
                .context("Could not serialize properties")?;
17✔
659
            let properties_path = self.basename.with_extension(PROPERTIES_EXTENSION);
34✔
660
            std::fs::write(&properties_path, properties).with_context(|| {
68✔
661
                format!(
×
662
                    "Could not write properties to {}",
×
663
                    properties_path.display()
×
664
                )
665
            })?;
666

667
            log::info!(
17✔
UNCOV
668
                "Compressed {} arcs into {} bits for {:.4} bits/arc",
×
669
                total_arcs,
×
670
                total_written_bits,
×
671
                total_written_bits as f64 / total_arcs as f64
17✔
672
            );
673
            log::info!(
17✔
UNCOV
674
                "Created offsets file with {} bits for {:.4} bits/node",
×
675
                total_offsets_written_bits,
×
676
                total_offsets_written_bits as f64 / num_nodes as f64
17✔
677
            );
678

679
            // cleanup the temp files
680
            std::fs::remove_dir_all(&tmp_dir).with_context(|| {
51✔
681
                format!("Could not clean temporary directory {}", tmp_dir.display())
×
682
            })?;
683
            Ok(total_written_bits)
17✔
684
        })
685
    }
686
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc