• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vigna / webgraph-rs / 18280289969

06 Oct 2025 12:07PM UTC coverage: 48.82%. First build
18280289969

Pull #148

github

web-flow
Merge dfe1df133 into 2777b3d50
Pull Request #148: bvcomp: Create temporary directory if it does not exist

3 of 6 new or added lines in 1 file covered. (50.0%)

3911 of 8011 relevant lines covered (48.82%)

23015602.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

45.16
/webgraph/src/graphs/bvgraph/comp/impls.rs
1
/*
2
 * SPDX-FileCopyrightText: 2023 Inria
3
 * SPDX-FileCopyrightText: 2023 Sebastiano Vigna
4
 *
5
 * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
6
 */
7

8
use crate::prelude::*;
9
use anyhow::{ensure, Context, Result};
10
use dsi_bitstream::prelude::*;
11
use dsi_progress_logger::prelude::*;
12
use lender::prelude::*;
13
use rayon::{ThreadPool, ThreadPoolBuilder};
14
use std::fs::File;
15
use std::io::{BufReader, BufWriter};
16
use std::path::{Path, PathBuf};
17

18
/// A queue that pulls jobs with ids in a contiguous initial segment of the
19
/// natural numbers from an iterator out of order and implement an iterator in
20
/// which they can be pulled in order.
21
///
22
/// Jobs must be ordered by their job id, and must implement [`Eq`] with a
23
/// [`usize`] using their job id.
24
struct TaskQueue<I: Iterator> {
25
    iter: I,
26
    jobs: Vec<Option<I::Item>>,
27
    next_id: usize,
28
}
29

30
trait JobId {
31
    fn id(&self) -> usize;
32
}
33

34
impl<I: Iterator> TaskQueue<I> {
35
    fn new(iter: I) -> Self {
17✔
36
        Self {
37
            iter,
38
            jobs: vec![],
17✔
39
            next_id: 0,
40
        }
41
    }
42
}
43

44
impl<I: Iterator> Iterator for TaskQueue<I>
45
where
46
    I::Item: JobId,
47
{
48
    type Item = I::Item;
49

50
    fn next(&mut self) -> Option<Self::Item> {
94✔
51
        loop {
×
52
            if let Some(item) = self.jobs.get_mut(self.next_id) {
431✔
53
                if item.is_some() {
×
54
                    self.next_id += 1;
77✔
55
                    return item.take();
154✔
56
                }
57
            }
58
            if let Some(item) = self.iter.next() {
171✔
59
                let id = item.id();
×
60
                if id >= self.jobs.len() {
×
61
                    self.jobs.resize_with(id + 1, || None);
207✔
62
                }
63
                self.jobs[id] = Some(item);
×
64
            } else {
65
                return None;
17✔
66
            }
67
        }
68
    }
69
}
70

71
/// A compression job.
72
#[derive(Debug, PartialEq, PartialOrd, Eq, Ord, Clone)]
73
struct Job {
74
    job_id: usize,
75
    first_node: usize,
76
    last_node: usize,
77
    chunk_graph_path: PathBuf,
78
    written_bits: u64,
79
    chunk_offsets_path: PathBuf,
80
    offsets_written_bits: u64,
81
    num_arcs: u64,
82
}
83

84
impl JobId for Job {
85
    fn id(&self) -> usize {
212✔
86
        self.job_id
212✔
87
    }
88
}
89

90
impl BvComp<()> {
91
    /// Compresses s [`NodeLabelsLender`] and returns the length in bits of the
92
    /// graph bitstream.
93
    pub fn single_thread<E, L>(
×
94
        basename: impl AsRef<Path>,
95
        iter: L,
96
        compression_flags: CompFlags,
97
        build_offsets: bool,
98
        num_nodes: Option<usize>,
99
    ) -> Result<u64>
100
    where
101
        E: Endianness,
102
        L: IntoLender,
103
        L::Lender: for<'next> NodeLabelsLender<'next, Label = usize>,
104
        BufBitWriter<E, WordAdapter<usize, BufWriter<File>>>: CodesWrite<E>,
105
    {
106
        BvCompBuilder::new(basename)
×
107
            .with_compression_flags(compression_flags)
×
108
            .single_thread::<E, L>(iter, build_offsets, num_nodes)
×
109
    }
110

111
    /// A wrapper over [`parallel_graph`](Self::parallel_graph) that takes the
112
    /// endianness as a string.
113
    ///
114
    /// Endianness can only be [`BE::NAME`](BE) or [`LE::NAME`](LE).
115
    ///
116
    ///  A given endianness is enabled only if the corresponding feature is
117
    /// enabled, `be_bins` for big endian and `le_bins` for little endian, or if
118
    /// neither features are enabled.
119
    pub fn parallel_endianness<P: AsRef<Path>, G: SplitLabeling + SequentialGraph>(
8✔
120
        basename: impl AsRef<Path> + Send + Sync,
121
        graph: &G,
122
        num_nodes: usize,
123
        compression_flags: CompFlags,
124
        threads: &ThreadPool,
125
        tmp_dir: P,
126
        endianness: &str,
127
    ) -> Result<u64>
128
    where
129
        for<'a> <G as SplitLabeling>::SplitLender<'a>: Send + Sync,
130
    {
131
        BvCompBuilder::new(basename)
24✔
132
            .with_compression_flags(compression_flags)
24✔
133
            .with_threads(threads)
24✔
134
            .with_tmp_dir(tmp_dir)
8✔
135
            .parallel_endianness(graph, num_nodes, endianness)
32✔
136
    }
137

138
    /// Compresses a graph in parallel and returns the length in bits of the graph bitstream.
139
    pub fn parallel_graph<E: Endianness>(
9✔
140
        basename: impl AsRef<Path> + Send + Sync,
141
        graph: &(impl SequentialGraph + SplitLabeling),
142
        compression_flags: CompFlags,
143
        threads: &ThreadPool,
144
        tmp_dir: impl AsRef<Path>,
145
    ) -> Result<u64>
146
    where
147
        BufBitWriter<E, WordAdapter<usize, BufWriter<std::fs::File>>>: CodesWrite<E>,
148
        BufBitReader<E, WordAdapter<u32, BufReader<std::fs::File>>>: BitRead<E>,
149
    {
150
        BvCompBuilder::new(basename)
27✔
151
            .with_compression_flags(compression_flags)
27✔
152
            .with_threads(threads)
27✔
153
            .with_tmp_dir(tmp_dir)
9✔
154
            .parallel_graph::<E>(graph)
18✔
155
    }
156

157
    /// Compresses multiple [`NodeLabelsLender`] in parallel and returns the length in bits
158
    /// of the graph bitstream.
159
    pub fn parallel_iter<
×
160
        E: Endianness,
161
        L: Lender + for<'next> NodeLabelsLender<'next, Label = usize> + Send,
162
    >(
163
        basename: impl AsRef<Path> + Send + Sync,
164
        iter: impl Iterator<Item = L>,
165
        num_nodes: usize,
166
        compression_flags: CompFlags,
167
        threads: &ThreadPool,
168
        tmp_dir: impl AsRef<Path>,
169
    ) -> Result<u64>
170
    where
171
        BufBitWriter<E, WordAdapter<usize, BufWriter<std::fs::File>>>: CodesWrite<E>,
172
        BufBitReader<E, WordAdapter<u32, BufReader<std::fs::File>>>: BitRead<E>,
173
    {
174
        BvCompBuilder::new(basename)
×
175
            .with_compression_flags(compression_flags)
×
176
            .with_threads(threads)
×
177
            .with_tmp_dir(tmp_dir)
×
178
            .parallel_iter::<E, L>(iter, num_nodes)
×
179
    }
180
}
181

182
/// Like [`std::borrow::Cow`] but does not require `T: ToOwned`
183
#[derive(Debug)]
184
enum MaybeOwned<'a, T> {
185
    Borrowed(&'a T),
186
    Owned(T),
187
}
188

189
#[derive(Debug)]
190
pub struct BvCompBuilder<'t> {
191
    basename: PathBuf,
192
    compression_flags: CompFlags,
193
    threads: Option<MaybeOwned<'t, ThreadPool>>,
194
    tmp_dir: Option<PathBuf>,
195
    /// owns the TempDir that [`Self::tmp_dir`] refers to, if it was created by default
196
    owned_tmp_dir: Option<tempfile::TempDir>,
197
}
198

199
impl BvCompBuilder<'static> {
200
    pub fn new(basename: impl AsRef<Path>) -> Self {
17✔
201
        Self {
202
            basename: basename.as_ref().into(),
51✔
203
            compression_flags: CompFlags::default(),
34✔
204
            threads: None,
205
            tmp_dir: None,
206
            owned_tmp_dir: None,
207
        }
208
    }
209
}
210

211
impl<'t> BvCompBuilder<'t> {
212
    pub fn with_compression_flags(mut self, compression_flags: CompFlags) -> Self {
44✔
213
        self.compression_flags = compression_flags;
44✔
214
        self
44✔
215
    }
216

217
    pub fn with_tmp_dir(mut self, tmp_dir: impl AsRef<Path>) -> Self {
17✔
218
        self.tmp_dir = Some(tmp_dir.as_ref().into());
34✔
219
        self
17✔
220
    }
221

222
    pub fn with_threads(self, threads: &'_ ThreadPool) -> BvCompBuilder<'_> {
44✔
223
        BvCompBuilder {
224
            threads: Some(MaybeOwned::Borrowed(threads)),
44✔
225
            ..self
226
        }
227
    }
228

229
    fn tmp_dir(&mut self) -> Result<PathBuf> {
44✔
230
        if self.tmp_dir.is_none() {
88✔
231
            let tmp_dir = tempfile::tempdir()?;
×
232
            self.tmp_dir = Some(tmp_dir.path().to_owned());
×
233
            self.owned_tmp_dir = Some(tmp_dir);
×
234
        }
235

236
        let tmp_dir = self.tmp_dir.clone().unwrap();
44✔
NEW
237
        if !std::fs::exists(&tmp_dir)
×
NEW
238
            .with_context(|| format!("Could not check whether {} exists", tmp_dir.display()))?
×
239
        {
240
            std::fs::create_dir_all(&tmp_dir)
8✔
NEW
241
                .with_context(|| format!("Could not create {}", tmp_dir.display()))?;
×
242
        }
243
        Ok(tmp_dir)
44✔
244
    }
245

246
    fn ensure_threads(&mut self) -> Result<()> {
88✔
247
        if self.threads.is_none() {
176✔
248
            self.threads = Some(MaybeOwned::Owned(
×
249
                ThreadPoolBuilder::default()
×
250
                    .build()
×
251
                    .context("Could not build default thread pool")?,
×
252
            ));
253
        }
254

255
        Ok(())
88✔
256
    }
257

258
    fn threads(&self) -> &ThreadPool {
88✔
259
        match self.threads.as_ref().unwrap() {
176✔
260
            MaybeOwned::Owned(threads) => threads,
×
261
            MaybeOwned::Borrowed(threads) => threads,
88✔
262
        }
263
    }
264

265
    /// Compresses s [`NodeLabelsLender`] and returns the length in bits of the
266
    /// graph bitstream.
267
    pub fn single_thread<E, L>(
×
268
        &mut self,
269
        iter: L,
270
        build_offsets: bool,
271
        num_nodes: Option<usize>,
272
    ) -> Result<u64>
273
    where
274
        E: Endianness,
275
        L: IntoLender,
276
        L::Lender: for<'next> NodeLabelsLender<'next, Label = usize>,
277
        BufBitWriter<E, WordAdapter<usize, BufWriter<File>>>: CodesWrite<E>,
278
    {
279
        let graph_path = self.basename.with_extension(GRAPH_EXTENSION);
×
280

281
        // Compress the graph
282
        let bit_write = <BufBitWriter<E, _>>::new(<WordAdapter<usize, _>>::new(BufWriter::new(
×
283
            File::create(&graph_path)
×
284
                .with_context(|| format!("Could not create {}", graph_path.display()))?,
×
285
        )));
286

287
        let comp_flags = CompFlags {
288
            ..Default::default()
289
        };
290

291
        let codes_writer = DynCodesEncoder::new(bit_write, &comp_flags)?;
×
292

293
        let mut bvcomp = BvComp::new(
294
            codes_writer,
×
295
            self.compression_flags.compression_window,
×
296
            self.compression_flags.max_ref_count,
×
297
            self.compression_flags.min_interval_length,
×
298
            0,
299
        );
300

301
        let mut pl = progress_logger![
×
302
            display_memory = true,
×
303
            item_name = "node",
×
304
            expected_updates = num_nodes,
×
305
        ];
306
        pl.start("Compressing successors...");
×
307
        let mut bitstream_len = 0;
×
308

309
        let mut real_num_nodes = 0;
×
310
        if build_offsets {
×
311
            let offsets_path = self.basename.with_extension(OFFSETS_EXTENSION);
×
312
            let file = std::fs::File::create(&offsets_path)
×
313
                .with_context(|| format!("Could not create {}", offsets_path.display()))?;
×
314
            // create a bit writer on the file
315
            let mut writer = <BufBitWriter<E, _>>::new(<WordAdapter<usize, _>>::new(
×
316
                BufWriter::with_capacity(1 << 20, file),
×
317
            ));
318

319
            writer
×
320
                .write_gamma(0)
321
                .context("Could not write initial delta")?;
322
            for_! ( (_node_id, successors) in iter {
×
323
                let delta = bvcomp.push(successors).context("Could not push successors")?;
×
324
                bitstream_len += delta;
×
325
                writer.write_gamma(delta).context("Could not write delta")?;
×
326
                pl.update();
×
327
                real_num_nodes += 1;
×
328
            });
329
        } else {
330
            for_! ( (_node_id, successors) in iter {
×
331
                bitstream_len += bvcomp.push(successors).context("Could not push successors")?;
×
332
                pl.update();
×
333
                real_num_nodes += 1;
×
334
            });
335
        }
336
        pl.done();
×
337

338
        if let Some(num_nodes) = num_nodes {
×
339
            if num_nodes != real_num_nodes {
×
340
                log::warn!(
×
341
                    "The expected number of nodes is {num_nodes} but the actual number of nodes is {real_num_nodes}"
×
342
                );
343
            }
344
        }
345
        let num_arcs = bvcomp.arcs;
×
346
        bitstream_len += bvcomp.flush().context("Could not flush bvcomp")? as u64;
×
347

348
        log::info!("Writing the .properties file");
×
349
        let properties = self
×
350
            .compression_flags
×
351
            .to_properties::<E>(real_num_nodes, num_arcs, bitstream_len)
×
352
            .context("Could not serialize properties")?;
353
        let properties_path = self.basename.with_extension(PROPERTIES_EXTENSION);
×
354
        std::fs::write(&properties_path, properties)
×
355
            .with_context(|| format!("Could not write {}", properties_path.display()))?;
×
356

357
        Ok(bitstream_len)
×
358
    }
359

360
    /// A wrapper over [`parallel_graph`](Self::parallel_graph) that takes the
361
    /// endianness as a string.
362
    ///
363
    /// Endianness can only be [`BE::NAME`](BE) or [`LE::NAME`](LE).
364
    ///
365
    ///  A given endianness is enabled only if the corresponding feature is
366
    /// enabled, `be_bins` for big endian and `le_bins` for little endian, or if
367
    /// neither features are enabled.
368
    pub fn parallel_endianness<G: SplitLabeling + SequentialGraph>(
8✔
369
        &mut self,
370
        graph: &G,
371
        num_nodes: usize,
372
        endianness: &str,
373
    ) -> Result<u64>
374
    where
375
        for<'a> <G as SplitLabeling>::SplitLender<'a>: Send + Sync,
376
    {
377
        self.ensure_threads()?;
16✔
378
        let num_threads = self.threads().current_num_threads();
8✔
379

380
        match endianness {
×
381
            #[cfg(any(
382
                feature = "be_bins",
383
                not(any(feature = "be_bins", feature = "le_bins"))
384
            ))]
385
            BE::NAME => {
×
386
                // compress the transposed graph
387
                self.parallel_iter::<BigEndian, _>(
16✔
388
                    graph.split_iter(num_threads).into_iter(),
32✔
389
                    num_nodes,
8✔
390
                )
391
            }
392
            #[cfg(any(
393
                feature = "le_bins",
394
                not(any(feature = "be_bins", feature = "le_bins"))
395
            ))]
396
            LE::NAME => {
×
397
                // compress the transposed graph
398
                self.parallel_iter::<LittleEndian, _>(
×
399
                    graph.split_iter(num_threads).into_iter(),
×
400
                    num_nodes,
×
401
                )
402
            }
403
            x => anyhow::bail!("Unknown endianness {}", x),
×
404
        }
405
    }
406

407
    /// Compresses a graph in parallel and returns the length in bits of the graph bitstream.
408
    pub fn parallel_graph<E: Endianness>(
9✔
409
        &mut self,
410
        graph: &(impl SequentialGraph + SplitLabeling),
411
    ) -> Result<u64>
412
    where
413
        BufBitWriter<E, WordAdapter<usize, BufWriter<std::fs::File>>>: CodesWrite<E>,
414
        BufBitReader<E, WordAdapter<u32, BufReader<std::fs::File>>>: BitRead<E>,
415
    {
416
        self.ensure_threads()?;
18✔
417
        let num_threads = self.threads().current_num_threads();
9✔
418
        self.parallel_iter(graph.split_iter(num_threads).into_iter(), graph.num_nodes())
×
419
    }
420

421
    /// Compresses multiple [`NodeLabelsLender`] in parallel and returns the length in bits
422
    /// of the graph bitstream.
423
    pub fn parallel_iter<
17✔
424
        E: Endianness,
425
        L: Lender + for<'next> NodeLabelsLender<'next, Label = usize> + Send,
426
    >(
427
        &mut self,
428
        iter: impl Iterator<Item = L>,
429
        num_nodes: usize,
430
    ) -> Result<u64>
431
    where
432
        BufBitWriter<E, WordAdapter<usize, BufWriter<std::fs::File>>>: CodesWrite<E>,
433
        BufBitReader<E, WordAdapter<u32, BufReader<std::fs::File>>>: BitRead<E>,
434
    {
435
        self.ensure_threads()?;
34✔
436
        let tmp_dir = self.tmp_dir()?;
34✔
437
        let threads = self.threads();
×
438

439
        let graph_path = self.basename.with_extension(GRAPH_EXTENSION);
×
440
        let offsets_path = self.basename.with_extension(OFFSETS_EXTENSION);
×
441

442
        let (tx, rx) = std::sync::mpsc::channel();
×
443

444
        let thread_path = |thread_id: usize| tmp_dir.join(format!("{thread_id:016x}.bitstream"));
308✔
445

446
        let mut comp_pl = concurrent_progress_logger![
×
447
            log_target = "webgraph::graphs::bvgraph::comp::impls::parallel_iter::comp",
×
448
            display_memory = true,
×
449
            item_name = "node",
×
450
            local_speed = true,
×
451
            expected_updates = Some(num_nodes),
×
452
        ];
453
        comp_pl.start("Compressing successors in parallel...");
×
454
        threads.in_place_scope(|s| {
17✔
455
            let cp_flags = &self.compression_flags;
34✔
456

457
            for (thread_id, mut thread_lender) in iter.enumerate() {
111✔
458
                let tmp_path = thread_path(thread_id);
×
459
                let chunk_graph_path = tmp_path.with_extension(GRAPH_EXTENSION);
×
460
                let chunk_offsets_path = tmp_path.with_extension(OFFSETS_EXTENSION);
×
461
                let tx = tx.clone();
×
462
                let mut comp_pl = comp_pl.clone();
×
463
                // Spawn the thread
464
                s.spawn(move |_| {
77✔
465
                    log::debug!("Thread {thread_id} started");
122✔
466
                    let first_node;
×
467
                    let mut bvcomp;
×
468
                    let mut offsets_writer;
×
469
                    let mut written_bits;
×
470
                    let mut offsets_written_bits;
×
471

472
                    match thread_lender.next() {
77✔
473
                        None => return,
×
474
                        Some((node_id, successors)) => {
231✔
475
                            first_node = node_id;
154✔
476

477
                            offsets_writer = <BufBitWriter<BigEndian, _>>::new(<WordAdapter<usize, _>>::new(
308✔
478
                                BufWriter::new(File::create(&chunk_offsets_path).unwrap()),
385✔
479
                            ));
480

481
                            let writer = <BufBitWriter<E, _>>::new(<WordAdapter<usize, _>>::new(
308✔
482
                                BufWriter::new(File::create(&chunk_graph_path).unwrap()),
385✔
483
                            ));
484
                            let codes_encoder = <DynCodesEncoder<E, _>>::new(writer, cp_flags).unwrap();
462✔
485

486
                            bvcomp = BvComp::new(
231✔
487
                                codes_encoder,
154✔
488
                                cp_flags.compression_window,
154✔
489
                                cp_flags.max_ref_count,
154✔
490
                                cp_flags.min_interval_length,
154✔
491
                                node_id,
154✔
492
                            );
493
                            written_bits = bvcomp.push(successors).unwrap();
308✔
494
                            offsets_written_bits = offsets_writer.write_gamma(written_bits).unwrap() as u64;
231✔
495

496
                        }
497
                    };
498

499
                    let mut last_node = first_node;
154✔
500
                    let iter_nodes = thread_lender.inspect(|(x, _)| last_node = *x);
5,534,623✔
501
                    for_! ( (_, succ) in iter_nodes {
5,534,469✔
502
                        let node_bits = bvcomp.push(succ.into_iter()).unwrap();
×
503
                        written_bits += node_bits;
×
504
                        offsets_written_bits += offsets_writer.write_gamma(node_bits).unwrap() as u64;
×
505
                    });
506

507
                    let num_arcs = bvcomp.arcs;
154✔
508
                    bvcomp.flush().unwrap();
231✔
509
                    offsets_writer.flush().unwrap();
231✔
510
                    comp_pl.update_with_count(last_node - first_node + 1);
231✔
511

512

513

514
                    log::debug!(
77✔
515
                        "Finished Compression thread {thread_id} and wrote {written_bits} bits for the graph and {offsets_written_bits} bits for the offsets",
45✔
516
                    );
517
                    tx.send(Job {
231✔
518
                        job_id: thread_id,
154✔
519
                        first_node,
154✔
520
                        last_node,
154✔
521
                        chunk_graph_path,
154✔
522
                        written_bits,
154✔
523
                        chunk_offsets_path,
154✔
524
                        offsets_written_bits,
77✔
525
                        num_arcs,
77✔
526
                    })
527
                    .unwrap()
77✔
528
                });
529
            }
530

531
            drop(tx);
34✔
532

533
            let mut copy_pl = progress_logger![
34✔
534
                log_target = "webgraph::graphs::bvgraph::comp::impls::parallel_iter::copy",
×
535
                display_memory = true,
×
536
                item_name = "node",
×
537
                local_speed = true,
×
538
                expected_updates = Some(num_nodes),
17✔
539
            ];
540
            copy_pl.start("Copying compressed successors to final graph");
34✔
541

542
            let file = File::create(&graph_path)
51✔
543
                .with_context(|| format!("Could not create graph {}", graph_path.display()))?;
17✔
544
            let mut graph_writer =
×
545
                <BufBitWriter<E, _>>::new(<WordAdapter<usize, _>>::new(BufWriter::new(file)));
×
546

547
            let file = File::create(&offsets_path)
17✔
548
                .with_context(|| format!("Could not create offsets {}", offsets_path.display()))?;
×
549
            let mut offsets_writer =
×
550
                <BufBitWriter<BigEndian, _>>::new(<WordAdapter<usize, _>>::new(BufWriter::new(file)));
×
551
            offsets_writer.write_gamma(0)?;
×
552

553
            let mut total_written_bits: u64 = 0;
17✔
554
            let mut total_offsets_written_bits: u64 = 0;
×
555
            let mut total_arcs: u64 = 0;
×
556

557
            let mut next_node = 0;
×
558
            // glue together the bitstreams as they finish, this allows us to do
559
            // task pipelining for better performance
560
            for Job {
×
561
                job_id,
77✔
562
                first_node,
77✔
563
                last_node,
77✔
564
                chunk_graph_path,
77✔
565
                written_bits,
77✔
566
                chunk_offsets_path,
77✔
567
                offsets_written_bits,
77✔
568
                num_arcs,
77✔
569
            } in TaskQueue::new(rx.iter())
×
570
            {
571
                ensure!(
77✔
572
                    first_node == next_node,
77✔
573
                    "Non-adjacent lenders: lender {} has first node {} instead of {}",
×
574
                    job_id,
×
575
                    first_node,
×
576
                    next_node
×
577
                );
578

579
                next_node = last_node + 1;
77✔
580
                total_arcs += num_arcs;
×
581
                log::debug!(
×
582
                    "Copying {} [{}..{}) bits from {} to {}",
45✔
583
                    written_bits,
×
584
                    total_written_bits,
×
585
                    total_written_bits + written_bits,
45✔
586
                    chunk_graph_path.display(),
45✔
587
                    graph_path.display()
45✔
588
                );
589
                total_written_bits += written_bits;
×
590

591
                let mut reader =
77✔
592
                    <BufBitReader<E, _>>::new(<WordAdapter<u32, _>>::new(BufReader::new(
×
593
                        File::open(&chunk_graph_path)
×
594
                            .with_context(|| format!("Could not open {}", chunk_graph_path.display()))?,
×
595
                    )));
596
                graph_writer
×
597
                    .copy_from(&mut reader, written_bits)
×
598
                    .with_context(|| {
×
599
                        format!(
×
600
                            "Could not copy from {} to {}",
×
601
                            chunk_graph_path.display(),
×
602
                            graph_path.display()
×
603
                        )
604
                    })?;
605

606
                log::debug!(
77✔
607
                    "Copying offsets {} [{}..{}) bits from {} to {}",
45✔
608
                    offsets_written_bits,
×
609
                    total_offsets_written_bits,
×
610
                    total_offsets_written_bits + offsets_written_bits,
45✔
611
                    chunk_offsets_path.display(),
45✔
612
                    offsets_path.display()
45✔
613
                );
614
                total_offsets_written_bits += offsets_written_bits;
×
615

616
                let mut reader =
77✔
617
                    <BufBitReader<BigEndian, _>>::new(<WordAdapter<u32, _>>::new(BufReader::new(
×
618
                        File::open(&chunk_offsets_path)
×
619
                            .with_context(|| format!("Could not open {}", chunk_offsets_path.display()))?,
×
620
                    )));
621
                offsets_writer
×
622
                    .copy_from(&mut reader, offsets_written_bits)
×
623
                    .with_context(|| {
×
624
                        format!(
×
625
                            "Could not copy from {} to {}",
×
626
                            chunk_offsets_path.display(),
×
627
                            offsets_path.display()
×
628
                        )
629
                    })?;
630

631
                copy_pl.update_with_count(last_node - first_node + 1);
77✔
632
            }
633

634

635
            log::info!("Flushing the merged bitstreams");
34✔
636
            graph_writer.flush()?;
×
637
            offsets_writer.flush()?;
17✔
638

639
            comp_pl.done();
17✔
640
            copy_pl.done();
×
641

642
            log::info!("Writing the .properties file");
17✔
643
            let properties = self.compression_flags
17✔
644
                .to_properties::<E>(num_nodes, total_arcs, total_written_bits)
×
645
                .context("Could not serialize properties")?;
×
646
            let properties_path = self.basename.with_extension(PROPERTIES_EXTENSION);
×
647
            std::fs::write(&properties_path, properties).with_context(|| {
×
648
                format!(
×
649
                    "Could not write properties to {}",
×
650
                    properties_path.display()
×
651
                )
652
            })?;
653

654
            log::info!(
17✔
655
                "Compressed {} arcs into {} bits for {:.4} bits/arc",
17✔
656
                total_arcs,
×
657
                total_written_bits,
×
658
                total_written_bits as f64 / total_arcs as f64
17✔
659
            );
660
            log::info!(
×
661
                "Created offsets file with {} bits for {:.4} bits/node",
17✔
662
                total_offsets_written_bits,
×
663
                total_offsets_written_bits as f64 / num_nodes as f64
17✔
664
            );
665

666
            // cleanup the temp files
667
            std::fs::remove_dir_all(&tmp_dir).with_context(|| {
×
668
                format!("Could not clean temporary directory {}", tmp_dir.display())
×
669
            })?;
670
            Ok(total_written_bits)
17✔
671
        })
672
    }
673
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc