• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jzombie / rust-triplets / 23724991092

30 Mar 2026 02:13AM UTC coverage: 95.295% (+0.003%) from 95.292%
23724991092

Pull #45

github

web-flow
Merge 0de5c3072 into add1496a9
Pull Request #45: Parallel sampling

62 of 62 new or added lines in 1 file covered. (100.0%)

1 existing line in 1 file now uncovered.

16425 of 17236 relevant lines covered (95.29%)

3502.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.38
/src/source/mod.rs
1
//! Data source interfaces and paging helpers.
2
//!
3
//! Ownership model:
4
//! - `DataSource` is the sampler-facing interface that produces batches.
5
//! - `IndexableSource` exposes stable, index-based access into a corpus.
6
//! - `IndexablePager` owns the deterministic pseudo-random paging logic and
7
//!   can page any indexable source without retaining per-record state.
8

9
use chrono::{DateTime, Utc};
10
use std::hash::Hash;
11
use std::sync::Arc;
12
use std::time::Instant;
13

14
use crate::config::{SamplerConfig, TripletRecipe};
15
use crate::data::DataRecord;
16
use crate::errors::SamplerError;
17
use crate::hash::stable_hash_with;
18
use crate::types::SourceId;
19

20
/// Source implementation modules.
21
pub mod backends;
22
/// Utility helpers used by source implementations.
23
pub mod indexing;
24
pub use backends::file_source::{
25
    FileSource, FileSourceConfig, SectionBuilder, TaxonomyBuilder, anchor_context_sections,
26
    taxonomy_from_path,
27
};
28
#[cfg(feature = "huggingface")]
29
pub use backends::huggingface_source::{
30
    HuggingFaceRowSource, HuggingFaceRowsConfig, managed_hf_list_snapshot_dir,
31
    managed_hf_snapshot_dir,
32
};
33

34
/// Source-owned incremental refresh position.
35
///
36
/// The sampler stores and returns this value between refresh calls.
37
/// `revision` is opaque to the sampler and interpreted only by the source.
38
#[derive(Clone, Debug)]
39
pub struct SourceCursor {
40
    /// Most recent observation timestamp produced by the source.
41
    pub last_seen: DateTime<Utc>,
42
    /// Opaque paging position token used to continue incremental refresh.
43
    pub revision: u64,
44
}
45

46
/// Result of a single source refresh call.
47
///
48
/// Pass the returned `cursor` back into the next refresh to continue paging.
49
#[derive(Clone, Debug)]
50
pub struct SourceSnapshot {
51
    /// Records returned by the refresh operation.
52
    pub records: Vec<DataRecord>,
53
    /// Next cursor to pass into a future refresh call.
54
    pub cursor: SourceCursor,
55
}
56

57
/// Sampler-facing data source interface.
58
///
59
/// Implementations may be streaming or index-backed. For a fixed dataset state
60
/// and cursor, refresh output should be deterministic.
61
pub trait DataSource: Send + Sync {
62
    /// Stable source identifier used in records, metrics, and persistence state.
63
    fn id(&self) -> &str;
64
    /// Fetch up to `limit` records starting from `cursor` state.
65
    ///
66
    /// Return the next cursor position in `SourceSnapshot.cursor`.
67
    fn refresh(
68
        &self,
69
        config: &SamplerConfig,
70
        cursor: Option<&SourceCursor>,
71
        limit: Option<usize>,
72
    ) -> Result<SourceSnapshot, SamplerError>;
73

74
    /// Exact metadata record count reported by the source.
75
    ///
76
    /// This is intended for estimators that must avoid iterating records.
77
    /// Implementations should return `Ok(count)` only when the count is
78
    /// exact for the source scope. Return `Err` when exact counting is not
79
    /// possible or the source is unavailable.
80
    ///
81
    /// Keep this consistent with `refresh` by using the same backend scope,
82
    /// filtering, and logical corpus definition.
83
    fn reported_record_count(&self, config: &SamplerConfig) -> Result<u128, SamplerError>;
84

85
    /// Optional source-provided default triplet recipes.
86
    ///
87
    /// Used when sampler config does not provide explicit recipes.
88
    fn default_triplet_recipes(&self) -> Vec<TripletRecipe> {
41✔
89
        Vec::new()
41✔
90
    }
41✔
91
}
92

93
/// Index-addressable source interface used by deterministic pagers.
94
///
95
/// `len_hint` must be stable within an epoch, and `record_at` must return the
96
/// record corresponding to the same index across runs.
97
///
98
/// Dense indexing is strongly recommended: implement indices as `0..len_hint`
99
/// with minimal gaps. Sparse indexes (returning `None` for many positions)
100
/// still work but waste paging capacity and reduce batch fill rates.
101
pub trait IndexableSource: Send + Sync {
102
    /// Stable source identifier.
103
    fn id(&self) -> &str;
104
    /// Current index domain size, typically `Some(total_records)`.
105
    fn len_hint(&self) -> Option<usize>;
106
    /// Return the record at index `idx`, or `None` for sparse/missing positions.
107
    fn record_at(&self, idx: usize) -> Result<Option<DataRecord>, SamplerError>;
108
}
109

110
/// Deterministic pager for `IndexableSource`.
111
///
112
/// Encapsulates shuffle seed and cursor math so callers can reuse a stable
113
/// paging algorithm without implementing permutation logic themselves.
114
pub struct IndexablePager {
115
    source_id: SourceId,
116
}
117

118
impl IndexablePager {
119
    /// Create a new deterministic pager for `source_id`.
120
    pub fn new(source_id: impl Into<SourceId>) -> Self {
11✔
121
        Self {
11✔
122
            source_id: source_id.into(),
11✔
123
        }
11✔
124
    }
11✔
125

126
    /// Page records from an `IndexableSource` using the provided cursor.
127
    pub fn refresh(
6✔
128
        &self,
6✔
129
        source: &dyn IndexableSource,
6✔
130
        cursor: Option<&SourceCursor>,
6✔
131
        limit: Option<usize>,
6✔
132
    ) -> Result<SourceSnapshot, SamplerError> {
6✔
133
        let total = source
6✔
134
            .len_hint()
6✔
135
            .ok_or_else(|| SamplerError::SourceInconsistent {
6✔
136
                source_id: source.id().to_string(),
1✔
137
                details: "indexable source did not provide len_hint".into(),
1✔
138
            })?;
1✔
139
        self.refresh_with(total, cursor, limit, |idx| source.record_at(idx))
76✔
140
    }
6✔
141

142
    /// Page records using a custom index fetcher.
143
    ///
144
    /// Useful when records are indexable but not exposed through `IndexableSource`
145
    /// (for example, temporary index stores or precomputed path lists).
146
    ///
147
    /// The fetcher is called concurrently using rayon. It must be `Fn + Send + Sync`
148
    /// (not merely `FnMut`). All callers that pass a closure over a shared
149
    /// `&IndexableSource` satisfy this because `record_at` takes `&self`.
150
    pub fn refresh_with<F>(
10✔
151
        &self,
10✔
152
        total: usize,
10✔
153
        cursor: Option<&SourceCursor>,
10✔
154
        limit: Option<usize>,
10✔
155
        fetch: F,
10✔
156
    ) -> Result<SourceSnapshot, SamplerError>
10✔
157
    where
10✔
158
        F: Fn(usize) -> Result<Option<DataRecord>, SamplerError> + Send + Sync,
10✔
159
    {
160
        if total == 0 {
10✔
161
            return Ok(SourceSnapshot {
1✔
162
                records: Vec::new(),
1✔
163
                cursor: SourceCursor {
1✔
164
                    last_seen: Utc::now(),
1✔
165
                    revision: 0,
1✔
166
                },
1✔
167
            });
1✔
168
        }
9✔
169
        let mut start = cursor.map(|cursor| cursor.revision as usize).unwrap_or(0);
9✔
170
        if start >= total {
9✔
171
            start = 0;
1✔
172
        }
8✔
173
        let max = limit.unwrap_or(total);
9✔
174
        let seed = Self::seed_for(&self.source_id, total);
9✔
175

176
        // Pre-generate the full permuted index sequence with per-position cursor
177
        // values. Pure integer arithmetic — negligible cost vs. record fetch.
178
        let mut permutation = IndexPermutation::new(total, seed, start as u64);
9✔
179
        let seq: Vec<(usize, usize)> = (0..total)
9✔
180
            .map(|_| {
12,296✔
181
                let idx = permutation.next();
12,296✔
182
                (idx, permutation.cursor())
12,296✔
183
            })
12,296✔
184
            .collect();
9✔
185

186
        let should_report = total >= 10_000 || max >= 1_024;
9✔
187
        let refresh_start = Instant::now();
9✔
188
        if should_report {
9✔
189
            eprintln!(
2✔
190
                "[triplets:source] refresh start source='{}' total={} target={}",
2✔
191
                self.source_id, total, max
2✔
192
            );
2✔
193
        }
7✔
194

195
        use rayon::prelude::*;
196
        // Only process the first `max` entries in parallel. Since almost
197
        // all index positions return Some, this fills the quota in a single
198
        // parallel pass. Any residual shortage (from None returns) is
199
        // handled by a short sequential sweep of the remaining entries.
200
        let par_end = max.min(total);
9✔
201
        let results: Vec<Result<Option<DataRecord>, SamplerError>> = seq[..par_end]
9✔
202
            .par_iter()
9✔
203
            .map(|&(idx, _)| fetch(idx))
1,110✔
204
            .collect();
9✔
205
        let mut records = Vec::with_capacity(max.min(total));
9✔
206
        let mut final_cursor = start;
9✔
207
        for (result, &(_, cursor_after)) in results.into_iter().zip(seq[..par_end].iter()) {
1,109✔
208
            if records.len() >= max {
1,109✔
UNCOV
209
                break;
×
210
            }
1,109✔
211
            if let Some(r) = result? {
1,109✔
212
                records.push(r)
80✔
213
            }
1,028✔
214
            final_cursor = cursor_after;
1,108✔
215
        }
216
        // Sequential fallback for any shortage caused by None returns.
217
        for &(idx, cursor_after) in &seq[par_end..] {
985✔
218
            if records.len() >= max {
985✔
219
                break;
5✔
220
            }
980✔
221
            if let Some(r) = fetch(idx)? {
980✔
222
                records.push(r);
4✔
223
            }
976✔
224
            final_cursor = cursor_after;
980✔
225
        }
226

227
        if should_report {
8✔
228
            eprintln!(
2✔
229
                "[triplets:source] refresh done source='{}' attempted={} fetched={} elapsed={:.2}s",
2✔
230
                self.source_id,
2✔
231
                total,
2✔
232
                records.len(),
2✔
233
                refresh_start.elapsed().as_secs_f64()
2✔
234
            );
2✔
235
        }
6✔
236
        let last_seen = records
8✔
237
            .iter()
8✔
238
            .map(|record| record.updated_at)
8✔
239
            .max()
8✔
240
            .unwrap_or_else(Utc::now);
8✔
241
        Ok(SourceSnapshot {
8✔
242
            records,
8✔
243
            cursor: SourceCursor {
8✔
244
                last_seen,
8✔
245
                revision: final_cursor as u64,
8✔
246
            },
8✔
247
        })
8✔
248
    }
10✔
249

250
    /// Build a deterministic seed for a source and total size.
251
    pub(crate) fn seed_for(source_id: &SourceId, total: usize) -> u64 {
55✔
252
        Self::stable_index_shuffle_key(source_id, 0)
55✔
253
            ^ Self::stable_index_shuffle_key(source_id, total)
55✔
254
    }
55✔
255

256
    /// Build a deterministic seed for a source/total pair with explicit sampler seed.
257
    #[cfg(any(test, feature = "huggingface"))]
258
    pub(crate) fn seed_for_sampler(source_id: &SourceId, total: usize, sampler_seed: u64) -> u64 {
43✔
259
        Self::seed_for(source_id, total)
43✔
260
            ^ stable_hash_with(|hasher| {
43✔
261
                "triplets_sampler_seed".hash(hasher);
43✔
262
                source_id.hash(hasher);
43✔
263
                total.hash(hasher);
43✔
264
                sampler_seed.hash(hasher);
43✔
265
            })
43✔
266
    }
43✔
267

268
    fn stable_index_shuffle_key(source_id: &SourceId, idx: usize) -> u64 {
110✔
269
        stable_hash_with(|hasher| {
110✔
270
            source_id.hash(hasher);
110✔
271
            idx.hash(hasher);
110✔
272
        })
110✔
273
    }
110✔
274
}
275

276
/// DataSource adapter that pages an `IndexableSource` via `IndexablePager`.
277
pub struct IndexableAdapter<T: IndexableSource> {
278
    inner: T,
279
}
280

281
impl<T: IndexableSource> IndexableAdapter<T> {
282
    /// Wrap an `IndexableSource` so it can be registered as a `DataSource`.
283
    pub fn new(inner: T) -> Self {
4✔
284
        Self { inner }
4✔
285
    }
4✔
286
}
287

288
impl<T: IndexableSource> DataSource for IndexableAdapter<T> {
289
    fn id(&self) -> &str {
1✔
290
        self.inner.id()
1✔
291
    }
1✔
292

293
    fn refresh(
5✔
294
        &self,
5✔
295
        _config: &SamplerConfig,
5✔
296
        cursor: Option<&SourceCursor>,
5✔
297
        limit: Option<usize>,
5✔
298
    ) -> Result<SourceSnapshot, SamplerError> {
5✔
299
        let pager = IndexablePager::new(self.inner.id());
5✔
300
        pager.refresh(&self.inner, cursor, limit)
5✔
301
    }
5✔
302

303
    fn reported_record_count(&self, _config: &SamplerConfig) -> Result<u128, SamplerError> {
2✔
304
        self.inner
2✔
305
            .len_hint()
2✔
306
            .map(|value| value as u128)
2✔
307
            .ok_or_else(|| SamplerError::SourceInconsistent {
2✔
308
                source_id: self.inner.id().to_string(),
1✔
309
                details: "indexable source did not provide len_hint".into(),
1✔
310
            })
1✔
311
    }
2✔
312
}
313

314
/// Internal permutation used by `IndexablePager`.
315
pub(crate) struct IndexPermutation {
316
    total: u64,
317
    domain_bits: u32,
318
    domain_size: u64,
319
    seed: u64,
320
    counter: u64,
321
}
322

323
impl IndexPermutation {
324
    fn new(total: usize, seed: u64, counter: u64) -> Self {
77✔
325
        let total_u64 = total as u64;
77✔
326
        let domain_bits = (64 - (total_u64 - 1).leading_zeros()).max(1);
77✔
327
        let domain_size = 1u64 << domain_bits;
77✔
328
        Self {
77✔
329
            total: total_u64,
77✔
330
            domain_bits,
77✔
331
            domain_size,
77✔
332
            seed,
77✔
333
            counter,
77✔
334
        }
77✔
335
    }
77✔
336

337
    fn next(&mut self) -> usize {
12,997✔
338
        loop {
339
            let v =
19,464✔
340
                Self::permute_bits(self.counter % self.domain_size, self.domain_bits, self.seed);
19,464✔
341
            self.counter = self.counter.wrapping_add(1);
19,464✔
342
            if v < self.total {
19,464✔
343
                return v as usize;
12,997✔
344
            }
6,467✔
345
        }
346
    }
12,997✔
347

348
    fn cursor(&self) -> usize {
12,344✔
349
        (self.counter as usize) % (self.total as usize)
12,344✔
350
    }
12,344✔
351
    fn permute_bits(value: u64, bits: u32, seed: u64) -> u64 {
19,466✔
352
        if bits == 0 {
19,466✔
353
            return 0;
1✔
354
        }
19,465✔
355
        let mask = if bits == 64 {
19,465✔
356
            u64::MAX
×
357
        } else {
358
            (1u64 << bits) - 1
19,465✔
359
        };
360
        let mut a = (seed | 1) & mask;
19,465✔
361
        if a == 0 {
19,465✔
362
            a = 1;
×
363
        }
19,465✔
364
        let b = (seed >> 1) & mask;
19,465✔
365
        a.wrapping_mul(value).wrapping_add(b) & mask
19,465✔
366
    }
19,466✔
367
}
368

369
/// In-memory data source for tests and small datasets.
370
pub struct InMemorySource {
371
    id: SourceId,
372
    records: Arc<Vec<DataRecord>>,
373
}
374

375
impl InMemorySource {
376
    /// Create an in-memory source from prebuilt records.
377
    pub fn new(id: impl Into<SourceId>, records: Vec<DataRecord>) -> Self {
269✔
378
        Self {
269✔
379
            id: id.into(),
269✔
380
            records: Arc::new(records),
269✔
381
        }
269✔
382
    }
269✔
383
}
384

385
impl DataSource for InMemorySource {
386
    fn id(&self) -> &str {
7,378✔
387
        &self.id
7,378✔
388
    }
7,378✔
389

390
    fn refresh(
870✔
391
        &self,
870✔
392
        _config: &SamplerConfig,
870✔
393
        cursor: Option<&SourceCursor>,
870✔
394
        limit: Option<usize>,
870✔
395
    ) -> Result<SourceSnapshot, SamplerError> {
870✔
396
        let records = &*self.records;
870✔
397
        let total = records.len();
870✔
398
        let mut start = cursor.map(|cursor| cursor.revision as usize).unwrap_or(0);
870✔
399
        if total > 0 && start >= total {
870✔
400
            start = 0;
1✔
401
        }
869✔
402
        let max = limit.unwrap_or(total);
870✔
403
        let mut filtered = Vec::new();
870✔
404
        for idx in 0..total {
11,891✔
405
            if filtered.len() >= max {
11,891✔
406
                break;
133✔
407
            }
11,758✔
408
            let pos = (start + idx) % total;
11,758✔
409
            filtered.push(records[pos].clone());
11,758✔
410
        }
411
        let last_seen = filtered
870✔
412
            .iter()
870✔
413
            .map(|record| record.updated_at)
870✔
414
            .max()
870✔
415
            .unwrap_or_else(Utc::now);
870✔
416
        let next_start = if total == 0 {
870✔
417
            0
×
418
        } else {
419
            (start + filtered.len()) % total
870✔
420
        };
421
        Ok(SourceSnapshot {
870✔
422
            records: filtered,
870✔
423
            cursor: SourceCursor {
870✔
424
                last_seen,
870✔
425
                revision: next_start as u64,
870✔
426
            },
870✔
427
        })
870✔
428
    }
870✔
429

430
    fn reported_record_count(&self, _config: &SamplerConfig) -> Result<u128, SamplerError> {
1✔
431
        Ok(self.records.len() as u128)
1✔
432
    }
1✔
433
}
434

435
#[cfg(test)]
436
mod tests {
437
    use super::*;
438
    use crate::data::{QualityScore, RecordSection, SectionRole};
439
    use crate::types::RecordId;
440
    use chrono::Duration;
441
    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
442
    use std::thread;
443
    use std::time::Duration as StdDuration;
444

445
    /// Minimal `IndexableSource` test fixture.
446
    struct IndexableStub {
447
        id: SourceId,
448
        count: usize,
449
    }
450

451
    struct NoLenHintStub {
452
        id: SourceId,
453
    }
454

455
    impl IndexableStub {
456
        fn new(id: &str, count: usize) -> Self {
3✔
457
            Self {
3✔
458
                id: id.to_string(),
3✔
459
                count,
3✔
460
            }
3✔
461
        }
3✔
462
    }
463

464
    impl NoLenHintStub {
465
        fn new(id: &str) -> Self {
2✔
466
            Self { id: id.to_string() }
2✔
467
        }
2✔
468
    }
469

470
    impl IndexableSource for IndexableStub {
471
        fn id(&self) -> &str {
6✔
472
            &self.id
6✔
473
        }
6✔
474

475
        fn len_hint(&self) -> Option<usize> {
6✔
476
            Some(self.count)
6✔
477
        }
6✔
478

479
        fn record_at(&self, idx: usize) -> Result<Option<DataRecord>, SamplerError> {
76✔
480
            if idx >= self.count {
76✔
481
                return Ok(None);
×
482
            }
76✔
483
            let now = Utc::now();
76✔
484
            Ok(Some(DataRecord {
76✔
485
                id: format!("record_{idx}"),
76✔
486
                source: self.id.clone(),
76✔
487
                created_at: now,
76✔
488
                updated_at: now,
76✔
489
                quality: QualityScore { trust: 1.0 },
76✔
490
                taxonomy: Vec::new(),
76✔
491
                sections: vec![RecordSection {
76✔
492
                    role: SectionRole::Anchor,
76✔
493
                    heading: None,
76✔
494
                    text: "stub".into(),
76✔
495
                    sentences: vec!["stub".into()],
76✔
496
                }],
76✔
497
                meta_prefix: None,
76✔
498
            }))
76✔
499
        }
76✔
500
    }
501

502
    impl IndexableSource for NoLenHintStub {
503
        fn id(&self) -> &str {
2✔
504
            &self.id
2✔
505
        }
2✔
506

507
        fn len_hint(&self) -> Option<usize> {
2✔
508
            None
2✔
509
        }
2✔
510

511
        fn record_at(&self, _idx: usize) -> Result<Option<DataRecord>, SamplerError> {
×
512
            Ok(None)
×
513
        }
×
514
    }
515

516
    #[test]
517
    fn indexable_adapter_pages_in_stable_order() {
1✔
518
        let adapter = IndexableAdapter::new(IndexableStub::new("stub", 6));
1✔
519
        let config = SamplerConfig::default();
1✔
520
        let full = adapter.refresh(&config, None, None).unwrap();
1✔
521
        let full_ids: Vec<RecordId> = full.records.into_iter().map(|r| r.id).collect();
1✔
522

523
        let mut cursor = None;
1✔
524
        let mut paged = Vec::new();
1✔
525
        for _ in 0..3 {
1✔
526
            let snapshot = adapter.refresh(&config, cursor.as_ref(), Some(2)).unwrap();
3✔
527
            cursor = Some(snapshot.cursor);
3✔
528
            paged.extend(snapshot.records.into_iter().map(|r| r.id));
3✔
529
        }
530
        assert_eq!(paged, full_ids);
1✔
531
    }
1✔
532

533
    #[test]
534
    fn indexable_paging_spans_multiple_regimes() {
1✔
535
        // Use a source id whose permutation step is not 1 or -1 mod 2^k,
536
        // otherwise the sequence would be a simple rotation/reversal.
537
        let total = 256usize;
1✔
538
        let mask = (1u64 << (64 - (total as u64 - 1).leading_zeros())) - 1;
1✔
539
        let source_id = (0..512)
1✔
540
            .map(|idx| format!("regime_test_{idx}"))
1✔
541
            .find(|id| {
1✔
542
                let seed = IndexablePager::seed_for(id, total);
1✔
543
                let a = (seed | 1) & mask;
1✔
544
                a != 1 && a != mask
1✔
545
            })
1✔
546
            .unwrap();
1✔
547

548
        // Pull a single page and ensure the indices are spread across the space,
549
        // which indicates the permutation isn't stuck in a narrow regime.
550
        let adapter = IndexableAdapter::new(IndexableStub::new(&source_id, total));
1✔
551
        let snapshot = adapter
1✔
552
            .refresh(&SamplerConfig::default(), None, Some(64))
1✔
553
            .unwrap();
1✔
554
        let indices: Vec<usize> = snapshot
1✔
555
            .records
1✔
556
            .into_iter()
1✔
557
            .map(|r| {
64✔
558
                r.id.strip_prefix("record_")
64✔
559
                    .unwrap()
64✔
560
                    .parse::<usize>()
64✔
561
                    .unwrap()
64✔
562
            })
64✔
563
            .collect();
1✔
564
        let min_idx = *indices.iter().min().unwrap();
1✔
565
        let max_idx = *indices.iter().max().unwrap();
1✔
566
        assert!(
1✔
567
            max_idx - min_idx >= total / 2,
1✔
568
            "expected spread across the index space, got min={min_idx} max={max_idx}"
569
        );
570
    }
1✔
571

572
    #[test]
573
    fn indexable_pager_errors_when_len_hint_missing() {
1✔
574
        let pager = IndexablePager::new("no_len_hint");
1✔
575
        let source = NoLenHintStub::new("no_len_hint");
1✔
576
        let result = pager.refresh(&source, None, Some(3));
1✔
577
        assert!(result.is_err());
1✔
578
    }
1✔
579

580
    #[test]
581
    fn indexable_adapter_reported_count_errors_when_len_hint_missing() {
1✔
582
        let adapter = IndexableAdapter::new(NoLenHintStub::new("no_len_hint"));
1✔
583
        let result = adapter.reported_record_count(&SamplerConfig::default());
1✔
584
        assert!(result.is_err());
1✔
585
    }
1✔
586

587
    #[test]
588
    fn indexable_pager_refresh_with_zero_total_returns_empty_snapshot() {
1✔
589
        let pager = IndexablePager::new("empty");
1✔
590
        let snapshot = pager
1✔
591
            .refresh_with(0, None, Some(4), |_idx| Ok(None))
1✔
592
            .unwrap();
1✔
593
        assert!(snapshot.records.is_empty());
1✔
594
        assert_eq!(snapshot.cursor.revision, 0);
1✔
595
    }
1✔
596

597
    #[test]
598
    fn in_memory_source_refresh_wraps_cursor_and_uses_latest_timestamp() {
1✔
599
        let now = Utc::now();
1✔
600
        let older = now - Duration::seconds(5);
1✔
601
        let newer = now + Duration::seconds(5);
1✔
602
        let mk = |id: &str, ts: chrono::DateTime<Utc>| DataRecord {
1✔
603
            id: id.to_string(),
2✔
604
            source: "mem".to_string(),
2✔
605
            created_at: ts,
2✔
606
            updated_at: ts,
2✔
607
            quality: QualityScore { trust: 1.0 },
2✔
608
            taxonomy: Vec::new(),
2✔
609
            sections: vec![RecordSection {
2✔
610
                role: SectionRole::Anchor,
2✔
611
                heading: None,
2✔
612
                text: id.to_string(),
2✔
613
                sentences: vec![id.to_string()],
2✔
614
            }],
2✔
615
            meta_prefix: None,
2✔
616
        };
2✔
617

618
        let source = InMemorySource::new("mem", vec![mk("a", older), mk("b", newer)]);
1✔
619
        let cursor = SourceCursor {
1✔
620
            last_seen: now,
1✔
621
            revision: 7,
1✔
622
        };
1✔
623

624
        let snapshot = source
1✔
625
            .refresh(&SamplerConfig::default(), Some(&cursor), Some(1))
1✔
626
            .unwrap();
1✔
627
        assert_eq!(snapshot.records.len(), 1);
1✔
628
        assert_eq!(snapshot.records[0].id, "a");
1✔
629
        assert_eq!(snapshot.cursor.revision, 1);
1✔
630
        assert_eq!(snapshot.cursor.last_seen, older);
1✔
631
    }
1✔
632

633
    #[test]
634
    fn index_permutation_permute_bits_handles_zero_bits_and_zero_seed_path() {
1✔
635
        assert_eq!(IndexPermutation::permute_bits(123, 0, 99), 0);
1✔
636

637
        let bits = 1;
1✔
638
        let value = 1;
1✔
639
        let out = IndexPermutation::permute_bits(value, bits, 0);
1✔
640
        assert!(out <= 1);
1✔
641
    }
1✔
642

643
    #[test]
644
    fn index_permutation_next_stays_within_total_and_cursor_advances() {
1✔
645
        let mut perm = IndexPermutation::new(3, 7, 0);
1✔
646
        let mut seen = Vec::new();
1✔
647
        for _ in 0..8 {
8✔
648
            seen.push(perm.next());
8✔
649
        }
8✔
650
        assert!(seen.iter().all(|idx| *idx < 3));
8✔
651
        assert!(perm.cursor() < 3);
1✔
652
    }
1✔
653

654
    #[test]
655
    fn indexable_pager_large_refresh_triggers_reporting_branch_and_wraps_cursor() {
1✔
656
        let pager = IndexablePager::new("reporting");
1✔
657
        let cursor = SourceCursor {
1✔
658
            last_seen: Utc::now(),
1✔
659
            revision: 20_000,
1✔
660
        };
1✔
661
        let snapshot = pager
1✔
662
            .refresh_with(10_000, Some(&cursor), Some(4), |idx| {
4✔
663
                Ok(Some(DataRecord {
4✔
664
                    id: format!("record_{idx}"),
4✔
665
                    source: "reporting".to_string(),
4✔
666
                    created_at: Utc::now(),
4✔
667
                    updated_at: Utc::now(),
4✔
668
                    quality: QualityScore { trust: 1.0 },
4✔
669
                    taxonomy: Vec::new(),
4✔
670
                    sections: vec![RecordSection {
4✔
671
                        role: SectionRole::Anchor,
4✔
672
                        heading: None,
4✔
673
                        text: "t".to_string(),
4✔
674
                        sentences: vec!["t".to_string()],
4✔
675
                    }],
4✔
676
                    meta_prefix: None,
4✔
677
                }))
4✔
678
            })
4✔
679
            .unwrap();
1✔
680

681
        assert_eq!(snapshot.records.len(), 4);
1✔
682
        assert!(snapshot.cursor.revision < 10_000);
1✔
683
    }
1✔
684

685
    #[test]
686
    fn indexable_pager_reporting_branch_emits_progress_when_refresh_is_slow() {
1✔
687
        let pager = IndexablePager::new("slow_reporting");
1✔
688
        let slept = AtomicBool::new(false);
1✔
689
        let snapshot = pager
1✔
690
            .refresh_with(2_000, None, Some(1_024), |_idx| {
2,000✔
691
                if !slept.swap(true, Ordering::Relaxed) {
2,000✔
692
                    thread::sleep(StdDuration::from_millis(800));
1✔
693
                }
1,999✔
694
                Ok(None)
2,000✔
695
            })
2,000✔
696
            .unwrap();
1✔
697

698
        assert!(snapshot.records.is_empty());
1✔
699
        assert!(snapshot.cursor.revision < 2_000);
1✔
700
    }
1✔
701

702
    #[test]
703
    fn source_ids_and_reported_counts_are_exposed() {
1✔
704
        let adapter = IndexableAdapter::new(IndexableStub::new("stub_id", 3));
1✔
705
        assert_eq!(adapter.id(), "stub_id");
1✔
706
        assert_eq!(
1✔
707
            adapter
1✔
708
                .reported_record_count(&SamplerConfig::default())
1✔
709
                .unwrap(),
1✔
710
            3
711
        );
712

713
        let memory = InMemorySource::new("mem_id", Vec::new());
1✔
714
        assert_eq!(memory.id(), "mem_id");
1✔
715
        assert_eq!(
1✔
716
            memory
1✔
717
                .reported_record_count(&SamplerConfig::default())
1✔
718
                .unwrap(),
1✔
719
            0
720
        );
721
    }
1✔
722

723
    #[test]
724
    fn indexable_pager_sequential_fallback_fills_quota_when_parallel_pass_yields_none() {
1✔
725
        // Exercise the seq[par_end..] fallback loop: parallel pass entries all
726
        // return None, so the sequential sweep has to supply the records.
727
        // total=8, limit=4 -> par_end=4. First 4 calls (parallel) get None;
728
        // next 4 calls (sequential fallback) get Some, filling the quota.
729
        let pager = IndexablePager::new("fallback_fill");
1✔
730
        let call_count = AtomicUsize::new(0);
1✔
731
        let par_end = 4usize;
1✔
732
        let snapshot = pager
1✔
733
            .refresh_with(8, None, Some(par_end), |idx| {
8✔
734
                let n = call_count.fetch_add(1, Ordering::Relaxed);
8✔
735
                if n < par_end {
8✔
736
                    Ok(None)
4✔
737
                } else {
738
                    Ok(Some(DataRecord {
4✔
739
                        id: format!("r_{idx}"),
4✔
740
                        source: "fallback_fill".to_string(),
4✔
741
                        created_at: Utc::now(),
4✔
742
                        updated_at: Utc::now(),
4✔
743
                        quality: QualityScore { trust: 1.0 },
4✔
744
                        taxonomy: Vec::new(),
4✔
745
                        sections: vec![RecordSection {
4✔
746
                            role: SectionRole::Anchor,
4✔
747
                            heading: None,
4✔
748
                            text: "t".to_string(),
4✔
749
                            sentences: vec!["t".to_string()],
4✔
750
                        }],
4✔
751
                        meta_prefix: None,
4✔
752
                    }))
4✔
753
                }
754
            })
8✔
755
            .unwrap();
1✔
756
        assert_eq!(snapshot.records.len(), par_end);
1✔
757
    }
1✔
758

759
    #[test]
760
    fn indexable_pager_refresh_with_propagates_fetch_error() {
1✔
761
        let pager = IndexablePager::new("err");
1✔
762
        let err = pager
1✔
763
            .refresh_with(8, None, Some(2), |_idx| {
2✔
764
                Err(SamplerError::SourceUnavailable {
2✔
765
                    source_id: "err".to_string(),
2✔
766
                    reason: "fetch failed".to_string(),
2✔
767
                })
2✔
768
            })
2✔
769
            .unwrap_err();
1✔
770
        assert!(matches!(
1✔
771
            err,
1✔
772
            SamplerError::SourceUnavailable { ref reason, .. } if reason.contains("fetch failed")
1✔
773
        ));
774
    }
1✔
775

776
    #[test]
777
    fn seed_for_sampler_depends_on_sampler_seed() {
1✔
778
        let source_id = "seeded".to_string();
1✔
779
        let base = IndexablePager::seed_for(&source_id, 17);
1✔
780
        let with_a = IndexablePager::seed_for_sampler(&source_id, 17, 1);
1✔
781
        let with_b = IndexablePager::seed_for_sampler(&source_id, 17, 2);
1✔
782
        assert_ne!(with_a, with_b);
1✔
783
        assert_ne!(with_a, base);
1✔
784
    }
1✔
785
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc