• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vigna / webgraph-rs / 24635666654

19 Apr 2026 06:07PM UTC coverage: 66.542% (-0.4%) from 66.986%
24635666654

push

github

vigna
Refined methods

7 of 25 new or added lines in 5 files covered. (28.0%)

670 existing lines in 36 files now uncovered.

6752 of 10147 relevant lines covered (66.54%)

46688126.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.53
/webgraph/src/utils/batch_codec/gaps.rs
1
/*
2
 * SPDX-FileCopyrightText: 2023 Inria
3
 * SPDX-FileCopyrightText: 2023 Sebastiano Vigna
4
 * SPDX-FileCopyrightText: 2025 Tommaso Fontana
5
 *
6
 * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
7
 */
8

9
use super::{BitReader, BitWriter};
10
use crate::traits::SortedIterator;
11
use crate::utils::{ArcMmapHelper, MmapHelper, Triple};
12
use crate::{
13
    traits::{BitDeserializer, BitSerializer},
14
    utils::{BatchCodec, humanize},
15
};
16

17
use std::sync::Arc;
18

19
use anyhow::{Context, Result};
20
use dsi_bitstream::prelude::*;
21
use mmap_rs::MmapFlags;
22
use rdst::*;
23

24
/// A codec for encoding and decoding batches of triples using gap compression.
25
///
26
/// This codec encodes triples of the form `(src, dst, label)` by encoding the
27
/// gaps between consecutive sources and destinations using a specified code.
28
///
29
/// # Type Parameters
30
///
31
/// - `S`: Serializer for the labels, implementing [`BitSerializer`] for the label type.
32
/// - `D`: Deserializer for the labels, implementing [`BitDeserializer`] for the label type.
33
/// - `SRC_CODE`: Code used for encoding source gaps (default: gamma).
34
/// - `DST_CODE`: Code used for encoding destination gaps (default: gamma).
35
///
36
/// # Encoding Format
37
///
38
/// 1. The batch length is written using delta coding.
39
/// 2. For each group of triples with the same source:
40
///     - The gap from the previous source is encoded.
41
///     - The gap from the previous destination is encoded.
42
///     - The label is serialized.
43
///
44
/// The bit deserializer must be [`Clone`] because we need one for each
45
/// [`GapsIter`], and there are possible scenarios in which the
46
/// deserializer might be stateful.
47
#[derive(Clone, Debug)]
48
pub struct GapsCodec<
49
    E: Endianness = NE,
50
    S: BitSerializer<E, BitWriter<E>> = (),
51
    D: BitDeserializer<E, BitReader<E>, DeserType = S::SerType> + Clone = (),
52
    const SRC_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA },
53
    const DST_CODE: usize = { dsi_bitstream::dispatch::code_consts::DELTA },
54
    const DEDUP: bool = false,
55
> where
56
    BitReader<E>: BitRead<E> + CodesRead<E>,
57
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
58
{
59
    /// Serializer for the labels
60
    pub serializer: S,
61
    /// Deserializer for the labels
62
    pub deserializer: D,
63
    /// Marker for the endianness
64
    _marker: std::marker::PhantomData<E>,
65
}
66

67
impl<E, S, D, const SRC_CODE: usize, const DST_CODE: usize, const DEDUP: bool>
68
    GapsCodec<E, S, D, SRC_CODE, DST_CODE, DEDUP>
69
where
70
    E: Endianness,
71
    S: BitSerializer<E, BitWriter<E>> + Send + Sync,
72
    D: BitDeserializer<E, BitReader<E>, DeserType = S::SerType> + Send + Sync + Clone,
73
    BitReader<E>: BitRead<E> + CodesRead<E>,
74
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
75
{
76
    /// Creates a new `GapsCodec` with the given serializer and deserializer.
77
    pub const fn new(serializer: S, deserializer: D) -> Self {
2✔
78
        Self {
79
            serializer,
80
            deserializer,
81
            _marker: std::marker::PhantomData,
82
        }
83
    }
84
}
85

86
impl<E, S: Default, D: Default, const SRC_CODE: usize, const DST_CODE: usize, const DEDUP: bool>
87
    core::default::Default for GapsCodec<E, S, D, SRC_CODE, DST_CODE, DEDUP>
88
where
89
    E: Endianness,
90
    S: BitSerializer<E, BitWriter<E>> + Send + Sync,
91
    D: BitDeserializer<E, BitReader<E>, DeserType = S::SerType> + Send + Sync + Clone,
92
    BitReader<E>: BitRead<E> + CodesRead<E>,
93
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
94
{
95
    fn default() -> Self {
2✔
96
        Self::new(Default::default(), Default::default())
6✔
97
    }
98
}
99

100
/// Statistics about the encoding performed by [`GapsCodec`].
101
#[derive(Debug, Clone, Copy)]
102
pub struct GapsStats {
103
    /// Total number of triples encoded
104
    pub total_triples: usize,
105
    /// Number of bits used for source gaps
106
    pub src_bits: usize,
107
    /// Number of bits used for destination gaps
108
    pub dst_bits: usize,
109
    /// Number of bits used for labels
110
    pub labels_bits: usize,
111
}
112

113
impl core::fmt::Display for GapsStats {
114
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
×
115
        let total_bits = self.src_bits + self.dst_bits + self.labels_bits;
×
116
        write!(
×
117
            f,
×
118
            "src: {}B ({:.3} bits / triple), dst: {}B ({:.3} bits / triple), labels: {}B ({:.3} bits / triple), total: {}B ({:.3} bits / triple)",
119
            humanize(self.src_bits as f64 / 8.0),
×
120
            self.src_bits as f64 / self.total_triples as f64,
×
121
            humanize(self.dst_bits as f64 / 8.0),
×
122
            self.dst_bits as f64 / self.total_triples as f64,
×
123
            humanize(self.labels_bits as f64 / 8.0),
×
124
            self.labels_bits as f64 / self.total_triples as f64,
×
125
            humanize(total_bits as f64 / 8.0),
×
126
            total_bits as f64 / self.total_triples as f64,
×
127
        )
128
    }
129
}
130

131
impl crate::utils::BatchStats for GapsStats {
132
    #[inline]
UNCOV
133
    fn total_triples(&self) -> usize {
×
UNCOV
134
        self.total_triples
×
135
    }
136
}
137

138
impl<E, S, D, const SRC_CODE: usize, const DST_CODE: usize, const DEDUP: bool> BatchCodec
139
    for GapsCodec<E, S, D, SRC_CODE, DST_CODE, DEDUP>
140
where
141
    E: Endianness,
142
    S: BitSerializer<E, BitWriter<E>> + Send + Sync,
143
    D: BitDeserializer<E, BitReader<E>, DeserType = S::SerType> + Send + Sync + Clone,
144
    S::SerType: Send + Sync + Copy + 'static + core::fmt::Debug, // needed by radix sort
145
    BitReader<E>: BitRead<E> + CodesRead<E>,
146
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
147
{
148
    type Label = S::SerType;
149
    type DecodedBatch = GapsIter<E, D, SRC_CODE, DST_CODE>;
150
    type EncodedBatchStats = GapsStats;
151

152
    fn encode_batch(
1✔
153
        &self,
154
        path: impl AsRef<std::path::Path>,
155
        batch: &mut [((usize, usize), Self::Label)],
156
    ) -> Result<(usize, Self::EncodedBatchStats)> {
157
        let start = std::time::Instant::now();
2✔
158
        Triple::cast_batch_mut(batch).radix_sort_unstable();
2✔
159
        log::debug!("Sorted {} arcs in {:?}", batch.len(), start.elapsed());
1✔
160
        self.encode_sorted_batch(path, batch)
4✔
161
    }
162

163
    fn encode_sorted_batch(
2✔
164
        &self,
165
        path: impl AsRef<std::path::Path>,
166
        batch: &[((usize, usize), Self::Label)],
167
    ) -> Result<(usize, Self::EncodedBatchStats)> {
168
        debug_assert!(Triple::cast_batch(batch).is_sorted());
6✔
169
        // create a bitstream to write to the file
170
        let file_path = path.as_ref();
6✔
171
        let mut stream = buf_bit_writer::from_path::<E, usize>(file_path).with_context(|| {
8✔
172
            format!(
×
173
                "Could not create BatchIterator temporary file {}",
×
174
                file_path.display()
×
175
            )
176
        })?;
177

178
        // Pre-count unique pairs for the length prefix when deduplicating
179
        let batch_len = if DEDUP {
4✔
180
            if batch.is_empty() {
×
181
                0
×
182
            } else {
183
                1 + batch.windows(2).filter(|w| w[0].0 != w[1].0).count()
×
184
            }
185
        } else {
186
            batch.len()
4✔
187
        };
188

189
        // prefix the stream with the length of the batch
190
        // we use a delta code since it'll be a big number most of the time
191
        stream
2✔
192
            .write_delta(batch_len as u64)
4✔
193
            .context("Could not write length")?;
194

195
        let mut stats = GapsStats {
196
            total_triples: batch_len,
197
            src_bits: 0,
198
            dst_bits: 0,
199
            labels_bits: 0,
200
        };
201
        // Dump the triples to the bitstream
202
        let (mut prev_src, mut prev_dst) = (0, 0);
6✔
203
        let mut prev_pair: Option<(usize, usize)> = None;
6✔
204
        for ((src, dst), label) in batch.iter() {
25✔
205
            if DEDUP {
7✔
206
                if prev_pair == Some((*src, *dst)) {
×
207
                    continue;
×
208
                }
209
                prev_pair = Some((*src, *dst));
×
210
            }
211
            // write the source gap as gamma
212
            stats.src_bits += ConstCode::<SRC_CODE>
7✔
213
                .write(&mut stream, (src - prev_src) as u64)
21✔
214
                .with_context(|| format!("Could not write {src} after {prev_src}"))?;
7✔
215
            if *src != prev_src {
11✔
216
                // Reset prev_y
217
                prev_dst = 0;
4✔
218
            }
219
            // write the destination gap as gamma
220
            stats.dst_bits += ConstCode::<DST_CODE>
7✔
221
                .write(&mut stream, (dst - prev_dst) as u64)
21✔
222
                .with_context(|| format!("Could not write {dst} after {prev_dst}"))?;
7✔
223
            // write the label
224
            stats.labels_bits += self
7✔
225
                .serializer
7✔
226
                .serialize(label, &mut stream)
21✔
227
                .context("Could not serialize label")?;
7✔
228
            (prev_src, prev_dst) = (*src, *dst);
21✔
229
        }
230
        // flush the stream and reset the buffer
231
        stream.flush().context("Could not flush stream")?;
6✔
232

233
        let total_bits = stats.src_bits + stats.dst_bits + stats.labels_bits;
4✔
234
        Ok((total_bits, stats))
2✔
235
    }
236

237
    fn decode_batch(&self, path: impl AsRef<std::path::Path>) -> Result<Self::DecodedBatch> {
2✔
238
        // open the file
239
        let mut stream = <BufBitReader<E, _>>::new(MemWordReader::new(ArcMmapHelper(Arc::new(
8✔
240
            MmapHelper::mmap(
2✔
241
                path.as_ref(),
4✔
242
                MmapFlags::TRANSPARENT_HUGE_PAGES | MmapFlags::SEQUENTIAL,
2✔
243
            )
244
            .with_context(|| format!("Could not mmap {}", path.as_ref().display()))?,
2✔
245
        ))));
246

247
        // read the length of the batch (first value in the stream)
248
        let len = stream.read_delta().context("Could not read length")? as usize;
8✔
249

250
        // create the iterator
251
        Ok(GapsIter {
2✔
252
            deserializer: self.deserializer.clone(),
6✔
253
            stream,
2✔
254
            len,
2✔
255
            current: 0,
2✔
256
            prev_src: 0,
2✔
257
            prev_dst: 0,
2✔
258
        })
259
    }
260
}
261

262
/// An iterator over triples encoded with gaps, this is returned by [`GapsCodec`].
263
#[derive(Clone, Debug)]
264
pub struct GapsIter<
265
    E: Endianness = NE,
266
    D: BitDeserializer<E, BitReader<E>> = (),
267
    const SRC_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA },
268
    const DST_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA },
269
> where
270
    BitReader<E>: BitRead<E> + CodesRead<E>,
271
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
272
{
273
    /// Deserializer for the labels
274
    deserializer: D,
275
    /// Bitstream to read from
276
    stream: BitReader<E>,
277
    /// Length of the iterator (number of triples)
278
    len: usize,
279
    /// Current position in the iterator
280
    current: usize,
281
    /// Previous source node
282
    prev_src: usize,
283
    /// Previous destination node
284
    prev_dst: usize,
285
}
286

287
// SAFETY: gaps are decoded in non-decreasing (src, dst) order.
288
unsafe impl<
289
    E: Endianness,
290
    D: BitDeserializer<E, BitReader<E>>,
291
    const SRC_CODE: usize,
292
    const DST_CODE: usize,
293
> SortedIterator for GapsIter<E, D, SRC_CODE, DST_CODE>
294
where
295
    BitReader<E>: BitRead<E> + CodesRead<E>,
296
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
297
{
298
}
299

300
impl<
301
    E: Endianness,
302
    D: BitDeserializer<E, BitReader<E>>,
303
    const SRC_CODE: usize,
304
    const DST_CODE: usize,
305
> Iterator for GapsIter<E, D, SRC_CODE, DST_CODE>
306
where
307
    BitReader<E>: BitRead<E> + CodesRead<E>,
308
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
309
{
310
    type Item = ((usize, usize), D::DeserType);
311

312
    fn next(&mut self) -> Option<Self::Item> {
5✔
313
        if self.current >= self.len {
5✔
314
            return None;
1✔
315
        }
316
        let src_gap = ConstCode::<SRC_CODE>.read(&mut self.stream).ok()?;
20✔
317
        let dst_gap = ConstCode::<DST_CODE>.read(&mut self.stream).ok()?;
20✔
318
        let label = self.deserializer.deserialize(&mut self.stream).ok()?;
20✔
319
        self.prev_src += src_gap as usize;
4✔
320
        if src_gap != 0 {
6✔
321
            self.prev_dst = 0;
2✔
322
        }
323
        self.prev_dst += dst_gap as usize;
4✔
324
        self.current += 1;
4✔
325
        Some(((self.prev_src, self.prev_dst), label))
4✔
326
    }
327

328
    fn size_hint(&self) -> (usize, Option<usize>) {
2✔
329
        (self.len(), Some(self.len()))
6✔
330
    }
331

332
    fn count(self) -> usize {
×
333
        self.len()
×
334
    }
335
}
336

337
impl<
338
    E: Endianness,
339
    D: BitDeserializer<E, BitReader<E>>,
340
    const SRC_CODE: usize,
341
    const DST_CODE: usize,
342
> ExactSizeIterator for GapsIter<E, D, SRC_CODE, DST_CODE>
343
where
344
    BitReader<E>: BitRead<E> + CodesRead<E>,
345
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
346
{
347
    fn len(&self) -> usize {
5✔
348
        self.len - self.current
5✔
349
    }
350
}
351

352
impl<
353
    E: Endianness,
354
    D: BitDeserializer<E, BitReader<E>>,
355
    const SRC_CODE: usize,
356
    const DST_CODE: usize,
357
> core::iter::FusedIterator for GapsIter<E, D, SRC_CODE, DST_CODE>
358
where
359
    BitReader<E>: BitRead<E> + CodesRead<E>,
360
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
361
{
362
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc