• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vigna / webgraph-rs / 18458664278

13 Oct 2025 07:39AM UTC coverage: 48.052% (+0.2%) from 47.835%
18458664278

Pull #152

github

vigna
Docs review
Pull Request #152: Introduce BatchCodec to substitute BatchIterator

112 of 233 new or added lines in 9 files covered. (48.07%)

7 existing lines in 3 files now uncovered.

4009 of 8343 relevant lines covered (48.05%)

24464313.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.7
/webgraph/src/utils/batch_codec/gaps.rs
1
/*
2
 * SPDX-FileCopyrightText: 2023 Inria
3
 * SPDX-FileCopyrightText: 2023 Sebastiano Vigna
4
 * SPDX-FileCopyrightText: 2025 Tommaso Fontana
5
 *
6
 * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
7
 */
8

9
use super::{BitReader, BitWriter};
10
use crate::traits::SortedIterator;
11
use crate::utils::{ArcMmapHelper, MmapHelper, Triple};
12
use crate::{
13
    traits::{BitDeserializer, BitSerializer},
14
    utils::BatchCodec,
15
};
16

17
use std::sync::Arc;
18

19
use anyhow::{Context, Result};
20
use dsi_bitstream::prelude::*;
21
use mmap_rs::MmapFlags;
22
use rdst::*;
23

24
#[derive(Clone, Debug)]
25
/// A codec for encoding and decoding batches of triples using gap compression.
26
///
27
/// This codec encodes triples of the form `(src, dst, label)` by encoding the
28
/// gaps between consecutive sources and destinations using a specified code.
29
///
30
/// # Type Parameters
31
///
32
/// - `S`: Serializer for the labels, implementing [`BitSerializer`] for the label type.
33
/// - `D`: Deserializer for the labels, implementing [`BitDeserializer`] for the label type.
34
/// - `SRC_CODE`: Code used for encoding source gaps (default: gamma).
35
/// - `DST_CODE`: Code used for encoding destination gaps (default: gamma).
36
///
37
/// # Encoding Format
38
///
39
/// 1. The batch length is written using delta coding.
40
/// 2. For each group of triples with the same source:
41
///     - The gap from the previous source is encoded.
42
///     - The gap from the previous destination is encoded.
43
///     - The label is serialized.
44
///
45
/// The bit deserializer must be [`Clone`] because we need one for each
46
/// [`GapsIterator`], and there are possible scenarios in which the
47
/// deserializer might be stateful.
48
pub struct GapsCodec<
49
    E: Endianness = NE,
50
    S: BitSerializer<E, BitWriter<E>> = (),
51
    D: BitDeserializer<E, BitReader<E>, DeserType = S::SerType> + Clone = (),
52
    const SRC_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA },
53
    const DST_CODE: usize = { dsi_bitstream::dispatch::code_consts::DELTA },
54
> where
55
    BitReader<E>: BitRead<E> + CodesRead<E>,
56
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
57
{
58
    /// Serializer for the labels
59
    pub serializer: S,
60
    /// Deserializer for the labels
61
    pub deserializer: D,
62
    /// Marker for the endianness
63
    pub _marker: std::marker::PhantomData<E>,
64
}
65

66
impl<E, S, D, const SRC_CODE: usize, const DST_CODE: usize> GapsCodec<E, S, D, SRC_CODE, DST_CODE>
67
where
68
    E: Endianness,
69
    S: BitSerializer<E, BitWriter<E>> + Send + Sync,
70
    D: BitDeserializer<E, BitReader<E>, DeserType = S::SerType> + Send + Sync + Clone,
71
    BitReader<E>: BitRead<E> + CodesRead<E>,
72
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
73
{
74
    /// Creates a new `GapsCodec` with the given serializer and deserializer.
75
    pub fn new(serializer: S, deserializer: D) -> Self {
4✔
76
        Self {
77
            serializer,
78
            deserializer,
79
            _marker: std::marker::PhantomData,
80
        }
81
    }
82
}
83

84
impl<E, S: Default, D: Default, const SRC_CODE: usize, const DST_CODE: usize> core::default::Default
85
    for GapsCodec<E, S, D, SRC_CODE, DST_CODE>
86
where
87
    E: Endianness,
88
    S: BitSerializer<E, BitWriter<E>> + Send + Sync,
89
    D: BitDeserializer<E, BitReader<E>, DeserType = S::SerType> + Send + Sync + Clone,
90
    BitReader<E>: BitRead<E> + CodesRead<E>,
91
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
92
{
93
    fn default() -> Self {
2✔
94
        Self::new(Default::default(), Default::default())
6✔
95
    }
96
}
97

98
impl<E, S, D, const SRC_CODE: usize, const DST_CODE: usize> BatchCodec
99
    for GapsCodec<E, S, D, SRC_CODE, DST_CODE>
100
where
101
    E: Endianness,
102
    S: BitSerializer<E, BitWriter<E>> + Send + Sync,
103
    D: BitDeserializer<E, BitReader<E>, DeserType = S::SerType> + Send + Sync + Clone,
104
    S::SerType: Send + Sync + Copy + 'static + core::fmt::Debug, // needed by radix sort
105
    BitReader<E>: BitRead<E> + CodesRead<E>,
106
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
107
{
108
    type Label = S::SerType;
109
    type DecodedBatch = GapsIterator<E, D, SRC_CODE, DST_CODE>;
110

111
    fn encode_batch(
8✔
112
        &self,
113
        path: impl AsRef<std::path::Path>,
114
        batch: &mut [(usize, usize, Self::Label)],
115
    ) -> Result<usize> {
116
        let start = std::time::Instant::now();
16✔
117
        Triple::cast_batch_mut(batch).radix_sort_unstable();
16✔
118
        log::debug!("Sorted {} arcs in {:?}", batch.len(), start.elapsed());
8✔
119
        self.encode_sorted_batch(path, batch)
32✔
120
    }
121

122
    fn encode_sorted_batch(
8✔
123
        &self,
124
        path: impl AsRef<std::path::Path>,
125
        batch: &[(usize, usize, Self::Label)],
126
    ) -> Result<usize> {
127
        debug_assert!(Triple::cast_batch(batch).is_sorted());
24✔
128
        // create a batch file where to dump
129
        let file_path = path.as_ref();
24✔
130
        let file = std::io::BufWriter::with_capacity(
131
            1 << 16,
8✔
132
            std::fs::File::create(file_path).with_context(|| {
24✔
NEW
133
                format!(
×
NEW
134
                    "Could not create BatchIterator temporary file {}",
×
NEW
135
                    file_path.display()
×
136
                )
137
            })?,
138
        );
139
        // create a bitstream to write to the file
NEW
140
        let mut stream = <BufBitWriter<E, _>>::new(<WordAdapter<usize, _>>::new(file));
×
141

142
        // prefix the stream with the length of the batch
143
        // we use a delta code since it'll be a big number most of the time
NEW
144
        stream
×
NEW
145
            .write_delta(batch.len() as u64)
×
146
            .context("Could not write length")?;
147

148
        // dump the triples to the bitstream
149
        let (mut prev_src, mut prev_dst) = (0, 0);
8✔
NEW
150
        let mut written_bits = 0;
×
151
        for (src, dst, label) in batch.iter() {
120✔
152
            // write the source gap as gamma
153
            written_bits += ConstCode::<SRC_CODE>
40✔
154
                .write(&mut stream, (src - prev_src) as u64)
120✔
155
                .with_context(|| format!("Could not write {src} after {prev_src}"))?;
40✔
156
            if *src != prev_src {
74✔
157
                // Reset prev_y
158
                prev_dst = 0;
34✔
159
            }
160
            // write the destination gap as gamma
NEW
161
            written_bits += ConstCode::<DST_CODE>
×
NEW
162
                .write(&mut stream, (dst - prev_dst) as u64)
×
NEW
163
                .with_context(|| format!("Could not write {dst} after {prev_dst}"))?;
×
164
            // write the label
165
            written_bits += self
40✔
166
                .serializer
40✔
NEW
167
                .serialize(label, &mut stream)
×
NEW
168
                .context("Could not serialize label")?;
×
169
            (prev_src, prev_dst) = (*src, *dst);
40✔
170
        }
171
        // flush the stream and reset the buffer
172
        written_bits += stream.flush().context("Could not flush stream")?;
8✔
173

174
        Ok(written_bits)
8✔
175
    }
176

177
    fn decode_batch(&self, path: impl AsRef<std::path::Path>) -> Result<Self::DecodedBatch> {
8✔
178
        // open the file
179
        let mut stream = <BufBitReader<E, _>>::new(MemWordReader::new(ArcMmapHelper(Arc::new(
8✔
180
            MmapHelper::mmap(
8✔
181
                path.as_ref(),
16✔
182
                MmapFlags::TRANSPARENT_HUGE_PAGES | MmapFlags::SEQUENTIAL,
8✔
183
            )
184
            .with_context(|| format!("Could not mmap {}", path.as_ref().display()))?,
8✔
185
        ))));
186

187
        // read the length of the batch (first value in the stream)
188
        let len = stream.read_delta().context("Could not read length")? as usize;
8✔
189

190
        // create the iterator
NEW
191
        Ok(GapsIterator {
×
NEW
192
            deserializer: self.deserializer.clone(),
×
NEW
193
            stream,
×
NEW
194
            len,
×
NEW
195
            current: 0,
×
NEW
196
            prev_src: 0,
×
NEW
197
            prev_dst: 0,
×
198
        })
199
    }
200
}
201

202
#[derive(Clone, Debug)]
203
/// An iterator over triples encoded with gaps, this is returned by [`GapsCodec`].
204
pub struct GapsIterator<
205
    E: Endianness = NE,
206
    D: BitDeserializer<E, BitReader<E>> = (),
207
    const SRC_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA },
208
    const DST_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA },
209
> where
210
    BitReader<E>: BitRead<E> + CodesRead<E>,
211
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
212
{
213
    /// Deserializer for the labels
214
    deserializer: D,
215
    /// Bitstream to read from
216
    stream: BitReader<E>,
217
    /// Length of the iterator (number of triples)
218
    len: usize,
219
    /// Current position in the iterator
220
    current: usize,
221
    /// Previous source node
222
    prev_src: usize,
223
    /// Previous destination node
224
    prev_dst: usize,
225
}
226

227
unsafe impl<
228
        E: Endianness,
229
        D: BitDeserializer<E, BitReader<E>>,
230
        const SRC_CODE: usize,
231
        const DST_CODE: usize,
232
    > SortedIterator for GapsIterator<E, D, SRC_CODE, DST_CODE>
233
where
234
    BitReader<E>: BitRead<E> + CodesRead<E>,
235
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
236
{
237
}
238

239
impl<
240
        E: Endianness,
241
        D: BitDeserializer<E, BitReader<E>>,
242
        const SRC_CODE: usize,
243
        const DST_CODE: usize,
244
    > Iterator for GapsIterator<E, D, SRC_CODE, DST_CODE>
245
where
246
    BitReader<E>: BitRead<E> + CodesRead<E>,
247
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
248
{
249
    type Item = (usize, usize, D::DeserType);
250

251
    fn next(&mut self) -> Option<Self::Item> {
73✔
252
        if self.current >= self.len {
73✔
253
            return None;
11✔
254
        }
255
        let src_gap = ConstCode::<SRC_CODE>.read(&mut self.stream).ok()?;
62✔
256
        let dst_gap = ConstCode::<DST_CODE>.read(&mut self.stream).ok()?;
62✔
257
        let label = self.deserializer.deserialize(&mut self.stream).ok()?;
62✔
NEW
258
        self.prev_src += src_gap as usize;
×
259
        if src_gap != 0 {
56✔
260
            self.prev_dst = 0;
56✔
261
        }
NEW
262
        self.prev_dst += dst_gap as usize;
×
NEW
263
        self.current += 1;
×
NEW
264
        Some((self.prev_src, self.prev_dst, label))
×
265
    }
266

NEW
267
    fn size_hint(&self) -> (usize, Option<usize>) {
×
NEW
268
        (self.len(), Some(self.len()))
×
269
    }
270
}
271

272
impl<
273
        E: Endianness,
274
        D: BitDeserializer<E, BitReader<E>>,
275
        const SRC_CODE: usize,
276
        const DST_CODE: usize,
277
    > ExactSizeIterator for GapsIterator<E, D, SRC_CODE, DST_CODE>
278
where
279
    BitReader<E>: BitRead<E> + CodesRead<E>,
280
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
281
{
NEW
282
    fn len(&self) -> usize {
×
NEW
283
        self.len - self.current
×
284
    }
285
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc