• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vigna / webgraph-rs / 23365769388

20 Mar 2026 10:52PM UTC coverage: 68.228% (-3.0%) from 71.245%
23365769388

push

github

vigna
No le_bins,be_bins for webgraph

6655 of 9754 relevant lines covered (68.23%)

46582760.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.4
/webgraph/src/utils/batch_codec/grouped_gaps.rs
1
/*
2
 * SPDX-FileCopyrightText: 2025 Tommaso Fontana
3
 * SPDX-FileCopyrightText: 2025 Sebastiano Vigna
4
 *
5
 * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
6
 */
7

8
use super::{BitReader, BitWriter};
9
use crate::traits::SortedIterator;
10
use crate::utils::{ArcMmapHelper, MmapHelper, Triple};
11
use crate::{
12
    traits::{BitDeserializer, BitSerializer},
13
    utils::{BatchCodec, humanize},
14
};
15

16
use std::sync::Arc;
17

18
use anyhow::{Context, Result};
19
use dsi_bitstream::prelude::*;
20
use mmap_rs::MmapFlags;
21
use rdst::*;
22

23
/// A codec for encoding and decoding batches of triples using grouped gap compression.
24
///
25
/// This codec encodes triples of the form `(src, dst, label)` by grouping edges
26
/// with the same source node, and encoding the gaps between consecutive sources
27
/// and destinations using a specified code (default: gamma). The outdegree
28
/// (number of edges for each source) is also encoded using the specified code.
29
///
30
/// # Type Parameters
31
///
32
/// - `S`: Serializer for the labels, implementing [`BitSerializer`] for the label type.
33
/// - `D`: Deserializer for the labels, implementing [`BitDeserializer`] for the label type.
34
/// - `OUTDEGREE_CODE`: Code used for encoding outdegrees (default: [ɣ](dsi_bitstream::codes::gamma)).
35
/// - `SRC_CODE`: Code used for encoding source gaps (default: [ɣ](dsi_bitstream::codes::gamma)).
36
/// - `DST_CODE`: Code used for encoding destination gaps (default: [ɣ](dsi_bitstream::codes::gamma)).
37
///
38
/// # Encoding Format
39
///
40
/// 1. The batch length is written using delta coding.
41
/// 2. For each group of triples with the same source:
42
///     - The gap from the previous source is encoded.
43
///     - The outdegree (number of edges for this source) is encoded.
44
///     - For each destination:
45
///         - The gap from the previous destination is encoded.
46
///         - The label is serialized.
47
///
48
/// The bit deserializer must be [`Clone`] because we need one for each
49
/// [`GroupedGapsIter`], and there are possible scenarios in which the
50
/// deserializer might be stateful.
51
#[derive(Clone, Debug)]
52
pub struct GroupedGapsCodec<
53
    E: Endianness = NE,
54
    S: BitSerializer<E, BitWriter<E>> = (),
55
    D: BitDeserializer<E, BitReader<E>, DeserType = S::SerType> + Clone = (),
56
    const OUTDEGREE_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA },
57
    const SRC_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA },
58
    const DST_CODE: usize = { dsi_bitstream::dispatch::code_consts::DELTA },
59
    const DEDUP: bool = false,
60
> where
61
    BitReader<E>: BitRead<E>,
62
    BitWriter<E>: BitWrite<E>,
63
{
64
    /// Serializer for the labels.
65
    pub serializer: S,
66
    /// Deserializer for the labels.
67
    pub deserializer: D,
68

69
    _marker: core::marker::PhantomData<E>,
70
}
71

72
impl<
73
    E,
74
    S,
75
    D,
76
    const OUTDEGREE_CODE: usize,
77
    const SRC_CODE: usize,
78
    const DST_CODE: usize,
79
    const DEDUP: bool,
80
> GroupedGapsCodec<E, S, D, OUTDEGREE_CODE, SRC_CODE, DST_CODE, DEDUP>
81
where
82
    E: Endianness,
83
    S: BitSerializer<E, BitWriter<E>> + Send + Sync,
84
    D: BitDeserializer<E, BitReader<E>, DeserType = S::SerType> + Send + Sync + Clone,
85
    BitReader<E>: BitRead<E>,
86
    BitWriter<E>: BitWrite<E>,
87
{
88
    /// Creates a new `GroupedGapsCodec` with the given serializer and deserializer.
89
    pub const fn new(serializer: S, deserializer: D) -> Self {
1✔
90
        Self {
91
            serializer,
92
            deserializer,
93
            _marker: core::marker::PhantomData,
94
        }
95
    }
96
}
97

98
impl<
99
    E: Endianness,
100
    S: BitSerializer<E, BitWriter<E>> + Default,
101
    D: BitDeserializer<E, BitReader<E>, DeserType = S::SerType> + Clone + Default,
102
    const OUTDEGREE_CODE: usize,
103
    const SRC_CODE: usize,
104
    const DST_CODE: usize,
105
    const DEDUP: bool,
106
> Default for GroupedGapsCodec<E, S, D, OUTDEGREE_CODE, SRC_CODE, DST_CODE, DEDUP>
107
where
108
    BitReader<E>: BitRead<E>,
109
    BitWriter<E>: BitWrite<E>,
110
{
111
    fn default() -> Self {
152✔
112
        Self {
113
            serializer: S::default(),
304✔
114
            deserializer: D::default(),
152✔
115
            _marker: core::marker::PhantomData,
116
        }
117
    }
118
}
119

120
/// Statistics about the encoding performed by [`GroupedGapsCodec`].
121
#[derive(Debug, Clone, Copy)]
122
pub struct GroupedGapsStats {
123
    /// Total number of triples encoded
124
    pub total_triples: usize,
125
    /// Number of bits used for outdegrees
126
    pub outdegree_bits: usize,
127
    /// Number of bits used for source gaps
128
    pub src_bits: usize,
129
    /// Number of bits used for destination gaps
130
    pub dst_bits: usize,
131
    /// Number of bits used for labels
132
    pub labels_bits: usize,
133
}
134

135
impl core::fmt::Display for GroupedGapsStats {
136
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
×
137
        write!(
×
138
            f,
×
139
            "outdegree: {}B ({:.3} bits / arc), src: {}B ({:.3} bits / arc), dst: {}B ({:.3} bits / arc), labels: {}B ({:.3} bits / arc)",
140
            humanize(self.outdegree_bits as f64 / 8.0),
×
141
            self.outdegree_bits as f64 / self.total_triples as f64,
×
142
            humanize(self.src_bits as f64 / 8.0),
×
143
            self.src_bits as f64 / self.total_triples as f64,
×
144
            humanize(self.dst_bits as f64 / 8.0),
×
145
            self.dst_bits as f64 / self.total_triples as f64,
×
146
            humanize(self.labels_bits as f64 / 8.0),
×
147
            self.labels_bits as f64 / self.total_triples as f64,
×
148
        )
149
    }
150
}
151

152
impl<
153
    E,
154
    S,
155
    D,
156
    const OUTDEGREE_CODE: usize,
157
    const SRC_CODE: usize,
158
    const DST_CODE: usize,
159
    const DEDUP: bool,
160
> BatchCodec for GroupedGapsCodec<E, S, D, OUTDEGREE_CODE, SRC_CODE, DST_CODE, DEDUP>
161
where
162
    E: Endianness,
163
    S: BitSerializer<E, BitWriter<E>> + Send + Sync,
164
    D: BitDeserializer<E, BitReader<E>, DeserType = S::SerType> + Send + Sync + Clone,
165
    S::SerType: Send + Sync + Copy + 'static, // needed by radix sort
166
    BitReader<E>: BitRead<E> + CodesRead<E>,
167
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
168
{
169
    type Label = S::SerType;
170
    type DecodedBatch = GroupedGapsIter<E, D, OUTDEGREE_CODE, SRC_CODE, DST_CODE>;
171
    type EncodedBatchStats = GroupedGapsStats;
172

173
    fn encode_batch(
544✔
174
        &self,
175
        path: impl AsRef<std::path::Path>,
176
        batch: &mut [((usize, usize), Self::Label)],
177
    ) -> Result<(usize, Self::EncodedBatchStats)> {
178
        let start = std::time::Instant::now();
1,088✔
179
        Triple::cast_batch_mut(batch).radix_sort_unstable();
1,088✔
180
        log::debug!("Sorted {} arcs in {:?}", batch.len(), start.elapsed());
544✔
181
        self.encode_sorted_batch(path, batch)
2,176✔
182
    }
183

184
    fn encode_sorted_batch(
544✔
185
        &self,
186
        path: impl AsRef<std::path::Path>,
187
        batch: &[((usize, usize), Self::Label)],
188
    ) -> Result<(usize, Self::EncodedBatchStats)> {
189
        debug_assert!(Triple::cast_batch(batch).is_sorted(), "Batch is not sorted");
1,632✔
190
        // create a bitstream to write to the file
191
        let file_path = path.as_ref();
1,632✔
192
        let mut stream = buf_bit_writer::from_path::<E, usize>(file_path).with_context(|| {
2,176✔
193
            format!(
×
194
                "Could not create BatchIterator temporary file {}",
×
195
                file_path.display()
×
196
            )
197
        })?;
198

199
        // Pre-count unique pairs for the length prefix when deduplicating
200
        let batch_len = if DEDUP {
1,088✔
201
            if batch.is_empty() {
316✔
202
                0
42✔
203
            } else {
204
                1 + batch.windows(2).filter(|w| w[0].0 != w[1].0).count()
62,574,574✔
205
            }
206
        } else {
207
            batch.len()
772✔
208
        };
209

210
        // prefix the stream with the length of the batch
211
        // we use a delta code since it'll be a big number most of the time
212
        stream
544✔
213
            .write_delta(batch_len as u64)
1,088✔
214
            .context("Could not write length")?;
215

216
        let mut stats = GroupedGapsStats {
217
            total_triples: batch_len,
218
            outdegree_bits: 0,
219
            src_bits: 0,
220
            dst_bits: 0,
221
            labels_bits: 0,
222
        };
223
        // dump the triples to the bitstream
224
        let mut prev_src = 0;
1,088✔
225
        let mut i = 0;
1,088✔
226
        while i < batch.len() {
6,537,346✔
227
            let ((src, _), _) = batch[i];
6,536,258✔
228
            // write the source gap as gamma
229
            stats.src_bits += ConstCode::<SRC_CODE>
3,268,129✔
230
                .write(&mut stream, (src - prev_src) as _)
9,804,387✔
231
                .with_context(|| format!("Could not write {src} after {prev_src}"))?;
3,268,129✔
232
            // figure out how many edges have this source
233
            let group_size = batch[i..].iter().take_while(|t| t.0.0 == src).count();
124,391,955✔
234

235
            // compute outdegree (unique destinations when deduplicating)
236
            let outdegree = if DEDUP {
6,536,258✔
237
                let group = &batch[i..i + group_size];
6,755,936✔
238
                if group.is_empty() {
3,377,968✔
239
                    0
×
240
                } else {
241
                    1 + group.windows(2).filter(|w| w[0].0.1 != w[1].0.1).count()
65,952,310✔
242
                }
243
            } else {
244
                group_size
1,579,145✔
245
            };
246

247
            // write the outdegree
248
            stats.outdegree_bits += ConstCode::<OUTDEGREE_CODE>
3,268,129✔
249
                .write(&mut stream, outdegree as _)
9,804,387✔
250
                .with_context(|| format!("Could not write outdegree {outdegree} for {src}"))?;
3,268,129✔
251

252
            // encode the destinations
253
            let mut prev_dst = 0;
6,536,258✔
254
            let mut last_written_dst: Option<usize> = None;
9,804,387✔
255
            let end = i + group_size;
6,536,258✔
256
            while i < end {
54,026,090✔
257
                let ((_, dst), label) = &batch[i];
152,273,883✔
258
                if DEDUP && last_written_dst == Some(*dst) {
82,045,132✔
259
                    i += 1;
3,880,552✔
260
                    continue;
3,880,552✔
261
                }
262
                if DEDUP {
74,284,028✔
263
                    last_written_dst = Some(*dst);
27,406,619✔
264
                }
265
                // write the destination gap as gamma
266
                stats.dst_bits += ConstCode::<DST_CODE>
46,877,409✔
267
                    .write(&mut stream, (dst - prev_dst) as _)
140,632,227✔
268
                    .with_context(|| format!("Could not write {dst} after {prev_dst}"))?;
46,877,409✔
269
                // write the label
270
                stats.labels_bits += self
46,877,409✔
271
                    .serializer
46,877,409✔
272
                    .serialize(label, &mut stream)
140,632,227✔
273
                    .context("Could not serialize label")?;
46,877,409✔
274
                prev_dst = *dst;
46,877,409✔
275
                i += 1;
46,877,409✔
276
            }
277
            prev_src = src;
3,268,129✔
278
        }
279
        // flush the stream and reset the buffer
280
        stream.flush().context("Could not flush stream")?;
1,632✔
281

282
        let total_bits = stats.outdegree_bits + stats.src_bits + stats.dst_bits + stats.labels_bits;
1,088✔
283
        Ok((total_bits, stats))
544✔
284
    }
285

286
    fn decode_batch(&self, path: impl AsRef<std::path::Path>) -> Result<Self::DecodedBatch> {
544✔
287
        // open the file
288
        let mut stream = <BufBitReader<E, _>>::new(MemWordReader::new(ArcMmapHelper(Arc::new(
2,176✔
289
            MmapHelper::mmap(
544✔
290
                path.as_ref(),
1,088✔
291
                MmapFlags::TRANSPARENT_HUGE_PAGES | MmapFlags::SEQUENTIAL,
544✔
292
            )
293
            .with_context(|| format!("Could not mmap {}", path.as_ref().display()))?,
544✔
294
        ))));
295

296
        // read the length of the batch (first value in the stream)
297
        let len = stream.read_delta().context("Could not read length")? as usize;
2,176✔
298

299
        // create the iterator
300
        Ok(GroupedGapsIter {
544✔
301
            deserializer: self.deserializer.clone(),
1,632✔
302
            stream,
544✔
303
            len,
544✔
304
            current: 0,
544✔
305
            src: 0,
544✔
306
            dst_left: 0,
544✔
307
            prev_dst: 0,
544✔
308
        })
309
    }
310
}
311

312
/// An iterator over triples encoded with gaps, this is returned by [`GroupedGapsCodec`].
313
#[derive(Clone, Debug)]
314
pub struct GroupedGapsIter<
315
    E: Endianness = NE,
316
    D: BitDeserializer<E, BitReader<E>> = (),
317
    const OUTDEGREE_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA },
318
    const SRC_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA },
319
    const DST_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA },
320
> where
321
    BitReader<E>: BitRead<E>,
322
    BitWriter<E>: BitWrite<E>,
323
{
324
    /// Deserializer for the labels
325
    deserializer: D,
326
    /// Bitstream to read from
327
    stream: BitReader<E>,
328
    /// Length of the iterator (number of triples)
329
    len: usize,
330
    /// Current position in the iterator
331
    current: usize,
332
    /// Current source node
333
    src: usize,
334
    /// Number of destinations left for the current source
335
    dst_left: usize,
336
    /// Previous destination node
337
    prev_dst: usize,
338
}
339

340
// SAFETY: gaps are decoded in non-decreasing (src, dst) order.
341
unsafe impl<
342
    E: Endianness,
343
    D: BitDeserializer<E, BitReader<E>>,
344
    const OUTDEGREE_CODE: usize,
345
    const SRC_CODE: usize,
346
    const DST_CODE: usize,
347
> SortedIterator for GroupedGapsIter<E, D, OUTDEGREE_CODE, SRC_CODE, DST_CODE>
348
where
349
    BitReader<E>: BitRead<E> + CodesRead<E>,
350
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
351
{
352
}
353

354
impl<
355
    E: Endianness,
356
    D: BitDeserializer<E, BitReader<E>>,
357
    const OUTDEGREE_CODE: usize,
358
    const SRC_CODE: usize,
359
    const DST_CODE: usize,
360
> Iterator for GroupedGapsIter<E, D, OUTDEGREE_CODE, SRC_CODE, DST_CODE>
361
where
362
    BitReader<E>: BitRead<E> + CodesRead<E>,
363
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
364
{
365
    type Item = ((usize, usize), D::DeserType);
366
    fn next(&mut self) -> Option<Self::Item> {
46,876,936✔
367
        if self.current >= self.len {
46,876,936✔
368
            return None;
533✔
369
        }
370
        if self.dst_left == 0 {
46,876,403✔
371
            // read a new source
372
            let src_gap = ConstCode::<SRC_CODE>.read(&mut self.stream).ok()?;
16,337,265✔
373
            self.src += src_gap as usize;
3,267,453✔
374
            // read the outdegree
375
            self.dst_left = ConstCode::<OUTDEGREE_CODE>.read(&mut self.stream).ok()? as usize;
13,069,812✔
376
            self.prev_dst = 0;
3,267,453✔
377
        }
378

379
        let dst_gap = ConstCode::<DST_CODE>.read(&mut self.stream).ok()?;
234,382,015✔
380
        let label = self.deserializer.deserialize(&mut self.stream).ok()?;
234,382,015✔
381
        self.prev_dst += dst_gap as usize;
46,876,403✔
382
        self.current += 1;
46,876,403✔
383
        self.dst_left -= 1;
46,876,403✔
384
        Some(((self.src, self.prev_dst), label))
46,876,403✔
385
    }
386

387
    fn size_hint(&self) -> (usize, Option<usize>) {
2✔
388
        (self.len(), Some(self.len()))
6✔
389
    }
390

391
    fn count(self) -> usize {
×
392
        self.len()
×
393
    }
394
}
395

396
impl<
397
    E: Endianness,
398
    D: BitDeserializer<E, BitReader<E>>,
399
    const OUTDEGREE_CODE: usize,
400
    const SRC_CODE: usize,
401
    const DST_CODE: usize,
402
> ExactSizeIterator for GroupedGapsIter<E, D, OUTDEGREE_CODE, SRC_CODE, DST_CODE>
403
where
404
    BitReader<E>: BitRead<E> + CodesRead<E>,
405
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
406
{
407
    fn len(&self) -> usize {
5✔
408
        self.len - self.current
5✔
409
    }
410
}
411

412
impl<
413
    E: Endianness,
414
    D: BitDeserializer<E, BitReader<E>>,
415
    const OUTDEGREE_CODE: usize,
416
    const SRC_CODE: usize,
417
    const DST_CODE: usize,
418
> core::iter::FusedIterator for GroupedGapsIter<E, D, OUTDEGREE_CODE, SRC_CODE, DST_CODE>
419
where
420
    BitReader<E>: BitRead<E> + CodesRead<E>,
421
    BitWriter<E>: BitWrite<E> + CodesWrite<E>,
422
{
423
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc