• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vigna / webgraph-rs / 18458664278

13 Oct 2025 07:39AM UTC coverage: 48.052% (+0.2%) from 47.835%
18458664278

Pull #152

github

vigna
Docs review
Pull Request #152: Introduce BatchCodec to substitute BatchIterator

112 of 233 new or added lines in 9 files covered. (48.07%)

7 existing lines in 3 files now uncovered.

4009 of 8343 relevant lines covered (48.05%)

24464313.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

52.5
/webgraph/src/utils/batch_codec/grouped_gaps.rs
1
/*
2
 * SPDX-FileCopyrightText: 2025 Tommaso Fontana
3
 *
4
 * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
5
 */
6

7
use super::{BitReader, BitWriter};
8
use crate::traits::SortedIterator;
9
use crate::utils::{ArcMmapHelper, MmapHelper, Triple};
10
use crate::{
11
    traits::{BitDeserializer, BitSerializer},
12
    utils::BatchCodec,
13
};
14

15
use std::sync::Arc;
16

17
use anyhow::{Context, Result};
18
use dsi_bitstream::prelude::*;
19
use mmap_rs::MmapFlags;
20
use rdst::*;
21

22
#[derive(Clone, Debug)]
23
/// A codec for encoding and decoding batches of triples using grouped gap compression.
24
///
25
/// This codec encodes triples of the form `(src, dst, label)` by grouping edges
26
/// with the same source node, and encoding the gaps between consecutive sources
27
/// and destinations using a specified code (default: gamma). The outdegree
28
/// (number of edges for each source) is also encoded using the specified code.
29
///
30
/// # Type Parameters
31
///
32
/// - `S`: Serializer for the labels, implementing [`BitSerializer`] for the label type.
33
/// - `D`: Deserializer for the labels, implementing [`BitDeserializer`] for the label type.
34
/// - `OUTDEGREE_CODE`: Code used for encoding outdegrees (default: [É£](dsi_bitstream::codes::gamma)).
35
/// - `SRC_CODE`: Code used for encoding source gaps (default: [É£](dsi_bitstream::codes::gamma)).
36
/// - `DST_CODE`: Code used for encoding destination gaps (default: [É£](dsi_bitstream::codes::gamma)).
37
///
38
/// # Encoding Format
39
///
40
/// 1. The batch length is written using delta coding.
41
/// 2. For each group of triples with the same source:
42
///     - The gap from the previous source is encoded.
43
///     - The outdegree (number of edges for this source) is encoded.
44
///     - For each destination:
45
///         - The gap from the previous destination is encoded.
46
///         - The label is serialized.
47
///
48
/// The bit deserializer must be [`Clone`] because we need one for each
49
/// [`GroupedGapsIterator`], and there are possible scenarios in which the
50
/// deserializer might be stateful.
51
pub struct GroupedGapsCodec<
52
    E: Endianness = NE,
53
    S: BitSerializer<E, BitWriter<E>> = (),
54
    D: BitDeserializer<E, BitReader<E>, DeserType = S::SerType> + Clone = (),
55
    const OUTDEGREE_CODE: usize = { dsi_bitstream::dispatch::code_consts::EXP_GOLOMB3 },
56
    const SRC_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA },
57
    const DST_CODE: usize = { dsi_bitstream::dispatch::code_consts::DELTA },
58
> where
59
    BitReader<E>: BitRead<E>,
60
    BitWriter<E>: BitWrite<E>,
61
{
62
    /// Serializer for the labels.
63
    pub serializer: S,
64
    /// Deserializer for the labels.
65
    pub deserializer: D,
66

67
    pub _marker: core::marker::PhantomData<E>,
68
}
69

70
impl<E, S, D, const OUTDEGREE_CODE: usize, const SRC_CODE: usize, const DST_CODE: usize>
71
    GroupedGapsCodec<E, S, D, OUTDEGREE_CODE, SRC_CODE, DST_CODE>
72
where
73
    E: Endianness,
74
    S: BitSerializer<E, BitWriter<E>> + Send + Sync,
75
    D: BitDeserializer<E, BitReader<E>, DeserType = S::SerType> + Send + Sync + Clone,
76
    BitReader<E>: BitRead<E>,
77
    BitWriter<E>: BitWrite<E>,
78
{
79
    /// Creates a new `GroupedGapsCodec` with the given serializer and deserializer.
NEW
80
    pub fn new(serializer: S, deserializer: D) -> Self {
×
81
        Self {
82
            serializer,
83
            deserializer,
84
            _marker: core::marker::PhantomData,
85
        }
86
    }
87
}
88

89
impl<
90
        E: Endianness,
91
        S: BitSerializer<E, BitWriter<E>> + Default,
92
        D: BitDeserializer<E, BitReader<E>, DeserType = S::SerType> + Clone + Default,
93
        const OUTDEGREE_CODE: usize,
94
        const SRC_CODE: usize,
95
        const DST_CODE: usize,
96
    > Default for GroupedGapsCodec<E, S, D, OUTDEGREE_CODE, SRC_CODE, DST_CODE>
97
where
98
    BitReader<E>: BitRead<E>,
99
    BitWriter<E>: BitWrite<E>,
100
{
101
    fn default() -> Self {
129✔
102
        Self {
103
            serializer: S::default(),
258✔
104
            deserializer: D::default(),
129✔
105
            _marker: core::marker::PhantomData,
106
        }
107
    }
108
}
109

110
impl<E, S, D, const OUTDEGREE_CODE: usize, const SRC_CODE: usize, const DST_CODE: usize> BatchCodec
111
    for GroupedGapsCodec<E, S, D, OUTDEGREE_CODE, SRC_CODE, DST_CODE>
112
where
113
    E: Endianness,
114
    S: BitSerializer<E, BitWriter<E>> + Send + Sync,
115
    D: BitDeserializer<E, BitReader<E>, DeserType = S::SerType> + Send + Sync + Clone,
116
    S::SerType: Send + Sync + Copy + 'static, // needed by radix sort
117
    BitReader<E>: BitRead<E> + CodesRead<E>,
118
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
119
{
120
    type Label = S::SerType;
121
    type DecodedBatch = GroupedGapsIterator<E, D, OUTDEGREE_CODE, SRC_CODE, DST_CODE>;
122

123
    fn encode_batch(
432✔
124
        &self,
125
        path: impl AsRef<std::path::Path>,
126
        batch: &mut [(usize, usize, Self::Label)],
127
    ) -> Result<usize> {
128
        let start = std::time::Instant::now();
864✔
129
        Triple::cast_batch_mut(batch).radix_sort_unstable();
864✔
130
        log::debug!("Sorted {} arcs in {:?}", batch.len(), start.elapsed());
432✔
131
        self.encode_sorted_batch(path, batch)
1,728✔
132
    }
133

134
    fn encode_sorted_batch(
432✔
135
        &self,
136
        path: impl AsRef<std::path::Path>,
137
        batch: &[(usize, usize, Self::Label)],
138
    ) -> Result<usize> {
139
        debug_assert!(Triple::cast_batch(batch).is_sorted(), "Batch is not sorted");
1,296✔
140
        // create a batch file where to dump
141
        let file_path = path.as_ref();
1,296✔
142
        let file = std::io::BufWriter::with_capacity(
143
            1 << 16,
432✔
144
            std::fs::File::create(file_path).with_context(|| {
1,296✔
NEW
145
                format!(
×
NEW
146
                    "Could not create BatchIterator temporary file {}",
×
NEW
147
                    file_path.display()
×
148
                )
149
            })?,
150
        );
151
        // create a bitstream to write to the file
NEW
152
        let mut stream = <BufBitWriter<E, _>>::new(<WordAdapter<usize, _>>::new(file));
×
153

154
        // prefix the stream with the length of the batch
155
        // we use a delta code since it'll be a big number most of the time
NEW
156
        stream
×
NEW
157
            .write_delta(batch.len() as u64)
×
158
            .context("Could not write length")?;
159

160
        // dump the triples to the bitstream
161
        let mut prev_src = 0;
432✔
NEW
162
        let mut written_bits = 0;
×
NEW
163
        let mut i = 0;
×
164
        while i < batch.len() {
4,726,354✔
165
            let (src, _, _) = batch[i];
4,725,490✔
166
            // write the source gap as gamma
167
            written_bits += ConstCode::<SRC_CODE>
2,362,745✔
168
                .write(&mut stream, (src - prev_src) as _)
7,088,235✔
169
                .with_context(|| format!("Could not write {src} after {prev_src}"))?;
2,362,745✔
170
            // figure out how many edges have this source
171
            let outdegree = batch[i..].iter().take_while(|t| t.0 == src).count();
84,259,241✔
172
            // write the outdegree
NEW
173
            written_bits += ConstCode::<OUTDEGREE_CODE>
×
NEW
174
                .write(&mut stream, outdegree as _)
×
NEW
175
                .with_context(|| format!("Could not write outdegree {outdegree} for {src}"))?;
×
176

177
            // encode the destinations
178
            let mut prev_dst = 0;
2,362,745✔
NEW
179
            for _ in 0..outdegree {
×
180
                let (_, dst, label) = &batch[i];
38,585,935✔
181
                // write the destination gap as gamma
NEW
182
                written_bits += ConstCode::<DST_CODE>
×
NEW
183
                    .write(&mut stream, (dst - prev_dst) as _)
×
NEW
184
                    .with_context(|| format!("Could not write {dst} after {prev_dst}"))?;
×
185
                // write the label
186
                written_bits += self
38,585,935✔
187
                    .serializer
38,585,935✔
NEW
188
                    .serialize(label, &mut stream)
×
NEW
189
                    .context("Could not serialize label")?;
×
190
                prev_dst = *dst;
38,585,935✔
NEW
191
                i += 1;
×
192
            }
193
            prev_src = src;
2,362,745✔
194
        }
195
        // flush the stream and reset the buffer
196
        written_bits += stream.flush().context("Could not flush stream")?;
432✔
197

198
        Ok(written_bits)
432✔
199
    }
200

201
    fn decode_batch(&self, path: impl AsRef<std::path::Path>) -> Result<Self::DecodedBatch> {
141✔
202
        // open the file
203
        let mut stream = <BufBitReader<E, _>>::new(MemWordReader::new(ArcMmapHelper(Arc::new(
141✔
204
            MmapHelper::mmap(
141✔
205
                path.as_ref(),
282✔
206
                MmapFlags::TRANSPARENT_HUGE_PAGES | MmapFlags::SEQUENTIAL,
141✔
207
            )
208
            .with_context(|| format!("Could not mmap {}", path.as_ref().display()))?,
141✔
209
        ))));
210

211
        // read the length of the batch (first value in the stream)
212
        let len = stream.read_delta().context("Could not read length")? as usize;
141✔
213

214
        // create the iterator
NEW
215
        Ok(GroupedGapsIterator {
×
NEW
216
            deserializer: self.deserializer.clone(),
×
NEW
217
            stream,
×
NEW
218
            len,
×
NEW
219
            current: 0,
×
NEW
220
            src: 0,
×
NEW
221
            dst_left: 0,
×
NEW
222
            prev_dst: 0,
×
223
        })
224
    }
225
}
226

227
#[derive(Clone, Debug)]
228
/// An iterator over triples encoded with gaps, this is returned by [`GroupedGapsCodec`].
229
pub struct GroupedGapsIterator<
230
    E: Endianness = NE,
231
    D: BitDeserializer<E, BitReader<E>> = (),
232
    const OUTDEGREE_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA },
233
    const SRC_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA },
234
    const DST_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA },
235
> where
236
    BitReader<E>: BitRead<E>,
237
    BitWriter<E>: BitWrite<E>,
238
{
239
    /// Deserializer for the labels
240
    deserializer: D,
241
    /// Bitstream to read from
242
    stream: BitReader<E>,
243
    /// Length of the iterator (number of triples)
244
    len: usize,
245
    /// Current position in the iterator
246
    current: usize,
247
    /// Current source node
248
    src: usize,
249
    /// Number of destinations left for the current source
250
    dst_left: usize,
251
    /// Previous destination node
252
    prev_dst: usize,
253
}
254

255
unsafe impl<
256
        E: Endianness,
257
        D: BitDeserializer<E, BitReader<E>>,
258
        const OUTDEGREE_CODE: usize,
259
        const SRC_CODE: usize,
260
        const DST_CODE: usize,
261
    > SortedIterator for GroupedGapsIterator<E, D, OUTDEGREE_CODE, SRC_CODE, DST_CODE>
262
where
263
    BitReader<E>: BitRead<E> + CodesRead<E>,
264
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
265
{
266
}
267

268
impl<
269
        E: Endianness,
270
        D: BitDeserializer<E, BitReader<E>>,
271
        const OUTDEGREE_CODE: usize,
272
        const SRC_CODE: usize,
273
        const DST_CODE: usize,
274
    > Iterator for GroupedGapsIterator<E, D, OUTDEGREE_CODE, SRC_CODE, DST_CODE>
275
where
276
    BitReader<E>: BitRead<E> + CodesRead<E>,
277
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
278
{
279
    type Item = (usize, usize, D::DeserType);
280
    fn next(&mut self) -> Option<Self::Item> {
67,291,558✔
281
        if self.current >= self.len {
67,291,558✔
282
            return None;
130✔
283
        }
NEW
284
        if self.dst_left == 0 {
×
285
            // read a new source
286
            let src_gap = ConstCode::<SRC_CODE>.read(&mut self.stream).ok()?;
20,316,530✔
NEW
287
            self.src += src_gap as usize;
×
288
            // read the outdegree
NEW
289
            self.dst_left = ConstCode::<OUTDEGREE_CODE>.read(&mut self.stream).ok()? as usize;
×
290
            self.prev_dst = 0;
4,063,306✔
291
        }
292

293
        let dst_gap = ConstCode::<DST_CODE>.read(&mut self.stream).ok()?;
134,582,856✔
294
        let label = self.deserializer.deserialize(&mut self.stream).ok()?;
67,291,428✔
NEW
295
        self.prev_dst += dst_gap as usize;
×
NEW
296
        self.current += 1;
×
NEW
297
        self.dst_left -= 1;
×
NEW
298
        Some((self.src, self.prev_dst, label))
×
299
    }
300

NEW
301
    fn size_hint(&self) -> (usize, Option<usize>) {
×
NEW
302
        (self.len(), Some(self.len()))
×
303
    }
304
}
305

306
impl<
307
        E: Endianness,
308
        D: BitDeserializer<E, BitReader<E>>,
309
        const OUTDEGREE_CODE: usize,
310
        const SRC_CODE: usize,
311
        const DST_CODE: usize,
312
    > ExactSizeIterator for GroupedGapsIterator<E, D, OUTDEGREE_CODE, SRC_CODE, DST_CODE>
313
where
314
    BitReader<E>: BitRead<E> + CodesRead<E>,
315
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
316
{
NEW
317
    fn len(&self) -> usize {
×
NEW
318
        self.len - self.current
×
319
    }
320
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc