• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vigna / webgraph-rs / 18443982156

12 Oct 2025 12:32PM UTC coverage: 48.052% (+0.2%) from 47.835%
18443982156

Pull #152

github

zommiommy
BatchCodec accepts Endianness
Pull Request #152: Introduce BatchCodec to substitute BatchIterator

112 of 233 new or added lines in 9 files covered. (48.07%)

7 existing lines in 3 files now uncovered.

4009 of 8343 relevant lines covered (48.05%)

21780055.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

52.5
/webgraph/src/utils/batch_codec/grouped_gaps.rs
1
/*
2
 * SPDX-FileCopyrightText: 2025 Tommaso Fontana
3
 *
4
 * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
5
 */
6

7
use super::{BitReader, BitWriter};
8
use crate::traits::SortedIterator;
9
use crate::utils::{ArcMmapHelper, MmapHelper, Triple};
10
use crate::{
11
    traits::{BitDeserializer, BitSerializer},
12
    utils::BatchCodec,
13
};
14

15
use std::sync::Arc;
16

17
use anyhow::{Context, Result};
18
use dsi_bitstream::prelude::*;
19
use mmap_rs::MmapFlags;
20
use rdst::*;
21

22
#[derive(Clone, Debug)]
23
/// A codec for encoding and decoding batches of triples using grouped gap compression.
24
///
25
/// This codec encodes triples of the form `(src, dst, label)` by grouping edges with the same source node,
26
/// and encoding the gaps between consecutive sources and destinations using a specified code (default: gamma).
27
/// The outdegree (number of edges for each source) is also encoded using the specified code.
28
///
29
/// ## Type Parameters
30
/// - `S`: Serializer for the labels, implementing [`BitSerializer`] for the label type.
31
/// - `D`: Deserializer for the labels, implementing [`BitDeserializer`] for the label type.
32
/// - `OUTDEGREE_CODE`: Code used for encoding outdegrees (default: gamma).
33
/// - `SRC_CODE`: Code used for encoding source gaps (default: gamma).
34
/// - `DST_CODE`: Code used for encoding destination gaps (default: gamma).
35
///
36
/// ## Fields
37
/// - `serializer`: The label serializer.
38
/// - `deserializer`: The label deserializer.
39
///
40
/// ## Encoding Format
41
/// 1. The batch length is written using delta coding.
42
/// 2. For each group of triples with the same source:
43
///     - The gap from the previous source is encoded.
44
///     - The outdegree (number of edges for this source) is encoded.
45
///     - For each destination:
46
///         - The gap from the previous destination is encoded.
47
///         - The label is serialized.
48
///
49
/// The bit deserializer must be [`Clone`] because we need one for each
50
/// [`GroupedGapsIterator`], and there are possible scenarios in which the
51
/// deserializer might be stateful.
52
///
53
/// ## Choosing the codes
54
///
55
/// When transposing `enwiki-2024`, these are the top 10 codes for src gaps, outdegree, and dst gaps:
56
/// ```ignore
57
/// Outdegree stats
58
///   Code: ExpGolomb(3) Size: 34004796
59
///   Code: ExpGolomb(2) Size: 34101784
60
///   Code: ExpGolomb(4) Size: 36036394
61
///   Code: Zeta(2)      Size: 36231582
62
///   Code: ExpGolomb(1) Size: 36369750
63
///   Code: Zeta(3)      Size: 36893285
64
///   Code: Pi(2)        Size: 37415701
65
///   Code: Zeta(4)      Size: 38905267
66
///   Code: Golomb(20)   Size: 38963840
67
///   Code: Golomb(19)   Size: 39118201
68
/// Src stats
69
///   Code: Golomb(2)    Size: 12929998
70
///   Code: Rice(1)      Size: 12929998
71
///   Code: Unary        Size: 13025332
72
///   Code: Golomb(1)    Size: 13025332
73
///   Code: Rice(0)      Size: 13025332
74
///   Code: ExpGolomb(1) Size: 13319930
75
///   Code: Golomb(4)    Size: 18732384
76
///   Code: Rice(2)      Size: 18732384
77
///   Code: Golomb(3)    Size: 18736573
78
///   Code: ExpGolomb(2) Size: 18746122
79
/// Dst stats
80
///   Code: Pi(2)   Size: 2063880685
81
///   Code: Pi(3)   Size: 2074138948
82
///   Code: Zeta(3) Size: 2122730298
83
///   Code: Zeta(4) Size: 2123948774
84
///   Code: Zeta(5) Size: 2169131998
85
///   Code: Pi(4)   Size: 2176097847
86
///   Code: Zeta(2) Size: 2226573622
87
///   Code: Zeta(6) Size: 2237680403
88
///   Code: Delta   Size: 2272691460
89
///   Code: Zeta(7) Size: 2305354857
90
/// ```
91
///
92
/// The best codes are `Golomb(2)` for src gaps, `ExpGolomb(3)` for outdegree, and `Pi(2)` for dst gaps.
93
/// However, `Golomb` can perform poorly if the data don't follow the expected distribution,
94
/// so the recommended defaults are `Gamma` for src gaps, `ExpGolomb3` for outdegree, and `Delta` for dst gaps,
95
/// as they are universal codes.
96
pub struct GroupedGapsCodec<
97
    E: Endianness = BE,
98
    S: BitSerializer<E, BitWriter<E>> = (),
99
    D: BitDeserializer<E, BitReader<E>, DeserType = S::SerType> + Clone = (),
100
    const OUTDEGREE_CODE: usize = { dsi_bitstream::dispatch::code_consts::EXP_GOLOMB3 },
101
    const SRC_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA },
102
    const DST_CODE: usize = { dsi_bitstream::dispatch::code_consts::DELTA },
103
> where
104
    BitReader<E>: BitRead<E>,
105
    BitWriter<E>: BitWrite<E>,
106
{
107
    /// Serializer for the labels
108
    pub serializer: S,
109
    /// Deserializer for the labels
110
    pub deserializer: D,
111

112
    pub _marker: core::marker::PhantomData<E>,
113
}
114

115
impl<E, S, D, const OUTDEGREE_CODE: usize, const SRC_CODE: usize, const DST_CODE: usize>
116
    GroupedGapsCodec<E, S, D, OUTDEGREE_CODE, SRC_CODE, DST_CODE>
117
where
118
    E: Endianness,
119
    S: BitSerializer<E, BitWriter<E>> + Send + Sync,
120
    D: BitDeserializer<E, BitReader<E>, DeserType = S::SerType> + Send + Sync + Clone,
121
    BitReader<E>: BitRead<E>,
122
    BitWriter<E>: BitWrite<E>,
123
{
124
    /// Creates a new `GroupedGapsCodec` with the given serializer and deserializer.
NEW
125
    pub fn new(serializer: S, deserializer: D) -> Self {
×
126
        Self {
127
            serializer,
128
            deserializer,
129
            _marker: core::marker::PhantomData,
130
        }
131
    }
132
}
133

134
impl<
135
        E: Endianness,
136
        S: BitSerializer<E, BitWriter<E>> + Default,
137
        D: BitDeserializer<E, BitReader<E>, DeserType = S::SerType> + Clone + Default,
138
        const OUTDEGREE_CODE: usize,
139
        const SRC_CODE: usize,
140
        const DST_CODE: usize,
141
    > Default for GroupedGapsCodec<E, S, D, OUTDEGREE_CODE, SRC_CODE, DST_CODE>
142
where
143
    BitReader<E>: BitRead<E>,
144
    BitWriter<E>: BitWrite<E>,
145
{
146
    fn default() -> Self {
129✔
147
        Self {
148
            serializer: S::default(),
258✔
149
            deserializer: D::default(),
129✔
150
            _marker: core::marker::PhantomData,
151
        }
152
    }
153
}
154

155
impl<E, S, D, const OUTDEGREE_CODE: usize, const SRC_CODE: usize, const DST_CODE: usize> BatchCodec
156
    for GroupedGapsCodec<E, S, D, OUTDEGREE_CODE, SRC_CODE, DST_CODE>
157
where
158
    E: Endianness,
159
    S: BitSerializer<E, BitWriter<E>> + Send + Sync,
160
    D: BitDeserializer<E, BitReader<E>, DeserType = S::SerType> + Send + Sync + Clone,
161
    S::SerType: Send + Sync + Copy + 'static, // needed by radix sort
162
    BitReader<E>: BitRead<E> + CodesRead<E>,
163
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
164
{
165
    type Label = S::SerType;
166
    type DecodedBatch = GroupedGapsIterator<E, D, OUTDEGREE_CODE, SRC_CODE, DST_CODE>;
167

168
    fn encode_batch(
432✔
169
        &self,
170
        path: impl AsRef<std::path::Path>,
171
        batch: &mut [(usize, usize, Self::Label)],
172
    ) -> Result<usize> {
173
        let start = std::time::Instant::now();
864✔
174
        Triple::cast_batch_mut(batch).radix_sort_unstable();
864✔
175
        log::debug!("Sorted {} arcs in {:?}", batch.len(), start.elapsed());
432✔
176
        self.encode_sorted_batch(path, batch)
1,728✔
177
    }
178

179
    fn encode_sorted_batch(
432✔
180
        &self,
181
        path: impl AsRef<std::path::Path>,
182
        batch: &[(usize, usize, Self::Label)],
183
    ) -> Result<usize> {
184
        debug_assert!(Triple::cast_batch(batch).is_sorted(), "Batch is not sorted");
1,296✔
185
        // create a batch file where to dump
186
        let file_path = path.as_ref();
1,296✔
187
        let file = std::io::BufWriter::with_capacity(
188
            1 << 16,
432✔
189
            std::fs::File::create(file_path).with_context(|| {
1,296✔
NEW
190
                format!(
×
NEW
191
                    "Could not create BatchIterator temporary file {}",
×
NEW
192
                    file_path.display()
×
193
                )
194
            })?,
195
        );
196
        // create a bitstream to write to the file
NEW
197
        let mut stream = <BufBitWriter<E, _>>::new(<WordAdapter<usize, _>>::new(file));
×
198

199
        // prefix the stream with the length of the batch
200
        // we use a delta code since it'll be a big number most of the time
NEW
201
        stream
×
NEW
202
            .write_delta(batch.len() as u64)
×
203
            .context("Could not write length")?;
204

205
        // dump the triples to the bitstream
206
        let mut prev_src = 0;
432✔
NEW
207
        let mut written_bits = 0;
×
NEW
208
        let mut i = 0;
×
209
        while i < batch.len() {
4,726,354✔
210
            let (src, _, _) = batch[i];
4,725,490✔
211
            // write the source gap as gamma
212
            written_bits += ConstCode::<SRC_CODE>
2,362,745✔
213
                .write(&mut stream, (src - prev_src) as _)
7,088,235✔
214
                .with_context(|| format!("Could not write {src} after {prev_src}"))?;
2,362,745✔
215
            // figure out how many edges have this source
216
            let outdegree = batch[i..].iter().take_while(|t| t.0 == src).count();
84,259,241✔
217
            // write the outdegree
NEW
218
            written_bits += ConstCode::<OUTDEGREE_CODE>
×
NEW
219
                .write(&mut stream, outdegree as _)
×
NEW
220
                .with_context(|| format!("Could not write outdegree {outdegree} for {src}"))?;
×
221

222
            // encode the destinations
223
            let mut prev_dst = 0;
2,362,745✔
NEW
224
            for _ in 0..outdegree {
×
225
                let (_, dst, label) = &batch[i];
38,585,935✔
226
                // write the destination gap as gamma
NEW
227
                written_bits += ConstCode::<DST_CODE>
×
NEW
228
                    .write(&mut stream, (dst - prev_dst) as _)
×
NEW
229
                    .with_context(|| format!("Could not write {dst} after {prev_dst}"))?;
×
230
                // write the label
231
                written_bits += self
38,585,935✔
232
                    .serializer
38,585,935✔
NEW
233
                    .serialize(label, &mut stream)
×
NEW
234
                    .context("Could not serialize label")?;
×
235
                prev_dst = *dst;
38,585,935✔
NEW
236
                i += 1;
×
237
            }
238
            prev_src = src;
2,362,745✔
239
        }
240
        // flush the stream and reset the buffer
241
        written_bits += stream.flush().context("Could not flush stream")?;
432✔
242

243
        Ok(written_bits)
432✔
244
    }
245

246
    fn decode_batch(&self, path: impl AsRef<std::path::Path>) -> Result<Self::DecodedBatch> {
141✔
247
        // open the file
248
        let mut stream = <BufBitReader<E, _>>::new(MemWordReader::new(ArcMmapHelper(Arc::new(
141✔
249
            MmapHelper::mmap(
141✔
250
                path.as_ref(),
282✔
251
                MmapFlags::TRANSPARENT_HUGE_PAGES | MmapFlags::SEQUENTIAL,
141✔
252
            )
253
            .with_context(|| format!("Could not mmap {}", path.as_ref().display()))?,
141✔
254
        ))));
255

256
        // read the length of the batch (first value in the stream)
257
        let len = stream.read_delta().context("Could not read length")? as usize;
141✔
258

259
        // create the iterator
NEW
260
        Ok(GroupedGapsIterator {
×
NEW
261
            deserializer: self.deserializer.clone(),
×
NEW
262
            stream,
×
NEW
263
            len,
×
NEW
264
            current: 0,
×
NEW
265
            src: 0,
×
NEW
266
            dst_left: 0,
×
NEW
267
            prev_dst: 0,
×
268
        })
269
    }
270
}
271

272
#[derive(Clone, Debug)]
273
/// An iterator over triples encoded with gaps, this is returned by [`GroupedGapsCodec`].
274
pub struct GroupedGapsIterator<
275
    E: Endianness = NE,
276
    D: BitDeserializer<E, BitReader<E>> = (),
277
    const OUTDEGREE_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA },
278
    const SRC_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA },
279
    const DST_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA },
280
> where
281
    BitReader<E>: BitRead<E>,
282
    BitWriter<E>: BitWrite<E>,
283
{
284
    /// Deserializer for the labels
285
    deserializer: D,
286
    /// Bitstream to read from
287
    stream: BitReader<E>,
288
    /// Length of the iterator (number of triples)
289
    len: usize,
290
    /// Current position in the iterator
291
    current: usize,
292
    /// Current source node
293
    src: usize,
294
    /// Number of destinations left for the current source
295
    dst_left: usize,
296
    /// Previous destination node
297
    prev_dst: usize,
298
}
299

300
unsafe impl<
301
        E: Endianness,
302
        D: BitDeserializer<E, BitReader<E>>,
303
        const OUTDEGREE_CODE: usize,
304
        const SRC_CODE: usize,
305
        const DST_CODE: usize,
306
    > SortedIterator for GroupedGapsIterator<E, D, OUTDEGREE_CODE, SRC_CODE, DST_CODE>
307
where
308
    BitReader<E>: BitRead<E> + CodesRead<E>,
309
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
310
{
311
}
312

313
impl<
314
        E: Endianness,
315
        D: BitDeserializer<E, BitReader<E>>,
316
        const OUTDEGREE_CODE: usize,
317
        const SRC_CODE: usize,
318
        const DST_CODE: usize,
319
    > Iterator for GroupedGapsIterator<E, D, OUTDEGREE_CODE, SRC_CODE, DST_CODE>
320
where
321
    BitReader<E>: BitRead<E> + CodesRead<E>,
322
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
323
{
324
    type Item = (usize, usize, D::DeserType);
325
    fn next(&mut self) -> Option<Self::Item> {
67,920,690✔
326
        if self.current >= self.len {
67,920,690✔
327
            return None;
130✔
328
        }
NEW
329
        if self.dst_left == 0 {
×
330
            // read a new source
331
            let src_gap = ConstCode::<SRC_CODE>.read(&mut self.stream).ok()?;
20,355,590✔
NEW
332
            self.src += src_gap as usize;
×
333
            // read the outdegree
NEW
334
            self.dst_left = ConstCode::<OUTDEGREE_CODE>.read(&mut self.stream).ok()? as usize;
×
335
            self.prev_dst = 0;
4,071,118✔
336
        }
337

338
        let dst_gap = ConstCode::<DST_CODE>.read(&mut self.stream).ok()?;
135,841,120✔
339
        let label = self.deserializer.deserialize(&mut self.stream).ok()?;
67,920,560✔
NEW
340
        self.prev_dst += dst_gap as usize;
×
NEW
341
        self.current += 1;
×
NEW
342
        self.dst_left -= 1;
×
NEW
343
        Some((self.src, self.prev_dst, label))
×
344
    }
345

NEW
346
    fn size_hint(&self) -> (usize, Option<usize>) {
×
NEW
347
        (self.len(), Some(self.len()))
×
348
    }
349
}
350

351
impl<
352
        E: Endianness,
353
        D: BitDeserializer<E, BitReader<E>>,
354
        const OUTDEGREE_CODE: usize,
355
        const SRC_CODE: usize,
356
        const DST_CODE: usize,
357
    > ExactSizeIterator for GroupedGapsIterator<E, D, OUTDEGREE_CODE, SRC_CODE, DST_CODE>
358
where
359
    BitReader<E>: BitRead<E> + CodesRead<E>,
360
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
361
{
NEW
362
    fn len(&self) -> usize {
×
NEW
363
        self.len - self.current
×
364
    }
365
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc