• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

diffed-places / pipeline / 21910329452

11 Feb 2026 03:02PM UTC coverage: 95.994% (-0.003%) from 95.997%
21910329452

push

github

web-flow
Clean up coverage computation (#130)

35 of 36 new or added lines in 2 files covered. (97.22%)

1 existing line in 1 file now uncovered.

1198 of 1248 relevant lines covered (95.99%)

93.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.66
/src/coverage.rs
1
pub use reader::Coverage;
2
pub use writer::build_coverage;
3

4
/// The granularity of S2 cells we use to represent spatial coverage.
5
///
6
/// At level 19, An S2 cell is about 15 to 20 meters wide, see [S2 Cell
7
/// Statistics](https://s2geometry.io/resources/s2cell_statistics.html).
8
/// For an interactive visualization, see [S2 Region Coverer Online
9
// Viewer](https://igorgatis.github.io/ws2/ for a visualization).
10
const S2_GRANULARITY_LEVEL: u8 = 19;
11

12
/// To store the coverage map as a bitvector in a more compact form,
13
/// we do not store the actual S2 cell IDs because this would lead
14
/// to a sparse bitvector without contiguous runs. Rather, we shift the
15
/// unsigned 64-bit integer ids to the right, so that neighboring
16
/// parent cells (at our finest granularity, S2_GRANULARITY_LEVEL)
17
/// become neighboring bits in the bitvector. This leads to a better
18
/// run-length encoding. S2 cell ids use the most significant three
19
/// bits to encode the cube face [0..5], and then two bits for each
20
/// level of granularity.
21
const S2_CELL_ID_SHIFT: u8 = 64 - (3 + 2 * S2_GRANULARITY_LEVEL);
22

23
mod reader {
24
    use super::S2_CELL_ID_SHIFT;
25
    use anyhow::{Ok, Result, anyhow};
26
    use memmap2::Mmap;
27
    use s2::cellid::CellID;
28
    use std::fs::File;
29
    use std::path::Path;
30

31
    pub struct Coverage<'a> {
32
        /// Backing store for `mmap`.
33
        _file: File,
34

35
        /// Backing store for `run_starts`, `run_lengths` and `wikidata_ids`.
36
        _mmap: Mmap,
37

38
        /// Run-length encoded s2 cell ids, shifted by S2_CELL_ID_SHIFT.
39
        run_starts: &'a [u64],
40
        run_lengths: &'a [u8],
41

42
        /// Sorted array of covered wikidata ids.
43
        wikidata_ids: &'a [u64],
44
    }
45

46
    impl<'a> Coverage<'a> {
47
        pub fn contains_s2_cell(&self, cell: &CellID) -> bool {
1,807✔
48
            let val: u64 = cell.0 >> S2_CELL_ID_SHIFT;
1,807✔
49
            let index = if cfg!(target_endian = "little") {
1,807✔
50
                self.run_starts.partition_point(|&x| x <= val)
12,581✔
51
            } else {
NEW
52
                self.run_starts.partition_point(|&x| x.swap_bytes() <= val)
×
53
            };
54
            if index > 0 {
1,807✔
55
                let start = self.run_starts[index - 1];
1,804✔
56
                let limit = start + self.run_lengths[index - 1] as u64;
1,804✔
57
                start <= val && val <= limit
1,804✔
58
            } else {
59
                false
3✔
60
            }
61
        }
1,807✔
62

63
        #[allow(dead_code)] // TODO: Remove attribute once we use this function.
64
        pub fn contains_wikidata_item(&self, id: u64) -> bool {
6✔
65
            if cfg!(target_endian = "little") {
6✔
66
                self.wikidata_ids.binary_search(&id).is_ok()
6✔
67
            } else {
68
                self.wikidata_ids
×
69
                    .binary_search_by(|x| x.swap_bytes().cmp(&id))
×
70
                    .is_ok()
×
71
            }
72
        }
6✔
73

74
        pub fn load(path: &Path) -> Result<Coverage<'_>> {
4✔
75
            let file = File::open(path)?;
4✔
76

77
            // SAFETY: We don’t truncate the file while it’s mapped into memory.
78
            let mmap = unsafe { Mmap::map(&file)? };
4✔
79

80
            Self::check_signature(&mmap)?;
4✔
81

82
            let run_starts = {
4✔
83
                let (offset, size) = Self::get_offset_size(b"runstart", &mmap, 8)
4✔
84
                    .ok_or(anyhow!("no run starts in coverage file {:?}", path))?;
4✔
85
                // SAFETY: Alignment and bounds checked by get_offset_size().
86
                unsafe {
87
                    let ptr = mmap.as_ptr().add(offset) as *const u64;
4✔
88
                    std::slice::from_raw_parts(ptr, size / 8)
4✔
89
                }
90
            };
91

92
            let run_lengths = {
4✔
93
                let (offset, size) = Self::get_offset_size(b"runlengt", &mmap, 1)
4✔
94
                    .ok_or(anyhow!("no run lengths in coverage file {:?}", path))?;
4✔
95
                // SAFETY: Bounds checked by get_offset_size().
96
                unsafe { std::slice::from_raw_parts(mmap.as_ptr().add(offset), size) }
4✔
97
            };
98

99
            if run_starts.len() != run_lengths.len() {
4✔
UNCOV
100
                return Err(anyhow!("inconsistent number of runs in {:?}", path));
×
101
            }
4✔
102

103
            let wikidata_ids = {
4✔
104
                let (offset, size) = Self::get_offset_size(b"wikidata", &mmap, 8)
4✔
105
                    .ok_or(anyhow!("no wikidata ids in coverage file {:?}", path))?;
4✔
106
                // SAFETY: Alignment and bounds checked by get_offset_size().
107
                unsafe {
108
                    let ptr = mmap.as_ptr().add(offset) as *const u64;
4✔
109
                    std::slice::from_raw_parts(ptr, size / 8)
4✔
110
                }
111
            };
112

113
            Ok(Coverage {
4✔
114
                _file: file,
4✔
115
                _mmap: mmap,
4✔
116
                run_starts,
4✔
117
                run_lengths,
4✔
118
                wikidata_ids,
4✔
119
            })
4✔
120
        }
4✔
121

122
        fn check_signature(data: &[u8]) -> Result<()> {
8✔
123
            if data.len() < 24 || &data[0..24] != b"diffed-places coverage\0\0" {
8✔
124
                return Err(anyhow!("malformed coverage file"));
3✔
125
            }
5✔
126
            Ok(())
5✔
127
        }
8✔
128

129
        // This implementation only works on 64-bit processors. Theoretically,
130
        // we could write a special implementation for 32-bit platforms, which
131
        // would have to restrict its input to files smaller than 4 GiB.
132
        // However, we’re not doing a retro-computing project, so we’ll never
133
        // execute our code on such machines.
134
        #[cfg(target_pointer_width = "64")]
135
        fn get_offset_size(id: &[u8; 8], bytes: &[u8], alignment: usize) -> Option<(usize, usize)> {
21✔
136
            if bytes.len() < 32 {
21✔
137
                return None;
×
138
            }
21✔
139
            let num_headers = u64::from_le_bytes(bytes[24..32].try_into().ok()?) as usize;
21✔
140
            if bytes.len() < 32 + num_headers * 24 {
21✔
141
                return None;
1✔
142
            }
20✔
143
            for i in 0..num_headers {
37✔
144
                let pos = 32 + i * 24;
37✔
145
                if *id == bytes[pos..pos + 8] {
37✔
146
                    let offset =
19✔
147
                        u64::from_le_bytes(bytes[pos + 8..pos + 16].try_into().ok()?) as usize;
19✔
148
                    let size =
19✔
149
                        u64::from_le_bytes(bytes[pos + 16..pos + 24].try_into().ok()?) as usize;
19✔
150
                    if bytes.len() >= offset + size
19✔
151
                        && offset.is_multiple_of(alignment)
19✔
152
                        && size.is_multiple_of(alignment)
16✔
153
                    {
154
                        return Some((offset, size));
16✔
155
                    }
3✔
156
                }
18✔
157
            }
158
            None
4✔
159
        }
21✔
160
    }
161

162
    #[cfg(test)]
163
    mod tests {
164
        use super::Coverage;
165

166
        #[test]
167
        fn test_check_signature() {
1✔
168
            assert!(Coverage::check_signature(b"").is_err());
1✔
169
            assert!(Coverage::check_signature(b"foo").is_err());
1✔
170
            assert!(Coverage::check_signature(b"diffed-places coverage\0\x01").is_err());
1✔
171
            assert!(Coverage::check_signature(b"diffed-places coverage\0\0\0\0\0\0").is_ok());
1✔
172
        }
1✔
173

174
        #[test]
175
        fn test_get_offset_size() {
1✔
176
            // Construct a coverage file with two keys in its header.
177
            let mut bytes = Vec::new();
1✔
178
            bytes.extend_from_slice(b"diffed-places coverage\0\0");
1✔
179
            bytes.extend_from_slice(&2_u64.to_le_bytes());
1✔
180
            assert!(Coverage::get_offset_size(b"some_key", &bytes, 1) == None);
1✔
181
            for (key, offset, size) in [(b"some_key", 80, 16), (b"otherkey", 83, 2)] {
2✔
182
                bytes.extend_from_slice(key as &[u8; 8]);
2✔
183
                bytes.extend_from_slice(&(offset as u64).to_le_bytes());
2✔
184
                bytes.extend_from_slice(&(size as u64).to_le_bytes());
2✔
185
            }
2✔
186
            bytes.extend_from_slice(&0xdeadbeefcafefeed_u64.to_le_bytes());
1✔
187
            bytes.extend_from_slice(&42_u64.to_le_bytes());
1✔
188
            let bytes = bytes.as_ref();
1✔
189

190
            // The file header has no entry for key "in-exist".
191
            assert_eq!(Coverage::get_offset_size(b"in-exist", bytes, 1), None);
1✔
192

193
            // The data for "some_key" starts at offset 80 and is 16 bytes, which is
194
            // correctly aligned for single-byte, 8-byte and 16-byte access.
195
            // However, it would be unsafe (at least on RISC CPUs) to perform 32-byte
196
            // access, eg. loading a SIMD register, because 80 is not divisible by 32.
197
            // Also, the data size is too small to read 32-byte entitites anyway.
198
            assert_eq!(
1✔
199
                Coverage::get_offset_size(b"some_key", bytes, 1),
1✔
200
                Some((80, 16))
201
            );
202
            assert_eq!(
1✔
203
                Coverage::get_offset_size(b"some_key", bytes, 2),
1✔
204
                Some((80, 16))
205
            );
206
            assert_eq!(
1✔
207
                Coverage::get_offset_size(b"some_key", bytes, 8),
1✔
208
                Some((80, 16))
209
            );
210
            assert_eq!(Coverage::get_offset_size(b"some_key", bytes, 32), None);
1✔
211

212
            // The data for "otherkey" starts at offset 83 and is 2 bytes long.
213
            // Access is only safe with single-byte alignment.
214
            assert_eq!(
1✔
215
                Coverage::get_offset_size(b"otherkey", bytes, 1),
1✔
216
                Some((83, 2))
217
            );
218
            assert_eq!(Coverage::get_offset_size(b"otherkey", bytes, 2), None);
1✔
219
            assert_eq!(Coverage::get_offset_size(b"otherkey", bytes, 8), None);
1✔
220
        }
1✔
221
    }
222
}
223

224
mod writer {
225
    use super::{Coverage, S2_CELL_ID_SHIFT, S2_GRANULARITY_LEVEL};
226
    use crate::PROGRESS_BAR_STYLE;
227
    use anyhow::{Ok, Result, anyhow};
228
    use ext_sort::{ExternalSorter, ExternalSorterBuilder, buffer::LimitedBufferBuilder};
229
    use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
230
    use parquet::file::reader::{FileReader, SerializedFileReader};
231
    use parquet::record::RowAccessor;
232
    use parquet::schema::types::Type;
233
    use rayon::prelude::*;
234
    use regex::Regex;
235
    use s2::{
236
        cap::Cap,
237
        cell::Cell,
238
        cellid::CellID,
239
        region::RegionCoverer,
240
        s1::{Angle, ChordAngle},
241
    };
242
    use std::collections::HashSet;
243
    use std::fs::{File, remove_file, rename};
244
    use std::hash::{BuildHasherDefault, Hasher};
245
    use std::io::{BufReader, BufWriter, Seek, SeekFrom, Write};
246
    use std::path::{Path, PathBuf};
247
    use std::sync::atomic::{AtomicU64, Ordering};
248
    use std::sync::mpsc::{Receiver, SyncSender, sync_channel};
249

250
    /// Computes the spatial coverage of a set of places.
251
    pub fn build_coverage(atp: &Path, progress: &MultiProgress, workdir: &Path) -> Result<PathBuf> {
2✔
252
        assert!(atp.exists());
2✔
253

254
        let out = workdir.join("coverage");
2✔
255
        if out.exists() {
2✔
256
            log::info!("skipping build_coverage, already found {:?}", out);
×
257
            return Ok(out);
×
258
        } else {
259
            log::info!("build_coverage is starting");
2✔
260
        }
261

262
        let mut tmp = PathBuf::from(&out);
2✔
263
        tmp.add_extension("tmp");
2✔
264
        let mut writer = CoverageWriter::try_new(&tmp)?;
2✔
265

266
        // To avoid deadlock, we must not use Rayon threads here.
267
        // https://dev.to/sgchris/scoped-threads-with-stdthreadscope-in-rust-163-48f9
268
        let (cell_tx, cell_rx) = sync_channel::<CellID>(50_000);
2✔
269
        let (wikidata_tx, wikidata_rx) = sync_channel::<u64>(1000);
2✔
270

271
        // The AllThePlaces dump of 2026-01-03 references 3791 unique wikidata ids.
272
        let mut wikidata_ids = Vec::<u64>::new();
2✔
273
        std::thread::scope(|s| {
2✔
274
            let producer = s.spawn(|| read_places(atp, progress, cell_tx, wikidata_tx));
2✔
275
            let cell_consumer = s.spawn(|| build_spatial_coverage(cell_rx, progress, &mut writer));
2✔
276
            let wikidata_consumer = s.spawn(|| {
2✔
277
                wikidata_ids = collect_wikidata_ids(wikidata_rx);
2✔
278
                Ok(())
2✔
279
            });
2✔
280
            producer
2✔
281
                .join()
2✔
282
                .expect("producer panic")
2✔
283
                .and(cell_consumer.join().expect("cell consumer panic"))
2✔
284
                .and(wikidata_consumer.join().expect("wikidata consumer panic"))
2✔
285
        })?;
2✔
286

287
        writer.set_wikidata_ids(wikidata_ids);
2✔
288
        writer.close()?;
2✔
289
        rename(&tmp, &out)?;
2✔
290

291
        // As a sanity check, let’s try to open the freshly generated file.
292
        // This verifies the presence of the correct file signature, and that
293
        // the run_starts and run_lengths array are correctly aligned and
294
        // within allowable bounds.
295
        _ = Coverage::load(&out)?;
2✔
296

297
        log::info!("build_spatial_coverage finished, built {:?}", out);
2✔
298
        Ok(out)
2✔
299
    }
2✔
300

301
    fn read_places(
2✔
302
        places: &Path,
2✔
303
        progress: &MultiProgress,
2✔
304
        out_cells: SyncSender<CellID>,
2✔
305
        out_wikidata_ids: SyncSender<u64>,
2✔
306
    ) -> Result<()> {
2✔
307
        use anyhow::Context;
308
        let file =
2✔
309
            File::open(places).with_context(|| format!("could not open file `{:?}`", places))?;
2✔
310
        let reader = SerializedFileReader::new(file)?;
2✔
311
        let metadata = reader.metadata();
2✔
312
        let file_metadata = metadata.file_metadata();
2✔
313
        let schema = file_metadata.schema();
2✔
314
        let s2_cell_id_column = column_index("s2_cell_id", schema)?;
2✔
315
        // let source_column = column_index("source", schema)?;
316
        let tags_column = column_index("tags", schema)?;
2✔
317
        let num_row_groups = reader.num_row_groups();
2✔
318
        let num_rows: i64 = file_metadata.num_rows();
2✔
319
        let num_rows: u64 = if num_rows < 0 { 0 } else { num_rows as u64 };
2✔
320

321
        let large_radius = meters_to_chord_angle(100.0);
2✔
322
        let small_radius = meters_to_chord_angle(10.0);
2✔
323
        let coverer = RegionCoverer {
2✔
324
            max_cells: 8,
2✔
325
            min_level: S2_GRANULARITY_LEVEL,
2✔
326
            max_level: S2_GRANULARITY_LEVEL,
2✔
327
            level_mod: 1,
2✔
328
        };
2✔
329

330
        let wikidata_regex = Regex::new(r"[Qq]\d+")?;
2✔
331

332
        let bar = progress.add(ProgressBar::new(num_rows));
2✔
333
        bar.set_style(ProgressStyle::with_template(PROGRESS_BAR_STYLE)?);
2✔
334
        bar.set_prefix("cov.read ");
2✔
335
        bar.set_message("features");
2✔
336

337
        (0..num_row_groups)
2✔
338
            .into_par_iter()
2✔
339
            .try_for_each(|row_group_index| {
2✔
340
                // Because Apache’s implementation of Parquet is not
341
                // thread-safe, but the alternative implementation in
342
                // arrow2 is deprecated, we let each worker thread have
343
                // its own SerializedFileReader, each reading one row
344
                // group in the same Parquet file. This is a little
345
                // wasteful, but it’s actually not too bad.  In our
346
                // Parquet file for the full AllThePlace dump of
347
                // 2026-01-03, there were 24 row groups in total.
348
                // An earlier version of this code was using the
349
                // alternative implementation of the parquet2 crate,
350
                // but our application code got awfully complicated
351
                // when using that low-level library.
352
                let reader = SerializedFileReader::new(File::open(places)?)?;
2✔
353
                let row_group = reader.get_row_group(row_group_index)?;
2✔
354
                for row in row_group.get_row_iter(None)? {
14✔
355
                    let row = row?;
14✔
356
                    let s2_cell = Cell::from(CellID(row.get_ulong(s2_cell_id_column)?));
14✔
357
                    // let source = row.get_string(source_column)?;
358
                    let tags = row.get_map(tags_column)?.entries();
14✔
359
                    let mut radius = small_radius;
14✔
360
                    for (key, value) in tags.iter() {
136✔
361
                        use parquet::record::Field::Str;
362
                        if let (Str(key), Str(value)) = (key, value) {
136✔
363
                            radius = radius.max(match (key.as_ref(), value.as_ref()) {
136✔
364
                                ("shop", _) => large_radius,
136✔
365
                                ("tourism", _) => large_radius,
130✔
366
                                ("public_transport", "platform") => large_radius,
130✔
367
                                ("railway", "platform") => large_radius,
130✔
368
                                (_, _) => small_radius,
130✔
369
                            });
370
                            if key == "wikidata" || key.ends_with(":wikidata") {
136✔
371
                                for id in wikidata_regex.find_iter(value).map(|m| {
14✔
372
                                    m.as_str()[1..].parse::<u64>().unwrap_or_else(|_| {
14✔
373
                                        panic!("regex captured non-digits in \"{}\"", m.as_str())
×
374
                                    })
375
                                }) {
14✔
376
                                    out_wikidata_ids.send(id)?;
14✔
377
                                }
378
                            }
122✔
379
                        }
×
380
                    }
381
                    let cap = Cap::from_center_chordangle(&s2_cell.center(), &radius);
14✔
382
                    for cell_id in coverer.covering(&cap).0.into_iter() {
976✔
383
                        out_cells.send(cell_id)?;
976✔
384
                    }
385
                    bar.inc(1);
14✔
386
                }
387

388
                Ok(())
2✔
389
            })?;
2✔
390

391
        bar.finish();
2✔
392
        Ok(())
2✔
393
    }
2✔
394

395
    fn column_index(name: &str, schema: &Type) -> Result<usize> {
4✔
396
        for (i, field) in schema.get_fields().iter().enumerate() {
8✔
397
            if field.name() == name {
8✔
398
                return Ok(i);
4✔
399
            }
4✔
400
        }
401
        Err(anyhow!("column \"{}\" not found", name))
×
402
    }
4✔
403

404
    fn meters_to_chord_angle(radius_meters: f64) -> ChordAngle {
4✔
405
        use s2::s1::angle::Rad;
406
        const EARTH_RADIUS_METERS: f64 = 6_371_000.0;
407
        ChordAngle::from(Angle::from(Rad(radius_meters / EARTH_RADIUS_METERS)))
4✔
408
    }
4✔
409

410
    /// Builds a spatial coverage file from a stream of s2::CellIDs.
411
    fn build_spatial_coverage(
2✔
412
        cells: Receiver<CellID>,
2✔
413
        progress: &MultiProgress,
2✔
414
        writer: &mut CoverageWriter,
2✔
415
    ) -> Result<()> {
2✔
416
        let num_cells = AtomicU64::new(0);
2✔
417
        let sorter: ExternalSorter<CellID, std::io::Error, LimitedBufferBuilder> =
2✔
418
            ExternalSorterBuilder::new()
2✔
419
                .with_tmp_dir(Path::new("./"))
2✔
420
                .with_buffer(LimitedBufferBuilder::new(
2✔
421
                    1_000_000, /* preallocate */ true,
422
                ))
423
                .build()?;
2✔
424
        let sorted = sorter.sort(cells.iter().map(|x| {
976✔
425
            num_cells.fetch_add(1, Ordering::SeqCst);
976✔
426
            std::io::Result::Ok(x)
976✔
427
        }))?;
976✔
428

429
        let bar = progress.add(ProgressBar::new(num_cells.load(Ordering::SeqCst)));
2✔
430
        bar.set_style(ProgressStyle::with_template(PROGRESS_BAR_STYLE)?);
2✔
431
        bar.set_prefix("cov.write");
2✔
432
        bar.set_message("s2 cells");
2✔
433

434
        for cur in sorted {
976✔
435
            writer.write(cur?.0 >> S2_CELL_ID_SHIFT)?;
976✔
436
            bar.inc(1);
976✔
437
        }
438

439
        bar.finish();
2✔
440

441
        Ok(())
2✔
442
    }
2✔
443

444
    fn collect_wikidata_ids(stream: Receiver<u64>) -> Vec<u64> {
2✔
445
        // The AllThePlaces dump of 2026-01-03 references 3791 unique wikidata ids.
446
        let mut ids = HashSet::with_capacity_and_hasher(8192, NoOpBuildHasher::default());
2✔
447
        let mut last: u64 = 0; // not a valid wikidata id
2✔
448
        for id in stream {
14✔
449
            if id != last {
14✔
450
                ids.insert(id);
6✔
451
                last = id;
6✔
452
            }
8✔
453
        }
454

455
        let mut sorted_ids: Vec<u64> = ids.into_iter().collect();
2✔
456
        sorted_ids.sort();
2✔
457
        sorted_ids
2✔
458
    }
2✔
459

460
    struct CoverageWriter {
461
        writer: BufWriter<File>,
462
        run_starts_pos: u64, // offset of run_starts array relative to start of file
463

464
        run_lengths_path: PathBuf,
465
        run_lengths_writer: BufWriter<File>,
466

467
        num_values: u64,
468
        num_runs: u64,
469
        run_start: Option<u64>,
470
        run_length_minus_1: u8,
471

472
        wikidata_ids: Vec<u64>,
473
    }
474

475
    impl CoverageWriter {
476
        const NUM_HEADERS: usize = 3;
477

478
        fn try_new(path: &Path) -> Result<CoverageWriter> {
3✔
479
            let file = File::create(path)?;
3✔
480
            let mut writer = BufWriter::with_capacity(32768, file);
3✔
481

482
            // Write file header.
483
            writer.write_all(b"diffed-places coverage\0\0")?;
3✔
484
            writer.write_all(&(Self::NUM_HEADERS as u64).to_le_bytes())?;
3✔
485
            writer.write_all(&[0; 24 * Self::NUM_HEADERS])?; // leave space for headers
3✔
486

487
            let run_starts_pos = writer.stream_position()?;
3✔
488
            let run_lengths_path = path.with_extension("tmp_run_lengths");
3✔
489
            let run_lengths_file = File::create(&run_lengths_path)?;
3✔
490
            let run_lengths_writer = BufWriter::with_capacity(32768, run_lengths_file);
3✔
491

492
            Ok(CoverageWriter {
3✔
493
                writer,
3✔
494
                run_starts_pos,
3✔
495
                run_lengths_path,
3✔
496
                run_lengths_writer,
3✔
497
                num_values: 0,
3✔
498
                num_runs: 0,
3✔
499
                run_start: None,
3✔
500
                run_length_minus_1: 0,
3✔
501
                wikidata_ids: Vec::default(),
3✔
502
            })
3✔
503
        }
3✔
504

505
        fn write(&mut self, value: u64) -> Result<()> {
1,236✔
506
            let Some(run_start) = self.run_start else {
1,236✔
507
                self.num_values = 1;
3✔
508
                self.num_runs = 1;
3✔
509
                self.run_start = Some(value);
3✔
510
                self.run_length_minus_1 = 0;
3✔
511
                return Ok(());
3✔
512
            };
513

514
            let run_end: u64 = run_start + (self.run_length_minus_1 as u64);
1,233✔
515
            assert!(
1,233✔
516
                value >= run_end,
1,233✔
517
                "values not written in sort order: {} after {}",
518
                value,
519
                run_end
520
            );
521

522
            if value == run_end {
1,233✔
523
                // If we write the same value twice, we don’t need to do anything.
524
                return Ok(());
×
525
            } else if value == run_end + 1 && self.run_length_minus_1 < 0xff {
1,233✔
526
                // Extending the length of the current run, if there’s still
527
                // enough space to hold the new length in the available 8 bits.
528
                self.run_length_minus_1 += 1;
1,123✔
529
                self.num_values += 1;
1,123✔
530
                return Ok(());
1,123✔
531
            }
110✔
532

533
            // Start a new run with the current value.
534
            self.finish_run()?;
110✔
535
            self.run_start = Some(value);
110✔
536
            self.run_length_minus_1 = 0;
110✔
537
            self.num_values += 1;
110✔
538
            self.num_runs += 1;
110✔
539
            Ok(())
110✔
540
        }
1,236✔
541

542
        fn set_wikidata_ids(&mut self, ids: Vec<u64>) {
3✔
543
            assert!(ids.is_sorted());
3✔
544
            self.wikidata_ids = ids;
3✔
545
        }
3✔
546

547
        fn write_wikidata_ids(&mut self) -> Result<()> {
3✔
548
            for id in &self.wikidata_ids {
7✔
549
                self.writer.write_all(&id.to_le_bytes())?;
7✔
550
            }
551
            Ok(())
3✔
552
        }
3✔
553

554
        fn close(mut self) -> Result<()> {
3✔
555
            self.finish_run()?;
3✔
556
            self.writer.flush()?;
3✔
557

558
            // Append the run lengths, which are at this point in time in a temporary file,
559
            // to the end of the main file.
560
            let run_lengths_pos = self.writer.stream_position()?;
3✔
561
            self.run_lengths_writer.flush()?;
3✔
562
            assert_eq!(self.run_lengths_writer.stream_position()?, self.num_runs);
3✔
563
            self.run_lengths_writer.seek(SeekFrom::Start(0))?;
3✔
564
            let run_lengths_path: &Path = self.run_lengths_path.as_path();
3✔
565
            let mut reader = BufReader::new(File::open(run_lengths_path)?);
3✔
566
            std::io::copy(&mut reader, &mut self.writer)?;
3✔
567
            remove_file(run_lengths_path)?;
3✔
568

569
            // Append wikidata ids.
570
            self.write_padding(8)?;
3✔
571
            let wikidata_ids_pos = self.writer.stream_position()?;
3✔
572
            self.write_wikidata_ids()?;
3✔
573
            let wikidata_ids_size = self.writer.stream_position()? - wikidata_ids_pos;
3✔
574

575
            self.write_headers(&[
3✔
576
                ("runstart", self.run_starts_pos, self.num_runs * 8),
3✔
577
                ("runlengt", run_lengths_pos, self.num_runs),
3✔
578
                ("wikidata", wikidata_ids_pos, wikidata_ids_size),
3✔
579
            ])?;
3✔
580
            Ok(())
3✔
581
        }
3✔
582

583
        fn write_headers(&mut self, headers: &[(&str, u64, u64)]) -> Result<()> {
3✔
584
            self.writer.seek(SeekFrom::Start(32))?;
3✔
585
            assert_eq!(headers.len(), Self::NUM_HEADERS);
3✔
586
            for (id, pos, len) in headers {
9✔
587
                assert_eq!(
9✔
588
                    id.len(),
9✔
589
                    8,
590
                    "header id must be 8 chars long but \"{:?}\" is not",
591
                    id
592
                );
593
                self.writer.write_all(id.as_bytes())?;
9✔
594
                self.writer.write_all(&pos.to_le_bytes())?;
9✔
595
                self.writer.write_all(&len.to_le_bytes())?;
9✔
596
            }
597
            self.writer.flush()?;
3✔
598
            Ok(())
3✔
599
        }
3✔
600

601
        fn finish_run(&mut self) -> Result<()> {
113✔
602
            let Some(run_start) = self.run_start else {
113✔
603
                return Ok(());
×
604
            };
605
            self.writer.write_all(&run_start.to_le_bytes())?;
113✔
606
            self.run_lengths_writer
113✔
607
                .write_all(&[self.run_length_minus_1])?;
113✔
608
            Ok(())
113✔
609
        }
113✔
610

611
        fn write_padding(&mut self, alignment: usize) -> Result<()> {
3✔
612
            let pos = self.writer.stream_position()?;
3✔
613
            let alignment = alignment as u64;
3✔
614
            let num_bytes = ((alignment - (pos % alignment)) % alignment) as usize;
3✔
615
            if num_bytes > 0 {
3✔
616
                let padding = vec![0; num_bytes];
3✔
617
                self.writer.write_all(&padding)?;
3✔
618
            }
×
619
            Ok(())
3✔
620
        }
3✔
621
    }
622

623
    #[derive(Default)]
624
    struct NoOpHasher(u64);
625

626
    impl Hasher for NoOpHasher {
627
        fn write(&mut self, bytes: &[u8]) {
1✔
628
            if let Some(value) = bytes.last() {
1✔
629
                self.0 = *value as u64;
1✔
630
            }
1✔
631
        }
1✔
632

633
        fn write_u64(&mut self, value: u64) {
7✔
634
            self.0 = value;
7✔
635
        }
7✔
636

637
        fn finish(&self) -> u64 {
8✔
638
            self.0
8✔
639
        }
8✔
640
    }
641

642
    type NoOpBuildHasher = BuildHasherDefault<NoOpHasher>;
643

644
    #[cfg(test)]
645
    mod tests {
646
        use super::super::Coverage;
647
        use super::{CoverageWriter, S2_CELL_ID_SHIFT, S2_GRANULARITY_LEVEL, build_coverage};
648
        use anyhow::{Ok, Result};
649
        use indicatif::{MultiProgress, ProgressDrawTarget};
650
        use s2::cellid::CellID;
651
        use std::path::PathBuf;
652
        use tempfile::{NamedTempFile, TempDir};
653

654
        #[test]
655
        fn test_cell_id_shift() {
1✔
656
            let id = CellID::from_face_pos_level(3, 0x12345678, S2_GRANULARITY_LEVEL as u64);
1✔
657
            let range_len = id.range_max().0 - id.range_min().0 + 2;
1✔
658
            assert_eq!(id.level() as u8, S2_GRANULARITY_LEVEL);
1✔
659
            assert_eq!(range_len.ilog2() as u8, S2_CELL_ID_SHIFT);
1✔
660
        }
1✔
661

662
        #[test]
663
        fn test_build_coverage() -> Result<()> {
1✔
664
            use std::os::unix::fs::symlink;
665
            let mut atp = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
1✔
666
            atp.push("tests/test_data/alltheplaces.parquet");
1✔
667
            let workdir = TempDir::new()?;
1✔
668
            symlink(&atp, workdir.path().join("alltheplaces.parquet"))?;
1✔
669
            let progress = MultiProgress::with_draw_target(ProgressDrawTarget::hidden());
1✔
670
            let coverage = build_coverage(&atp, &progress, workdir.path())?;
1✔
671
            assert!(coverage.exists());
1✔
672
            Ok(())
1✔
673
        }
1✔
674

675
        #[test]
676
        fn test_coverage_writer() -> Result<()> {
1✔
677
            let temp_file = NamedTempFile::new()?;
1✔
678
            let mut writer = CoverageWriter::try_new(temp_file.path())?;
1✔
679
            writer.write(7)?;
1✔
680
            for i in 1000..=1258 {
259✔
681
                writer.write(i)?;
259✔
682
            }
683
            writer.set_wikidata_ids(vec![23, 77, 88]);
1✔
684
            writer.close()?;
1✔
685

686
            let cov = Coverage::load(temp_file.path())?;
1✔
687
            for i in &[0, 5, 6, 8, 9, 999, 1259] {
7✔
688
                let cell = CellID(i << S2_CELL_ID_SHIFT);
7✔
689
                assert!(
7✔
690
                    !cov.contains_s2_cell(&cell),
7✔
691
                    "cell {:?} for i={} should not be covered, but is",
692
                    cell,
693
                    i,
694
                );
695
            }
696
            for i in &[7, 1000, 1001, 1002, 1111, 1254, 1255, 1256, 1257, 1258] {
10✔
697
                let cell = CellID(i << S2_CELL_ID_SHIFT);
10✔
698
                assert!(
10✔
699
                    cov.contains_s2_cell(&cell),
10✔
700
                    "cell {:?} for i={} should be covered, but is not",
701
                    cell,
702
                    i,
703
                );
704
            }
705

706
            assert_eq!(cov.contains_wikidata_item(1), false);
1✔
707
            assert_eq!(cov.contains_wikidata_item(23), true);
1✔
708
            assert_eq!(cov.contains_wikidata_item(51), false);
1✔
709
            assert_eq!(cov.contains_wikidata_item(77), true);
1✔
710
            assert_eq!(cov.contains_wikidata_item(88), true);
1✔
711
            assert_eq!(cov.contains_wikidata_item(89), false);
1✔
712

713
            Ok(())
1✔
714
        }
1✔
715
    }
716

717
    #[test]
718
    fn test_no_op_hasher() {
1✔
719
        let mut h = NoOpHasher(7);
1✔
720

721
        h.write_u64(2600);
1✔
722
        assert_eq!(h.finish(), 2600);
1✔
723

724
        h.write(&[31, 33, 7]);
1✔
725
        assert_eq!(h.finish(), 7);
1✔
726
    }
1✔
727
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc