• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vigna / webgraph-rs / 22649179319

04 Mar 2026 12:25AM UTC coverage: 71.445% (-0.06%) from 71.504%
22649179319

push

github

vigna
Deduplication support at codec level

32 of 40 new or added lines in 8 files covered. (80.0%)

7 existing lines in 3 files now uncovered.

6345 of 8881 relevant lines covered (71.44%)

51405455.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.16
/webgraph/src/utils/batch_codec/gaps.rs
1
/*
2
 * SPDX-FileCopyrightText: 2023 Inria
3
 * SPDX-FileCopyrightText: 2023 Sebastiano Vigna
4
 * SPDX-FileCopyrightText: 2025 Tommaso Fontana
5
 *
6
 * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
7
 */
8

9
use super::{BitReader, BitWriter};
10
use crate::traits::SortedIterator;
11
use crate::utils::{ArcMmapHelper, MmapHelper, Triple};
12
use crate::{
13
    traits::{BitDeserializer, BitSerializer},
14
    utils::{BatchCodec, humanize},
15
};
16

17
use std::sync::Arc;
18

19
use anyhow::{Context, Result};
20
use dsi_bitstream::prelude::*;
21
use mmap_rs::MmapFlags;
22
use rdst::*;
23

24
/// A codec for encoding and decoding batches of triples using gap compression.
25
///
26
/// This codec encodes triples of the form `(src, dst, label)` by encoding the
27
/// gaps between consecutive sources and destinations using a specified code.
28
///
29
/// # Type Parameters
30
///
31
/// - `S`: Serializer for the labels, implementing [`BitSerializer`] for the label type.
32
/// - `D`: Deserializer for the labels, implementing [`BitDeserializer`] for the label type.
33
/// - `SRC_CODE`: Code used for encoding source gaps (default: gamma).
34
/// - `DST_CODE`: Code used for encoding destination gaps (default: gamma).
35
///
36
/// # Encoding Format
37
///
38
/// 1. The batch length is written using delta coding.
39
/// 2. For each group of triples with the same source:
40
///     - The gap from the previous source is encoded.
41
///     - The gap from the previous destination is encoded.
42
///     - The label is serialized.
43
///
44
/// The bit deserializer must be [`Clone`] because we need one for each
45
/// [`GapsIter`], and there are possible scenarios in which the
46
/// deserializer might be stateful.
47
#[derive(Clone, Debug)]
48
pub struct GapsCodec<
49
    E: Endianness = NE,
50
    S: BitSerializer<E, BitWriter<E>> = (),
51
    D: BitDeserializer<E, BitReader<E>, DeserType = S::SerType> + Clone = (),
52
    const SRC_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA },
53
    const DST_CODE: usize = { dsi_bitstream::dispatch::code_consts::DELTA },
54
    const DEDUP: bool = false,
55
> where
56
    BitReader<E>: BitRead<E> + CodesRead<E>,
57
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
58
{
59
    /// Serializer for the labels
60
    pub serializer: S,
61
    /// Deserializer for the labels
62
    pub deserializer: D,
63
    /// Marker for the endianness
64
    _marker: std::marker::PhantomData<E>,
65
}
66

67
impl<E, S, D, const SRC_CODE: usize, const DST_CODE: usize, const DEDUP: bool>
68
    GapsCodec<E, S, D, SRC_CODE, DST_CODE, DEDUP>
69
where
70
    E: Endianness,
71
    S: BitSerializer<E, BitWriter<E>> + Send + Sync,
72
    D: BitDeserializer<E, BitReader<E>, DeserType = S::SerType> + Send + Sync + Clone,
73
    BitReader<E>: BitRead<E> + CodesRead<E>,
74
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
75
{
76
    /// Creates a new `GapsCodec` with the given serializer and deserializer.
77
    pub const fn new(serializer: S, deserializer: D) -> Self {
7✔
78
        Self {
79
            serializer,
80
            deserializer,
81
            _marker: std::marker::PhantomData,
82
        }
83
    }
84
}
85

86
impl<E, S: Default, D: Default, const SRC_CODE: usize, const DST_CODE: usize, const DEDUP: bool>
87
    core::default::Default for GapsCodec<E, S, D, SRC_CODE, DST_CODE, DEDUP>
88
where
89
    E: Endianness,
90
    S: BitSerializer<E, BitWriter<E>> + Send + Sync,
91
    D: BitDeserializer<E, BitReader<E>, DeserType = S::SerType> + Send + Sync + Clone,
92
    BitReader<E>: BitRead<E> + CodesRead<E>,
93
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
94
{
95
    fn default() -> Self {
5✔
96
        Self::new(Default::default(), Default::default())
15✔
97
    }
98
}
99

100
/// Statistics about the encoding performed by [`GapsCodec`].
101
#[derive(Debug, Clone, Copy)]
102
pub struct GapsStats {
103
    /// Total number of triples encoded
104
    pub total_triples: usize,
105
    /// Number of bits used for source gaps
106
    pub src_bits: usize,
107
    /// Number of bits used for destination gaps
108
    pub dst_bits: usize,
109
    /// Number of bits used for labels
110
    pub labels_bits: usize,
111
}
112

113
impl core::fmt::Display for GapsStats {
114
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
×
115
        let total_bits = self.src_bits + self.dst_bits + self.labels_bits;
×
116
        write!(
×
117
            f,
×
118
            "src: {}B ({:.3} bits / arc), dst: {}B ({:.3} bits / arc), labels: {}B ({:.3} bits / arc), total: {}B ({:.3} bits / arc)",
119
            humanize(self.src_bits as f64 / 8.0),
×
120
            self.src_bits as f64 / self.total_triples as f64,
×
121
            humanize(self.dst_bits as f64 / 8.0),
×
122
            self.dst_bits as f64 / self.total_triples as f64,
×
123
            humanize(self.labels_bits as f64 / 8.0),
×
124
            self.labels_bits as f64 / self.total_triples as f64,
×
125
            humanize(total_bits as f64 / 8.0),
×
126
            total_bits as f64 / self.total_triples as f64,
×
127
        )
128
    }
129
}
130

131
impl<E, S, D, const SRC_CODE: usize, const DST_CODE: usize, const DEDUP: bool> BatchCodec
132
    for GapsCodec<E, S, D, SRC_CODE, DST_CODE, DEDUP>
133
where
134
    E: Endianness,
135
    S: BitSerializer<E, BitWriter<E>> + Send + Sync,
136
    D: BitDeserializer<E, BitReader<E>, DeserType = S::SerType> + Send + Sync + Clone,
137
    S::SerType: Send + Sync + Copy + 'static + core::fmt::Debug, // needed by radix sort
138
    BitReader<E>: BitRead<E> + CodesRead<E>,
139
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
140
{
141
    type Label = S::SerType;
142
    type DecodedBatch = GapsIter<E, D, SRC_CODE, DST_CODE>;
143
    type EncodedBatchStats = GapsStats;
144

145
    fn encode_batch(
11✔
146
        &self,
147
        path: impl AsRef<std::path::Path>,
148
        batch: &mut [((usize, usize), Self::Label)],
149
    ) -> Result<(usize, Self::EncodedBatchStats)> {
150
        let start = std::time::Instant::now();
22✔
151
        Triple::cast_batch_mut(batch).radix_sort_unstable();
22✔
152
        log::debug!("Sorted {} arcs in {:?}", batch.len(), start.elapsed());
11✔
153
        self.encode_sorted_batch(path, batch)
44✔
154
    }
155

156
    fn encode_sorted_batch(
12✔
157
        &self,
158
        path: impl AsRef<std::path::Path>,
159
        batch: &[((usize, usize), Self::Label)],
160
    ) -> Result<(usize, Self::EncodedBatchStats)> {
161
        debug_assert!(Triple::cast_batch(batch).is_sorted());
36✔
162
        // create a bitstream to write to the file
163
        let file_path = path.as_ref();
36✔
164
        let mut stream = buf_bit_writer::from_path::<E, usize>(file_path).with_context(|| {
48✔
165
            format!(
×
166
                "Could not create BatchIterator temporary file {}",
×
167
                file_path.display()
×
168
            )
169
        })?;
170

171
        // Pre-count unique pairs for the length prefix when deduplicating
172
        let batch_len = if DEDUP {
24✔
NEW
173
            if batch.is_empty() {
×
NEW
174
                0
×
175
            } else {
NEW
176
                1 + batch.windows(2).filter(|w| w[0].0 != w[1].0).count()
×
177
            }
178
        } else {
179
            batch.len()
24✔
180
        };
181

182
        // prefix the stream with the length of the batch
183
        // we use a delta code since it'll be a big number most of the time
184
        stream
12✔
185
            .write_delta(batch_len as u64)
24✔
186
            .context("Could not write length")?;
187

188
        let mut stats = GapsStats {
189
            total_triples: batch_len,
190
            src_bits: 0,
191
            dst_bits: 0,
192
            labels_bits: 0,
193
        };
194
        // Dump the triples to the bitstream
195
        let (mut prev_src, mut prev_dst) = (0, 0);
36✔
196
        let mut prev_pair: Option<(usize, usize)> = None;
36✔
197
        for ((src, dst), label) in batch.iter() {
180✔
198
            if DEDUP {
52✔
NEW
199
                if prev_pair == Some((*src, *dst)) {
×
NEW
200
                    continue;
×
201
                }
NEW
202
                prev_pair = Some((*src, *dst));
×
203
            }
204
            // write the source gap as gamma
205
            stats.src_bits += ConstCode::<SRC_CODE>
52✔
206
                .write(&mut stream, (src - prev_src) as u64)
156✔
207
                .with_context(|| format!("Could not write {src} after {prev_src}"))?;
52✔
208
            if *src != prev_src {
94✔
209
                // Reset prev_y
210
                prev_dst = 0;
42✔
211
            }
212
            // write the destination gap as gamma
213
            stats.dst_bits += ConstCode::<DST_CODE>
52✔
214
                .write(&mut stream, (dst - prev_dst) as u64)
156✔
215
                .with_context(|| format!("Could not write {dst} after {prev_dst}"))?;
52✔
216
            // write the label
217
            stats.labels_bits += self
52✔
218
                .serializer
52✔
219
                .serialize(label, &mut stream)
156✔
220
                .context("Could not serialize label")?;
52✔
221
            (prev_src, prev_dst) = (*src, *dst);
156✔
222
        }
223
        // flush the stream and reset the buffer
224
        stream.flush().context("Could not flush stream")?;
36✔
225

226
        let total_bits = stats.src_bits + stats.dst_bits + stats.labels_bits;
24✔
227
        Ok((total_bits, stats))
12✔
228
    }
229

230
    fn decode_batch(&self, path: impl AsRef<std::path::Path>) -> Result<Self::DecodedBatch> {
12✔
231
        // open the file
232
        let mut stream = <BufBitReader<E, _>>::new(MemWordReader::new(ArcMmapHelper(Arc::new(
48✔
233
            MmapHelper::mmap(
12✔
234
                path.as_ref(),
24✔
235
                MmapFlags::TRANSPARENT_HUGE_PAGES | MmapFlags::SEQUENTIAL,
12✔
236
            )
237
            .with_context(|| format!("Could not mmap {}", path.as_ref().display()))?,
12✔
238
        ))));
239

240
        // read the length of the batch (first value in the stream)
241
        let len = stream.read_delta().context("Could not read length")? as usize;
48✔
242

243
        // create the iterator
244
        Ok(GapsIter {
12✔
245
            deserializer: self.deserializer.clone(),
36✔
246
            stream,
12✔
247
            len,
12✔
248
            current: 0,
12✔
249
            prev_src: 0,
12✔
250
            prev_dst: 0,
12✔
251
        })
252
    }
253
}
254

255
/// An iterator over triples encoded with gaps, this is returned by [`GapsCodec`].
256
#[derive(Clone, Debug)]
257
pub struct GapsIter<
258
    E: Endianness = NE,
259
    D: BitDeserializer<E, BitReader<E>> = (),
260
    const SRC_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA },
261
    const DST_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA },
262
> where
263
    BitReader<E>: BitRead<E> + CodesRead<E>,
264
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
265
{
266
    /// Deserializer for the labels
267
    deserializer: D,
268
    /// Bitstream to read from
269
    stream: BitReader<E>,
270
    /// Length of the iterator (number of triples)
271
    len: usize,
272
    /// Current position in the iterator
273
    current: usize,
274
    /// Previous source node
275
    prev_src: usize,
276
    /// Previous destination node
277
    prev_dst: usize,
278
}
279

280
// SAFETY: gaps are decoded in non-decreasing (src, dst) order.
281
unsafe impl<
282
    E: Endianness,
283
    D: BitDeserializer<E, BitReader<E>>,
284
    const SRC_CODE: usize,
285
    const DST_CODE: usize,
286
> SortedIterator for GapsIter<E, D, SRC_CODE, DST_CODE>
287
where
288
    BitReader<E>: BitRead<E> + CodesRead<E>,
289
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
290
{
291
}
292

293
impl<
294
    E: Endianness,
295
    D: BitDeserializer<E, BitReader<E>>,
296
    const SRC_CODE: usize,
297
    const DST_CODE: usize,
298
> Iterator for GapsIter<E, D, SRC_CODE, DST_CODE>
299
where
300
    BitReader<E>: BitRead<E> + CodesRead<E>,
301
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
302
{
303
    type Item = ((usize, usize), D::DeserType);
304

305
    fn next(&mut self) -> Option<Self::Item> {
85✔
306
        if self.current >= self.len {
85✔
307
            return None;
14✔
308
        }
309
        let src_gap = ConstCode::<SRC_CODE>.read(&mut self.stream).ok()?;
355✔
310
        let dst_gap = ConstCode::<DST_CODE>.read(&mut self.stream).ok()?;
355✔
311
        let label = self.deserializer.deserialize(&mut self.stream).ok()?;
355✔
312
        self.prev_src += src_gap as usize;
71✔
313
        if src_gap != 0 {
133✔
314
            self.prev_dst = 0;
62✔
315
        }
316
        self.prev_dst += dst_gap as usize;
71✔
317
        self.current += 1;
71✔
318
        Some(((self.prev_src, self.prev_dst), label))
71✔
319
    }
320

321
    fn size_hint(&self) -> (usize, Option<usize>) {
2✔
322
        (self.len(), Some(self.len()))
6✔
323
    }
324

325
    fn count(self) -> usize {
×
326
        self.len()
×
327
    }
328
}
329

330
impl<
331
    E: Endianness,
332
    D: BitDeserializer<E, BitReader<E>>,
333
    const SRC_CODE: usize,
334
    const DST_CODE: usize,
335
> ExactSizeIterator for GapsIter<E, D, SRC_CODE, DST_CODE>
336
where
337
    BitReader<E>: BitRead<E> + CodesRead<E>,
338
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
339
{
340
    fn len(&self) -> usize {
5✔
341
        self.len - self.current
5✔
342
    }
343
}
344

345
impl<
346
    E: Endianness,
347
    D: BitDeserializer<E, BitReader<E>>,
348
    const SRC_CODE: usize,
349
    const DST_CODE: usize,
350
> core::iter::FusedIterator for GapsIter<E, D, SRC_CODE, DST_CODE>
351
where
352
    BitReader<E>: BitRead<E> + CodesRead<E>,
353
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
354
{
355
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc