• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

diffed-places / pipeline / 21906504133

11 Feb 2026 01:13PM UTC coverage: 95.997% (-0.1%) from 96.101%
21906504133

push

github

web-flow
Compute Wikidata coverage (#129)

108 of 115 new or added lines in 1 file covered. (93.91%)

1 existing line in 1 file now uncovered.

1199 of 1249 relevant lines covered (96.0%)

97.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.67
/src/coverage.rs
1
pub use reader::Coverage;
2
pub use writer::build_coverage;
3

4
/// The granularity of S2 cells we use to represent spatial coverage.
5
///
6
/// At level 19, An S2 cell is about 15 to 20 meters wide, see [S2 Cell
7
/// Statistics](https://s2geometry.io/resources/s2cell_statistics.html).
8
/// For an interactive visualization, see [S2 Region Coverer Online
9
// Viewer](https://igorgatis.github.io/ws2/ for a visualization).
10
const S2_GRANULARITY_LEVEL: u8 = 19;
11

12
/// To store the coverage map as a bitvector in a more compact form,
13
/// we do not store the actual S2 cell IDs because this would lead
14
/// to a sparse bitvector without contiguous runs. Rather, we shift the
15
/// unsigned 64-bit integer ids to the right, so that neighboring
16
/// parent cells (at our finest granularity, S2_GRANULARITY_LEVEL)
17
/// become neighboring bits in the bitvector. This leads to a better
18
/// run-length encoding. S2 cell ids use the most significant three
19
/// bits to encode the cube face [0..5], and then two bits for each
20
/// level of granularity.
21
const S2_CELL_ID_SHIFT: u8 = 64 - (3 + 2 * S2_GRANULARITY_LEVEL);
22

23
mod reader {
24
    use super::S2_CELL_ID_SHIFT;
25
    use anyhow::{Ok, Result, anyhow};
26
    use memmap2::Mmap;
27
    use s2::cellid::CellID;
28
    use std::fs::File;
29
    use std::path::Path;
30

31
    #[allow(dead_code)]
32
    pub struct Coverage<'a> {
33
        file: File,
34
        mmap: Mmap,
35

36
        num_runs: usize,
37
        run_starts_offset: usize, // TODO: Replace by `run_starts: &'a [u64]`.
38
        run_lengths_offset: usize, // TODO: Replace by `run_lengths: &'a [u8]`.
39

40
        wikidata_ids: &'a [u64],
41
    }
42

43
    impl<'a> Coverage<'a> {
44
        pub fn is_covering(&self, cell: &CellID) -> bool {
1,807✔
45
            // SAFETY: Alignment and bounds already checked by get_offset_size().
46
            let run_starts = unsafe {
1,807✔
47
                let ptr = self.mmap.as_ptr().add(self.run_starts_offset) as *const u64;
1,807✔
48
                std::slice::from_raw_parts(ptr, self.num_runs)
1,807✔
49
            };
50

51
            let value: u64 = cell.0 >> S2_CELL_ID_SHIFT;
1,807✔
52
            let index = if cfg!(target_endian = "little") {
1,807✔
53
                run_starts.partition_point(|&start| start <= value)
12,581✔
54
            } else {
55
                run_starts.partition_point(|&start| start.swap_bytes() <= value)
×
56
            };
57
            if index > 0 {
1,807✔
58
                let start = run_starts[index - 1];
1,804✔
59
                let limit = start + self.mmap[self.run_lengths_offset + index - 1] as u64;
1,804✔
60
                start <= value && value <= limit
1,804✔
61
            } else {
62
                false
3✔
63
            }
64
        }
1,807✔
65

66
        #[allow(dead_code)] // TODO: Remove attribute once we use this function.
67
        pub fn contains_wikidata_id(&self, id: u64) -> bool {
6✔
68
            if cfg!(target_endian = "little") {
6✔
69
                self.wikidata_ids.binary_search(&id).is_ok()
6✔
70
            } else {
NEW
71
                self.wikidata_ids
×
NEW
72
                    .binary_search_by(|x| x.swap_bytes().cmp(&id))
×
NEW
73
                    .is_ok()
×
74
            }
75
        }
6✔
76

77
        pub fn load(path: &Path) -> Result<Coverage<'_>> {
4✔
78
            let file = File::open(path)?;
4✔
79

80
            // SAFETY: No other process writes to the file while we read it.
81
            let mmap = unsafe { Mmap::map(&file)? };
4✔
82

83
            Self::check_signature(&mmap)?;
4✔
84

85
            let (run_starts_offset, run_starts_size) = Self::get_offset_size(b"runstart", &mmap, 8)
4✔
86
                .ok_or(anyhow!("no \"runstart\" in coverage file {:?}", path))?;
4✔
87
            let (run_lengths_offset, run_lengths_size) =
4✔
88
                Self::get_offset_size(b"runlengt", &mmap, 1)
4✔
89
                    .ok_or(anyhow!("no \"runlengt\" in coverage file {:?}", path))?;
4✔
90
            if run_starts_size != run_lengths_size * 8 {
4✔
91
                return Err(anyhow!("inconsistent number of runs in {:?}", path));
×
92
            }
4✔
93
            let num_runs = run_lengths_size;
4✔
94

95
            let (wikidata_offset, wikidata_size) = Self::get_offset_size(b"wikidata", &mmap, 8)
4✔
96
                .ok_or(anyhow!("no \"wikidata\" in coverage file {:?}", path))?;
4✔
97
            // SAFETY: Alignment and bounds checked by get_offset_size().
98
            let wikidata_ids = unsafe {
4✔
99
                let ptr = mmap.as_ptr().add(wikidata_offset) as *const u64;
4✔
100
                std::slice::from_raw_parts(ptr, wikidata_size / 8)
4✔
101
            };
102

103
            Ok(Coverage {
4✔
104
                file,
4✔
105
                mmap,
4✔
106
                num_runs,
4✔
107
                run_starts_offset,
4✔
108
                run_lengths_offset,
4✔
109
                wikidata_ids,
4✔
110
            })
4✔
111
        }
4✔
112

113
        fn check_signature(data: &[u8]) -> Result<()> {
8✔
114
            if data.len() < 24 || &data[0..24] != b"diffed-places coverage\0\0" {
8✔
115
                return Err(anyhow!("malformed coverage file"));
3✔
116
            }
5✔
117
            Ok(())
5✔
118
        }
8✔
119

120
        // This implementation only works on 64-bit processors. Theoretically,
121
        // we could write a special implementation for 32-bit platforms, which
122
        // would have to restrict its input to files smaller than 4 GiB.
123
        // However, we’re not doing a retro-computing project, so we’ll never
124
        // execute our code on such machines.
125
        #[cfg(target_pointer_width = "64")]
126
        fn get_offset_size(id: &[u8; 8], bytes: &[u8], alignment: usize) -> Option<(usize, usize)> {
21✔
127
            if bytes.len() < 32 {
21✔
128
                return None;
×
129
            }
21✔
130
            let num_headers = u64::from_le_bytes(bytes[24..32].try_into().ok()?) as usize;
21✔
131
            if bytes.len() < 32 + num_headers * 24 {
21✔
132
                return None;
1✔
133
            }
20✔
134
            for i in 0..num_headers {
37✔
135
                let pos = 32 + i * 24;
37✔
136
                if *id == bytes[pos..pos + 8] {
37✔
137
                    let offset =
19✔
138
                        u64::from_le_bytes(bytes[pos + 8..pos + 16].try_into().ok()?) as usize;
19✔
139
                    let size =
19✔
140
                        u64::from_le_bytes(bytes[pos + 16..pos + 24].try_into().ok()?) as usize;
19✔
141
                    if bytes.len() >= offset + size
19✔
142
                        && offset.is_multiple_of(alignment)
19✔
143
                        && size.is_multiple_of(alignment)
16✔
144
                    {
145
                        return Some((offset, size));
16✔
146
                    }
3✔
147
                }
18✔
148
            }
149
            None
4✔
150
        }
21✔
151
    }
152

153
    #[cfg(test)]
154
    mod tests {
155
        use super::Coverage;
156

157
        #[test]
158
        fn test_check_signature() {
1✔
159
            assert!(Coverage::check_signature(b"").is_err());
1✔
160
            assert!(Coverage::check_signature(b"foo").is_err());
1✔
161
            assert!(Coverage::check_signature(b"diffed-places coverage\0\x01").is_err());
1✔
162
            assert!(Coverage::check_signature(b"diffed-places coverage\0\0\0\0\0\0").is_ok());
1✔
163
        }
1✔
164

165
        #[test]
166
        fn test_get_offset_size() {
1✔
167
            // Construct a coverage file with two keys in its header.
168
            let mut bytes = Vec::new();
1✔
169
            bytes.extend_from_slice(b"diffed-places coverage\0\0");
1✔
170
            bytes.extend_from_slice(&2_u64.to_le_bytes());
1✔
171
            assert!(Coverage::get_offset_size(b"some_key", &bytes, 1) == None);
1✔
172
            for (key, offset, size) in [(b"some_key", 80, 16), (b"otherkey", 83, 2)] {
2✔
173
                bytes.extend_from_slice(key as &[u8; 8]);
2✔
174
                bytes.extend_from_slice(&(offset as u64).to_le_bytes());
2✔
175
                bytes.extend_from_slice(&(size as u64).to_le_bytes());
2✔
176
            }
2✔
177
            bytes.extend_from_slice(&0xdeadbeefcafefeed_u64.to_le_bytes());
1✔
178
            bytes.extend_from_slice(&42_u64.to_le_bytes());
1✔
179
            let bytes = bytes.as_ref();
1✔
180

181
            // The file header has no entry for key "in-exist".
182
            assert_eq!(Coverage::get_offset_size(b"in-exist", bytes, 1), None);
1✔
183

184
            // The data for "some_key" starts at offset 80 and is 16 bytes, which is
185
            // correctly aligned for single-byte, 8-byte and 16-byte access.
186
            // However, it would be unsafe (at least on RISC CPUs) to perform 32-byte
187
            // access, eg. loading a SIMD register, because 80 is not divisible by 32.
188
            // Also, the data size is too small to read 32-byte entitites anyway.
189
            assert_eq!(
1✔
190
                Coverage::get_offset_size(b"some_key", bytes, 1),
1✔
191
                Some((80, 16))
192
            );
193
            assert_eq!(
1✔
194
                Coverage::get_offset_size(b"some_key", bytes, 2),
1✔
195
                Some((80, 16))
196
            );
197
            assert_eq!(
1✔
198
                Coverage::get_offset_size(b"some_key", bytes, 8),
1✔
199
                Some((80, 16))
200
            );
201
            assert_eq!(Coverage::get_offset_size(b"some_key", bytes, 32), None);
1✔
202

203
            // The data for "otherkey" starts at offset 83 and is 2 bytes long.
204
            // Access is only safe with single-byte alignment.
205
            assert_eq!(
1✔
206
                Coverage::get_offset_size(b"otherkey", bytes, 1),
1✔
207
                Some((83, 2))
208
            );
209
            assert_eq!(Coverage::get_offset_size(b"otherkey", bytes, 2), None);
1✔
210
            assert_eq!(Coverage::get_offset_size(b"otherkey", bytes, 8), None);
1✔
211
        }
1✔
212
    }
213
}
214

215
mod writer {
216
    use super::{Coverage, S2_CELL_ID_SHIFT, S2_GRANULARITY_LEVEL};
217
    use crate::PROGRESS_BAR_STYLE;
218
    use anyhow::{Ok, Result, anyhow};
219
    use ext_sort::{ExternalSorter, ExternalSorterBuilder, buffer::LimitedBufferBuilder};
220
    use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
221
    use parquet::file::reader::{FileReader, SerializedFileReader};
222
    use parquet::record::RowAccessor;
223
    use parquet::schema::types::Type;
224
    use rayon::prelude::*;
225
    use regex::Regex;
226
    use s2::{
227
        cap::Cap,
228
        cell::Cell,
229
        cellid::CellID,
230
        region::RegionCoverer,
231
        s1::{Angle, ChordAngle},
232
    };
233
    use std::collections::HashSet;
234
    use std::fs::{File, remove_file, rename};
235
    use std::hash::{BuildHasherDefault, Hasher};
236
    use std::io::{BufReader, BufWriter, Seek, SeekFrom, Write};
237
    use std::path::{Path, PathBuf};
238
    use std::sync::atomic::{AtomicU64, Ordering};
239
    use std::sync::mpsc::{Receiver, SyncSender, sync_channel};
240

241
    /// Computes the spatial coverage of a set of places.
242
    pub fn build_coverage(atp: &Path, progress: &MultiProgress, workdir: &Path) -> Result<PathBuf> {
2✔
243
        assert!(atp.exists());
2✔
244

245
        let out = workdir.join("coverage");
2✔
246
        if out.exists() {
2✔
NEW
247
            log::info!("skipping build_coverage, already found {:?}", out);
×
NEW
248
            return Ok(out);
×
249
        } else {
250
            log::info!("build_coverage is starting");
2✔
251
        }
252

253
        let mut tmp = PathBuf::from(&out);
2✔
254
        tmp.add_extension("tmp");
2✔
255
        let mut writer = CoverageWriter::try_new(&tmp)?;
2✔
256

257
        // To avoid deadlock, we must not use Rayon threads here.
258
        // https://dev.to/sgchris/scoped-threads-with-stdthreadscope-in-rust-163-48f9
259
        let (cell_tx, cell_rx) = sync_channel::<CellID>(50_000);
2✔
260
        let (wikidata_tx, wikidata_rx) = sync_channel::<u64>(1000);
2✔
261

262
        // The AllThePlaces dump of 2026-01-03 references 3791 unique wikidata ids.
263
        let mut wikidata_ids = Vec::<u64>::new();
2✔
264
        std::thread::scope(|s| {
2✔
265
            let producer = s.spawn(|| read_places(atp, progress, cell_tx, wikidata_tx));
2✔
266
            let cell_consumer = s.spawn(|| build_spatial_coverage(cell_rx, progress, &mut writer));
2✔
267
            let wikidata_consumer = s.spawn(|| {
2✔
268
                wikidata_ids = collect_wikidata_ids(wikidata_rx);
2✔
269
                Ok(())
2✔
270
            });
2✔
271
            producer
2✔
272
                .join()
2✔
273
                .expect("producer panic")
2✔
274
                .and(cell_consumer.join().expect("cell consumer panic"))
2✔
275
                .and(wikidata_consumer.join().expect("wikidata consumer panic"))
2✔
276
        })?;
2✔
277

278
        writer.set_wikidata_ids(wikidata_ids);
2✔
279
        writer.close()?;
2✔
280
        rename(&tmp, &out)?;
2✔
281

282
        // As a sanity check, let’s try to open the freshly generated file.
283
        // This verifies the presence of the correct file signature, and that
284
        // the run_starts and run_lengths array are correctly aligned and
285
        // within allowable bounds.
286
        _ = Coverage::load(&out)?;
2✔
287

288
        log::info!("build_spatial_coverage finished, built {:?}", out);
2✔
289
        Ok(out)
2✔
290
    }
2✔
291

292
    fn read_places(
2✔
293
        places: &Path,
2✔
294
        progress: &MultiProgress,
2✔
295
        out_cells: SyncSender<CellID>,
2✔
296
        out_wikidata_ids: SyncSender<u64>,
2✔
297
    ) -> Result<()> {
2✔
298
        use anyhow::Context;
299
        let file =
2✔
300
            File::open(places).with_context(|| format!("could not open file `{:?}`", places))?;
2✔
301
        let reader = SerializedFileReader::new(file)?;
2✔
302
        let metadata = reader.metadata();
2✔
303
        let file_metadata = metadata.file_metadata();
2✔
304
        let schema = file_metadata.schema();
2✔
305
        let s2_cell_id_column = column_index("s2_cell_id", schema)?;
2✔
306
        // let source_column = column_index("source", schema)?;
307
        let tags_column = column_index("tags", schema)?;
2✔
308
        let num_row_groups = reader.num_row_groups();
2✔
309
        let num_rows: i64 = file_metadata.num_rows();
2✔
310
        let num_rows: u64 = if num_rows < 0 { 0 } else { num_rows as u64 };
2✔
311

312
        let large_radius = meters_to_chord_angle(100.0);
2✔
313
        let small_radius = meters_to_chord_angle(10.0);
2✔
314
        let coverer = RegionCoverer {
2✔
315
            max_cells: 8,
2✔
316
            min_level: S2_GRANULARITY_LEVEL,
2✔
317
            max_level: S2_GRANULARITY_LEVEL,
2✔
318
            level_mod: 1,
2✔
319
        };
2✔
320

321
        let wikidata_regex = Regex::new(r"[Qq]\d+")?;
2✔
322

323
        let bar = progress.add(ProgressBar::new(num_rows));
2✔
324
        bar.set_style(ProgressStyle::with_template(PROGRESS_BAR_STYLE)?);
2✔
325
        bar.set_prefix("cov.read ");
2✔
326
        bar.set_message("features");
2✔
327

328
        (0..num_row_groups)
2✔
329
            .into_par_iter()
2✔
330
            .try_for_each(|row_group_index| {
2✔
331
                // Because Apache’s implementation of Parquet is not
332
                // thread-safe, but the alternative implementation in
333
                // arrow2 is deprecated, we let each worker thread have
334
                // its own SerializedFileReader, each reading one row
335
                // group in the same Parquet file. This is a little
336
                // wasteful, but it’s actually not too bad.  In our
337
                // Parquet file for the full AllThePlace dump of
338
                // 2026-01-03, there were 24 row groups in total.
339
                // An earlier version of this code was using the
340
                // alternative implementation of the parquet2 crate,
341
                // but our application code got awfully complicated
342
                // when using that low-level library.
343
                let reader = SerializedFileReader::new(File::open(places)?)?;
2✔
344
                let row_group = reader.get_row_group(row_group_index)?;
2✔
345
                for row in row_group.get_row_iter(None)? {
14✔
346
                    let row = row?;
14✔
347
                    let s2_cell = Cell::from(CellID(row.get_ulong(s2_cell_id_column)?));
14✔
348
                    // let source = row.get_string(source_column)?;
349
                    let tags = row.get_map(tags_column)?.entries();
14✔
350
                    let mut radius = small_radius;
14✔
351
                    for (key, value) in tags.iter() {
136✔
352
                        use parquet::record::Field::Str;
353
                        if let (Str(key), Str(value)) = (key, value) {
136✔
354
                            radius = radius.max(match (key.as_ref(), value.as_ref()) {
136✔
355
                                ("shop", _) => large_radius,
136✔
356
                                ("tourism", _) => large_radius,
130✔
357
                                ("public_transport", "platform") => large_radius,
130✔
358
                                ("railway", "platform") => large_radius,
130✔
359
                                (_, _) => small_radius,
130✔
360
                            });
361
                            if key == "wikidata" || key.ends_with(":wikidata") {
136✔
362
                                for id in wikidata_regex.find_iter(value).map(|m| {
14✔
363
                                    m.as_str()[1..].parse::<u64>().unwrap_or_else(|_| {
14✔
NEW
364
                                        panic!("regex captured non-digits in \"{}\"", m.as_str())
×
365
                                    })
366
                                }) {
14✔
367
                                    out_wikidata_ids.send(id)?;
14✔
368
                                }
369
                            }
122✔
UNCOV
370
                        }
×
371
                    }
372
                    let cap = Cap::from_center_chordangle(&s2_cell.center(), &radius);
14✔
373
                    for cell_id in coverer.covering(&cap).0.into_iter() {
976✔
374
                        out_cells.send(cell_id)?;
976✔
375
                    }
376
                    bar.inc(1);
14✔
377
                }
378

379
                Ok(())
2✔
380
            })?;
2✔
381

382
        bar.finish();
2✔
383
        Ok(())
2✔
384
    }
2✔
385

386
    fn column_index(name: &str, schema: &Type) -> Result<usize> {
4✔
387
        for (i, field) in schema.get_fields().iter().enumerate() {
8✔
388
            if field.name() == name {
8✔
389
                return Ok(i);
4✔
390
            }
4✔
391
        }
392
        Err(anyhow!("column \"{}\" not found", name))
×
393
    }
4✔
394

395
    fn meters_to_chord_angle(radius_meters: f64) -> ChordAngle {
4✔
396
        use s2::s1::angle::Rad;
397
        const EARTH_RADIUS_METERS: f64 = 6_371_000.0;
398
        ChordAngle::from(Angle::from(Rad(radius_meters / EARTH_RADIUS_METERS)))
4✔
399
    }
4✔
400

401
    /// Builds a spatial coverage file from a stream of s2::CellIDs.
402
    fn build_spatial_coverage(
2✔
403
        cells: Receiver<CellID>,
2✔
404
        progress: &MultiProgress,
2✔
405
        writer: &mut CoverageWriter,
2✔
406
    ) -> Result<()> {
2✔
407
        let num_cells = AtomicU64::new(0);
2✔
408
        let sorter: ExternalSorter<CellID, std::io::Error, LimitedBufferBuilder> =
2✔
409
            ExternalSorterBuilder::new()
2✔
410
                .with_tmp_dir(Path::new("./"))
2✔
411
                .with_buffer(LimitedBufferBuilder::new(
2✔
412
                    1_000_000, /* preallocate */ true,
413
                ))
414
                .build()?;
2✔
415
        let sorted = sorter.sort(cells.iter().map(|x| {
976✔
416
            num_cells.fetch_add(1, Ordering::SeqCst);
976✔
417
            std::io::Result::Ok(x)
976✔
418
        }))?;
976✔
419

420
        let bar = progress.add(ProgressBar::new(num_cells.load(Ordering::SeqCst)));
2✔
421
        bar.set_style(ProgressStyle::with_template(PROGRESS_BAR_STYLE)?);
2✔
422
        bar.set_prefix("cov.write");
2✔
423
        bar.set_message("s2 cells");
2✔
424

425
        for cur in sorted {
976✔
426
            writer.write(cur?.0 >> S2_CELL_ID_SHIFT)?;
976✔
427
            bar.inc(1);
976✔
428
        }
429

430
        bar.finish();
2✔
431

432
        Ok(())
2✔
433
    }
2✔
434

435
    fn collect_wikidata_ids(stream: Receiver<u64>) -> Vec<u64> {
2✔
436
        // The AllThePlaces dump of 2026-01-03 references 3791 unique wikidata ids.
437
        let mut ids = HashSet::with_capacity_and_hasher(8192, NoOpBuildHasher::default());
2✔
438
        let mut last: u64 = 0; // not a valid wikidata id
2✔
439
        for id in stream {
14✔
440
            if id != last {
14✔
441
                ids.insert(id);
6✔
442
                last = id;
6✔
443
            }
8✔
444
        }
445

446
        let mut sorted_ids: Vec<u64> = ids.into_iter().collect();
2✔
447
        sorted_ids.sort();
2✔
448
        sorted_ids
2✔
449
    }
2✔
450

451
    struct CoverageWriter {
452
        writer: BufWriter<File>,
453
        run_starts_pos: u64, // offset of run_starts array relative to start of file
454

455
        run_lengths_path: PathBuf,
456
        run_lengths_writer: BufWriter<File>,
457

458
        num_values: u64,
459
        num_runs: u64,
460
        run_start: Option<u64>,
461
        run_length_minus_1: u8,
462

463
        wikidata_ids: Vec<u64>,
464
    }
465

466
    impl CoverageWriter {
467
        const NUM_HEADERS: usize = 3;
468

469
        fn try_new(path: &Path) -> Result<CoverageWriter> {
3✔
470
            let file = File::create(path)?;
3✔
471
            let mut writer = BufWriter::with_capacity(32768, file);
3✔
472

473
            // Write file header.
474
            writer.write_all(b"diffed-places coverage\0\0")?;
3✔
475
            writer.write_all(&(Self::NUM_HEADERS as u64).to_le_bytes())?;
3✔
476
            writer.write_all(&[0; 24 * Self::NUM_HEADERS])?; // leave space for headers
3✔
477

478
            let run_starts_pos = writer.stream_position()?;
3✔
479
            let run_lengths_path = path.with_extension("tmp_run_lengths");
3✔
480
            let run_lengths_file = File::create(&run_lengths_path)?;
3✔
481
            let run_lengths_writer = BufWriter::with_capacity(32768, run_lengths_file);
3✔
482

483
            Ok(CoverageWriter {
3✔
484
                writer,
3✔
485
                run_starts_pos,
3✔
486
                run_lengths_path,
3✔
487
                run_lengths_writer,
3✔
488
                num_values: 0,
3✔
489
                num_runs: 0,
3✔
490
                run_start: None,
3✔
491
                run_length_minus_1: 0,
3✔
492
                wikidata_ids: Vec::default(),
3✔
493
            })
3✔
494
        }
3✔
495

496
        fn write(&mut self, value: u64) -> Result<()> {
1,236✔
497
            let Some(run_start) = self.run_start else {
1,236✔
498
                self.num_values = 1;
3✔
499
                self.num_runs = 1;
3✔
500
                self.run_start = Some(value);
3✔
501
                self.run_length_minus_1 = 0;
3✔
502
                return Ok(());
3✔
503
            };
504

505
            let run_end: u64 = run_start + (self.run_length_minus_1 as u64);
1,233✔
506
            assert!(
1,233✔
507
                value >= run_end,
1,233✔
508
                "values not written in sort order: {} after {}",
509
                value,
510
                run_end
511
            );
512

513
            if value == run_end {
1,233✔
514
                // If we write the same value twice, we don’t need to do anything.
515
                return Ok(());
×
516
            } else if value == run_end + 1 && self.run_length_minus_1 < 0xff {
1,233✔
517
                // Extending the length of the current run, if there’s still
518
                // enough space to hold the new length in the available 8 bits.
519
                self.run_length_minus_1 += 1;
1,123✔
520
                self.num_values += 1;
1,123✔
521
                return Ok(());
1,123✔
522
            }
110✔
523

524
            // Start a new run with the current value.
525
            self.finish_run()?;
110✔
526
            self.run_start = Some(value);
110✔
527
            self.run_length_minus_1 = 0;
110✔
528
            self.num_values += 1;
110✔
529
            self.num_runs += 1;
110✔
530
            Ok(())
110✔
531
        }
1,236✔
532

533
        fn set_wikidata_ids(&mut self, ids: Vec<u64>) {
3✔
534
            assert!(ids.is_sorted());
3✔
535
            self.wikidata_ids = ids;
3✔
536
        }
3✔
537

538
        fn write_wikidata_ids(&mut self) -> Result<()> {
3✔
539
            for id in &self.wikidata_ids {
7✔
540
                self.writer.write_all(&id.to_le_bytes())?;
7✔
541
            }
542
            Ok(())
3✔
543
        }
3✔
544

545
        fn close(mut self) -> Result<()> {
3✔
546
            self.finish_run()?;
3✔
547
            self.writer.flush()?;
3✔
548

549
            // Append the run lengths, which are at this point in time in a temporary file,
550
            // to the end of the main file.
551
            let run_lengths_pos = self.writer.stream_position()?;
3✔
552
            self.run_lengths_writer.flush()?;
3✔
553
            assert_eq!(self.run_lengths_writer.stream_position()?, self.num_runs);
3✔
554
            self.run_lengths_writer.seek(SeekFrom::Start(0))?;
3✔
555
            let run_lengths_path: &Path = self.run_lengths_path.as_path();
3✔
556
            let mut reader = BufReader::new(File::open(run_lengths_path)?);
3✔
557
            std::io::copy(&mut reader, &mut self.writer)?;
3✔
558
            remove_file(run_lengths_path)?;
3✔
559

560
            // Append wikidata ids.
561
            self.write_padding(8)?;
3✔
562
            let wikidata_ids_pos = self.writer.stream_position()?;
3✔
563
            self.write_wikidata_ids()?;
3✔
564
            let wikidata_ids_size = self.writer.stream_position()? - wikidata_ids_pos;
3✔
565

566
            self.write_headers(&[
3✔
567
                ("runstart", self.run_starts_pos, self.num_runs * 8),
3✔
568
                ("runlengt", run_lengths_pos, self.num_runs),
3✔
569
                ("wikidata", wikidata_ids_pos, wikidata_ids_size),
3✔
570
            ])?;
3✔
571
            Ok(())
3✔
572
        }
3✔
573

574
        fn write_headers(&mut self, headers: &[(&str, u64, u64)]) -> Result<()> {
3✔
575
            self.writer.seek(SeekFrom::Start(32))?;
3✔
576
            assert_eq!(headers.len(), Self::NUM_HEADERS);
3✔
577
            for (id, pos, len) in headers {
9✔
578
                assert_eq!(
9✔
579
                    id.len(),
9✔
580
                    8,
581
                    "header id must be 8 chars long but \"{:?}\" is not",
582
                    id
583
                );
584
                self.writer.write_all(id.as_bytes())?;
9✔
585
                self.writer.write_all(&pos.to_le_bytes())?;
9✔
586
                self.writer.write_all(&len.to_le_bytes())?;
9✔
587
            }
588
            self.writer.flush()?;
3✔
589
            Ok(())
3✔
590
        }
3✔
591

592
        fn finish_run(&mut self) -> Result<()> {
113✔
593
            let Some(run_start) = self.run_start else {
113✔
594
                return Ok(());
×
595
            };
596
            self.writer.write_all(&run_start.to_le_bytes())?;
113✔
597
            self.run_lengths_writer
113✔
598
                .write_all(&[self.run_length_minus_1])?;
113✔
599
            Ok(())
113✔
600
        }
113✔
601

602
        fn write_padding(&mut self, alignment: usize) -> Result<()> {
3✔
603
            let pos = self.writer.stream_position()?;
3✔
604
            let alignment = alignment as u64;
3✔
605
            let num_bytes = ((alignment - (pos % alignment)) % alignment) as usize;
3✔
606
            if num_bytes > 0 {
3✔
607
                let padding = vec![0; num_bytes];
3✔
608
                self.writer.write_all(&padding)?;
3✔
NEW
609
            }
×
610
            Ok(())
3✔
611
        }
3✔
612
    }
613

614
    #[derive(Default)]
615
    struct NoOpHasher(u64);
616

617
    impl Hasher for NoOpHasher {
618
        fn write(&mut self, bytes: &[u8]) {
1✔
619
            if let Some(value) = bytes.last() {
1✔
620
                self.0 = *value as u64;
1✔
621
            }
1✔
622
        }
1✔
623

624
        fn write_u64(&mut self, value: u64) {
7✔
625
            self.0 = value;
7✔
626
        }
7✔
627

628
        fn finish(&self) -> u64 {
8✔
629
            self.0
8✔
630
        }
8✔
631
    }
632

633
    type NoOpBuildHasher = BuildHasherDefault<NoOpHasher>;
634

635
    #[cfg(test)]
636
    mod tests {
637
        use super::super::Coverage;
638
        use super::{CoverageWriter, S2_CELL_ID_SHIFT, S2_GRANULARITY_LEVEL, build_coverage};
639
        use anyhow::{Ok, Result};
640
        use indicatif::{MultiProgress, ProgressDrawTarget};
641
        use s2::cellid::CellID;
642
        use std::path::PathBuf;
643
        use tempfile::{NamedTempFile, TempDir};
644

645
        #[test]
646
        fn test_cell_id_shift() {
1✔
647
            let id = CellID::from_face_pos_level(3, 0x12345678, S2_GRANULARITY_LEVEL as u64);
1✔
648
            let range_len = id.range_max().0 - id.range_min().0 + 2;
1✔
649
            assert_eq!(id.level() as u8, S2_GRANULARITY_LEVEL);
1✔
650
            assert_eq!(range_len.ilog2() as u8, S2_CELL_ID_SHIFT);
1✔
651
        }
1✔
652

653
        #[test]
654
        fn test_build_coverage() -> Result<()> {
1✔
655
            use std::os::unix::fs::symlink;
656
            let mut atp = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
1✔
657
            atp.push("tests/test_data/alltheplaces.parquet");
1✔
658
            let workdir = TempDir::new()?;
1✔
659
            symlink(&atp, workdir.path().join("alltheplaces.parquet"))?;
1✔
660
            let progress = MultiProgress::with_draw_target(ProgressDrawTarget::hidden());
1✔
661
            let coverage = build_coverage(&atp, &progress, workdir.path())?;
1✔
662
            assert!(coverage.exists());
1✔
663
            Ok(())
1✔
664
        }
1✔
665

666
        #[test]
667
        fn test_coverage_writer() -> Result<()> {
1✔
668
            let temp_file = NamedTempFile::new()?;
1✔
669
            let mut writer = CoverageWriter::try_new(temp_file.path())?;
1✔
670
            writer.write(7)?;
1✔
671
            for i in 1000..=1258 {
259✔
672
                writer.write(i)?;
259✔
673
            }
674
            writer.set_wikidata_ids(vec![23, 77, 88]);
1✔
675
            writer.close()?;
1✔
676

677
            let cov = Coverage::load(temp_file.path())?;
1✔
678
            for i in &[0, 5, 6, 8, 9, 999, 1259] {
7✔
679
                let cell = CellID(i << S2_CELL_ID_SHIFT);
7✔
680
                assert!(
7✔
681
                    !cov.is_covering(&cell),
7✔
682
                    "cell {:?} for i={} should not be covered, but is",
683
                    cell,
684
                    i,
685
                );
686
            }
687
            for i in &[7, 1000, 1001, 1002, 1111, 1254, 1255, 1256, 1257, 1258] {
10✔
688
                let cell = CellID(i << S2_CELL_ID_SHIFT);
10✔
689
                assert!(
10✔
690
                    cov.is_covering(&cell),
10✔
691
                    "cell {:?} for i={} should be covered, but is not",
692
                    cell,
693
                    i,
694
                );
695
            }
696

697
            assert_eq!(cov.contains_wikidata_id(1), false);
1✔
698
            assert_eq!(cov.contains_wikidata_id(23), true);
1✔
699
            assert_eq!(cov.contains_wikidata_id(51), false);
1✔
700
            assert_eq!(cov.contains_wikidata_id(77), true);
1✔
701
            assert_eq!(cov.contains_wikidata_id(88), true);
1✔
702
            assert_eq!(cov.contains_wikidata_id(89), false);
1✔
703

704
            Ok(())
1✔
705
        }
1✔
706
    }
707

708
    #[test]
709
    fn test_no_op_hasher() {
1✔
710
        let mut h = NoOpHasher(7);
1✔
711

712
        h.write_u64(2600);
1✔
713
        assert_eq!(h.finish(), 2600);
1✔
714

715
        h.write(&[31, 33, 7]);
1✔
716
        assert_eq!(h.finish(), 7);
1✔
717
    }
1✔
718
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc