• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vigna / webgraph-rs / 18443982156

12 Oct 2025 12:32PM UTC coverage: 48.052% (+0.2%) from 47.835%
18443982156

Pull #152

github

zommiommy
BatchCodec accepts Endianness
Pull Request #152: Introduce BatchCodec to substitute BatchIterator

112 of 233 new or added lines in 9 files covered. (48.07%)

7 existing lines in 3 files now uncovered.

4009 of 8343 relevant lines covered (48.05%)

21780055.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.7
/webgraph/src/utils/batch_codec/gaps.rs
1
/*
2
 * SPDX-FileCopyrightText: 2023 Inria
3
 * SPDX-FileCopyrightText: 2023 Sebastiano Vigna
4
 * SPDX-FileCopyrightText: 2025 Tommaso Fontana
5
 *
6
 * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
7
 */
8

9
use super::{BitReader, BitWriter};
10
use crate::traits::SortedIterator;
11
use crate::utils::{ArcMmapHelper, MmapHelper, Triple};
12
use crate::{
13
    traits::{BitDeserializer, BitSerializer},
14
    utils::BatchCodec,
15
};
16

17
use std::sync::Arc;
18

19
use anyhow::{Context, Result};
20
use dsi_bitstream::prelude::*;
21
use mmap_rs::MmapFlags;
22
use rdst::*;
23

24
#[derive(Clone, Debug)]
25
/// A codec for encoding and decoding batches of triples using gap compression.
26
///
27
/// This codec encodes triples of the form `(src, dst, label)` by encoding the
28
/// gaps between consecutive sources and destinations using a specified code.
29
///
30
/// ## Type Parameters
31
/// - `S`: Serializer for the labels, implementing [`BitSerializer`] for the label type.
32
/// - `D`: Deserializer for the labels, implementing [`BitDeserializer`] for the label type.
33
/// - `SRC_CODE`: Code used for encoding source gaps (default: gamma).
34
/// - `DST_CODE`: Code used for encoding destination gaps (default: gamma).
35
///
36
/// ## Fields
37
/// - `serializer`: The label serializer.
38
/// - `deserializer`: The label deserializer.
39
///
40
/// ## Encoding Format
41
/// 1. The batch length is written using delta coding.
42
/// 2. For each group of triples with the same source:
43
///     - The gap from the previous source is encoded.
44
///     - The gap from the previous destination is encoded.
45
///     - The label is serialized.
46
///
47
/// The bit deserializer must be [`Clone`] because we need one for each
48
/// [`GapsIterator`], and there are possible scenarios in which the
49
/// deserializer might be stateful.
50
///
51
/// ## Choosing the codes
52
///
53
/// These are the top 10 codes for src and dst gaps when transposing `enwiki-2024`.
54
/// ```ignore
55
/// Src codes:
56
///   Code: Unary        Size: 179553432
57
///   Code: Golomb(1)    Size: 179553432
58
///   Code: Rice(0)      Size: 179553432
59
///   Code: Gamma        Size: 185374984
60
///   Code: Zeta(1)      Size: 185374984
61
///   Code: ExpGolomb(0) Size: 185374984
62
///   Code: Omega        Size: 185439656
63
///   Code: Delta        Size: 191544794
64
///   Code: Golomb(2)    Size: 345986198
65
///   Code: Rice(1)      Size: 345986198
66
/// Dst codes:
67
///   Code: Pi(2)   Size: 2063880685
68
///   Code: Pi(3)   Size: 2074138948
69
///   Code: Zeta(3) Size: 2122730298
70
///   Code: Zeta(4) Size: 2123948774
71
///   Code: Zeta(5) Size: 2169131998
72
///   Code: Pi(4)   Size: 2176097847
73
///   Code: Zeta(2) Size: 2226573622
74
///   Code: Zeta(6) Size: 2237680403
75
///   Code: Delta   Size: 2272691460
76
///   Code: Zeta(7) Size: 2305354857
77
/// ```
78
///
79
/// So the best combination is `Unary` for src gaps and `Pi(2)` for dst gaps.
80
/// But, `Unary` can behave poorly if the distribution of your data changes,
81
/// therefore the recommended default is `Gamma` for src gaps and `Delta` for
82
/// dst gaps as they are universal codes.
83
pub struct GapsCodec<
84
    E: Endianness = NE,
85
    S: BitSerializer<E, BitWriter<E>> = (),
86
    D: BitDeserializer<E, BitReader<E>, DeserType = S::SerType> + Clone = (),
87
    const SRC_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA },
88
    const DST_CODE: usize = { dsi_bitstream::dispatch::code_consts::DELTA },
89
> where
90
    BitReader<E>: BitRead<E> + CodesRead<E>,
91
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
92
{
93
    /// Serializer for the labels
94
    pub serializer: S,
95
    /// Deserializer for the labels
96
    pub deserializer: D,
97
    /// Marker for the endianness
98
    pub _marker: std::marker::PhantomData<E>,
99
}
100

101
impl<E, S, D, const SRC_CODE: usize, const DST_CODE: usize> GapsCodec<E, S, D, SRC_CODE, DST_CODE>
102
where
103
    E: Endianness,
104
    S: BitSerializer<E, BitWriter<E>> + Send + Sync,
105
    D: BitDeserializer<E, BitReader<E>, DeserType = S::SerType> + Send + Sync + Clone,
106
    BitReader<E>: BitRead<E> + CodesRead<E>,
107
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
108
{
109
    /// Creates a new `GapsCodec` with the given serializer and deserializer.
110
    pub fn new(serializer: S, deserializer: D) -> Self {
4✔
111
        Self {
112
            serializer,
113
            deserializer,
114
            _marker: std::marker::PhantomData,
115
        }
116
    }
117
}
118

119
impl<E, S: Default, D: Default, const SRC_CODE: usize, const DST_CODE: usize> core::default::Default
120
    for GapsCodec<E, S, D, SRC_CODE, DST_CODE>
121
where
122
    E: Endianness,
123
    S: BitSerializer<E, BitWriter<E>> + Send + Sync,
124
    D: BitDeserializer<E, BitReader<E>, DeserType = S::SerType> + Send + Sync + Clone,
125
    BitReader<E>: BitRead<E> + CodesRead<E>,
126
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
127
{
128
    fn default() -> Self {
2✔
129
        Self::new(Default::default(), Default::default())
6✔
130
    }
131
}
132

133
impl<E, S, D, const SRC_CODE: usize, const DST_CODE: usize> BatchCodec
134
    for GapsCodec<E, S, D, SRC_CODE, DST_CODE>
135
where
136
    E: Endianness,
137
    S: BitSerializer<E, BitWriter<E>> + Send + Sync,
138
    D: BitDeserializer<E, BitReader<E>, DeserType = S::SerType> + Send + Sync + Clone,
139
    S::SerType: Send + Sync + Copy + 'static + core::fmt::Debug, // needed by radix sort
140
    BitReader<E>: BitRead<E> + CodesRead<E>,
141
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
142
{
143
    type Label = S::SerType;
144
    type DecodedBatch = GapsIterator<E, D, SRC_CODE, DST_CODE>;
145

146
    fn encode_batch(
8✔
147
        &self,
148
        path: impl AsRef<std::path::Path>,
149
        batch: &mut [(usize, usize, Self::Label)],
150
    ) -> Result<usize> {
151
        let start = std::time::Instant::now();
16✔
152
        Triple::cast_batch_mut(batch).radix_sort_unstable();
16✔
153
        log::debug!("Sorted {} arcs in {:?}", batch.len(), start.elapsed());
8✔
154
        self.encode_sorted_batch(path, batch)
32✔
155
    }
156

157
    fn encode_sorted_batch(
8✔
158
        &self,
159
        path: impl AsRef<std::path::Path>,
160
        batch: &[(usize, usize, Self::Label)],
161
    ) -> Result<usize> {
162
        debug_assert!(Triple::cast_batch(batch).is_sorted());
24✔
163
        // create a batch file where to dump
164
        let file_path = path.as_ref();
24✔
165
        let file = std::io::BufWriter::with_capacity(
166
            1 << 16,
8✔
167
            std::fs::File::create(file_path).with_context(|| {
24✔
NEW
168
                format!(
×
NEW
169
                    "Could not create BatchIterator temporary file {}",
×
NEW
170
                    file_path.display()
×
171
                )
172
            })?,
173
        );
174
        // create a bitstream to write to the file
NEW
175
        let mut stream = <BufBitWriter<E, _>>::new(<WordAdapter<usize, _>>::new(file));
×
176

177
        // prefix the stream with the length of the batch
178
        // we use a delta code since it'll be a big number most of the time
NEW
179
        stream
×
NEW
180
            .write_delta(batch.len() as u64)
×
181
            .context("Could not write length")?;
182

183
        // dump the triples to the bitstream
184
        let (mut prev_src, mut prev_dst) = (0, 0);
8✔
NEW
185
        let mut written_bits = 0;
×
186
        for (src, dst, label) in batch.iter() {
120✔
187
            // write the source gap as gamma
188
            written_bits += ConstCode::<SRC_CODE>
40✔
189
                .write(&mut stream, (src - prev_src) as u64)
120✔
190
                .with_context(|| format!("Could not write {src} after {prev_src}"))?;
40✔
191
            if *src != prev_src {
74✔
192
                // Reset prev_y
193
                prev_dst = 0;
34✔
194
            }
195
            // write the destination gap as gamma
NEW
196
            written_bits += ConstCode::<DST_CODE>
×
NEW
197
                .write(&mut stream, (dst - prev_dst) as u64)
×
NEW
198
                .with_context(|| format!("Could not write {dst} after {prev_dst}"))?;
×
199
            // write the label
200
            written_bits += self
40✔
201
                .serializer
40✔
NEW
202
                .serialize(label, &mut stream)
×
NEW
203
                .context("Could not serialize label")?;
×
204
            (prev_src, prev_dst) = (*src, *dst);
40✔
205
        }
206
        // flush the stream and reset the buffer
207
        written_bits += stream.flush().context("Could not flush stream")?;
8✔
208

209
        Ok(written_bits)
8✔
210
    }
211

212
    fn decode_batch(&self, path: impl AsRef<std::path::Path>) -> Result<Self::DecodedBatch> {
8✔
213
        // open the file
214
        let mut stream = <BufBitReader<E, _>>::new(MemWordReader::new(ArcMmapHelper(Arc::new(
8✔
215
            MmapHelper::mmap(
8✔
216
                path.as_ref(),
16✔
217
                MmapFlags::TRANSPARENT_HUGE_PAGES | MmapFlags::SEQUENTIAL,
8✔
218
            )
219
            .with_context(|| format!("Could not mmap {}", path.as_ref().display()))?,
8✔
220
        ))));
221

222
        // read the length of the batch (first value in the stream)
223
        let len = stream.read_delta().context("Could not read length")? as usize;
8✔
224

225
        // create the iterator
NEW
226
        Ok(GapsIterator {
×
NEW
227
            deserializer: self.deserializer.clone(),
×
NEW
228
            stream,
×
NEW
229
            len,
×
NEW
230
            current: 0,
×
NEW
231
            prev_src: 0,
×
NEW
232
            prev_dst: 0,
×
233
        })
234
    }
235
}
236

237
#[derive(Clone, Debug)]
238
/// An iterator over triples encoded with gaps, this is returned by [`GapsCodec`].
239
pub struct GapsIterator<
240
    E: Endianness = NE,
241
    D: BitDeserializer<E, BitReader<E>> = (),
242
    const SRC_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA },
243
    const DST_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA },
244
> where
245
    BitReader<E>: BitRead<E> + CodesRead<E>,
246
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
247
{
248
    /// Deserializer for the labels
249
    deserializer: D,
250
    /// Bitstream to read from
251
    stream: BitReader<E>,
252
    /// Length of the iterator (number of triples)
253
    len: usize,
254
    /// Current position in the iterator
255
    current: usize,
256
    /// Previous source node
257
    prev_src: usize,
258
    /// Previous destination node
259
    prev_dst: usize,
260
}
261

262
unsafe impl<
263
        E: Endianness,
264
        D: BitDeserializer<E, BitReader<E>>,
265
        const SRC_CODE: usize,
266
        const DST_CODE: usize,
267
    > SortedIterator for GapsIterator<E, D, SRC_CODE, DST_CODE>
268
where
269
    BitReader<E>: BitRead<E> + CodesRead<E>,
270
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
271
{
272
}
273

274
impl<
275
        E: Endianness,
276
        D: BitDeserializer<E, BitReader<E>>,
277
        const SRC_CODE: usize,
278
        const DST_CODE: usize,
279
    > Iterator for GapsIterator<E, D, SRC_CODE, DST_CODE>
280
where
281
    BitReader<E>: BitRead<E> + CodesRead<E>,
282
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
283
{
284
    type Item = (usize, usize, D::DeserType);
285

286
    fn next(&mut self) -> Option<Self::Item> {
73✔
287
        if self.current >= self.len {
73✔
288
            return None;
11✔
289
        }
290
        let src_gap = ConstCode::<SRC_CODE>.read(&mut self.stream).ok()?;
62✔
291
        let dst_gap = ConstCode::<DST_CODE>.read(&mut self.stream).ok()?;
62✔
292
        let label = self.deserializer.deserialize(&mut self.stream).ok()?;
62✔
NEW
293
        self.prev_src += src_gap as usize;
×
294
        if src_gap != 0 {
56✔
295
            self.prev_dst = 0;
56✔
296
        }
NEW
297
        self.prev_dst += dst_gap as usize;
×
NEW
298
        self.current += 1;
×
NEW
299
        Some((self.prev_src, self.prev_dst, label))
×
300
    }
301

NEW
302
    fn size_hint(&self) -> (usize, Option<usize>) {
×
NEW
303
        (self.len(), Some(self.len()))
×
304
    }
305
}
306

307
impl<
308
        E: Endianness,
309
        D: BitDeserializer<E, BitReader<E>>,
310
        const SRC_CODE: usize,
311
        const DST_CODE: usize,
312
    > ExactSizeIterator for GapsIterator<E, D, SRC_CODE, DST_CODE>
313
where
314
    BitReader<E>: BitRead<E> + CodesRead<E>,
315
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
316
{
NEW
317
    fn len(&self) -> usize {
×
NEW
318
        self.len - self.current
×
319
    }
320
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc