• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vigna / webgraph-rs / 22046400135

16 Feb 2026 12:49AM UTC coverage: 72.401% (+11.3%) from 61.096%
22046400135

push

github

vigna
fmt

6060 of 8370 relevant lines covered (72.4%)

48832055.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.67
/webgraph/src/utils/batch_codec/grouped_gaps.rs
1
/*
2
 * SPDX-FileCopyrightText: 2025 Tommaso Fontana
3
 *
4
 * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
5
 */
6

7
use super::{BitReader, BitWriter};
8
use crate::traits::SortedIterator;
9
use crate::utils::{ArcMmapHelper, MmapHelper, Triple};
10
use crate::{
11
    traits::{BitDeserializer, BitSerializer},
12
    utils::{BatchCodec, humanize},
13
};
14

15
use std::sync::Arc;
16

17
use anyhow::{Context, Result};
18
use dsi_bitstream::prelude::*;
19
use mmap_rs::MmapFlags;
20
use rdst::*;
21

22
#[derive(Clone, Debug)]
23
/// A codec for encoding and decoding batches of triples using grouped gap compression.
24
///
25
/// This codec encodes triples of the form `(src, dst, label)` by grouping edges
26
/// with the same source node, and encoding the gaps between consecutive sources
27
/// and destinations using a specified code (default: gamma). The outdegree
28
/// (number of edges for each source) is also encoded using the specified code.
29
///
30
/// # Type Parameters
31
///
32
/// - `S`: Serializer for the labels, implementing [`BitSerializer`] for the label type.
33
/// - `D`: Deserializer for the labels, implementing [`BitDeserializer`] for the label type.
34
/// - `OUTDEGREE_CODE`: Code used for encoding outdegrees (default: [É£](dsi_bitstream::codes::gamma)).
35
/// - `SRC_CODE`: Code used for encoding source gaps (default: [É£](dsi_bitstream::codes::gamma)).
36
/// - `DST_CODE`: Code used for encoding destination gaps (default: [É£](dsi_bitstream::codes::gamma)).
37
///
38
/// # Encoding Format
39
///
40
/// 1. The batch length is written using delta coding.
41
/// 2. For each group of triples with the same source:
42
///     - The gap from the previous source is encoded.
43
///     - The outdegree (number of edges for this source) is encoded.
44
///     - For each destination:
45
///         - The gap from the previous destination is encoded.
46
///         - The label is serialized.
47
///
48
/// The bit deserializer must be [`Clone`] because we need one for each
49
/// [`GroupedGapsIter`], and there are possible scenarios in which the
50
/// deserializer might be stateful.
51
pub struct GroupedGapsCodec<
52
    E: Endianness = NE,
53
    S: BitSerializer<E, BitWriter<E>> = (),
54
    D: BitDeserializer<E, BitReader<E>, DeserType = S::SerType> + Clone = (),
55
    const OUTDEGREE_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA },
56
    const SRC_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA },
57
    const DST_CODE: usize = { dsi_bitstream::dispatch::code_consts::DELTA },
58
> where
59
    BitReader<E>: BitRead<E>,
60
    BitWriter<E>: BitWrite<E>,
61
{
62
    /// Serializer for the labels.
63
    pub serializer: S,
64
    /// Deserializer for the labels.
65
    pub deserializer: D,
66

67
    pub _marker: core::marker::PhantomData<E>,
68
}
69

70
impl<E, S, D, const OUTDEGREE_CODE: usize, const SRC_CODE: usize, const DST_CODE: usize>
71
    GroupedGapsCodec<E, S, D, OUTDEGREE_CODE, SRC_CODE, DST_CODE>
72
where
73
    E: Endianness,
74
    S: BitSerializer<E, BitWriter<E>> + Send + Sync,
75
    D: BitDeserializer<E, BitReader<E>, DeserType = S::SerType> + Send + Sync + Clone,
76
    BitReader<E>: BitRead<E>,
77
    BitWriter<E>: BitWrite<E>,
78
{
79
    /// Creates a new `GroupedGapsCodec` with the given serializer and deserializer.
80
    pub fn new(serializer: S, deserializer: D) -> Self {
1✔
81
        Self {
82
            serializer,
83
            deserializer,
84
            _marker: core::marker::PhantomData,
85
        }
86
    }
87
}
88

89
impl<
90
    E: Endianness,
91
    S: BitSerializer<E, BitWriter<E>> + Default,
92
    D: BitDeserializer<E, BitReader<E>, DeserType = S::SerType> + Clone + Default,
93
    const OUTDEGREE_CODE: usize,
94
    const SRC_CODE: usize,
95
    const DST_CODE: usize,
96
> Default for GroupedGapsCodec<E, S, D, OUTDEGREE_CODE, SRC_CODE, DST_CODE>
97
where
98
    BitReader<E>: BitRead<E>,
99
    BitWriter<E>: BitWrite<E>,
100
{
101
    fn default() -> Self {
180✔
102
        Self {
103
            serializer: S::default(),
360✔
104
            deserializer: D::default(),
180✔
105
            _marker: core::marker::PhantomData,
106
        }
107
    }
108
}
109

110
#[derive(Debug, Clone, Copy)]
111
/// Statistics about the encoding performed by [`GroupedGapsCodec`].
112
pub struct GroupedGapsStats {
113
    /// Total number of triples encoded
114
    pub total_triples: usize,
115
    /// Number of bits used for outdegrees
116
    pub outdegree_bits: usize,
117
    /// Number of bits used for source gaps
118
    pub src_bits: usize,
119
    /// Number of bits used for destination gaps
120
    pub dst_bits: usize,
121
    /// Number of bits used for labels
122
    pub labels_bits: usize,
123
}
124

125
impl core::fmt::Display for GroupedGapsStats {
126
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
36✔
127
        write!(
36✔
128
            f,
36✔
129
            "outdegree: {}B ({:.3} bits / arc), src: {}B ({:.3} bits / arc), dst: {}B ({:.3} bits / arc), labels: {}B ({:.3} bits / arc)",
130
            humanize(self.outdegree_bits as f64 / 8.0),
72✔
131
            self.outdegree_bits as f64 / self.total_triples as f64,
36✔
132
            humanize(self.src_bits as f64 / 8.0),
72✔
133
            self.src_bits as f64 / self.total_triples as f64,
36✔
134
            humanize(self.dst_bits as f64 / 8.0),
72✔
135
            self.dst_bits as f64 / self.total_triples as f64,
36✔
136
            humanize(self.labels_bits as f64 / 8.0),
72✔
137
            self.labels_bits as f64 / self.total_triples as f64,
36✔
138
        )
139
    }
140
}
141

142
impl<E, S, D, const OUTDEGREE_CODE: usize, const SRC_CODE: usize, const DST_CODE: usize> BatchCodec
143
    for GroupedGapsCodec<E, S, D, OUTDEGREE_CODE, SRC_CODE, DST_CODE>
144
where
145
    E: Endianness,
146
    S: BitSerializer<E, BitWriter<E>> + Send + Sync,
147
    D: BitDeserializer<E, BitReader<E>, DeserType = S::SerType> + Send + Sync + Clone,
148
    S::SerType: Send + Sync + Copy + 'static, // needed by radix sort
149
    BitReader<E>: BitRead<E> + CodesRead<E>,
150
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
151
{
152
    type Label = S::SerType;
153
    type DecodedBatch = GroupedGapsIter<E, D, OUTDEGREE_CODE, SRC_CODE, DST_CODE>;
154
    type EncodedBatchStats = GroupedGapsStats;
155

156
    fn encode_batch(
1,266✔
157
        &self,
158
        path: impl AsRef<std::path::Path>,
159
        batch: &mut [((usize, usize), Self::Label)],
160
    ) -> Result<(usize, Self::EncodedBatchStats)> {
161
        let start = std::time::Instant::now();
2,532✔
162
        Triple::cast_batch_mut(batch).radix_sort_unstable();
2,532✔
163
        log::debug!("Sorted {} arcs in {:?}", batch.len(), start.elapsed());
1,266✔
164
        self.encode_sorted_batch(path, batch)
5,064✔
165
    }
166

167
    fn encode_sorted_batch(
1,266✔
168
        &self,
169
        path: impl AsRef<std::path::Path>,
170
        batch: &[((usize, usize), Self::Label)],
171
    ) -> Result<(usize, Self::EncodedBatchStats)> {
172
        debug_assert!(Triple::cast_batch(batch).is_sorted(), "Batch is not sorted");
3,798✔
173
        // create a bitstream to write to the file
174
        let file_path = path.as_ref();
3,798✔
175
        let mut stream = buf_bit_writer::from_path::<E, usize>(file_path).with_context(|| {
5,064✔
176
            format!(
×
177
                "Could not create BatchIterator temporary file {}",
×
178
                file_path.display()
×
179
            )
180
        })?;
181

182
        // prefix the stream with the length of the batch
183
        // we use a delta code since it'll be a big number most of the time
184
        stream
1,266✔
185
            .write_delta(batch.len() as u64)
2,532✔
186
            .context("Could not write length")?;
187

188
        let mut stats = GroupedGapsStats {
189
            total_triples: batch.len(),
1,266✔
190
            outdegree_bits: 0,
191
            src_bits: 0,
192
            dst_bits: 0,
193
            labels_bits: 0,
194
        };
195
        // dump the triples to the bitstream
196
        let mut prev_src = 0;
2,532✔
197
        let mut i = 0;
2,532✔
198
        while i < batch.len() {
5,572,818✔
199
            let ((src, _), _) = batch[i];
5,570,286✔
200
            // write the source gap as gamma
201
            stats.src_bits += ConstCode::<SRC_CODE>
2,785,143✔
202
                .write(&mut stream, (src - prev_src) as _)
8,355,429✔
203
                .with_context(|| format!("Could not write {src} after {prev_src}"))?;
2,785,143✔
204
            // figure out how many edges have this source
205
            let outdegree = batch[i..].iter().take_while(|t| t.0.0 == src).count();
103,098,995✔
206
            // write the outdegree
207
            stats.outdegree_bits += ConstCode::<OUTDEGREE_CODE>
2,785,143✔
208
                .write(&mut stream, outdegree as _)
8,355,429✔
209
                .with_context(|| format!("Could not write outdegree {outdegree} for {src}"))?;
2,785,143✔
210

211
            // encode the destinations
212
            let mut prev_dst = 0;
5,570,286✔
213
            for _ in 0..outdegree {
2,785,143✔
214
                let ((_, dst), label) = &batch[i];
125,408,175✔
215
                // write the destination gap as gamma
216
                stats.dst_bits += ConstCode::<DST_CODE>
41,802,725✔
217
                    .write(&mut stream, (dst - prev_dst) as _)
125,408,175✔
218
                    .with_context(|| format!("Could not write {dst} after {prev_dst}"))?;
41,802,725✔
219
                // write the label
220
                stats.labels_bits += self
41,802,725✔
221
                    .serializer
41,802,725✔
222
                    .serialize(label, &mut stream)
125,408,175✔
223
                    .context("Could not serialize label")?;
41,802,725✔
224
                prev_dst = *dst;
41,802,725✔
225
                i += 1;
41,802,725✔
226
            }
227
            prev_src = src;
2,785,143✔
228
        }
229
        // flush the stream and reset the buffer
230
        stream.flush().context("Could not flush stream")?;
3,798✔
231

232
        let total_bits = stats.outdegree_bits + stats.src_bits + stats.dst_bits + stats.labels_bits;
2,532✔
233
        Ok((total_bits, stats))
1,266✔
234
    }
235

236
    fn decode_batch(&self, path: impl AsRef<std::path::Path>) -> Result<Self::DecodedBatch> {
837✔
237
        // open the file
238
        let mut stream = <BufBitReader<E, _>>::new(MemWordReader::new(ArcMmapHelper(Arc::new(
3,348✔
239
            MmapHelper::mmap(
837✔
240
                path.as_ref(),
1,674✔
241
                MmapFlags::TRANSPARENT_HUGE_PAGES | MmapFlags::SEQUENTIAL,
837✔
242
            )
243
            .with_context(|| format!("Could not mmap {}", path.as_ref().display()))?,
837✔
244
        ))));
245

246
        // read the length of the batch (first value in the stream)
247
        let len = stream.read_delta().context("Could not read length")? as usize;
3,348✔
248

249
        // create the iterator
250
        Ok(GroupedGapsIter {
837✔
251
            deserializer: self.deserializer.clone(),
2,511✔
252
            stream,
837✔
253
            len,
837✔
254
            current: 0,
837✔
255
            src: 0,
837✔
256
            dst_left: 0,
837✔
257
            prev_dst: 0,
837✔
258
        })
259
    }
260
}
261

262
#[derive(Clone, Debug)]
263
/// An iterator over triples encoded with gaps, this is returned by [`GroupedGapsCodec`].
264
pub struct GroupedGapsIter<
265
    E: Endianness = NE,
266
    D: BitDeserializer<E, BitReader<E>> = (),
267
    const OUTDEGREE_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA },
268
    const SRC_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA },
269
    const DST_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA },
270
> where
271
    BitReader<E>: BitRead<E>,
272
    BitWriter<E>: BitWrite<E>,
273
{
274
    /// Deserializer for the labels
275
    deserializer: D,
276
    /// Bitstream to read from
277
    stream: BitReader<E>,
278
    /// Length of the iterator (number of triples)
279
    len: usize,
280
    /// Current position in the iterator
281
    current: usize,
282
    /// Current source node
283
    src: usize,
284
    /// Number of destinations left for the current source
285
    dst_left: usize,
286
    /// Previous destination node
287
    prev_dst: usize,
288
}
289

290
unsafe impl<
291
    E: Endianness,
292
    D: BitDeserializer<E, BitReader<E>>,
293
    const OUTDEGREE_CODE: usize,
294
    const SRC_CODE: usize,
295
    const DST_CODE: usize,
296
> SortedIterator for GroupedGapsIter<E, D, OUTDEGREE_CODE, SRC_CODE, DST_CODE>
297
where
298
    BitReader<E>: BitRead<E> + CodesRead<E>,
299
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
300
{
301
}
302

303
impl<
304
    E: Endianness,
305
    D: BitDeserializer<E, BitReader<E>>,
306
    const OUTDEGREE_CODE: usize,
307
    const SRC_CODE: usize,
308
    const DST_CODE: usize,
309
> Iterator for GroupedGapsIter<E, D, OUTDEGREE_CODE, SRC_CODE, DST_CODE>
310
where
311
    BitReader<E>: BitRead<E> + CodesRead<E>,
312
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
313
{
314
    type Item = ((usize, usize), D::DeserType);
315
    fn next(&mut self) -> Option<Self::Item> {
71,147,002✔
316
        if self.current >= self.len {
71,147,002✔
317
            return None;
756✔
318
        }
319
        if self.dst_left == 0 {
71,146,246✔
320
            // read a new source
321
            let src_gap = ConstCode::<SRC_CODE>.read(&mut self.stream).ok()?;
22,458,175✔
322
            self.src += src_gap as usize;
4,491,635✔
323
            // read the outdegree
324
            self.dst_left = ConstCode::<OUTDEGREE_CODE>.read(&mut self.stream).ok()? as usize;
17,966,540✔
325
            self.prev_dst = 0;
4,491,635✔
326
        }
327

328
        let dst_gap = ConstCode::<DST_CODE>.read(&mut self.stream).ok()?;
355,731,230✔
329
        let label = self.deserializer.deserialize(&mut self.stream).ok()?;
355,731,230✔
330
        self.prev_dst += dst_gap as usize;
71,146,246✔
331
        self.current += 1;
71,146,246✔
332
        self.dst_left -= 1;
71,146,246✔
333
        Some(((self.src, self.prev_dst), label))
71,146,246✔
334
    }
335

336
    fn size_hint(&self) -> (usize, Option<usize>) {
2✔
337
        (self.len(), Some(self.len()))
6✔
338
    }
339
}
340

341
impl<
342
    E: Endianness,
343
    D: BitDeserializer<E, BitReader<E>>,
344
    const OUTDEGREE_CODE: usize,
345
    const SRC_CODE: usize,
346
    const DST_CODE: usize,
347
> ExactSizeIterator for GroupedGapsIter<E, D, OUTDEGREE_CODE, SRC_CODE, DST_CODE>
348
where
349
    BitReader<E>: BitRead<E> + CodesRead<E>,
350
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
351
{
352
    fn len(&self) -> usize {
5✔
353
        self.len - self.current
5✔
354
    }
355
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc