• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vigna / webgraph-rs / 19278973738

11 Nov 2025 09:25PM UTC coverage: 62.257% (+0.6%) from 61.674%
19278973738

Pull #152

github

web-flow
Merge d8e872162 into fedf79d67
Pull Request #152: Introduce BatchCodec to substitute BatchIterator

206 of 257 new or added lines in 10 files covered. (80.16%)

48 existing lines in 3 files now uncovered.

5247 of 8428 relevant lines covered (62.26%)

29389328.1 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.25
/webgraph/src/utils/batch_codec/gaps.rs
1
/*
2
 * SPDX-FileCopyrightText: 2023 Inria
3
 * SPDX-FileCopyrightText: 2023 Sebastiano Vigna
4
 * SPDX-FileCopyrightText: 2025 Tommaso Fontana
5
 *
6
 * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
7
 */
8

9
use super::{BitReader, BitWriter};
10
use crate::traits::SortedIterator;
11
use crate::utils::{ArcMmapHelper, MmapHelper, Triple};
12
use crate::{
13
    traits::{BitDeserializer, BitSerializer},
14
    utils::{humanize, BatchCodec},
15
};
16

17
use std::sync::Arc;
18

19
use anyhow::{Context, Result};
20
use dsi_bitstream::prelude::*;
21
use mmap_rs::MmapFlags;
22
use rdst::*;
23

24
#[derive(Clone, Debug)]
25
/// A codec for encoding and decoding batches of triples using gap compression.
26
///
27
/// This codec encodes triples of the form `(src, dst, label)` by encoding the
28
/// gaps between consecutive sources and destinations using a specified code.
29
///
30
/// # Type Parameters
31
///
32
/// - `S`: Serializer for the labels, implementing [`BitSerializer`] for the label type.
33
/// - `D`: Deserializer for the labels, implementing [`BitDeserializer`] for the label type.
34
/// - `SRC_CODE`: Code used for encoding source gaps (default: gamma).
35
/// - `DST_CODE`: Code used for encoding destination gaps (default: gamma).
36
///
37
/// # Encoding Format
38
///
39
/// 1. The batch length is written using delta coding.
40
/// 2. For each group of triples with the same source:
41
///     - The gap from the previous source is encoded.
42
///     - The gap from the previous destination is encoded.
43
///     - The label is serialized.
44
///
45
/// The bit deserializer must be [`Clone`] because we need one for each
46
/// [`GapsIterator`], and there are possible scenarios in which the
47
/// deserializer might be stateful.
48
pub struct GapsCodec<
49
    E: Endianness = NE,
50
    S: BitSerializer<E, BitWriter<E>> = (),
51
    D: BitDeserializer<E, BitReader<E>, DeserType = S::SerType> + Clone = (),
52
    const SRC_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA },
53
    const DST_CODE: usize = { dsi_bitstream::dispatch::code_consts::DELTA },
54
> where
55
    BitReader<E>: BitRead<E> + CodesRead<E>,
56
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
57
{
58
    /// Serializer for the labels
59
    pub serializer: S,
60
    /// Deserializer for the labels
61
    pub deserializer: D,
62
    /// Marker for the endianness
63
    pub _marker: std::marker::PhantomData<E>,
64
}
65

66
impl<E, S, D, const SRC_CODE: usize, const DST_CODE: usize> GapsCodec<E, S, D, SRC_CODE, DST_CODE>
67
where
68
    E: Endianness,
69
    S: BitSerializer<E, BitWriter<E>> + Send + Sync,
70
    D: BitDeserializer<E, BitReader<E>, DeserType = S::SerType> + Send + Sync + Clone,
71
    BitReader<E>: BitRead<E> + CodesRead<E>,
72
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
73
{
74
    /// Creates a new `GapsCodec` with the given serializer and deserializer.
75
    pub fn new(serializer: S, deserializer: D) -> Self {
4✔
76
        Self {
77
            serializer,
78
            deserializer,
79
            _marker: std::marker::PhantomData,
80
        }
81
    }
82
}
83

84
impl<E, S: Default, D: Default, const SRC_CODE: usize, const DST_CODE: usize> core::default::Default
85
    for GapsCodec<E, S, D, SRC_CODE, DST_CODE>
86
where
87
    E: Endianness,
88
    S: BitSerializer<E, BitWriter<E>> + Send + Sync,
89
    D: BitDeserializer<E, BitReader<E>, DeserType = S::SerType> + Send + Sync + Clone,
90
    BitReader<E>: BitRead<E> + CodesRead<E>,
91
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
92
{
93
    fn default() -> Self {
2✔
94
        Self::new(Default::default(), Default::default())
6✔
95
    }
96
}
97

98
#[derive(Debug, Clone, Copy)]
99
/// Statistics about the encoding performed by [`GapsCodec`].
100
pub struct GapsStats {
101
    /// Total number of triples encoded
102
    pub total_triples: usize,
103
    /// Number of bits used for source gaps
104
    pub src_bits: usize,
105
    //// Number of bits used for destination gaps
106
    pub dst_bits: usize,
107
    /// Number of bits used for labels
108
    pub labels_bits: usize,
109
}
110

111
impl core::fmt::Display for GapsStats {
NEW
112
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
×
NEW
113
        let total_bits = self.src_bits + self.dst_bits + self.labels_bits;
×
NEW
114
        write!(
×
NEW
115
            f,
×
116
            "src: {}B ({:.3} bits / arc), dst: {}B ({:.3} bits / arc), labels: {}B ({:.3} bits / arc), total: {}B ({:.3} bits / arc)",
NEW
117
            humanize(self.src_bits as f64 / 8.0),
×
NEW
118
            self.src_bits as f64 / self.total_triples as f64,
×
NEW
119
            humanize(self.dst_bits as f64 / 8.0),
×
NEW
120
            self.dst_bits as f64 / self.total_triples as f64,
×
NEW
121
            humanize(self.labels_bits as f64 / 8.0),
×
NEW
122
            self.labels_bits as f64 / self.total_triples as f64,
×
NEW
123
            humanize(total_bits as f64 / 8.0),
×
NEW
124
            total_bits as f64 / self.total_triples as f64,
×
125
        )
126
    }
127
}
128

129
impl<E, S, D, const SRC_CODE: usize, const DST_CODE: usize> BatchCodec
130
    for GapsCodec<E, S, D, SRC_CODE, DST_CODE>
131
where
132
    E: Endianness,
133
    S: BitSerializer<E, BitWriter<E>> + Send + Sync,
134
    D: BitDeserializer<E, BitReader<E>, DeserType = S::SerType> + Send + Sync + Clone,
135
    S::SerType: Send + Sync + Copy + 'static + core::fmt::Debug, // needed by radix sort
136
    BitReader<E>: BitRead<E> + CodesRead<E>,
137
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
138
{
139
    type Label = S::SerType;
140
    type DecodedBatch = GapsIterator<E, D, SRC_CODE, DST_CODE>;
141
    type EncodedBatchStats = GapsStats;
142

143
    fn encode_batch(
8✔
144
        &self,
145
        path: impl AsRef<std::path::Path>,
146
        batch: &mut [((usize, usize), Self::Label)],
147
    ) -> Result<(usize, Self::EncodedBatchStats)> {
148
        let start = std::time::Instant::now();
16✔
149
        Triple::cast_batch_mut(batch).radix_sort_unstable();
16✔
150
        log::debug!("Sorted {} arcs in {:?}", batch.len(), start.elapsed());
8✔
151
        self.encode_sorted_batch(path, batch)
32✔
152
    }
153

154
    fn encode_sorted_batch(
8✔
155
        &self,
156
        path: impl AsRef<std::path::Path>,
157
        batch: &[((usize, usize), Self::Label)],
158
    ) -> Result<(usize, Self::EncodedBatchStats)> {
159
        debug_assert!(Triple::cast_batch(batch).is_sorted());
24✔
160
        // create a batch file where to dump
161
        let file_path = path.as_ref();
24✔
162
        let file = std::io::BufWriter::with_capacity(
163
            1 << 16,
8✔
164
            std::fs::File::create(file_path).with_context(|| {
24✔
NEW
165
                format!(
×
NEW
166
                    "Could not create BatchIterator temporary file {}",
×
NEW
167
                    file_path.display()
×
168
                )
169
            })?,
170
        );
171
        // create a bitstream to write to the file
172
        let mut stream = <BufBitWriter<E, _>>::new(<WordAdapter<usize, _>>::new(file));
32✔
173

174
        // prefix the stream with the length of the batch
175
        // we use a delta code since it'll be a big number most of the time
176
        stream
8✔
177
            .write_delta(batch.len() as u64)
16✔
178
            .context("Could not write length")?;
179

180
        let mut stats = GapsStats {
181
            total_triples: batch.len(),
8✔
182
            src_bits: 0,
183
            dst_bits: 0,
184
            labels_bits: 0,
185
        };
186
        // dump the triples to the bitstrea
187
        let (mut prev_src, mut prev_dst) = (0, 0);
24✔
188
        for ((src, dst), label) in batch.iter() {
136✔
189
            // write the source gap as gamma
190
            stats.src_bits += ConstCode::<SRC_CODE>
40✔
191
                .write(&mut stream, (src - prev_src) as u64)
120✔
192
                .with_context(|| format!("Could not write {src} after {prev_src}"))?;
40✔
193
            if *src != prev_src {
74✔
194
                // Reset prev_y
195
                prev_dst = 0;
34✔
196
            }
197
            // write the destination gap as gamma
198
            stats.dst_bits += ConstCode::<DST_CODE>
40✔
199
                .write(&mut stream, (dst - prev_dst) as u64)
120✔
200
                .with_context(|| format!("Could not write {dst} after {prev_dst}"))?;
40✔
201
            // write the label
202
            stats.labels_bits += self
40✔
203
                .serializer
40✔
204
                .serialize(label, &mut stream)
120✔
205
                .context("Could not serialize label")?;
40✔
206
            (prev_src, prev_dst) = (*src, *dst);
120✔
207
        }
208
        // flush the stream and reset the buffer
209
        stream.flush().context("Could not flush stream")?;
24✔
210

211
        let total_bits = stats.src_bits + stats.dst_bits + stats.labels_bits;
16✔
212
        Ok((total_bits, stats))
8✔
213
    }
214

215
    fn decode_batch(&self, path: impl AsRef<std::path::Path>) -> Result<Self::DecodedBatch> {
8✔
216
        // open the file
217
        let mut stream = <BufBitReader<E, _>>::new(MemWordReader::new(ArcMmapHelper(Arc::new(
32✔
218
            MmapHelper::mmap(
8✔
219
                path.as_ref(),
16✔
220
                MmapFlags::TRANSPARENT_HUGE_PAGES | MmapFlags::SEQUENTIAL,
8✔
221
            )
222
            .with_context(|| format!("Could not mmap {}", path.as_ref().display()))?,
8✔
223
        ))));
224

225
        // read the length of the batch (first value in the stream)
226
        let len = stream.read_delta().context("Could not read length")? as usize;
32✔
227

228
        // create the iterator
229
        Ok(GapsIterator {
8✔
230
            deserializer: self.deserializer.clone(),
24✔
231
            stream,
8✔
232
            len,
8✔
233
            current: 0,
8✔
234
            prev_src: 0,
8✔
235
            prev_dst: 0,
8✔
236
        })
237
    }
238
}
239

240
#[derive(Clone, Debug)]
241
/// An iterator over triples encoded with gaps, this is returned by [`GapsCodec`].
242
pub struct GapsIterator<
243
    E: Endianness = NE,
244
    D: BitDeserializer<E, BitReader<E>> = (),
245
    const SRC_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA },
246
    const DST_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA },
247
> where
248
    BitReader<E>: BitRead<E> + CodesRead<E>,
249
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
250
{
251
    /// Deserializer for the labels
252
    deserializer: D,
253
    /// Bitstream to read from
254
    stream: BitReader<E>,
255
    /// Length of the iterator (number of triples)
256
    len: usize,
257
    /// Current position in the iterator
258
    current: usize,
259
    /// Previous source node
260
    prev_src: usize,
261
    /// Previous destination node
262
    prev_dst: usize,
263
}
264

265
unsafe impl<
266
        E: Endianness,
267
        D: BitDeserializer<E, BitReader<E>>,
268
        const SRC_CODE: usize,
269
        const DST_CODE: usize,
270
    > SortedIterator for GapsIterator<E, D, SRC_CODE, DST_CODE>
271
where
272
    BitReader<E>: BitRead<E> + CodesRead<E>,
273
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
274
{
275
}
276

277
impl<
278
        E: Endianness,
279
        D: BitDeserializer<E, BitReader<E>>,
280
        const SRC_CODE: usize,
281
        const DST_CODE: usize,
282
    > Iterator for GapsIterator<E, D, SRC_CODE, DST_CODE>
283
where
284
    BitReader<E>: BitRead<E> + CodesRead<E>,
285
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
286
{
287
    type Item = ((usize, usize), D::DeserType);
288

289
    fn next(&mut self) -> Option<Self::Item> {
73✔
290
        if self.current >= self.len {
73✔
291
            return None;
11✔
292
        }
293
        let src_gap = ConstCode::<SRC_CODE>.read(&mut self.stream).ok()?;
310✔
294
        let dst_gap = ConstCode::<DST_CODE>.read(&mut self.stream).ok()?;
310✔
295
        let label = self.deserializer.deserialize(&mut self.stream).ok()?;
310✔
296
        self.prev_src += src_gap as usize;
62✔
297
        if src_gap != 0 {
118✔
298
            self.prev_dst = 0;
56✔
299
        }
300
        self.prev_dst += dst_gap as usize;
62✔
301
        self.current += 1;
62✔
302
        Some(((self.prev_src, self.prev_dst), label))
62✔
303
    }
304

NEW
305
    fn size_hint(&self) -> (usize, Option<usize>) {
×
NEW
306
        (self.len(), Some(self.len()))
×
307
    }
308
}
309

310
impl<
311
        E: Endianness,
312
        D: BitDeserializer<E, BitReader<E>>,
313
        const SRC_CODE: usize,
314
        const DST_CODE: usize,
315
    > ExactSizeIterator for GapsIterator<E, D, SRC_CODE, DST_CODE>
316
where
317
    BitReader<E>: BitRead<E> + CodesRead<E>,
318
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
319
{
NEW
320
    fn len(&self) -> usize {
×
NEW
321
        self.len - self.current
×
322
    }
323
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc