• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vigna / webgraph-rs / 18470854817

13 Oct 2025 03:32PM UTC coverage: 48.029% (+0.2%) from 47.864%
18470854817

push

github

vigna
Cleaned up and fixed docs; removed useless From impl

29 of 45 new or added lines in 4 files covered. (64.44%)

218 existing lines in 5 files now uncovered.

3971 of 8268 relevant lines covered (48.03%)

21878868.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

44.62
/webgraph/src/graphs/bvgraph/comp/impls.rs
1
/*
2
 * SPDX-FileCopyrightText: 2023 Inria
3
 * SPDX-FileCopyrightText: 2023 Sebastiano Vigna
4
 *
5
 * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
6
 */
7

8
use crate::prelude::*;
9
use anyhow::{ensure, Context, Result};
10
use dsi_bitstream::prelude::*;
11
use dsi_progress_logger::prelude::*;
12
use lender::prelude::*;
13
use rayon::{ThreadPool, ThreadPoolBuilder};
14
use std::fs::File;
15
use std::io::{BufReader, BufWriter};
16
use std::path::{Path, PathBuf};
17

18
/// A queue that pulls jobs with ids in a contiguous initial segment of the
19
/// natural numbers from an iterator out of order and implement an iterator in
20
/// which they can be pulled in order.
21
///
22
/// Jobs must be ordered by their job id, and must implement [`Eq`] with a
23
/// [`usize`] using their job id.
24
struct TaskQueue<I: Iterator> {
25
    iter: I,
26
    jobs: Vec<Option<I::Item>>,
27
    next_id: usize,
28
}
29

30
trait JobId {
31
    fn id(&self) -> usize;
32
}
33

34
impl<I: Iterator> TaskQueue<I> {
35
    fn new(iter: I) -> Self {
17✔
36
        Self {
37
            iter,
38
            jobs: vec![],
17✔
39
            next_id: 0,
40
        }
41
    }
42
}
43

44
impl<I: Iterator> Iterator for TaskQueue<I>
45
where
46
    I::Item: JobId,
47
{
48
    type Item = I::Item;
49

50
    fn next(&mut self) -> Option<Self::Item> {
94✔
51
        loop {
×
52
            if let Some(item) = self.jobs.get_mut(self.next_id) {
430✔
53
                if item.is_some() {
×
54
                    self.next_id += 1;
77✔
55
                    return item.take();
154✔
56
                }
57
            }
58
            if let Some(item) = self.iter.next() {
171✔
59
                let id = item.id();
×
60
                if id >= self.jobs.len() {
×
61
                    self.jobs.resize_with(id + 1, || None);
204✔
62
                }
63
                self.jobs[id] = Some(item);
×
64
            } else {
65
                return None;
17✔
66
            }
67
        }
68
    }
69
}
70

71
/// A compression job.
72
#[derive(Debug, PartialEq, PartialOrd, Eq, Ord, Clone)]
73
struct Job {
74
    job_id: usize,
75
    first_node: usize,
76
    last_node: usize,
77
    chunk_graph_path: PathBuf,
78
    written_bits: u64,
79
    chunk_offsets_path: PathBuf,
80
    offsets_written_bits: u64,
81
    num_arcs: u64,
82
}
83

84
impl JobId for Job {
85
    fn id(&self) -> usize {
212✔
86
        self.job_id
212✔
87
    }
88
}
89

90
impl BvComp<()> {
91
    /// Compresses s [`NodeLabelsLender`] and returns the length in bits of the
92
    /// graph bitstream.
93
    pub fn single_thread<E, L>(
×
94
        basename: impl AsRef<Path>,
95
        iter: L,
96
        compression_flags: CompFlags,
97
        build_offsets: bool,
98
        num_nodes: Option<usize>,
99
    ) -> Result<u64>
100
    where
101
        E: Endianness,
102
        L: IntoLender,
103
        L::Lender: for<'next> NodeLabelsLender<'next, Label = usize>,
104
        BufBitWriter<E, WordAdapter<usize, BufWriter<File>>>: CodesWrite<E>,
105
    {
106
        BvCompBuilder::new(basename)
×
107
            .with_compression_flags(compression_flags)
×
108
            .single_thread::<E, L>(iter, build_offsets, num_nodes)
×
109
    }
110

111
    /// A wrapper over [`parallel_graph`](Self::parallel_graph) that takes the
112
    /// endianness as a string.
113
    ///
114
    /// Endianness can only be [`BE::NAME`](BE) or [`LE::NAME`](LE).
115
    ///
116
    ///  A given endianness is enabled only if the corresponding feature is
117
    /// enabled, `be_bins` for big endian and `le_bins` for little endian, or if
118
    /// neither features are enabled.
119
    pub fn parallel_endianness<P: AsRef<Path>, G: SplitLabeling + SequentialGraph>(
8✔
120
        basename: impl AsRef<Path> + Send + Sync,
121
        graph: &G,
122
        num_nodes: usize,
123
        compression_flags: CompFlags,
124
        threads: &ThreadPool,
125
        tmp_dir: P,
126
        endianness: &str,
127
    ) -> Result<u64>
128
    where
129
        for<'a> <G as SplitLabeling>::SplitLender<'a>: Send + Sync,
130
    {
131
        BvCompBuilder::new(basename)
24✔
132
            .with_compression_flags(compression_flags)
24✔
133
            .with_threads(threads)
24✔
134
            .with_tmp_dir(tmp_dir)
8✔
135
            .parallel_endianness(graph, num_nodes, endianness)
32✔
136
    }
137

138
    /// Compresses a graph in parallel and returns the length in bits of the graph bitstream.
139
    pub fn parallel_graph<E: Endianness>(
9✔
140
        basename: impl AsRef<Path> + Send + Sync,
141
        graph: &(impl SequentialGraph + SplitLabeling),
142
        compression_flags: CompFlags,
143
        threads: &ThreadPool,
144
        tmp_dir: impl AsRef<Path>,
145
    ) -> Result<u64>
146
    where
147
        BufBitWriter<E, WordAdapter<usize, BufWriter<std::fs::File>>>: CodesWrite<E>,
148
        BufBitReader<E, WordAdapter<u32, BufReader<std::fs::File>>>: BitRead<E>,
149
    {
150
        BvCompBuilder::new(basename)
27✔
151
            .with_compression_flags(compression_flags)
27✔
152
            .with_threads(threads)
27✔
153
            .with_tmp_dir(tmp_dir)
9✔
154
            .parallel_graph::<E>(graph)
18✔
155
    }
156

157
    /// Compresses multiple [`NodeLabelsLender`] in parallel and returns the length in bits
158
    /// of the graph bitstream.
159
    pub fn parallel_iter<
×
160
        E: Endianness,
161
        L: Lender + for<'next> NodeLabelsLender<'next, Label = usize> + Send,
162
    >(
163
        basename: impl AsRef<Path> + Send + Sync,
164
        iter: impl Iterator<Item = (usize, L)>,
165
        num_nodes: usize,
166
        compression_flags: CompFlags,
167
        threads: &ThreadPool,
168
        tmp_dir: impl AsRef<Path>,
169
    ) -> Result<u64>
170
    where
171
        BufBitWriter<E, WordAdapter<usize, BufWriter<std::fs::File>>>: CodesWrite<E>,
172
        BufBitReader<E, WordAdapter<u32, BufReader<std::fs::File>>>: BitRead<E>,
173
    {
174
        BvCompBuilder::new(basename)
×
175
            .with_compression_flags(compression_flags)
×
176
            .with_threads(threads)
×
177
            .with_tmp_dir(tmp_dir)
×
178
            .parallel_iter::<E, L>(iter, num_nodes)
×
179
    }
180
}
181

182
/// Like [`std::borrow::Cow`] but does not require `T: ToOwned`
183
#[derive(Debug)]
184
enum MaybeOwned<'a, T> {
185
    Borrowed(&'a T),
186
    Owned(T),
187
}
188

189
#[derive(Debug)]
190
pub struct BvCompBuilder<'t> {
191
    basename: PathBuf,
192
    compression_flags: CompFlags,
193
    threads: Option<MaybeOwned<'t, ThreadPool>>,
194
    tmp_dir: Option<PathBuf>,
195
    /// owns the TempDir that [`Self::tmp_dir`] refers to, if it was created by default
196
    owned_tmp_dir: Option<tempfile::TempDir>,
197
}
198

199
impl BvCompBuilder<'static> {
200
    pub fn new(basename: impl AsRef<Path>) -> Self {
17✔
201
        Self {
202
            basename: basename.as_ref().into(),
51✔
203
            compression_flags: CompFlags::default(),
34✔
204
            threads: None,
205
            tmp_dir: None,
206
            owned_tmp_dir: None,
207
        }
208
    }
209
}
210

211
impl<'t> BvCompBuilder<'t> {
212
    pub fn with_compression_flags(mut self, compression_flags: CompFlags) -> Self {
44✔
213
        self.compression_flags = compression_flags;
44✔
214
        self
44✔
215
    }
216

217
    pub fn with_tmp_dir(mut self, tmp_dir: impl AsRef<Path>) -> Self {
17✔
218
        self.tmp_dir = Some(tmp_dir.as_ref().into());
34✔
219
        self
17✔
220
    }
221

222
    pub fn with_threads(self, threads: &'_ ThreadPool) -> BvCompBuilder<'_> {
44✔
223
        BvCompBuilder {
224
            threads: Some(MaybeOwned::Borrowed(threads)),
44✔
225
            ..self
226
        }
227
    }
228

229
    fn tmp_dir(&mut self) -> Result<PathBuf> {
44✔
230
        if self.tmp_dir.is_none() {
88✔
231
            let tmp_dir = tempfile::tempdir()?;
×
232
            self.tmp_dir = Some(tmp_dir.path().to_owned());
×
233
            self.owned_tmp_dir = Some(tmp_dir);
×
234
        }
235

236
        let tmp_dir = self.tmp_dir.clone().unwrap();
44✔
237
        if !std::fs::exists(&tmp_dir)
×
238
            .with_context(|| format!("Could not check whether {} exists", tmp_dir.display()))?
×
239
        {
240
            std::fs::create_dir_all(&tmp_dir)
8✔
241
                .with_context(|| format!("Could not create {}", tmp_dir.display()))?;
×
242
        }
243
        Ok(tmp_dir)
44✔
244
    }
245

246
    fn ensure_threads(&mut self) -> Result<()> {
88✔
247
        if self.threads.is_none() {
176✔
248
            self.threads = Some(MaybeOwned::Owned(
×
249
                ThreadPoolBuilder::default()
×
250
                    .build()
×
251
                    .context("Could not build default thread pool")?,
×
252
            ));
253
        }
254

255
        Ok(())
88✔
256
    }
257

258
    fn threads(&self) -> &ThreadPool {
88✔
259
        match self.threads.as_ref().unwrap() {
176✔
260
            MaybeOwned::Owned(threads) => threads,
×
261
            MaybeOwned::Borrowed(threads) => threads,
88✔
262
        }
263
    }
264

265
    /// Compresses s [`NodeLabelsLender`] and returns the length in bits of the
266
    /// graph bitstream.
267
    pub fn single_thread<E, L>(
×
268
        &mut self,
269
        iter: L,
270
        build_offsets: bool,
271
        num_nodes: Option<usize>,
272
    ) -> Result<u64>
273
    where
274
        E: Endianness,
275
        L: IntoLender,
276
        L::Lender: for<'next> NodeLabelsLender<'next, Label = usize>,
277
        BufBitWriter<E, WordAdapter<usize, BufWriter<File>>>: CodesWrite<E>,
278
    {
279
        let graph_path = self.basename.with_extension(GRAPH_EXTENSION);
×
280

281
        // Compress the graph
282
        let bit_write = <BufBitWriter<E, _>>::new(<WordAdapter<usize, _>>::new(BufWriter::new(
×
283
            File::create(&graph_path)
×
284
                .with_context(|| format!("Could not create {}", graph_path.display()))?,
×
285
        )));
286

287
        let comp_flags = CompFlags {
288
            ..Default::default()
289
        };
290

291
        let codes_writer = DynCodesEncoder::new(bit_write, &comp_flags)?;
×
292

293
        let mut bvcomp = BvComp::new(
294
            codes_writer,
×
295
            self.compression_flags.compression_window,
×
296
            self.compression_flags.max_ref_count,
×
297
            self.compression_flags.min_interval_length,
×
298
            0,
299
        );
300

301
        let mut pl = progress_logger![
×
302
            display_memory = true,
×
303
            item_name = "node",
×
304
            expected_updates = num_nodes,
×
305
        ];
306
        pl.start("Compressing successors...");
×
307
        let mut bitstream_len = 0;
×
308

309
        let mut real_num_nodes = 0;
×
310
        if build_offsets {
×
311
            let offsets_path = self.basename.with_extension(OFFSETS_EXTENSION);
×
312
            let file = std::fs::File::create(&offsets_path)
×
313
                .with_context(|| format!("Could not create {}", offsets_path.display()))?;
×
314
            // create a bit writer on the file
315
            let mut writer = <BufBitWriter<E, _>>::new(<WordAdapter<usize, _>>::new(
×
316
                BufWriter::with_capacity(1 << 20, file),
×
317
            ));
318

319
            writer
×
320
                .write_gamma(0)
321
                .context("Could not write initial delta")?;
322
            for_! ( (_node_id, successors) in iter {
×
323
                let delta = bvcomp.push(successors).context("Could not push successors")?;
×
324
                bitstream_len += delta;
×
325
                writer.write_gamma(delta).context("Could not write delta")?;
×
326
                pl.update();
×
327
                real_num_nodes += 1;
×
328
            });
329
        } else {
330
            for_! ( (_node_id, successors) in iter {
×
331
                bitstream_len += bvcomp.push(successors).context("Could not push successors")?;
×
332
                pl.update();
×
333
                real_num_nodes += 1;
×
334
            });
335
        }
336
        pl.done();
×
337

338
        if let Some(num_nodes) = num_nodes {
×
339
            if num_nodes != real_num_nodes {
×
340
                log::warn!(
×
341
                    "The expected number of nodes is {num_nodes} but the actual number of nodes is {real_num_nodes}"
×
342
                );
343
            }
344
        }
345
        let num_arcs = bvcomp.arcs;
×
346
        bitstream_len += bvcomp.flush().context("Could not flush bvcomp")? as u64;
×
347

348
        log::info!("Writing the .properties file");
×
349
        let properties = self
×
350
            .compression_flags
×
351
            .to_properties::<E>(real_num_nodes, num_arcs, bitstream_len)
×
352
            .context("Could not serialize properties")?;
353
        let properties_path = self.basename.with_extension(PROPERTIES_EXTENSION);
×
354
        std::fs::write(&properties_path, properties)
×
355
            .with_context(|| format!("Could not write {}", properties_path.display()))?;
×
356

357
        Ok(bitstream_len)
×
358
    }
359

360
    /// A wrapper over [`parallel_graph`](Self::parallel_graph) that takes the
361
    /// endianness as a string.
362
    ///
363
    /// Endianness can only be [`BE::NAME`](BE) or [`LE::NAME`](LE).
364
    ///
365
    ///  A given endianness is enabled only if the corresponding feature is
366
    /// enabled, `be_bins` for big endian and `le_bins` for little endian, or if
367
    /// neither features are enabled.
368
    pub fn parallel_endianness<G: SplitLabeling + SequentialGraph>(
8✔
369
        &mut self,
370
        graph: &G,
371
        num_nodes: usize,
372
        endianness: &str,
373
    ) -> Result<u64>
374
    where
375
        for<'a> <G as SplitLabeling>::SplitLender<'a>: Send + Sync,
376
    {
377
        self.ensure_threads()?;
16✔
378
        let num_threads = self.threads().current_num_threads();
8✔
379

380
        match endianness {
×
381
            #[cfg(any(
382
                feature = "be_bins",
383
                not(any(feature = "be_bins", feature = "le_bins"))
384
            ))]
385
            BE::NAME => {
×
386
                // compress the transposed graph
387
                self.parallel_iter::<BigEndian, _>(
16✔
388
                    graph.split_iter(num_threads).into_iter(),
32✔
389
                    num_nodes,
8✔
390
                )
391
            }
392
            #[cfg(any(
393
                feature = "le_bins",
394
                not(any(feature = "be_bins", feature = "le_bins"))
395
            ))]
UNCOV
396
            LE::NAME => {
×
397
                // compress the transposed graph
UNCOV
398
                self.parallel_iter::<LittleEndian, _>(
×
399
                    graph.split_iter(num_threads).into_iter(),
×
UNCOV
400
                    num_nodes,
×
401
                )
402
            }
403
            x => anyhow::bail!("Unknown endianness {}", x),
×
404
        }
405
    }
406

407
    /// Compresses a graph in parallel and returns the length in bits of the graph bitstream.
408
    pub fn parallel_graph<E: Endianness>(
9✔
409
        &mut self,
410
        graph: &(impl SequentialGraph + SplitLabeling),
411
    ) -> Result<u64>
412
    where
413
        BufBitWriter<E, WordAdapter<usize, BufWriter<std::fs::File>>>: CodesWrite<E>,
414
        BufBitReader<E, WordAdapter<u32, BufReader<std::fs::File>>>: BitRead<E>,
415
    {
416
        self.ensure_threads()?;
18✔
417
        let num_threads = self.threads().current_num_threads();
9✔
UNCOV
418
        self.parallel_iter(graph.split_iter(num_threads).into_iter(), graph.num_nodes())
×
419
    }
420

421
    /// Compresses multiple [`NodeLabelsLender`] in parallel and returns the length in bits
422
    /// of the graph bitstream.
423
    pub fn parallel_iter<
17✔
424
        E: Endianness,
425
        L: Lender + for<'next> NodeLabelsLender<'next, Label = usize> + Send,
426
    >(
427
        &mut self,
428
        iter: impl Iterator<Item = (usize, L)>,
429
        num_nodes: usize,
430
    ) -> Result<u64>
431
    where
432
        BufBitWriter<E, WordAdapter<usize, BufWriter<std::fs::File>>>: CodesWrite<E>,
433
        BufBitReader<E, WordAdapter<u32, BufReader<std::fs::File>>>: BitRead<E>,
434
    {
435
        self.ensure_threads()?;
34✔
436
        let tmp_dir = self.tmp_dir()?;
34✔
UNCOV
437
        let threads = self.threads();
×
438

UNCOV
439
        let graph_path = self.basename.with_extension(GRAPH_EXTENSION);
×
UNCOV
440
        let offsets_path = self.basename.with_extension(OFFSETS_EXTENSION);
×
441

UNCOV
442
        let (tx, rx) = std::sync::mpsc::channel();
×
443

444
        let thread_path = |thread_id: usize| tmp_dir.join(format!("{thread_id:016x}.bitstream"));
308✔
445

UNCOV
446
        let mut comp_pl = concurrent_progress_logger![
×
UNCOV
447
            log_target = "webgraph::graphs::bvgraph::comp::impls::parallel_iter::comp",
×
UNCOV
448
            display_memory = true,
×
449
            item_name = "node",
×
UNCOV
450
            local_speed = true,
×
451
            expected_updates = Some(num_nodes),
×
452
        ];
UNCOV
453
        comp_pl.start("Compressing successors in parallel...");
×
454
        threads.in_place_scope(|s| {
17✔
455
            let cp_flags = &self.compression_flags;
34✔
456

457
            for (thread_id, (expected_first_node, mut thread_lender)) in iter.enumerate() {
111✔
458
                let tmp_path = thread_path(thread_id);
×
459
                let chunk_graph_path = tmp_path.with_extension(GRAPH_EXTENSION);
×
460
                let chunk_offsets_path = tmp_path.with_extension(OFFSETS_EXTENSION);
×
461
                let tx = tx.clone();
×
462
                let mut comp_pl = comp_pl.clone();
×
463
                // Spawn the thread
464
                s.spawn(move |_| {
77✔
465
                    log::debug!("Thread {thread_id} started");
122✔
UNCOV
466
                    let first_node;
×
UNCOV
467
                    let mut bvcomp;
×
UNCOV
468
                    let mut offsets_writer;
×
UNCOV
469
                    let mut written_bits;
×
470
                    let mut offsets_written_bits;
×
471

472
                    match thread_lender.next() {
77✔
473
                        None => return,
×
474
                        Some((node_id, successors)) => {
154✔
475
                            first_node = node_id;
77✔
476
                            if first_node != expected_first_node {
77✔
UNCOV
477
                                panic!(
×
478
                                    "Lender {} expected to start from node {} but started from {}",
×
479
                                    thread_id,
×
480
                                    expected_first_node,
×
481
                                    first_node
×
482
                                );
483
                            }
484

485
                            offsets_writer = <BufBitWriter<BigEndian, _>>::new(<WordAdapter<usize, _>>::new(
231✔
486
                                BufWriter::new(File::create(&chunk_offsets_path).unwrap()),
308✔
487
                            ));
488

489
                            let writer = <BufBitWriter<E, _>>::new(<WordAdapter<usize, _>>::new(
231✔
490
                                BufWriter::new(File::create(&chunk_graph_path).unwrap()),
308✔
491
                            ));
492
                            let codes_encoder = <DynCodesEncoder<E, _>>::new(writer, cp_flags).unwrap();
385✔
493

494
                            bvcomp = BvComp::new(
154✔
495
                                codes_encoder,
77✔
496
                                cp_flags.compression_window,
77✔
497
                                cp_flags.max_ref_count,
77✔
498
                                cp_flags.min_interval_length,
77✔
499
                                node_id,
77✔
500
                            );
501
                            written_bits = bvcomp.push(successors).unwrap();
231✔
502
                            offsets_written_bits = offsets_writer.write_gamma(written_bits).unwrap() as u64;
231✔
503

504
                        }
505
                    };
506

507
                    let mut last_node = first_node;
154✔
508
                    let iter_nodes = thread_lender.inspect(|(x, _)| last_node = *x);
5,534,623✔
509
                    for_! ( (_, succ) in iter_nodes {
5,534,469✔
UNCOV
510
                        let node_bits = bvcomp.push(succ.into_iter()).unwrap();
×
UNCOV
511
                        written_bits += node_bits;
×
UNCOV
512
                        offsets_written_bits += offsets_writer.write_gamma(node_bits).unwrap() as u64;
×
513
                    });
514

515
                    let num_arcs = bvcomp.arcs;
154✔
516
                    bvcomp.flush().unwrap();
231✔
517
                    offsets_writer.flush().unwrap();
231✔
518
                    comp_pl.update_with_count(last_node - first_node + 1);
231✔
519

520

521

522
                    log::debug!(
77✔
523
                        "Finished Compression thread {thread_id} and wrote {written_bits} bits for the graph and {offsets_written_bits} bits for the offsets",
45✔
524
                    );
525
                    tx.send(Job {
231✔
526
                        job_id: thread_id,
154✔
527
                        first_node,
154✔
528
                        last_node,
154✔
529
                        chunk_graph_path,
154✔
530
                        written_bits,
154✔
531
                        chunk_offsets_path,
154✔
532
                        offsets_written_bits,
77✔
533
                        num_arcs,
77✔
534
                    })
535
                    .unwrap()
77✔
536
                });
537
            }
538

539
            drop(tx);
34✔
540

541
            let mut copy_pl = progress_logger![
34✔
UNCOV
542
                log_target = "webgraph::graphs::bvgraph::comp::impls::parallel_iter::copy",
×
UNCOV
543
                display_memory = true,
×
UNCOV
544
                item_name = "node",
×
UNCOV
545
                local_speed = true,
×
546
                expected_updates = Some(num_nodes),
17✔
547
            ];
548
            copy_pl.start("Copying compressed successors to final graph");
34✔
549

550
            let file = File::create(&graph_path)
51✔
551
                .with_context(|| format!("Could not create graph {}", graph_path.display()))?;
17✔
UNCOV
552
            let mut graph_writer =
×
UNCOV
553
                <BufBitWriter<E, _>>::new(<WordAdapter<usize, _>>::new(BufWriter::new(file)));
×
554

555
            let file = File::create(&offsets_path)
17✔
556
                .with_context(|| format!("Could not create offsets {}", offsets_path.display()))?;
×
557
            let mut offsets_writer =
×
UNCOV
558
                <BufBitWriter<BigEndian, _>>::new(<WordAdapter<usize, _>>::new(BufWriter::new(file)));
×
UNCOV
559
            offsets_writer.write_gamma(0)?;
×
560

561
            let mut total_written_bits: u64 = 0;
17✔
562
            let mut total_offsets_written_bits: u64 = 0;
×
563
            let mut total_arcs: u64 = 0;
×
564

UNCOV
565
            let mut next_node = 0;
×
566
            // glue together the bitstreams as they finish, this allows us to do
567
            // task pipelining for better performance
UNCOV
568
            for Job {
×
569
                job_id,
77✔
570
                first_node,
77✔
571
                last_node,
77✔
572
                chunk_graph_path,
77✔
573
                written_bits,
77✔
574
                chunk_offsets_path,
77✔
575
                offsets_written_bits,
77✔
576
                num_arcs,
77✔
UNCOV
577
            } in TaskQueue::new(rx.iter())
×
578
            {
579
                ensure!(
77✔
580
                    first_node == next_node,
77✔
581
                    "Non-adjacent lenders: lender {} has first node {} instead of {}",
×
UNCOV
582
                    job_id,
×
UNCOV
583
                    first_node,
×
UNCOV
584
                    next_node
×
585
                );
586

587
                next_node = last_node + 1;
77✔
588
                total_arcs += num_arcs;
×
UNCOV
589
                log::debug!(
×
590
                    "Copying {} [{}..{}) bits from {} to {}",
45✔
UNCOV
591
                    written_bits,
×
592
                    total_written_bits,
×
593
                    total_written_bits + written_bits,
45✔
594
                    chunk_graph_path.display(),
45✔
595
                    graph_path.display()
45✔
596
                );
UNCOV
597
                total_written_bits += written_bits;
×
598

599
                let mut reader =
77✔
UNCOV
600
                    <BufBitReader<E, _>>::new(<WordAdapter<u32, _>>::new(BufReader::new(
×
601
                        File::open(&chunk_graph_path)
×
UNCOV
602
                            .with_context(|| format!("Could not open {}", chunk_graph_path.display()))?,
×
603
                    )));
604
                graph_writer
×
605
                    .copy_from(&mut reader, written_bits)
×
606
                    .with_context(|| {
×
UNCOV
607
                        format!(
×
608
                            "Could not copy from {} to {}",
×
609
                            chunk_graph_path.display(),
×
610
                            graph_path.display()
×
611
                        )
612
                    })?;
613

614
                log::debug!(
77✔
615
                    "Copying offsets {} [{}..{}) bits from {} to {}",
45✔
UNCOV
616
                    offsets_written_bits,
×
UNCOV
617
                    total_offsets_written_bits,
×
618
                    total_offsets_written_bits + offsets_written_bits,
45✔
619
                    chunk_offsets_path.display(),
45✔
620
                    offsets_path.display()
45✔
621
                );
UNCOV
622
                total_offsets_written_bits += offsets_written_bits;
×
623

624
                let mut reader =
77✔
UNCOV
625
                    <BufBitReader<BigEndian, _>>::new(<WordAdapter<u32, _>>::new(BufReader::new(
×
626
                        File::open(&chunk_offsets_path)
×
UNCOV
627
                            .with_context(|| format!("Could not open {}", chunk_offsets_path.display()))?,
×
628
                    )));
629
                offsets_writer
×
630
                    .copy_from(&mut reader, offsets_written_bits)
×
631
                    .with_context(|| {
×
UNCOV
632
                        format!(
×
633
                            "Could not copy from {} to {}",
×
634
                            chunk_offsets_path.display(),
×
635
                            offsets_path.display()
×
636
                        )
637
                    })?;
638

639
                copy_pl.update_with_count(last_node - first_node + 1);
77✔
640
            }
641

642

643
            log::info!("Flushing the merged bitstreams");
34✔
UNCOV
644
            graph_writer.flush()?;
×
645
            offsets_writer.flush()?;
17✔
646

647
            comp_pl.done();
17✔
648
            copy_pl.done();
×
649

650
            log::info!("Writing the .properties file");
17✔
651
            let properties = self.compression_flags
17✔
652
                .to_properties::<E>(num_nodes, total_arcs, total_written_bits)
×
UNCOV
653
                .context("Could not serialize properties")?;
×
UNCOV
654
            let properties_path = self.basename.with_extension(PROPERTIES_EXTENSION);
×
UNCOV
655
            std::fs::write(&properties_path, properties).with_context(|| {
×
656
                format!(
×
657
                    "Could not write properties to {}",
×
658
                    properties_path.display()
×
659
                )
660
            })?;
661

662
            log::info!(
17✔
663
                "Compressed {} arcs into {} bits for {:.4} bits/arc",
17✔
UNCOV
664
                total_arcs,
×
UNCOV
665
                total_written_bits,
×
666
                total_written_bits as f64 / total_arcs as f64
17✔
667
            );
668
            log::info!(
×
669
                "Created offsets file with {} bits for {:.4} bits/node",
17✔
UNCOV
670
                total_offsets_written_bits,
×
671
                total_offsets_written_bits as f64 / num_nodes as f64
17✔
672
            );
673

674
            // cleanup the temp files
UNCOV
675
            std::fs::remove_dir_all(&tmp_dir).with_context(|| {
×
UNCOV
676
                format!("Could not clean temporary directory {}", tmp_dir.display())
×
677
            })?;
678
            Ok(total_written_bits)
17✔
679
        })
680
    }
681
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc