• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

diffed-places / pipeline / 22520422957

28 Feb 2026 12:06PM UTC coverage: 96.815% (-0.1%) from 96.914%
22520422957

push

github

web-flow
Build index of OpenStreetMap feature IDs (#166)

At the moment, we only populate it for ways, and actually only those ways
that are referenced from a relation. Technically we could populate the
lookup-table also for nodes and relations, and for ways that are not
referenced in relations; but at the moment we would not ever use this
data, so there’s no point in building it.

On a 2018 MacBook Pro with 6 Intel CPU cores, filtering the complete
OSM planet of 2026-01-19 to the coverage area of AllThePlaces
2026-01-03 now takes 1 second for relations, 50 seconds for ways and
93 seconds for nodes; peak memory usage for all three steps combined
is 127.1M.  The size of the produced `osm-filtered` files is 2.6M for
relations, 213.7M for ways and 489.1M for nodes.

127 of 130 new or added lines in 1 file covered. (97.69%)

36 existing lines in 3 files now uncovered.

2067 of 2135 relevant lines covered (96.81%)

108.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.04
/src/coverage.rs
1
pub use reader::Coverage;
2
pub use writer::build_coverage;
3

4
/// The granularity of S2 cells we use to represent spatial coverage.
5
///
6
/// At level 19, An S2 cell is about 15 to 20 meters wide, see [S2 Cell
7
/// Statistics](https://s2geometry.io/resources/s2cell_statistics.html).
8
/// For an interactive visualization, see [S2 Region Coverer Online
9
// Viewer](https://igorgatis.github.io/ws2/ for a visualization).
10
const S2_GRANULARITY_LEVEL: u8 = 19;
11

12
/// To store the coverage map as a bitvector in a more compact form,
13
/// we do not store the actual S2 cell IDs because this would lead
14
/// to a sparse bitvector without contiguous runs. Rather, we shift the
15
/// unsigned 64-bit integer ids to the right, so that neighboring
16
/// parent cells (at our finest granularity, S2_GRANULARITY_LEVEL)
17
/// become neighboring bits in the bitvector. This leads to a better
18
/// run-length encoding. S2 cell ids use the most significant three
19
/// bits to encode the cube face [0..5], and then two bits for each
20
/// level of granularity.
21
const S2_CELL_ID_SHIFT: u8 = 64 - (3 + 2 * S2_GRANULARITY_LEVEL);
22

23
pub fn is_wikidata_key(key: &str) -> bool {
2,658✔
24
    key == "wikidata" || (key.ends_with(":wikidata") && key != "species:wikidata")
2,658✔
25
}
2,658✔
26

27
pub fn parse_wikidata_ids(input: &str) -> impl Iterator<Item = u64> + '_ {
83✔
28
    input.split(';').filter_map(|part| {
85✔
29
        let trimmed = part.trim();
85✔
30
        let digits = trimmed
85✔
31
            .strip_prefix('Q')
85✔
32
            .or_else(|| trimmed.strip_prefix('q'))?;
85✔
33
        digits.parse::<u64>().ok()
85✔
34
    })
85✔
35
}
83✔
36

37
#[cfg(test)]
38
mod tests {
39
    use super::{is_wikidata_key, parse_wikidata_ids};
40

41
    #[test]
42
    fn test_is_wikidata_id() {
1✔
43
        assert_eq!(is_wikidata_key("highway"), false);
1✔
44
        assert_eq!(is_wikidata_key("wikidata"), true);
1✔
45
        assert_eq!(is_wikidata_key("brand:wikidata"), true);
1✔
46
        assert_eq!(is_wikidata_key("network:wikidata"), true);
1✔
47
        assert_eq!(is_wikidata_key("operator:wikidata"), true);
1✔
48
        assert_eq!(is_wikidata_key("species:wikidata"), false);
1✔
49
    }
1✔
50

51
    #[test]
52
    fn test_parse_wikidata_ids() {
1✔
53
        let ids: Vec<u64> = parse_wikidata_ids(" Q123;Q813 ; q21").collect();
1✔
54
        assert_eq!(ids, vec![123, 813, 21]);
1✔
55
    }
1✔
56
}
57

58
mod reader {
59
    use super::S2_CELL_ID_SHIFT;
60
    use anyhow::{Ok, Result, anyhow};
61
    use memmap2::Mmap;
62
    use s2::cellid::CellID;
63
    use std::fs::File;
64
    use std::path::Path;
65

66
    pub struct Coverage<'a> {
67
        /// Backing store for `mmap`.
68
        _file: File,
69

70
        /// Backing store for `run_starts`, `run_lengths` and `wikidata_ids`.
71
        _mmap: Mmap,
72

73
        /// Run-length encoded s2 cell ids, shifted by S2_CELL_ID_SHIFT.
74
        run_starts: &'a [u64],
75
        run_lengths: &'a [u8],
76

77
        /// Sorted array of covered wikidata ids.
78
        wikidata_ids: &'a [u64],
79
    }
80

81
    impl<'a> Coverage<'a> {
82
        pub fn contains_s2_cell(&self, cell: &CellID) -> bool {
1,807✔
83
            let val: u64 = cell.0 >> S2_CELL_ID_SHIFT;
1,807✔
84
            let index = if cfg!(target_endian = "little") {
1,807✔
85
                self.run_starts.partition_point(|&x| x <= val)
12,581✔
86
            } else {
87
                self.run_starts.partition_point(|&x| x.swap_bytes() <= val)
×
88
            };
89
            if index > 0 {
1,807✔
90
                let start = self.run_starts[index - 1];
1,804✔
91
                let limit = start + self.run_lengths[index - 1] as u64;
1,804✔
92
                start <= val && val <= limit
1,804✔
93
            } else {
94
                false
3✔
95
            }
96
        }
1,807✔
97

98
        pub fn contains_wikidata_item(&self, id: u64) -> bool {
74✔
99
            if cfg!(target_endian = "little") {
74✔
100
                self.wikidata_ids.binary_search(&id).is_ok()
74✔
101
            } else {
UNCOV
102
                self.wikidata_ids
×
103
                    .binary_search_by(|x| x.swap_bytes().cmp(&id))
×
104
                    .is_ok()
×
105
            }
106
        }
74✔
107

108
        pub fn load(path: &Path) -> Result<Coverage<'_>> {
4✔
109
            let file = File::open(path)?;
4✔
110

111
            // SAFETY: We don’t truncate the file while it’s mapped into memory.
112
            let mmap = unsafe { Mmap::map(&file)? };
4✔
113

114
            Self::check_signature(&mmap)?;
4✔
115

116
            let run_starts = {
4✔
117
                let (offset, size) = Self::get_offset_size(b"runstart", &mmap, 8)
4✔
118
                    .ok_or(anyhow!("no run starts in coverage file {:?}", path))?;
4✔
119
                // SAFETY: Alignment and bounds checked by get_offset_size().
120
                unsafe {
121
                    let ptr = mmap.as_ptr().add(offset) as *const u64;
4✔
122
                    std::slice::from_raw_parts(ptr, size / 8)
4✔
123
                }
124
            };
125

126
            let run_lengths = {
4✔
127
                let (offset, size) = Self::get_offset_size(b"runlengt", &mmap, 1)
4✔
128
                    .ok_or(anyhow!("no run lengths in coverage file {:?}", path))?;
4✔
129
                // SAFETY: Bounds checked by get_offset_size().
130
                unsafe { std::slice::from_raw_parts(mmap.as_ptr().add(offset), size) }
4✔
131
            };
132

133
            if run_starts.len() != run_lengths.len() {
4✔
UNCOV
134
                return Err(anyhow!("inconsistent number of runs in {:?}", path));
×
135
            }
4✔
136

137
            let wikidata_ids = {
4✔
138
                let (offset, size) = Self::get_offset_size(b"wikidata", &mmap, 8)
4✔
139
                    .ok_or(anyhow!("no wikidata ids in coverage file {:?}", path))?;
4✔
140
                // SAFETY: Alignment and bounds checked by get_offset_size().
141
                unsafe {
142
                    let ptr = mmap.as_ptr().add(offset) as *const u64;
4✔
143
                    std::slice::from_raw_parts(ptr, size / 8)
4✔
144
                }
145
            };
146

147
            Ok(Coverage {
4✔
148
                _file: file,
4✔
149
                _mmap: mmap,
4✔
150
                run_starts,
4✔
151
                run_lengths,
4✔
152
                wikidata_ids,
4✔
153
            })
4✔
154
        }
4✔
155

156
        fn check_signature(data: &[u8]) -> Result<()> {
8✔
157
            if data.len() < 24 || &data[0..24] != b"diffed-places coverage\0\0" {
8✔
158
                return Err(anyhow!("malformed coverage file"));
3✔
159
            }
5✔
160
            Ok(())
5✔
161
        }
8✔
162

163
        // This implementation only works on 64-bit processors. Theoretically,
164
        // we could write a special implementation for 32-bit platforms, which
165
        // would have to restrict its input to files smaller than 4 GiB.
166
        // However, we’re not doing a retro-computing project, so we’ll never
167
        // execute our code on such machines.
168
        #[cfg(target_pointer_width = "64")]
169
        fn get_offset_size(id: &[u8; 8], bytes: &[u8], alignment: usize) -> Option<(usize, usize)> {
21✔
170
            if bytes.len() < 32 {
21✔
UNCOV
171
                return None;
×
172
            }
21✔
173
            let num_headers = u64::from_le_bytes(bytes[24..32].try_into().ok()?) as usize;
21✔
174
            if bytes.len() < 32 + num_headers * 24 {
21✔
175
                return None;
1✔
176
            }
20✔
177
            for i in 0..num_headers {
37✔
178
                let pos = 32 + i * 24;
37✔
179
                if *id == bytes[pos..pos + 8] {
37✔
180
                    let offset =
19✔
181
                        u64::from_le_bytes(bytes[pos + 8..pos + 16].try_into().ok()?) as usize;
19✔
182
                    let size =
19✔
183
                        u64::from_le_bytes(bytes[pos + 16..pos + 24].try_into().ok()?) as usize;
19✔
184
                    if bytes.len() >= offset + size
19✔
185
                        && offset.is_multiple_of(alignment)
19✔
186
                        && size.is_multiple_of(alignment)
16✔
187
                    {
188
                        return Some((offset, size));
16✔
189
                    }
3✔
190
                }
18✔
191
            }
192
            None
4✔
193
        }
21✔
194
    }
195

196
    #[cfg(test)]
197
    mod tests {
198
        use super::Coverage;
199

200
        #[test]
201
        fn test_check_signature() {
1✔
202
            assert!(Coverage::check_signature(b"").is_err());
1✔
203
            assert!(Coverage::check_signature(b"foo").is_err());
1✔
204
            assert!(Coverage::check_signature(b"diffed-places coverage\0\x01").is_err());
1✔
205
            assert!(Coverage::check_signature(b"diffed-places coverage\0\0\0\0\0\0").is_ok());
1✔
206
        }
1✔
207

208
        #[test]
209
        fn test_get_offset_size() {
1✔
210
            // Construct a coverage file with two keys in its header.
211
            let mut bytes = Vec::new();
1✔
212
            bytes.extend_from_slice(b"diffed-places coverage\0\0");
1✔
213
            bytes.extend_from_slice(&2_u64.to_le_bytes());
1✔
214
            assert!(Coverage::get_offset_size(b"some_key", &bytes, 1) == None);
1✔
215
            for (key, offset, size) in [(b"some_key", 80, 16), (b"otherkey", 83, 2)] {
2✔
216
                bytes.extend_from_slice(key as &[u8; 8]);
2✔
217
                bytes.extend_from_slice(&(offset as u64).to_le_bytes());
2✔
218
                bytes.extend_from_slice(&(size as u64).to_le_bytes());
2✔
219
            }
2✔
220
            bytes.extend_from_slice(&0xdeadbeefcafefeed_u64.to_le_bytes());
1✔
221
            bytes.extend_from_slice(&42_u64.to_le_bytes());
1✔
222
            let bytes = bytes.as_ref();
1✔
223

224
            // The file header has no entry for key "in-exist".
225
            assert_eq!(Coverage::get_offset_size(b"in-exist", bytes, 1), None);
1✔
226

227
            // The data for "some_key" starts at offset 80 and is 16 bytes, which is
228
            // correctly aligned for single-byte, 8-byte and 16-byte access.
229
            // However, it would be unsafe (at least on RISC CPUs) to perform 32-byte
230
            // access, eg. loading a SIMD register, because 80 is not divisible by 32.
231
            // Also, the data size is too small to read 32-byte entitites anyway.
232
            assert_eq!(
1✔
233
                Coverage::get_offset_size(b"some_key", bytes, 1),
1✔
234
                Some((80, 16))
235
            );
236
            assert_eq!(
1✔
237
                Coverage::get_offset_size(b"some_key", bytes, 2),
1✔
238
                Some((80, 16))
239
            );
240
            assert_eq!(
1✔
241
                Coverage::get_offset_size(b"some_key", bytes, 8),
1✔
242
                Some((80, 16))
243
            );
244
            assert_eq!(Coverage::get_offset_size(b"some_key", bytes, 32), None);
1✔
245

246
            // The data for "otherkey" starts at offset 83 and is 2 bytes long.
247
            // Access is only safe with single-byte alignment.
248
            assert_eq!(
1✔
249
                Coverage::get_offset_size(b"otherkey", bytes, 1),
1✔
250
                Some((83, 2))
251
            );
252
            assert_eq!(Coverage::get_offset_size(b"otherkey", bytes, 2), None);
1✔
253
            assert_eq!(Coverage::get_offset_size(b"otherkey", bytes, 8), None);
1✔
254
        }
1✔
255
    }
256
}
257

258
mod writer {
259
    use super::{
260
        Coverage, S2_CELL_ID_SHIFT, S2_GRANULARITY_LEVEL, is_wikidata_key, parse_wikidata_ids,
261
    };
262
    use crate::PROGRESS_BAR_STYLE;
263
    use anyhow::{Ok, Result, anyhow};
264
    use ext_sort::{ExternalSorter, ExternalSorterBuilder, buffer::LimitedBufferBuilder};
265
    use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
266
    use parquet::file::reader::{FileReader, SerializedFileReader};
267
    use parquet::record::RowAccessor;
268
    use parquet::schema::types::Type;
269
    use rayon::prelude::*;
270
    use s2::{
271
        cap::Cap,
272
        cell::Cell,
273
        cellid::CellID,
274
        region::RegionCoverer,
275
        s1::{Angle, ChordAngle},
276
    };
277
    use std::collections::HashSet;
278
    use std::fs::{File, remove_file, rename};
279
    use std::hash::{BuildHasherDefault, Hasher};
280
    use std::io::{BufReader, BufWriter, Seek, SeekFrom, Write};
281
    use std::path::{Path, PathBuf};
282
    use std::sync::atomic::{AtomicU64, Ordering};
283
    use std::sync::mpsc::{Receiver, SyncSender, sync_channel};
284

285
    /// Computes the spatial coverage of a set of places.
286
    pub fn build_coverage(atp: &Path, progress: &MultiProgress, workdir: &Path) -> Result<PathBuf> {
2✔
287
        assert!(atp.exists());
2✔
288

289
        let out = workdir.join("coverage");
2✔
290
        if out.exists() {
2✔
UNCOV
291
            log::info!("skipping build_coverage, already found {:?}", out);
×
292
            return Ok(out);
×
293
        } else {
294
            log::info!("build_coverage is starting");
2✔
295
        }
296

297
        let mut tmp = PathBuf::from(&out);
2✔
298
        tmp.add_extension("tmp");
2✔
299
        let mut writer = CoverageWriter::try_new(&tmp)?;
2✔
300

301
        // To avoid deadlock, we must not use Rayon threads here.
302
        // https://dev.to/sgchris/scoped-threads-with-stdthreadscope-in-rust-163-48f9
303
        let (cell_tx, cell_rx) = sync_channel::<CellID>(50_000);
2✔
304
        let (wikidata_tx, wikidata_rx) = sync_channel::<u64>(1000);
2✔
305

306
        // The AllThePlaces dump of 2026-01-03 references 3791 unique wikidata ids.
307
        let mut wikidata_ids = Vec::<u64>::new();
2✔
308
        std::thread::scope(|s| {
2✔
309
            let producer = s.spawn(|| read_places(atp, progress, cell_tx, wikidata_tx));
2✔
310
            let cell_consumer = s.spawn(|| build_spatial_coverage(cell_rx, progress, &mut writer));
2✔
311
            let wikidata_consumer = s.spawn(|| {
2✔
312
                wikidata_ids = collect_wikidata_ids(wikidata_rx);
2✔
313
                Ok(())
2✔
314
            });
2✔
315
            producer
2✔
316
                .join()
2✔
317
                .expect("producer panic")
2✔
318
                .and(cell_consumer.join().expect("cell consumer panic"))
2✔
319
                .and(wikidata_consumer.join().expect("wikidata consumer panic"))
2✔
320
        })?;
2✔
321

322
        writer.set_wikidata_ids(wikidata_ids);
2✔
323
        writer.close()?;
2✔
324
        rename(&tmp, &out)?;
2✔
325

326
        // As a sanity check, let’s try to open the freshly generated file.
327
        // This verifies the presence of the correct file signature, and that
328
        // the run_starts and run_lengths array are correctly aligned and
329
        // within allowable bounds.
330
        _ = Coverage::load(&out)?;
2✔
331

332
        log::info!("build_spatial_coverage finished, built {:?}", out);
2✔
333
        Ok(out)
2✔
334
    }
2✔
335

336
    fn read_places(
2✔
337
        places: &Path,
2✔
338
        progress: &MultiProgress,
2✔
339
        out_cells: SyncSender<CellID>,
2✔
340
        out_wikidata_ids: SyncSender<u64>,
2✔
341
    ) -> Result<()> {
2✔
342
        use anyhow::Context;
343
        let file =
2✔
344
            File::open(places).with_context(|| format!("could not open file `{:?}`", places))?;
2✔
345
        let reader = SerializedFileReader::new(file)?;
2✔
346
        let metadata = reader.metadata();
2✔
347
        let file_metadata = metadata.file_metadata();
2✔
348
        let schema = file_metadata.schema();
2✔
349
        let s2_cell_id_column = column_index("s2_cell_id", schema)?;
2✔
350
        // let source_column = column_index("source", schema)?;
351
        let tags_column = column_index("tags", schema)?;
2✔
352
        let num_row_groups = reader.num_row_groups();
2✔
353
        let num_rows: i64 = file_metadata.num_rows();
2✔
354
        let num_rows: u64 = if num_rows < 0 { 0 } else { num_rows as u64 };
2✔
355

356
        let large_radius = meters_to_chord_angle(100.0);
2✔
357
        let small_radius = meters_to_chord_angle(10.0);
2✔
358
        let coverer = RegionCoverer {
2✔
359
            max_cells: 8,
2✔
360
            min_level: S2_GRANULARITY_LEVEL,
2✔
361
            max_level: S2_GRANULARITY_LEVEL,
2✔
362
            level_mod: 1,
2✔
363
        };
2✔
364

365
        let bar = progress.add(ProgressBar::new(num_rows));
2✔
366
        bar.set_style(ProgressStyle::with_template(PROGRESS_BAR_STYLE)?);
2✔
367
        bar.set_prefix("cov.read ");
2✔
368
        bar.set_message("features");
2✔
369

370
        (0..num_row_groups)
2✔
371
            .into_par_iter()
2✔
372
            .try_for_each(|row_group_index| {
2✔
373
                // Because Apache’s implementation of Parquet is not
374
                // thread-safe, but the alternative implementation in
375
                // arrow2 is deprecated, we let each worker thread have
376
                // its own SerializedFileReader, each reading one row
377
                // group in the same Parquet file. This is a little
378
                // wasteful, but it’s actually not too bad.  In our
379
                // Parquet file for the full AllThePlace dump of
380
                // 2026-01-03, there were 24 row groups in total.
381
                // An earlier version of this code was using the
382
                // alternative implementation of the parquet2 crate,
383
                // but our application code got awfully complicated
384
                // when using that low-level library.
385
                let reader = SerializedFileReader::new(File::open(places)?)?;
2✔
386
                let row_group = reader.get_row_group(row_group_index)?;
2✔
387
                for row in row_group.get_row_iter(None)? {
14✔
388
                    let row = row?;
14✔
389
                    let s2_cell = Cell::from(CellID(row.get_ulong(s2_cell_id_column)?));
14✔
390
                    // let source = row.get_string(source_column)?;
391
                    let tags = row.get_map(tags_column)?.entries();
14✔
392
                    let mut radius = small_radius;
14✔
393
                    for (key, value) in tags.iter() {
136✔
394
                        use parquet::record::Field::Str;
395
                        if let (Str(key), Str(value)) = (key, value) {
136✔
396
                            radius = radius.max(match (key.as_ref(), value.as_ref()) {
136✔
397
                                ("shop", _) => large_radius,
136✔
398
                                ("tourism", _) => large_radius,
130✔
399
                                ("public_transport", "platform") => large_radius,
130✔
400
                                ("railway", "platform") => large_radius,
130✔
401
                                (_, _) => small_radius,
130✔
402
                            });
403
                            if is_wikidata_key(key) {
136✔
404
                                for id in parse_wikidata_ids(value) {
14✔
405
                                    out_wikidata_ids.send(id)?;
14✔
406
                                }
407
                            }
122✔
UNCOV
408
                        }
×
409
                    }
410
                    let cap = Cap::from_center_chordangle(&s2_cell.center(), &radius);
14✔
411
                    for cell_id in coverer.covering(&cap).0.into_iter() {
976✔
412
                        out_cells.send(cell_id)?;
976✔
413
                    }
414
                    bar.inc(1);
14✔
415
                }
416

417
                Ok(())
2✔
418
            })?;
2✔
419

420
        bar.finish();
2✔
421
        Ok(())
2✔
422
    }
2✔
423

424
    fn column_index(name: &str, schema: &Type) -> Result<usize> {
4✔
425
        for (i, field) in schema.get_fields().iter().enumerate() {
8✔
426
            if field.name() == name {
8✔
427
                return Ok(i);
4✔
428
            }
4✔
429
        }
UNCOV
430
        Err(anyhow!("column \"{}\" not found", name))
×
431
    }
4✔
432

433
    fn meters_to_chord_angle(radius_meters: f64) -> ChordAngle {
4✔
434
        use s2::s1::angle::Rad;
435
        const EARTH_RADIUS_METERS: f64 = 6_371_000.0;
436
        ChordAngle::from(Angle::from(Rad(radius_meters / EARTH_RADIUS_METERS)))
4✔
437
    }
4✔
438

439
    /// Builds a spatial coverage file from a stream of s2::CellIDs.
440
    fn build_spatial_coverage(
2✔
441
        cells: Receiver<CellID>,
2✔
442
        progress: &MultiProgress,
2✔
443
        writer: &mut CoverageWriter,
2✔
444
    ) -> Result<()> {
2✔
445
        let num_cells = AtomicU64::new(0);
2✔
446
        let sorter: ExternalSorter<CellID, std::io::Error, LimitedBufferBuilder> =
2✔
447
            ExternalSorterBuilder::new()
2✔
448
                .with_tmp_dir(Path::new("./"))
2✔
449
                .with_buffer(LimitedBufferBuilder::new(
2✔
450
                    1_000_000, /* preallocate */ true,
451
                ))
452
                .build()?;
2✔
453
        let sorted = sorter.sort(cells.iter().map(|x| {
976✔
454
            num_cells.fetch_add(1, Ordering::SeqCst);
976✔
455
            std::io::Result::Ok(x)
976✔
456
        }))?;
976✔
457

458
        let bar = progress.add(ProgressBar::new(num_cells.load(Ordering::SeqCst)));
2✔
459
        bar.set_style(ProgressStyle::with_template(PROGRESS_BAR_STYLE)?);
2✔
460
        bar.set_prefix("cov.write");
2✔
461
        bar.set_message("s2 cells");
2✔
462

463
        for cur in sorted {
976✔
464
            writer.write(cur?.0 >> S2_CELL_ID_SHIFT)?;
976✔
465
            bar.inc(1);
976✔
466
        }
467

468
        bar.finish();
2✔
469

470
        Ok(())
2✔
471
    }
2✔
472

473
    fn collect_wikidata_ids(stream: Receiver<u64>) -> Vec<u64> {
2✔
474
        // The AllThePlaces dump of 2026-01-03 references 3791 unique wikidata ids.
475
        let mut ids = HashSet::with_capacity_and_hasher(8192, NoOpBuildHasher::default());
2✔
476
        let mut last: u64 = 0; // not a valid wikidata id
2✔
477
        for id in stream {
14✔
478
            if id != last {
14✔
479
                ids.insert(id);
6✔
480
                last = id;
6✔
481
            }
8✔
482
        }
483

484
        let mut sorted_ids: Vec<u64> = ids.into_iter().collect();
2✔
485
        sorted_ids.sort();
2✔
486
        sorted_ids
2✔
487
    }
2✔
488

489
    struct CoverageWriter {
490
        writer: BufWriter<File>,
491
        run_starts_pos: u64, // offset of run_starts array relative to start of file
492

493
        run_lengths_path: PathBuf,
494
        run_lengths_writer: BufWriter<File>,
495

496
        num_values: u64,
497
        num_runs: u64,
498
        run_start: Option<u64>,
499
        run_length_minus_1: u8,
500

501
        wikidata_ids: Vec<u64>,
502
    }
503

504
    impl CoverageWriter {
505
        const NUM_HEADERS: usize = 3;
506

507
        fn try_new(path: &Path) -> Result<CoverageWriter> {
3✔
508
            let file = File::create(path)?;
3✔
509
            let mut writer = BufWriter::with_capacity(32768, file);
3✔
510

511
            // Write file header.
512
            writer.write_all(b"diffed-places coverage\0\0")?;
3✔
513
            writer.write_all(&(Self::NUM_HEADERS as u64).to_le_bytes())?;
3✔
514
            writer.write_all(&[0; 24 * Self::NUM_HEADERS])?; // leave space for headers
3✔
515

516
            let run_starts_pos = writer.stream_position()?;
3✔
517
            let run_lengths_path = path.with_extension("tmp_run_lengths");
3✔
518
            let run_lengths_file = File::create(&run_lengths_path)?;
3✔
519
            let run_lengths_writer = BufWriter::with_capacity(32768, run_lengths_file);
3✔
520

521
            Ok(CoverageWriter {
3✔
522
                writer,
3✔
523
                run_starts_pos,
3✔
524
                run_lengths_path,
3✔
525
                run_lengths_writer,
3✔
526
                num_values: 0,
3✔
527
                num_runs: 0,
3✔
528
                run_start: None,
3✔
529
                run_length_minus_1: 0,
3✔
530
                wikidata_ids: Vec::default(),
3✔
531
            })
3✔
532
        }
3✔
533

534
        fn write(&mut self, value: u64) -> Result<()> {
1,236✔
535
            let Some(run_start) = self.run_start else {
1,236✔
536
                self.num_values = 1;
3✔
537
                self.num_runs = 1;
3✔
538
                self.run_start = Some(value);
3✔
539
                self.run_length_minus_1 = 0;
3✔
540
                return Ok(());
3✔
541
            };
542

543
            let run_end: u64 = run_start + (self.run_length_minus_1 as u64);
1,233✔
544
            assert!(
1,233✔
545
                value >= run_end,
1,233✔
546
                "values not written in sort order: {} after {}",
547
                value,
548
                run_end
549
            );
550

551
            if value == run_end {
1,233✔
552
                // If we write the same value twice, we don’t need to do anything.
UNCOV
553
                return Ok(());
×
554
            } else if value == run_end + 1 && self.run_length_minus_1 < 0xff {
1,233✔
555
                // Extending the length of the current run, if there’s still
556
                // enough space to hold the new length in the available 8 bits.
557
                self.run_length_minus_1 += 1;
1,123✔
558
                self.num_values += 1;
1,123✔
559
                return Ok(());
1,123✔
560
            }
110✔
561

562
            // Start a new run with the current value.
563
            self.finish_run()?;
110✔
564
            self.run_start = Some(value);
110✔
565
            self.run_length_minus_1 = 0;
110✔
566
            self.num_values += 1;
110✔
567
            self.num_runs += 1;
110✔
568
            Ok(())
110✔
569
        }
1,236✔
570

571
        fn set_wikidata_ids(&mut self, ids: Vec<u64>) {
3✔
572
            assert!(ids.is_sorted());
3✔
573
            self.wikidata_ids = ids;
3✔
574
        }
3✔
575

576
        fn write_wikidata_ids(&mut self) -> Result<()> {
3✔
577
            for id in &self.wikidata_ids {
7✔
578
                self.writer.write_all(&id.to_le_bytes())?;
7✔
579
            }
580
            Ok(())
3✔
581
        }
3✔
582

583
        fn close(mut self) -> Result<()> {
3✔
584
            self.finish_run()?;
3✔
585
            self.writer.flush()?;
3✔
586

587
            // Append the run lengths, which are at this point in time in a temporary file,
588
            // to the end of the main file.
589
            let run_lengths_pos = self.writer.stream_position()?;
3✔
590
            self.run_lengths_writer.flush()?;
3✔
591
            assert_eq!(self.run_lengths_writer.stream_position()?, self.num_runs);
3✔
592
            self.run_lengths_writer.seek(SeekFrom::Start(0))?;
3✔
593
            let run_lengths_path: &Path = self.run_lengths_path.as_path();
3✔
594
            let mut reader = BufReader::new(File::open(run_lengths_path)?);
3✔
595
            std::io::copy(&mut reader, &mut self.writer)?;
3✔
596
            remove_file(run_lengths_path)?;
3✔
597

598
            // Append wikidata ids.
599
            self.write_padding(8)?;
3✔
600
            let wikidata_ids_pos = self.writer.stream_position()?;
3✔
601
            self.write_wikidata_ids()?;
3✔
602
            let wikidata_ids_size = self.writer.stream_position()? - wikidata_ids_pos;
3✔
603

604
            self.write_headers(&[
3✔
605
                ("runstart", self.run_starts_pos, self.num_runs * 8),
3✔
606
                ("runlengt", run_lengths_pos, self.num_runs),
3✔
607
                ("wikidata", wikidata_ids_pos, wikidata_ids_size),
3✔
608
            ])?;
3✔
609
            Ok(())
3✔
610
        }
3✔
611

612
        fn write_headers(&mut self, headers: &[(&str, u64, u64)]) -> Result<()> {
3✔
613
            self.writer.seek(SeekFrom::Start(32))?;
3✔
614
            assert_eq!(headers.len(), Self::NUM_HEADERS);
3✔
615
            for (id, pos, len) in headers {
9✔
616
                assert_eq!(
9✔
617
                    id.len(),
9✔
618
                    8,
619
                    "header id must be 8 chars long but \"{:?}\" is not",
620
                    id
621
                );
622
                self.writer.write_all(id.as_bytes())?;
9✔
623
                self.writer.write_all(&pos.to_le_bytes())?;
9✔
624
                self.writer.write_all(&len.to_le_bytes())?;
9✔
625
            }
626
            self.writer.flush()?;
3✔
627
            Ok(())
3✔
628
        }
3✔
629

630
        fn finish_run(&mut self) -> Result<()> {
113✔
631
            let Some(run_start) = self.run_start else {
113✔
UNCOV
632
                return Ok(());
×
633
            };
634
            self.writer.write_all(&run_start.to_le_bytes())?;
113✔
635
            self.run_lengths_writer
113✔
636
                .write_all(&[self.run_length_minus_1])?;
113✔
637
            Ok(())
113✔
638
        }
113✔
639

640
        fn write_padding(&mut self, alignment: usize) -> Result<()> {
3✔
641
            let pos = self.writer.stream_position()?;
3✔
642
            let alignment = alignment as u64;
3✔
643
            let num_bytes = ((alignment - (pos % alignment)) % alignment) as usize;
3✔
644
            if num_bytes > 0 {
3✔
645
                let padding = vec![0; num_bytes];
3✔
646
                self.writer.write_all(&padding)?;
3✔
UNCOV
647
            }
×
648
            Ok(())
3✔
649
        }
3✔
650
    }
651

652
    #[derive(Default)]
653
    struct NoOpHasher(u64);
654

655
    impl Hasher for NoOpHasher {
656
        fn write(&mut self, bytes: &[u8]) {
1✔
657
            if let Some(value) = bytes.last() {
1✔
658
                self.0 = *value as u64;
1✔
659
            }
1✔
660
        }
1✔
661

662
        fn write_u64(&mut self, value: u64) {
7✔
663
            self.0 = value;
7✔
664
        }
7✔
665

666
        fn finish(&self) -> u64 {
8✔
667
            self.0
8✔
668
        }
8✔
669
    }
670

671
    type NoOpBuildHasher = BuildHasherDefault<NoOpHasher>;
672

673
    #[cfg(test)]
674
    mod tests {
675
        use super::super::Coverage;
676
        use super::{CoverageWriter, S2_CELL_ID_SHIFT, S2_GRANULARITY_LEVEL, build_coverage};
677
        use anyhow::{Ok, Result};
678
        use indicatif::{MultiProgress, ProgressDrawTarget};
679
        use s2::cellid::CellID;
680
        use std::path::PathBuf;
681
        use tempfile::{NamedTempFile, TempDir};
682

683
        #[test]
684
        fn test_cell_id_shift() {
1✔
685
            let id = CellID::from_face_pos_level(3, 0x12345678, S2_GRANULARITY_LEVEL as u64);
1✔
686
            let range_len = id.range_max().0 - id.range_min().0 + 2;
1✔
687
            assert_eq!(id.level() as u8, S2_GRANULARITY_LEVEL);
1✔
688
            assert_eq!(range_len.ilog2() as u8, S2_CELL_ID_SHIFT);
1✔
689
        }
1✔
690

691
        #[test]
692
        fn test_build_coverage() -> Result<()> {
1✔
693
            use std::os::unix::fs::symlink;
694
            let mut atp = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
1✔
695
            atp.push("tests/test_data/alltheplaces.parquet");
1✔
696
            let workdir = TempDir::new()?;
1✔
697
            symlink(&atp, workdir.path().join("alltheplaces.parquet"))?;
1✔
698
            let progress = MultiProgress::with_draw_target(ProgressDrawTarget::hidden());
1✔
699
            let coverage = build_coverage(&atp, &progress, workdir.path())?;
1✔
700
            assert!(coverage.exists());
1✔
701
            Ok(())
1✔
702
        }
1✔
703

704
        #[test]
705
        fn test_coverage_writer() -> Result<()> {
1✔
706
            let temp_file = NamedTempFile::new()?;
1✔
707
            let mut writer = CoverageWriter::try_new(temp_file.path())?;
1✔
708
            writer.write(7)?;
1✔
709
            for i in 1000..=1258 {
259✔
710
                writer.write(i)?;
259✔
711
            }
712
            writer.set_wikidata_ids(vec![23, 77, 88]);
1✔
713
            writer.close()?;
1✔
714

715
            let cov = Coverage::load(temp_file.path())?;
1✔
716
            for i in &[0, 5, 6, 8, 9, 999, 1259] {
7✔
717
                let cell = CellID(i << S2_CELL_ID_SHIFT);
7✔
718
                assert!(
7✔
719
                    !cov.contains_s2_cell(&cell),
7✔
720
                    "cell {:?} for i={} should not be covered, but is",
721
                    cell,
722
                    i,
723
                );
724
            }
725
            for i in &[7, 1000, 1001, 1002, 1111, 1254, 1255, 1256, 1257, 1258] {
10✔
726
                let cell = CellID(i << S2_CELL_ID_SHIFT);
10✔
727
                assert!(
10✔
728
                    cov.contains_s2_cell(&cell),
10✔
729
                    "cell {:?} for i={} should be covered, but is not",
730
                    cell,
731
                    i,
732
                );
733
            }
734

735
            assert_eq!(cov.contains_wikidata_item(1), false);
1✔
736
            assert_eq!(cov.contains_wikidata_item(23), true);
1✔
737
            assert_eq!(cov.contains_wikidata_item(51), false);
1✔
738
            assert_eq!(cov.contains_wikidata_item(77), true);
1✔
739
            assert_eq!(cov.contains_wikidata_item(88), true);
1✔
740
            assert_eq!(cov.contains_wikidata_item(89), false);
1✔
741

742
            Ok(())
1✔
743
        }
1✔
744
    }
745

746
    #[test]
747
    fn test_no_op_hasher() {
1✔
748
        let mut h = NoOpHasher(7);
1✔
749

750
        h.write_u64(2600);
1✔
751
        assert_eq!(h.finish(), 2600);
1✔
752

753
        h.write(&[31, 33, 7]);
1✔
754
        assert_eq!(h.finish(), 7);
1✔
755
    }
1✔
756
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc