• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jzombie / rust-triplets / 22360649277

24 Feb 2026 04:43PM UTC coverage: 93.359% (+0.7%) from 92.675%
22360649277

Pull #7

github

web-flow
Merge b67d90e2c into 980559192
Pull Request #7: Add HF source

5318 of 5790 new or added lines in 8 files covered. (91.85%)

1 existing line in 1 file now uncovered.

14761 of 15811 relevant lines covered (93.36%)

2502.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.95
/src/source/backends/huggingface_source.rs
1
use hf_hub::Repo;
2
use hf_hub::RepoType;
3
use hf_hub::api::sync::ApiBuilder;
4
use parquet::file::reader::{FileReader, SerializedFileReader};
5
use parquet::record::reader::RowIter;
6
use rayon::prelude::*;
7
use serde::{Deserialize, Serialize};
8
use serde_json::Value;
9
use std::cmp::Ordering;
10
use std::collections::hash_map::DefaultHasher;
11
use std::collections::{BTreeMap, HashMap, VecDeque};
12
use std::fs;
13
use std::fs::File;
14
use std::hash::{Hash, Hasher};
15
use std::io::{BufRead, BufReader, Read, Seek, SeekFrom, Write};
16
use std::path::Path;
17
use std::path::PathBuf;
18
use std::sync::{Arc, Mutex};
19
use std::thread;
20
use std::time::Duration;
21
use std::time::Instant;
22
use tracing::{info, warn};
23
use walkdir::WalkDir;
24

25
use crate::SamplerError;
26
use crate::config::{NegativeStrategy, SamplerConfig, Selector, TripletRecipe};
27
use crate::data::{DataRecord, QualityScore, SectionRole};
28
use crate::utils::make_section;
29
use chrono::{DateTime, Utc};
30

31
use crate::source::{DataSource, SourceCursor, SourceSnapshot};
32

33
const REMOTE_URL_PREFIX: &str = "url::";
34
/// Extra row-index headroom above currently materialized rows exposed via `len_hint`.
35
///
36
/// This is not a file count. It lets sampling look slightly past the local row
37
/// frontier so lazy remote expansion can continue without jumping to the full
38
/// global row domain at once.
39
/// Multiplies the sampler ingestion base (`SamplerConfig.ingestion_max_records`)
40
/// to compute `len_hint` expansion headroom rows.
41
const REMOTE_EXPANSION_HEADROOM_MULTIPLIER: usize = 4;
42
/// Number of initial remote shards to materialize when bootstrapping an empty
43
/// local snapshot before regular lazy expansion.
44
const REMOTE_BOOTSTRAP_SHARDS: usize = 4;
45
/// Multiplies the source `refresh` limit passed by `IngestionManager`
46
/// (`step.unwrap_or(max_records)`) to set this source's internal row-read
47
/// batch target for each refresh pass.
48
const HUGGINGFACE_REFRESH_BATCH_MULTIPLIER: usize = 32;
49
const SHARD_SEQUENCE_STATE_VERSION: u32 = 1;
50
const SHARD_SEQUENCE_STATE_FILE: &str = "_sequence_state.json";
51

52
#[derive(Clone, Debug)]
53
struct RowTextField {
54
    name: String,
55
    text: String,
56
}
57

58
#[derive(Clone, Debug)]
59
struct RowView {
60
    row_id: Option<String>,
61
    timestamp: Option<DateTime<Utc>>,
62
    text_fields: Vec<RowTextField>,
63
}
64

65
/// Configuration for a bulk Hugging Face row source backed by local snapshot files.
66
#[derive(Clone, Debug)]
67
pub struct HuggingFaceRowsConfig {
68
    /// Stable sampler source id used in record ids and metrics.
69
    pub source_id: String,
70
    /// Hugging Face dataset id, e.g. `HuggingFaceFW/fineweb`.
71
    pub dataset: String,
72
    /// Dataset config name, e.g. `default`.
73
    pub config: String,
74
    /// Split name, e.g. `train`.
75
    pub split: String,
76
    /// Local path to a snapshot directory for this split.
77
    pub snapshot_dir: PathBuf,
78
    /// File extensions accepted as shard files.
79
    pub shard_extensions: Vec<String>,
80
    /// Number of rows between seek checkpoints while indexing a shard.
81
    pub checkpoint_stride: usize,
82
    /// Maximum number of rows cached in-memory.
83
    pub cache_capacity: usize,
84
    /// Maximum number of decoded parquet row groups cached in-memory.
85
    pub parquet_row_group_cache_capacity: usize,
86
    /// Multiplier applied to current refresh `limit` when building a read batch target.
87
    ///
88
    /// Effective target is `limit * refresh_batch_multiplier`.
89
    pub refresh_batch_multiplier: usize,
90
    /// Multiplier applied to ingestion-sized base records for `len_hint` headroom.
91
    ///
92
    /// Effective headroom is `cache_capacity * remote_expansion_headroom_multiplier`.
93
    pub remote_expansion_headroom_multiplier: usize,
94
    /// Optional maximum row cap exposed by the source.
95
    pub max_rows: Option<usize>,
96
    /// Hard cap for local manifest-shard cache bytes.
97
    ///
98
    /// When exceeded, oldest cached manifest shards are evicted.
99
    pub local_disk_cap_bytes: Option<u64>,
100
    /// Minimum number of manifest shards to keep resident during eviction.
101
    pub min_resident_shards: usize,
102
    /// Optional row id column name. Falls back to synthetic id when missing.
103
    pub id_column: Option<String>,
104
    /// Text columns to extract. Empty means auto-detect textual scalar columns.
105
    pub text_columns: Vec<String>,
106
    /// Optional column used for anchor text.
107
    ///
108
    /// When set (or when `positive_column`/`context_columns` are set), role-based
109
    /// extraction is used instead of `text_columns`/auto-detect mode.
110
    pub anchor_column: Option<String>,
111
    /// Optional column used for positive text.
112
    ///
113
    /// Positive text is emitted as a `SectionRole::Context` section.
114
    pub positive_column: Option<String>,
115
    /// Optional ordered context columns.
116
    ///
117
    /// Used only in role-based extraction mode.
118
    pub context_columns: Vec<String>,
119
}
120

121
impl HuggingFaceRowsConfig {
122
    /// Create a config with required dataset identity values and local snapshot path.
123
    pub fn new(
131✔
124
        source_id: impl Into<String>,
131✔
125
        dataset: impl Into<String>,
131✔
126
        config: impl Into<String>,
131✔
127
        split: impl Into<String>,
131✔
128
        snapshot_dir: impl Into<PathBuf>,
131✔
129
    ) -> Self {
131✔
130
        Self {
131✔
131
            source_id: source_id.into(),
131✔
132
            dataset: dataset.into(),
131✔
133
            config: config.into(),
131✔
134
            split: split.into(),
131✔
135
            snapshot_dir: snapshot_dir.into(),
131✔
136
            shard_extensions: vec![
131✔
137
                "parquet".to_string(),
131✔
138
                "jsonl".to_string(),
131✔
139
                "ndjson".to_string(),
131✔
140
            ],
131✔
141
            checkpoint_stride: 4096,
131✔
142
            cache_capacity: SamplerConfig::default().ingestion_max_records,
131✔
143
            parquet_row_group_cache_capacity: 8,
131✔
144
            refresh_batch_multiplier: HUGGINGFACE_REFRESH_BATCH_MULTIPLIER,
131✔
145
            remote_expansion_headroom_multiplier: REMOTE_EXPANSION_HEADROOM_MULTIPLIER,
131✔
146
            max_rows: None,
131✔
147
            local_disk_cap_bytes: Some(32 * 1024 * 1024 * 1024),
131✔
148
            min_resident_shards: REMOTE_BOOTSTRAP_SHARDS,
131✔
149
            id_column: Some("id".to_string()),
131✔
150
            text_columns: Vec::new(),
131✔
151
            anchor_column: None,
131✔
152
            positive_column: None,
131✔
153
            context_columns: Vec::new(),
131✔
154
        }
131✔
155
    }
131✔
156
}
157

158
#[derive(Default)]
159
struct ParquetCache {
160
    readers: HashMap<PathBuf, Arc<SerializedFileReader<File>>>,
161
}
162

163
impl ParquetCache {
164
    /// Return a cached parquet reader for `path`, opening and caching it when missing.
165
    fn reader_for(
6✔
166
        &mut self,
6✔
167
        source_id: &str,
6✔
168
        path: &Path,
6✔
169
    ) -> Result<Arc<SerializedFileReader<File>>, SamplerError> {
6✔
170
        if let Some(reader) = self.readers.get(path) {
6✔
NEW
171
            return Ok(reader.clone());
×
172
        }
6✔
173

174
        let file = File::open(path).map_err(|err| SamplerError::SourceUnavailable {
6✔
175
            source_id: source_id.to_string(),
2✔
176
            reason: format!("failed opening parquet shard {}: {err}", path.display()),
2✔
177
        })?;
2✔
178
        let reader =
3✔
179
            SerializedFileReader::new(file).map_err(|err| SamplerError::SourceUnavailable {
4✔
180
                source_id: source_id.to_string(),
1✔
181
                reason: format!("failed reading parquet shard {}: {err}", path.display()),
1✔
182
            })?;
1✔
183
        let reader = Arc::new(reader);
3✔
184
        self.readers.insert(path.to_path_buf(), reader.clone());
3✔
185
        Ok(reader)
3✔
186
    }
6✔
187
}
188

189
#[derive(Clone, Debug)]
190
struct ShardIndex {
191
    path: PathBuf,
192
    global_start: usize,
193
    row_count: usize,
194
    is_parquet: bool,
195
    parquet_row_groups: Vec<(usize, usize)>,
196
    checkpoints: Vec<u64>,
197
}
198

199
#[derive(Default)]
200
struct RowCache {
201
    rows: HashMap<usize, RowView>,
202
    order: VecDeque<usize>,
203
}
204

205
impl RowCache {
206
    /// Return a cloned cached row by absolute index.
207
    fn get(&self, idx: usize) -> Option<RowView> {
61✔
208
        self.rows.get(&idx).cloned()
61✔
209
    }
61✔
210

211
    /// Insert or refresh a cached row and evict oldest entries over `capacity`.
212
    fn insert(&mut self, idx: usize, row: RowView, capacity: usize) {
51✔
213
        if capacity == 0 {
51✔
214
            return;
1✔
215
        }
50✔
216
        if !self.rows.contains_key(&idx) {
50✔
217
            self.order.push_back(idx);
50✔
218
        }
50✔
219
        self.rows.insert(idx, row);
50✔
220
        while self.rows.len() > capacity {
51✔
221
            if let Some(old) = self.order.pop_front() {
1✔
222
                self.rows.remove(&old);
1✔
223
            } else {
1✔
NEW
224
                break;
×
225
            }
226
        }
227
    }
51✔
228
}
229

230
/// Bulk-oriented Hugging Face source backed by local shard files.
231
pub struct HuggingFaceRowSource {
232
    config: HuggingFaceRowsConfig,
233
    sampler_config: Mutex<Option<SamplerConfig>>,
234
    state: Mutex<SourceState>,
235
    cache: Mutex<RowCache>,
236
    parquet_cache: Mutex<ParquetCache>,
237
}
238

239
#[derive(Debug)]
240
struct SourceState {
241
    materialized_rows: usize,
242
    total_rows: Option<usize>,
243
    shards: Vec<ShardIndex>,
244
    remote_candidates: Option<Vec<String>>,
245
    remote_candidate_sizes: HashMap<String, u64>,
246
    next_remote_idx: usize,
247
}
248

249
type ParquetGroupKey = (PathBuf, usize);
250
type ParquetGroupRequest = (usize, usize, ShardIndex);
251

252
#[derive(Clone, Debug, Serialize, Deserialize)]
253
struct PersistedShardSequence {
254
    version: u32,
255
    source_id: String,
256
    dataset: String,
257
    config: String,
258
    split: String,
259
    sampler_seed: u64,
260
    candidates: Vec<String>,
261
    candidate_sizes: HashMap<String, u64>,
262
    next_remote_idx: usize,
263
}
264

265
impl HuggingFaceRowSource {
266
    /// Build a new source by indexing local shard files.
267
    pub fn new(config: HuggingFaceRowsConfig) -> Result<Self, SamplerError> {
9✔
268
        let start_new = Instant::now();
9✔
269
        if config.checkpoint_stride == 0 {
9✔
270
            return Err(SamplerError::Configuration(
1✔
271
                "huggingface source checkpoint_stride must be > 0".to_string(),
1✔
272
            ));
1✔
273
        }
8✔
274

275
        fs::create_dir_all(&config.snapshot_dir).map_err(|err| {
8✔
NEW
276
            SamplerError::SourceUnavailable {
×
NEW
277
                source_id: config.source_id.clone(),
×
NEW
278
                reason: format!(
×
NEW
279
                    "failed creating snapshot_dir {}: {err}",
×
NEW
280
                    config.snapshot_dir.display()
×
NEW
281
                ),
×
NEW
282
            }
×
NEW
283
        })?;
×
284

285
        info!(
8✔
286
            "[triplets:hf] indexing local shards in {}",
NEW
287
            config.snapshot_dir.display()
×
288
        );
289
        let (shards, discovered) = Self::build_shard_index(&config).unwrap_or_default();
8✔
290
        if discovered == 0 {
8✔
NEW
291
            info!(
×
292
                "[triplets:hf] no local shards found in {} — lazy remote download enabled",
NEW
293
                config.snapshot_dir.display()
×
294
            );
295
        }
8✔
296

297
        let materialized_rows = config
8✔
298
            .max_rows
8✔
299
            .map(|cap| cap.min(discovered))
8✔
300
            .unwrap_or(discovered);
8✔
301
        let total_rows = match Self::fetch_global_row_count(&config) {
8✔
NEW
302
            Ok(value) => value,
×
303
            Err(err) => {
8✔
304
                warn!(
8✔
305
                    "[triplets:hf] global row count request failed; continuing with discovered rows only: {}",
306
                    err
307
                );
308
                None
8✔
309
            }
310
        };
311

312
        if let Some(global_total) = total_rows {
8✔
NEW
313
            info!(
×
314
                "[triplets:hf] global split row count reported: {} (known_local_rows={})",
315
                global_total, materialized_rows
316
            );
317
        }
8✔
318

319
        info!(
8✔
320
            "[triplets:hf] source ready in {:.2}s (rows={}, shards={})",
NEW
321
            start_new.elapsed().as_secs_f64(),
×
322
            materialized_rows,
NEW
323
            shards.len()
×
324
        );
325

326
        Ok(Self {
8✔
327
            config,
8✔
328
            sampler_config: Mutex::new(None),
8✔
329
            state: Mutex::new(SourceState {
8✔
330
                materialized_rows,
8✔
331
                total_rows,
8✔
332
                shards,
8✔
333
                remote_candidates: None,
8✔
334
                remote_candidate_sizes: HashMap::new(),
8✔
335
                next_remote_idx: 0,
8✔
336
            }),
8✔
337
            cache: Mutex::new(RowCache::default()),
8✔
338
            parquet_cache: Mutex::new(ParquetCache::default()),
8✔
339
        })
8✔
340
    }
9✔
341

342
    fn set_active_sampler_config(&self, config: &SamplerConfig) {
94✔
343
        if let Ok(mut slot) = self.sampler_config.lock() {
94✔
344
            *slot = Some(config.clone());
94✔
345
        }
94✔
346
    }
94✔
347

348
    #[cfg(test)]
349
    fn active_or_default_sampler_config(&self) -> SamplerConfig {
12✔
350
        self.sampler_config
12✔
351
            .lock()
12✔
352
            .ok()
12✔
353
            .and_then(|slot| slot.clone())
12✔
354
            .unwrap_or_default()
12✔
355
    }
12✔
356

357
    #[cfg(test)]
358
    fn configure_sampler(&self, config: &SamplerConfig) {
6✔
359
        self.set_active_sampler_config(config);
6✔
360
    }
6✔
361

362
    #[cfg(test)]
363
    fn refresh(
9✔
364
        &self,
9✔
365
        cursor: Option<&SourceCursor>,
9✔
366
        limit: Option<usize>,
9✔
367
    ) -> Result<SourceSnapshot, SamplerError> {
9✔
368
        let config = self.active_or_default_sampler_config();
9✔
369
        <Self as DataSource>::refresh(self, &config, cursor, limit)
9✔
370
    }
9✔
371

372
    #[cfg(test)]
373
    fn reported_record_count(&self) -> Result<u128, SamplerError> {
3✔
374
        let config = self.active_or_default_sampler_config();
3✔
375
        <Self as DataSource>::reported_record_count(self, &config)
3✔
376
    }
3✔
377

378
    /// Compute the effective internal row read target from refresh `limit`.
379
    fn effective_refresh_batch_target(&self, limit: usize) -> usize {
18✔
380
        let multiplier = self.config.refresh_batch_multiplier.max(1);
18✔
381
        limit.saturating_mul(multiplier)
18✔
382
    }
18✔
383

384
    /// Compute dynamic `len_hint` headroom rows based on sampler and source config.
385
    fn effective_expansion_headroom_rows(&self) -> usize {
8✔
386
        let multiplier = self.config.remote_expansion_headroom_multiplier.max(1);
8✔
387
        let base = self
8✔
388
            .sampler_config
8✔
389
            .lock()
8✔
390
            .ok()
8✔
391
            .and_then(|config| config.as_ref().map(|value| value.ingestion_max_records))
8✔
392
            .unwrap_or(self.config.cache_capacity)
8✔
393
            .max(1);
8✔
394
        base.saturating_mul(multiplier)
8✔
395
    }
8✔
396

397
    fn configured_sampler_seed(&self) -> Result<u64, SamplerError> {
34✔
398
        self.sampler_config
34✔
399
            .lock()
34✔
400
            .map_err(|_| SamplerError::SourceUnavailable {
34✔
NEW
401
                source_id: self.config.source_id.clone(),
×
NEW
402
                reason: "huggingface sampler-config lock poisoned".to_string(),
×
NEW
403
            })?
×
404
            .as_ref()
34✔
405
            .map(|config| config.seed)
34✔
406
            .ok_or_else(|| SamplerError::SourceInconsistent {
34✔
407
                source_id: self.config.source_id.clone(),
2✔
408
                details: "huggingface source sampler configuration not provided".to_string(),
2✔
409
            })
2✔
410
    }
34✔
411

412
    fn paging_seed(&self, total: usize) -> Result<u64, SamplerError> {
17✔
413
        let sampler_seed = self.configured_sampler_seed()?;
17✔
414
        Ok(crate::source::IndexablePager::seed_for_sampler(
16✔
415
            &self.config.source_id,
16✔
416
            total,
16✔
417
            sampler_seed,
16✔
418
        ))
16✔
419
    }
17✔
420

421
    fn normalized_shard_extensions(config: &HuggingFaceRowsConfig) -> Vec<String> {
14✔
422
        config
14✔
423
            .shard_extensions
14✔
424
            .iter()
14✔
425
            .map(|value| value.trim().trim_start_matches('.').to_ascii_lowercase())
39✔
426
            .collect::<Vec<_>>()
14✔
427
    }
14✔
428

429
    fn collect_candidates_from_siblings(
9✔
430
        config: &HuggingFaceRowsConfig,
9✔
431
        siblings: &[String],
9✔
432
        accepted: &[String],
9✔
433
        respect_split: bool,
9✔
434
    ) -> (Vec<String>, bool) {
9✔
435
        let mut saw_parquet = false;
9✔
436
        let mut candidates = Vec::new();
9✔
437
        for remote_path in siblings {
16✔
438
            if respect_split && !config.split.is_empty() {
16✔
439
                let split_tag = format!("{}/", config.split);
12✔
440
                let split_token = format!("-{}-", config.split);
12✔
441
                let split_prefix = format!("{}-", config.split);
12✔
442
                if !remote_path.contains(&split_tag)
12✔
443
                    && !remote_path.contains(&split_token)
7✔
444
                    && !Path::new(remote_path)
7✔
445
                        .file_name()
7✔
446
                        .and_then(|name| name.to_str())
7✔
447
                        .is_some_and(|name| name.starts_with(&split_prefix))
7✔
448
                {
449
                    continue;
4✔
450
                }
8✔
451
            }
4✔
452

453
            let ext = Path::new(remote_path)
12✔
454
                .extension()
12✔
455
                .and_then(|v| v.to_str())
12✔
456
                .map(|v| v.to_ascii_lowercase());
12✔
457
            if ext.as_deref() == Some("parquet") {
12✔
458
                saw_parquet = true;
3✔
459
            }
9✔
460
            if ext
12✔
461
                .as_deref()
12✔
462
                .is_some_and(|ext| accepted.iter().any(|allowed| allowed == ext))
30✔
463
            {
464
                let target = Self::candidate_target_path(config, remote_path);
7✔
465
                if target.exists() {
7✔
466
                    continue;
1✔
467
                }
6✔
468
                candidates.push(remote_path.clone());
6✔
469
            }
5✔
470
        }
471
        (candidates, saw_parquet)
9✔
472
    }
9✔
473

474
    fn resolve_remote_candidates_from_siblings(
4✔
475
        config: &HuggingFaceRowsConfig,
4✔
476
        siblings: &[String],
4✔
477
        accepted: &[String],
4✔
478
    ) -> Result<(Vec<String>, HashMap<String, u64>), SamplerError> {
4✔
479
        let (mut candidates, mut saw_parquet) =
4✔
480
            Self::collect_candidates_from_siblings(config, siblings, accepted, true);
4✔
481
        if candidates.is_empty() && !config.split.is_empty() {
4✔
482
            let (fallback_candidates, fallback_saw_parquet) =
3✔
483
                Self::collect_candidates_from_siblings(config, siblings, accepted, false);
3✔
484
            if !fallback_candidates.is_empty() {
3✔
485
                warn!(
1✔
486
                    "[triplets:hf] split filter '{}' matched no remote files; falling back to extension-only remote candidate scan",
487
                    config.split
488
                );
489
                candidates = fallback_candidates;
1✔
490
                saw_parquet = fallback_saw_parquet;
1✔
491
            }
2✔
492
        }
1✔
493

494
        candidates.sort();
4✔
495
        info!(
4✔
496
            "[triplets:hf] remote candidates matching {:?}: {}",
497
            config.shard_extensions,
498
            candidates.len()
4✔
499
        );
500
        if candidates.is_empty() {
4✔
501
            if saw_parquet {
2✔
502
                return Err(SamplerError::SourceUnavailable {
1✔
503
                    source_id: config.source_id.clone(),
1✔
504
                    reason: format!(
1✔
505
                        "dataset '{}' appears to be parquet-only, but shard_extensions does not include parquet ({:?}).",
1✔
506
                        config.dataset, config.shard_extensions
1✔
507
                    ),
1✔
508
                });
1✔
509
            }
1✔
510
            warn!(
1✔
511
                "[triplets:hf] no remote candidates found for dataset='{}' split='{}' extensions={:?}; source will be treated as exhausted",
512
                config.dataset, config.split, config.shard_extensions
513
            );
514
            return Ok((Vec::new(), HashMap::new()));
1✔
515
        }
2✔
516

517
        Ok((candidates, HashMap::new()))
2✔
518
    }
4✔
519

520
    fn candidates_from_parquet_manifest_json(
7✔
521
        config: &HuggingFaceRowsConfig,
7✔
522
        json: &Value,
7✔
523
    ) -> Result<(Vec<String>, HashMap<String, u64>), SamplerError> {
7✔
524
        let accepted = Self::normalized_shard_extensions(config);
7✔
525

526
        let mut candidates = Vec::new();
7✔
527
        let mut candidate_sizes = HashMap::new();
7✔
528
        if let Some(entries) = json.get("parquet_files").and_then(Value::as_array) {
7✔
529
            for entry in entries {
10✔
530
                let Some(url) = entry.get("url").and_then(Value::as_str) else {
10✔
531
                    continue;
1✔
532
                };
533

534
                let ext = Path::new(url)
9✔
535
                    .extension()
9✔
536
                    .and_then(|value| value.to_str())
9✔
537
                    .map(|value| value.to_ascii_lowercase());
9✔
538
                if !ext
9✔
539
                    .as_deref()
9✔
540
                    .is_some_and(|value| accepted.iter().any(|allowed| allowed == value))
15✔
541
                {
542
                    continue;
1✔
543
                }
8✔
544

545
                let candidate = format!("{REMOTE_URL_PREFIX}{url}");
8✔
546
                let expected_size = entry.get("size").and_then(Value::as_u64);
8✔
547
                let target = Self::candidate_target_path(config, &candidate);
8✔
548
                if target.exists() {
8✔
549
                    if Self::target_matches_expected_size(&target, expected_size) {
3✔
550
                        continue;
1✔
551
                    }
2✔
552
                    warn!(
2✔
553
                        "[triplets:hf] incomplete cached shard detected (will redownload): {}",
554
                        target.display()
2✔
555
                    );
556
                    if let Err(err) = fs::remove_file(&target)
2✔
557
                        && err.kind() != std::io::ErrorKind::NotFound
1✔
558
                    {
559
                        return Err(SamplerError::SourceUnavailable {
1✔
560
                            source_id: config.source_id.clone(),
1✔
561
                            reason: format!(
1✔
562
                                "failed removing incomplete shard {}: {err}",
1✔
563
                                target.display()
1✔
564
                            ),
1✔
565
                        });
1✔
566
                    }
1✔
567
                }
5✔
568
                if let Some(size) = expected_size {
6✔
569
                    candidate_sizes.insert(candidate.clone(), size);
6✔
570
                }
6✔
571
                candidates.push(candidate);
6✔
572
            }
573
        }
1✔
574

575
        candidates.sort();
6✔
576
        Ok((candidates, candidate_sizes))
6✔
577
    }
7✔
578

579
    /// Resolve and filter remote shard candidates from manifest or repository listing.
580
    fn list_remote_candidates(
1✔
581
        config: &HuggingFaceRowsConfig,
1✔
582
    ) -> Result<(Vec<String>, HashMap<String, u64>), SamplerError> {
1✔
583
        if let Ok((candidates, candidate_sizes)) =
1✔
584
            Self::list_remote_candidates_from_parquet_manifest(config)
1✔
585
            && !candidates.is_empty()
1✔
586
        {
587
            info!(
1✔
588
                "[triplets:hf] remote parquet manifest candidates matching {:?}: {}",
589
                config.shard_extensions,
590
                candidates.len()
1✔
591
            );
592
            return Ok((candidates, candidate_sizes));
1✔
NEW
593
        }
×
594

NEW
595
        let api = ApiBuilder::new()
×
NEW
596
            .with_progress(true)
×
NEW
597
            .with_retries(5)
×
NEW
598
            .with_token(None)
×
NEW
599
            .build()
×
NEW
600
            .map_err(|err| SamplerError::SourceUnavailable {
×
NEW
601
                source_id: config.source_id.clone(),
×
NEW
602
                reason: format!("failed building hf-hub client: {err}"),
×
NEW
603
            })?;
×
604

NEW
605
        let repo = Repo::new(config.dataset.clone(), RepoType::Dataset);
×
NEW
606
        let repo_api = api.repo(repo);
×
NEW
607
        info!(
×
608
            "[triplets:hf] reading remote file list for dataset {}",
609
            config.dataset
610
        );
NEW
611
        let info = repo_api
×
NEW
612
            .info()
×
NEW
613
            .map_err(|err| SamplerError::SourceUnavailable {
×
NEW
614
                source_id: config.source_id.clone(),
×
NEW
615
                reason: format!("failed reading hf-hub repository info: {err}"),
×
NEW
616
            })?;
×
617

NEW
618
        let accepted = Self::normalized_shard_extensions(config);
×
619

NEW
620
        let siblings = info
×
NEW
621
            .siblings
×
NEW
622
            .into_iter()
×
NEW
623
            .map(|entry| entry.rfilename)
×
NEW
624
            .collect::<Vec<_>>();
×
625

NEW
626
        Self::resolve_remote_candidates_from_siblings(config, &siblings, &accepted)
×
627
    }
1✔
628

629
    /// Return the persistence file path for shard sequence state.
630
    fn shard_sequence_state_path(config: &HuggingFaceRowsConfig) -> PathBuf {
22✔
631
        config
22✔
632
            .snapshot_dir
22✔
633
            .join("_parquet_manifest")
22✔
634
            .join(SHARD_SEQUENCE_STATE_FILE)
22✔
635
    }
22✔
636

637
    /// Load persisted shard candidate sequence when metadata and sampler seed match.
638
    #[cfg(test)]
639
    fn load_persisted_shard_sequence(
7✔
640
        config: &HuggingFaceRowsConfig,
7✔
641
        current_sampler_seed: u64,
7✔
642
    ) -> Result<Option<PersistedShardSequence>, SamplerError> {
7✔
643
        let path = Self::shard_sequence_state_path(config);
7✔
644
        if !path.exists() {
7✔
645
            return Ok(None);
1✔
646
        }
6✔
647

648
        let raw = fs::read_to_string(&path).map_err(|err| SamplerError::SourceUnavailable {
6✔
NEW
649
            source_id: config.source_id.clone(),
×
NEW
650
            reason: format!(
×
651
                "failed reading shard-sequence state {}: {err}",
NEW
652
                path.display()
×
653
            ),
NEW
654
        })?;
×
655

656
        let mut persisted: PersistedShardSequence =
5✔
657
            serde_json::from_str(&raw).map_err(|err| SamplerError::SourceUnavailable {
6✔
658
                source_id: config.source_id.clone(),
1✔
659
                reason: format!(
1✔
660
                    "failed parsing shard-sequence state {}: {err}",
661
                    path.display()
1✔
662
                ),
663
            })?;
1✔
664

665
        if persisted.version != SHARD_SEQUENCE_STATE_VERSION
5✔
666
            || persisted.source_id != config.source_id
5✔
667
            || persisted.dataset != config.dataset
4✔
668
            || persisted.config != config.config
4✔
669
            || persisted.split != config.split
4✔
670
            || persisted.sampler_seed != current_sampler_seed
4✔
671
        {
672
            warn!(
2✔
673
                "[triplets:hf] shard-sequence state mismatch for {}; rebuilding candidate order",
674
                path.display()
2✔
675
            );
676
            return Ok(None);
2✔
677
        }
3✔
678

679
        if persisted.next_remote_idx > persisted.candidates.len() {
3✔
680
            persisted.next_remote_idx = persisted.candidates.len();
2✔
681
        }
2✔
682

683
        Ok(Some(persisted))
3✔
684
    }
7✔
685

686
    /// Persist current shard candidate sequence and position atomically.
687
    fn persist_shard_sequence_locked(&self, state: &SourceState) -> Result<(), SamplerError> {
9✔
688
        let Some(candidates) = state.remote_candidates.as_ref() else {
9✔
689
            return Ok(());
1✔
690
        };
691

692
        let path = Self::shard_sequence_state_path(&self.config);
8✔
693
        if let Some(parent) = path.parent() {
8✔
694
            fs::create_dir_all(parent).map_err(|err| SamplerError::SourceUnavailable {
8✔
NEW
695
                source_id: self.config.source_id.clone(),
×
NEW
696
                reason: format!(
×
697
                    "failed creating shard-sequence state dir {}: {err}",
NEW
698
                    parent.display()
×
699
                ),
NEW
700
            })?;
×
NEW
701
        }
×
702

703
        let persisted = PersistedShardSequence {
8✔
704
            version: SHARD_SEQUENCE_STATE_VERSION,
705
            source_id: self.config.source_id.clone(),
8✔
706
            dataset: self.config.dataset.clone(),
8✔
707
            config: self.config.config.clone(),
8✔
708
            split: self.config.split.clone(),
8✔
709
            sampler_seed: self.configured_sampler_seed()?,
8✔
710
            candidates: candidates.clone(),
8✔
711
            candidate_sizes: state.remote_candidate_sizes.clone(),
8✔
712
            next_remote_idx: state.next_remote_idx.min(candidates.len()),
8✔
713
        };
714

715
        let raw = serde_json::to_vec_pretty(&persisted).map_err(|err| {
8✔
NEW
716
            SamplerError::SourceUnavailable {
×
NEW
717
                source_id: self.config.source_id.clone(),
×
NEW
718
                reason: format!(
×
NEW
719
                    "failed encoding shard-sequence state {}: {err}",
×
NEW
720
                    path.display()
×
NEW
721
                ),
×
NEW
722
            }
×
NEW
723
        })?;
×
724

725
        let tmp_path = path.with_extension("tmp");
8✔
726
        fs::write(&tmp_path, raw).map_err(|err| SamplerError::SourceUnavailable {
8✔
NEW
727
            source_id: self.config.source_id.clone(),
×
NEW
728
            reason: format!(
×
729
                "failed writing shard-sequence state temp {}: {err}",
NEW
730
                tmp_path.display()
×
731
            ),
NEW
732
        })?;
×
733
        fs::rename(&tmp_path, &path).map_err(|err| SamplerError::SourceUnavailable {
8✔
NEW
734
            source_id: self.config.source_id.clone(),
×
NEW
735
            reason: format!(
×
736
                "failed replacing shard-sequence state {}: {err}",
NEW
737
                path.display()
×
738
            ),
NEW
739
        })?;
×
740

741
        Ok(())
8✔
742
    }
9✔
743

744
    /// Rotate candidate ordering deterministically using source identity.
745
    fn rotate_candidates_deterministically(
4✔
746
        config: &HuggingFaceRowsConfig,
4✔
747
        candidates: &mut [String],
4✔
748
    ) {
4✔
749
        if candidates.len() <= 1 {
4✔
750
            return;
1✔
751
        }
3✔
752
        let mut hasher = DefaultHasher::new();
3✔
753
        config.source_id.hash(&mut hasher);
3✔
754
        config.dataset.hash(&mut hasher);
3✔
755
        config.config.hash(&mut hasher);
3✔
756
        config.split.hash(&mut hasher);
3✔
757
        let offset = (hasher.finish() as usize) % candidates.len();
3✔
758
        candidates.rotate_left(offset);
3✔
759
    }
4✔
760

761
    /// Build deterministic seed used to permute remote shard candidate order.
762
    fn shard_candidate_seed(
17✔
763
        config: &HuggingFaceRowsConfig,
17✔
764
        total_candidates: usize,
17✔
765
        sampler_seed: u64,
17✔
766
    ) -> u64 {
17✔
767
        let mut hasher = DefaultHasher::new();
17✔
768
        "hf_shard_candidate_sequence_v1".hash(&mut hasher);
17✔
769
        sampler_seed.hash(&mut hasher);
17✔
770
        config.source_id.hash(&mut hasher);
17✔
771
        config.dataset.hash(&mut hasher);
17✔
772
        config.config.hash(&mut hasher);
17✔
773
        config.split.hash(&mut hasher);
17✔
774
        total_candidates.hash(&mut hasher);
17✔
775
        hasher.finish()
17✔
776
    }
17✔
777

778
    fn parquet_manifest_endpoint() -> String {
4✔
779
        #[cfg(test)]
780
        if let Ok(value) = std::env::var("TRIPLETS_HF_PARQUET_ENDPOINT")
4✔
781
            && !value.trim().is_empty()
4✔
782
        {
783
            return value;
3✔
784
        }
1✔
785
        "https://datasets-server.huggingface.co/parquet".to_string()
1✔
786
    }
4✔
787

788
    fn size_endpoint() -> String {
12✔
789
        #[cfg(test)]
790
        if let Ok(value) = std::env::var("TRIPLETS_HF_SIZE_ENDPOINT")
4✔
791
            && !value.trim().is_empty()
4✔
792
        {
793
            return value;
3✔
794
        }
1✔
795
        "https://datasets-server.huggingface.co/size".to_string()
9✔
796
    }
12✔
797

798
    /// Query datasets-server parquet manifest and derive shard candidates.
799
    fn list_remote_candidates_from_parquet_manifest(
3✔
800
        config: &HuggingFaceRowsConfig,
3✔
801
    ) -> Result<(Vec<String>, HashMap<String, u64>), SamplerError> {
3✔
802
        let endpoint = Self::parquet_manifest_endpoint();
3✔
803
        info!(
3✔
804
            "[triplets:hf] reading datasets-server parquet manifest for dataset {}",
805
            config.dataset
806
        );
807
        let response = ureq::get(&endpoint)
3✔
808
            .query("dataset", &config.dataset)
3✔
809
            .query("config", &config.config)
3✔
810
            .query("split", &config.split)
3✔
811
            .call()
3✔
812
            .map_err(|err| SamplerError::SourceUnavailable {
3✔
813
                source_id: config.source_id.clone(),
1✔
814
                reason: format!("failed querying datasets-server parquet endpoint: {err}"),
1✔
815
            })?;
1✔
816

817
        let body = response.into_body().read_to_string().map_err(|err| {
2✔
NEW
818
            SamplerError::SourceUnavailable {
×
NEW
819
                source_id: config.source_id.clone(),
×
NEW
820
                reason: format!("failed reading datasets-server parquet response body: {err}"),
×
NEW
821
            }
×
NEW
822
        })?;
×
823

824
        Self::parse_parquet_manifest_response(config, &body)
2✔
825
    }
3✔
826

827
    fn parse_parquet_manifest_response(
4✔
828
        config: &HuggingFaceRowsConfig,
4✔
829
        body: &str,
4✔
830
    ) -> Result<(Vec<String>, HashMap<String, u64>), SamplerError> {
4✔
831
        let json: Value =
3✔
832
            serde_json::from_str(body).map_err(|err| SamplerError::SourceUnavailable {
4✔
833
                source_id: config.source_id.clone(),
1✔
834
                reason: format!("failed parsing datasets-server parquet response: {err}"),
1✔
835
            })?;
1✔
836

837
        Self::candidates_from_parquet_manifest_json(config, &json)
3✔
838
    }
4✔
839

840
    /// Map a candidate identifier to the local snapshot target path.
841
    fn candidate_target_path(config: &HuggingFaceRowsConfig, candidate: &str) -> PathBuf {
37✔
842
        if let Some(url) = candidate.strip_prefix(REMOTE_URL_PREFIX) {
37✔
843
            let suffix = url
28✔
844
                .split("/resolve/")
28✔
845
                .nth(1)
28✔
846
                .map(|value| value.trim_start_matches('/'))
28✔
847
                .filter(|value| !value.is_empty())
28✔
848
                .unwrap_or("parquet/unknown.parquet");
28✔
849
            return config.snapshot_dir.join("_parquet_manifest").join(suffix);
28✔
850
        }
9✔
851
        config.snapshot_dir.join(candidate)
9✔
852
    }
37✔
853

854
    /// Validate target file size against expected bytes when available.
855
    fn target_matches_expected_size(path: &Path, expected_bytes: Option<u64>) -> bool {
9✔
856
        if !path.exists() {
9✔
857
            return false;
1✔
858
        }
8✔
859
        if let Some(expected) = expected_bytes
8✔
860
            && expected > 0
7✔
861
        {
862
            return fs::metadata(path)
7✔
863
                .map(|meta| meta.len() == expected)
7✔
864
                .unwrap_or(false);
7✔
865
        }
1✔
866
        true
1✔
867
    }
9✔
868

869
    /// Return root directory used for manifest-cached remote shards.
870
    fn manifest_cache_root(&self) -> PathBuf {
25✔
871
        self.config.snapshot_dir.join("_parquet_manifest")
25✔
872
    }
25✔
873

874
    /// Return on-disk size for a shard path, or 0 if metadata lookup fails.
875
    fn shard_size_bytes(path: &Path) -> u64 {
25✔
876
        fs::metadata(path).map(|meta| meta.len()).unwrap_or(0)
25✔
877
    }
25✔
878

879
    /// Recompute shard `global_start` offsets and total materialized row count.
880
    fn recompute_shard_offsets(state: &mut SourceState) {
4✔
881
        let mut running = 0usize;
4✔
882
        for shard in &mut state.shards {
5✔
883
            shard.global_start = running;
5✔
884
            running = running.saturating_add(shard.row_count);
5✔
885
        }
5✔
886
        state.materialized_rows = running;
4✔
887
    }
4✔
888

889
    /// Enforce local disk cap by evicting oldest manifest shards when possible.
890
    fn enforce_disk_cap_locked(
12✔
891
        &self,
12✔
892
        state: &mut SourceState,
12✔
893
        protected_path: &Path,
12✔
894
    ) -> Result<bool, SamplerError> {
12✔
895
        let Some(cap_bytes) = self.config.local_disk_cap_bytes else {
12✔
896
            return Ok(false);
1✔
897
        };
898

899
        let manifest_root = self.manifest_cache_root();
11✔
900
        let mut usage_bytes = state
11✔
901
            .shards
11✔
902
            .iter()
11✔
903
            .filter(|shard| shard.path.starts_with(&manifest_root))
14✔
904
            .map(|shard| Self::shard_size_bytes(&shard.path))
14✔
905
            .sum::<u64>();
11✔
906

907
        if usage_bytes <= cap_bytes {
11✔
908
            return Ok(false);
6✔
909
        }
5✔
910

911
        let mut evicted_any = false;
5✔
912
        loop {
913
            if usage_bytes <= cap_bytes {
8✔
914
                break;
3✔
915
            }
5✔
916

917
            let resident_manifest_count = state
5✔
918
                .shards
5✔
919
                .iter()
5✔
920
                .filter(|shard| shard.path.starts_with(&manifest_root))
8✔
921
                .count();
5✔
922
            if resident_manifest_count <= self.config.min_resident_shards {
5✔
923
                break;
2✔
924
            }
3✔
925

926
            let evict_pos = state.shards.iter().position(|shard| {
3✔
927
                shard.path.starts_with(&manifest_root) && shard.path != protected_path
3✔
928
            });
3✔
929
            let Some(pos) = evict_pos else {
3✔
NEW
930
                break;
×
931
            };
932

933
            let shard = state.shards.remove(pos);
3✔
934
            let shard_size = Self::shard_size_bytes(&shard.path);
3✔
935
            if let Err(err) = fs::remove_file(&shard.path)
3✔
NEW
936
                && err.kind() != std::io::ErrorKind::NotFound
×
937
            {
NEW
938
                return Err(SamplerError::SourceUnavailable {
×
NEW
939
                    source_id: self.config.source_id.clone(),
×
NEW
940
                    reason: format!(
×
NEW
941
                        "failed evicting shard {} under disk cap: {err}",
×
NEW
942
                        shard.path.display()
×
NEW
943
                    ),
×
NEW
944
                });
×
945
            }
3✔
946

947
            usage_bytes = usage_bytes.saturating_sub(shard_size);
3✔
948
            evicted_any = true;
3✔
949
            warn!(
3✔
950
                "[triplets:hf] evicted shard for disk cap: {} (usage={:.2} GiB cap={:.2} GiB)",
951
                shard.path.display(),
3✔
952
                usage_bytes as f64 / (1024.0 * 1024.0 * 1024.0),
3✔
953
                cap_bytes as f64 / (1024.0 * 1024.0 * 1024.0)
3✔
954
            );
955
        }
956

957
        if usage_bytes > cap_bytes {
5✔
958
            if protected_path.exists() {
2✔
959
                let _ = fs::remove_file(protected_path);
2✔
960
            }
2✔
961
            return Err(SamplerError::SourceUnavailable {
2✔
962
                source_id: self.config.source_id.clone(),
2✔
963
                reason: format!(
2✔
964
                    "local disk cap exceeded and cannot evict further (usage={} bytes cap={} bytes)",
2✔
965
                    usage_bytes, cap_bytes
2✔
966
                ),
2✔
967
            });
2✔
968
        }
3✔
969

970
        if evicted_any {
3✔
971
            Self::recompute_shard_offsets(state);
3✔
972
        }
3✔
973
        Ok(evicted_any)
3✔
974
    }
12✔
975

976
    /// Return total on-disk bytes used by manifest-backed shards.
977
    fn manifest_usage_bytes_locked(&self, state: &SourceState) -> u64 {
7✔
978
        let manifest_root = self.manifest_cache_root();
7✔
979
        state
7✔
980
            .shards
7✔
981
            .iter()
7✔
982
            .filter(|shard| shard.path.starts_with(&manifest_root))
8✔
983
            .map(|shard| Self::shard_size_bytes(&shard.path))
7✔
984
            .sum::<u64>()
7✔
985
    }
7✔
986

987
    /// Fetch exact split row count metadata from datasets-server size endpoint.
988
    fn fetch_global_row_count(
11✔
989
        config: &HuggingFaceRowsConfig,
11✔
990
    ) -> Result<Option<usize>, SamplerError> {
11✔
991
        let endpoint = Self::size_endpoint();
11✔
992
        info!(
11✔
993
            "[triplets:hf] requesting global row count dataset='{}' config='{}' split='{}'",
994
            config.dataset, config.config, config.split
995
        );
996

997
        let response = ureq::get(&endpoint)
11✔
998
            .query("dataset", &config.dataset)
11✔
999
            .query("config", &config.config)
11✔
1000
            .query("split", &config.split)
11✔
1001
            .call()
11✔
1002
            .map_err(|err| SamplerError::SourceUnavailable {
11✔
1003
                source_id: config.source_id.clone(),
9✔
1004
                reason: format!("failed querying datasets-server size endpoint: {err}"),
9✔
1005
            })?;
9✔
1006

1007
        let body = response.into_body().read_to_string().map_err(|err| {
2✔
NEW
1008
            SamplerError::SourceUnavailable {
×
NEW
1009
                source_id: config.source_id.clone(),
×
NEW
1010
                reason: format!("failed reading datasets-server size response body: {err}"),
×
NEW
1011
            }
×
NEW
1012
        })?;
×
1013

1014
        Self::parse_global_row_count_response(config, &body)
2✔
1015
    }
11✔
1016

1017
    fn parse_global_row_count_response(
6✔
1018
        config: &HuggingFaceRowsConfig,
6✔
1019
        body: &str,
6✔
1020
    ) -> Result<Option<usize>, SamplerError> {
6✔
1021
        let json: Value =
5✔
1022
            serde_json::from_str(body).map_err(|err| SamplerError::SourceUnavailable {
6✔
1023
                source_id: config.source_id.clone(),
1✔
1024
                reason: format!("failed parsing datasets-server size response: {err}"),
1✔
1025
            })?;
1✔
1026

1027
        let mut count =
5✔
1028
            Self::extract_split_row_count_from_size_response(&json, &config.config, &config.split);
5✔
1029
        if let (Some(max_rows), Some(rows)) = (config.max_rows, count) {
5✔
1030
            count = Some(rows.min(max_rows));
1✔
1031
        }
4✔
1032
        Ok(count)
5✔
1033
    }
6✔
1034

1035
    /// Extract split row count from datasets-server size payload variants.
1036
    fn extract_split_row_count_from_size_response(
13✔
1037
        json: &Value,
13✔
1038
        config_name: &str,
13✔
1039
        split_name: &str,
13✔
1040
    ) -> Option<usize> {
13✔
1041
        let to_usize = |value: &Value| value.as_u64().and_then(|raw| usize::try_from(raw).ok());
13✔
1042

1043
        let size = json.get("size")?;
13✔
1044

1045
        if let Some(splits) = size.get("splits").and_then(Value::as_array) {
13✔
1046
            for entry in splits {
6✔
1047
                let entry_config = entry
6✔
1048
                    .get("config")
6✔
1049
                    .or_else(|| entry.get("config_name"))
6✔
1050
                    .and_then(Value::as_str)
6✔
1051
                    .unwrap_or_default();
6✔
1052
                let entry_split = entry
6✔
1053
                    .get("split")
6✔
1054
                    .or_else(|| entry.get("name"))
6✔
1055
                    .and_then(Value::as_str)
6✔
1056
                    .unwrap_or_default();
6✔
1057
                if entry_config == config_name
6✔
1058
                    && entry_split == split_name
5✔
1059
                    && let Some(rows) = entry.get("num_rows").and_then(to_usize)
3✔
1060
                {
1061
                    return Some(rows);
3✔
1062
                }
3✔
1063
            }
1064
        }
8✔
1065

1066
        if let Some(configs) = size.get("configs").and_then(Value::as_array) {
10✔
1067
            for config_entry in configs {
6✔
1068
                let entry_config = config_entry
6✔
1069
                    .get("config")
6✔
1070
                    .or_else(|| config_entry.get("config_name"))
6✔
1071
                    .and_then(Value::as_str)
6✔
1072
                    .unwrap_or_default();
6✔
1073
                if entry_config != config_name {
6✔
1074
                    continue;
1✔
1075
                }
5✔
1076

1077
                if let Some(splits) = config_entry.get("splits").and_then(Value::as_array) {
5✔
1078
                    for split_entry in splits {
4✔
1079
                        let entry_split = split_entry
4✔
1080
                            .get("split")
4✔
1081
                            .or_else(|| split_entry.get("name"))
4✔
1082
                            .and_then(Value::as_str)
4✔
1083
                            .unwrap_or_default();
4✔
1084
                        if entry_split == split_name
4✔
1085
                            && let Some(rows) = split_entry.get("num_rows").and_then(to_usize)
2✔
1086
                        {
1087
                            return Some(rows);
2✔
1088
                        }
2✔
1089
                    }
1090
                }
1✔
1091

1092
                if split_name.is_empty()
3✔
1093
                    && let Some(rows) = config_entry.get("num_rows").and_then(to_usize)
3✔
1094
                {
1095
                    return Some(rows);
3✔
NEW
1096
                }
×
1097
            }
1098
        }
4✔
1099

1100
        if split_name.is_empty() {
5✔
1101
            return size
2✔
1102
                .get("dataset")
2✔
1103
                .and_then(|dataset| dataset.get("num_rows"))
2✔
1104
                .and_then(to_usize);
2✔
1105
        }
3✔
1106

1107
        None
3✔
1108
    }
13✔
1109

1110
    /// Download a shard (URL or hf-hub path) and materialize it under snapshot dir.
1111
    fn download_and_materialize_shard(
13✔
1112
        config: &HuggingFaceRowsConfig,
13✔
1113
        remote_path: &str,
13✔
1114
        expected_bytes: Option<u64>,
13✔
1115
    ) -> Result<PathBuf, SamplerError> {
13✔
1116
        if let Some(remote_url) = remote_path.strip_prefix(REMOTE_URL_PREFIX) {
13✔
1117
            let target = Self::candidate_target_path(config, remote_path);
12✔
1118
            if target.exists() {
12✔
1119
                if Self::target_matches_expected_size(&target, expected_bytes) {
2✔
1120
                    return Ok(target);
1✔
1121
                }
1✔
1122
                warn!(
1✔
1123
                    "[triplets:hf] replacing incomplete shard before retry: {}",
1124
                    target.display()
1✔
1125
                );
1126
                fs::remove_file(&target).map_err(|err| SamplerError::SourceUnavailable {
1✔
NEW
1127
                    source_id: config.source_id.clone(),
×
NEW
1128
                    reason: format!(
×
1129
                        "failed removing incomplete shard {}: {err}",
NEW
1130
                        target.display()
×
1131
                    ),
NEW
1132
                })?;
×
1133
            }
10✔
1134

1135
            if let Some(parent) = target.parent() {
11✔
1136
                fs::create_dir_all(parent).map_err(|err| SamplerError::SourceUnavailable {
11✔
NEW
1137
                    source_id: config.source_id.clone(),
×
NEW
1138
                    reason: format!(
×
1139
                        "failed creating snapshot subdir {}: {err}",
NEW
1140
                        parent.display()
×
1141
                    ),
NEW
1142
                })?;
×
NEW
1143
            }
×
1144

1145
            let temp_target = target.with_extension("part");
11✔
1146
            if temp_target.exists() {
11✔
1147
                let _ = fs::remove_file(&temp_target);
1✔
1148
            }
10✔
1149

1150
            let response =
11✔
1151
                ureq::get(remote_url)
11✔
1152
                    .call()
11✔
1153
                    .map_err(|err| SamplerError::SourceUnavailable {
11✔
NEW
1154
                        source_id: config.source_id.clone(),
×
NEW
1155
                        reason: format!("failed downloading shard URL '{}': {err}", remote_url),
×
NEW
1156
                    })?;
×
1157
            let mut reader = response.into_body().into_reader();
11✔
1158
            let mut file =
11✔
1159
                File::create(&temp_target).map_err(|err| SamplerError::SourceUnavailable {
11✔
NEW
1160
                    source_id: config.source_id.clone(),
×
NEW
1161
                    reason: format!(
×
1162
                        "failed creating target shard {}: {err}",
NEW
1163
                        temp_target.display()
×
1164
                    ),
NEW
1165
                })?;
×
1166
            info!(
11✔
1167
                "[triplets:hf] downloading shard payload -> {}",
1168
                target.display()
11✔
1169
            );
1170
            let started = Instant::now();
11✔
1171
            let mut total_bytes = 0u64;
11✔
1172
            let mut buffer = vec![0u8; 8 * 1024 * 1024];
11✔
1173
            let mut last_report = Instant::now();
11✔
1174
            loop {
1175
                let read =
21✔
1176
                    reader
21✔
1177
                        .read(&mut buffer)
21✔
1178
                        .map_err(|err| SamplerError::SourceUnavailable {
21✔
NEW
1179
                            source_id: config.source_id.clone(),
×
NEW
1180
                            reason: format!("failed reading shard stream '{}': {err}", remote_url),
×
NEW
1181
                        })?;
×
1182
                if read == 0 {
21✔
1183
                    break;
11✔
1184
                }
10✔
1185
                file.write_all(&buffer[..read])
10✔
1186
                    .map_err(|err| SamplerError::SourceUnavailable {
10✔
NEW
1187
                        source_id: config.source_id.clone(),
×
NEW
1188
                        reason: format!(
×
1189
                            "failed writing target shard {}: {err}",
NEW
1190
                            temp_target.display()
×
1191
                        ),
NEW
1192
                    })?;
×
1193
                total_bytes = total_bytes.saturating_add(read as u64);
10✔
1194
                if last_report.elapsed() >= Duration::from_secs(2) {
10✔
NEW
1195
                    let elapsed = started.elapsed().as_secs_f64();
×
NEW
1196
                    if let Some(expected) = expected_bytes
×
NEW
1197
                        && expected > 0
×
1198
                    {
NEW
1199
                        let pct =
×
NEW
1200
                            ((total_bytes as f64 / expected as f64) * 100.0).clamp(0.0, 100.0);
×
NEW
1201
                        let rate = if elapsed > 0.0 {
×
NEW
1202
                            total_bytes as f64 / elapsed
×
1203
                        } else {
NEW
1204
                            0.0
×
1205
                        };
NEW
1206
                        let eta_secs = if rate > 0.0 && total_bytes < expected {
×
NEW
1207
                            (expected.saturating_sub(total_bytes) as f64) / rate
×
1208
                        } else {
NEW
1209
                            0.0
×
1210
                        };
NEW
1211
                        info!(
×
1212
                            "[triplets:hf] download progress {}: {:.1}/{:.1} MiB ({:.1}%, {:.1}s elapsed, ETA {:.1}s)",
NEW
1213
                            target.display(),
×
NEW
1214
                            total_bytes as f64 / (1024.0 * 1024.0),
×
NEW
1215
                            expected as f64 / (1024.0 * 1024.0),
×
1216
                            pct,
1217
                            elapsed,
NEW
1218
                            eta_secs.max(0.0)
×
1219
                        );
1220
                    } else {
NEW
1221
                        info!(
×
1222
                            "[triplets:hf] download progress {}: {:.1} MiB ({:.1}s)",
NEW
1223
                            target.display(),
×
NEW
1224
                            total_bytes as f64 / (1024.0 * 1024.0),
×
1225
                            elapsed
1226
                        );
1227
                    }
NEW
1228
                    last_report = Instant::now();
×
1229
                }
10✔
1230
            }
1231
            let elapsed = started.elapsed().as_secs_f64();
11✔
1232
            if let Some(expected) = expected_bytes
11✔
1233
                && expected > 0
3✔
1234
            {
1235
                let pct = ((total_bytes as f64 / expected as f64) * 100.0).clamp(0.0, 100.0);
3✔
1236
                info!(
3✔
1237
                    "[triplets:hf] download complete {}: {:.1}/{:.1} MiB ({:.1}%) in {:.1}s",
1238
                    target.display(),
3✔
1239
                    total_bytes as f64 / (1024.0 * 1024.0),
3✔
1240
                    expected as f64 / (1024.0 * 1024.0),
3✔
1241
                    pct,
1242
                    elapsed
1243
                );
1244
            } else {
1245
                info!(
8✔
1246
                    "[triplets:hf] download complete {}: {:.1} MiB in {:.1}s",
1247
                    target.display(),
8✔
1248
                    total_bytes as f64 / (1024.0 * 1024.0),
8✔
1249
                    elapsed
1250
                );
1251
            }
1252

1253
            fs::rename(&temp_target, &target).map_err(|err| SamplerError::SourceUnavailable {
11✔
NEW
1254
                source_id: config.source_id.clone(),
×
NEW
1255
                reason: format!(
×
1256
                    "failed moving downloaded shard {} -> {}: {err}",
NEW
1257
                    temp_target.display(),
×
NEW
1258
                    target.display()
×
1259
                ),
NEW
1260
            })?;
×
1261
            return Ok(target);
11✔
1262
        }
1✔
1263

1264
        let api = ApiBuilder::new()
1✔
1265
            .with_progress(true)
1✔
1266
            .with_retries(5)
1✔
1267
            .with_token(None)
1✔
1268
            .build()
1✔
1269
            .map_err(|err| SamplerError::SourceUnavailable {
1✔
NEW
1270
                source_id: config.source_id.clone(),
×
NEW
1271
                reason: format!("failed building hf-hub client: {err}"),
×
NEW
1272
            })?;
×
1273

1274
        let repo = Repo::new(config.dataset.clone(), RepoType::Dataset);
1✔
1275
        let repo_api = api.repo(repo);
1✔
1276

NEW
1277
        let mut local_cached =
×
1278
            repo_api
1✔
1279
                .get(remote_path)
1✔
1280
                .map_err(|err| SamplerError::SourceUnavailable {
1✔
1281
                    source_id: config.source_id.clone(),
1✔
1282
                    reason: format!("failed downloading '{}' from hf-hub: {err}", remote_path),
1✔
1283
                })?;
1✔
NEW
1284
        if !local_cached.exists() {
×
NEW
1285
            for _ in 0..5 {
×
NEW
1286
                local_cached = repo_api.download(remote_path).map_err(|err| {
×
NEW
1287
                    SamplerError::SourceUnavailable {
×
NEW
1288
                        source_id: config.source_id.clone(),
×
NEW
1289
                        reason: format!(
×
NEW
1290
                            "hf-hub returned missing cache path for '{}', and forced download failed: {err}",
×
NEW
1291
                            remote_path
×
NEW
1292
                        ),
×
NEW
1293
                    }
×
NEW
1294
                })?;
×
NEW
1295
                if local_cached.exists() {
×
NEW
1296
                    break;
×
NEW
1297
                }
×
NEW
1298
                thread::sleep(Duration::from_millis(400));
×
1299
            }
NEW
1300
        }
×
NEW
1301
        if !local_cached.exists() {
×
NEW
1302
            return Err(SamplerError::SourceUnavailable {
×
NEW
1303
                source_id: config.source_id.clone(),
×
NEW
1304
                reason: format!(
×
NEW
1305
                    "hf-hub returned non-existent cache file for '{}' at {}",
×
NEW
1306
                    remote_path,
×
NEW
1307
                    local_cached.display()
×
NEW
1308
                ),
×
NEW
1309
            });
×
NEW
1310
        }
×
1311

NEW
1312
        let target = Self::candidate_target_path(config, remote_path);
×
NEW
1313
        Self::materialize_local_file(config, &local_cached, &target)?;
×
NEW
1314
        Ok(target)
×
1315
    }
13✔
1316

1317
    /// Build shard metadata for a single local file.
1318
    fn index_single_shard(
38✔
1319
        config: &HuggingFaceRowsConfig,
38✔
1320
        path: &Path,
38✔
1321
        global_start: usize,
38✔
1322
    ) -> Result<Option<ShardIndex>, SamplerError> {
38✔
1323
        let is_parquet = path
38✔
1324
            .extension()
38✔
1325
            .and_then(|v| v.to_str())
38✔
1326
            .is_some_and(|ext| ext.eq_ignore_ascii_case("parquet"));
38✔
1327

1328
        let (rows, parquet_row_groups, checkpoints) = if is_parquet {
38✔
1329
            let (rows, parquet_row_groups) = Self::parquet_row_group_map(config, path)?;
4✔
1330
            (rows, parquet_row_groups, Vec::new())
4✔
1331
        } else {
1332
            let file = File::open(path).map_err(|err| SamplerError::SourceUnavailable {
34✔
1333
                source_id: config.source_id.clone(),
1✔
1334
                reason: format!("failed opening shard {}: {err}", path.display()),
1✔
1335
            })?;
1✔
1336
            let mut reader = BufReader::new(file);
33✔
1337
            let mut checkpoints = Vec::new();
33✔
1338
            let mut line = String::new();
33✔
1339
            let mut offset = 0u64;
33✔
1340
            let mut rows = 0usize;
33✔
1341

1342
            loop {
1343
                if rows.is_multiple_of(config.checkpoint_stride) {
10,094✔
1344
                    checkpoints.push(offset);
99✔
1345
                }
9,995✔
1346
                line.clear();
10,094✔
1347
                let bytes =
10,094✔
1348
                    reader
10,094✔
1349
                        .read_line(&mut line)
10,094✔
1350
                        .map_err(|err| SamplerError::SourceUnavailable {
10,094✔
NEW
1351
                            source_id: config.source_id.clone(),
×
NEW
1352
                            reason: format!("failed reading shard {}: {err}", path.display()),
×
NEW
1353
                        })?;
×
1354
                if bytes == 0 {
10,094✔
1355
                    break;
33✔
1356
                }
10,061✔
1357
                rows += 1;
10,061✔
1358
                offset = offset.saturating_add(bytes as u64);
10,061✔
1359
            }
1360

1361
            (rows, Vec::new(), checkpoints)
33✔
1362
        };
1363

1364
        if rows == 0 {
37✔
1365
            return Ok(None);
3✔
1366
        }
34✔
1367

1368
        Ok(Some(ShardIndex {
34✔
1369
            path: path.to_path_buf(),
34✔
1370
            global_start,
34✔
1371
            row_count: rows,
34✔
1372
            is_parquet,
34✔
1373
            parquet_row_groups,
34✔
1374
            checkpoints,
34✔
1375
        }))
34✔
1376
    }
38✔
1377

1378
    /// Build parquet row-group map for random-access row reads.
1379
    fn parquet_row_group_map(
6✔
1380
        config: &HuggingFaceRowsConfig,
6✔
1381
        path: &Path,
6✔
1382
    ) -> Result<(usize, Vec<(usize, usize)>), SamplerError> {
6✔
1383
        let file = File::open(path).map_err(|err| SamplerError::SourceUnavailable {
6✔
NEW
1384
            source_id: config.source_id.clone(),
×
NEW
1385
            reason: format!("failed opening parquet shard {}: {err}", path.display()),
×
NEW
1386
        })?;
×
1387
        let reader =
6✔
1388
            SerializedFileReader::new(file).map_err(|err| SamplerError::SourceUnavailable {
6✔
NEW
1389
                source_id: config.source_id.clone(),
×
NEW
1390
                reason: format!("failed reading parquet metadata {}: {err}", path.display()),
×
NEW
1391
            })?;
×
1392

1393
        let mut row_groups = Vec::new();
6✔
1394
        let mut running = 0usize;
6✔
1395
        for meta in reader.metadata().row_groups() {
6✔
1396
            let group_rows =
6✔
1397
                usize::try_from(meta.num_rows()).map_err(|_| SamplerError::SourceUnavailable {
6✔
NEW
1398
                    source_id: config.source_id.clone(),
×
NEW
1399
                    reason: format!("parquet row group size overflow in {}", path.display()),
×
NEW
1400
                })?;
×
1401
            if group_rows == 0 {
6✔
1402
                continue;
1✔
1403
            }
5✔
1404
            row_groups.push((running, group_rows));
5✔
1405
            running = running.saturating_add(group_rows);
5✔
1406
        }
1407
        if running > 0 {
6✔
1408
            return Ok((running, row_groups));
5✔
1409
        }
1✔
1410

1411
        let total_rows =
1✔
1412
            usize::try_from(reader.metadata().file_metadata().num_rows()).map_err(|_| {
1✔
NEW
1413
                SamplerError::SourceUnavailable {
×
NEW
1414
                    source_id: config.source_id.clone(),
×
NEW
1415
                    reason: format!("parquet row count overflow in {}", path.display()),
×
NEW
1416
                }
×
NEW
1417
            })?;
×
1418
        if total_rows == 0 {
1✔
1419
            return Ok((0, Vec::new()));
1✔
NEW
1420
        }
×
NEW
1421
        Ok((total_rows, vec![(0, total_rows)]))
×
1422
    }
6✔
1423

1424
    /// Ensure row index is available, expanding remote shard set lazily if needed.
1425
    fn ensure_row_available(&self, idx: usize) -> Result<bool, SamplerError> {
67✔
1426
        loop {
1427
            {
1428
                let state = self
70✔
1429
                    .state
70✔
1430
                    .lock()
70✔
1431
                    .map_err(|_| SamplerError::SourceUnavailable {
70✔
NEW
1432
                        source_id: self.config.source_id.clone(),
×
NEW
1433
                        reason: "huggingface source state lock poisoned".to_string(),
×
NEW
1434
                    })?;
×
1435

1436
                if idx < state.materialized_rows {
70✔
1437
                    return Ok(true);
61✔
1438
                }
9✔
1439

1440
                if self.config.max_rows.is_some_and(|max_rows| idx >= max_rows) {
9✔
1441
                    return Ok(false);
2✔
1442
                }
7✔
1443

1444
                if let Some(candidates) = &state.remote_candidates
7✔
1445
                    && state.next_remote_idx >= candidates.len()
7✔
1446
                {
1447
                    return Ok(false);
4✔
1448
                }
3✔
1449
            }
1450

1451
            let need_candidates = {
3✔
1452
                let state = self
3✔
1453
                    .state
3✔
1454
                    .lock()
3✔
1455
                    .map_err(|_| SamplerError::SourceUnavailable {
3✔
NEW
1456
                        source_id: self.config.source_id.clone(),
×
NEW
1457
                        reason: "huggingface source state lock poisoned".to_string(),
×
NEW
1458
                    })?;
×
1459
                state.remote_candidates.is_none()
3✔
1460
            };
1461

1462
            if need_candidates {
3✔
NEW
1463
                let mut state = self
×
NEW
1464
                    .state
×
NEW
1465
                    .lock()
×
NEW
1466
                    .map_err(|_| SamplerError::SourceUnavailable {
×
NEW
1467
                        source_id: self.config.source_id.clone(),
×
NEW
1468
                        reason: "huggingface source state lock poisoned".to_string(),
×
NEW
1469
                    })?;
×
NEW
1470
                if state.remote_candidates.is_none() {
×
NEW
1471
                    let (mut candidates, candidate_sizes) =
×
NEW
1472
                        Self::list_remote_candidates(&self.config)?;
×
NEW
1473
                    Self::rotate_candidates_deterministically(&self.config, &mut candidates);
×
NEW
1474
                    state.remote_candidates = Some(candidates);
×
NEW
1475
                    state.remote_candidate_sizes = candidate_sizes;
×
NEW
1476
                    state.next_remote_idx = 0;
×
1477

NEW
1478
                    self.persist_shard_sequence_locked(&state)?;
×
1479

NEW
1480
                    let candidate_count = state
×
NEW
1481
                        .remote_candidates
×
NEW
1482
                        .as_ref()
×
NEW
1483
                        .map(|values| values.len())
×
NEW
1484
                        .unwrap_or(0);
×
NEW
1485
                    let bootstrap_needed = state.materialized_rows == 0
×
NEW
1486
                        && candidate_count > 0
×
NEW
1487
                        && state.next_remote_idx == 0;
×
NEW
1488
                    let known_rows = state.materialized_rows;
×
NEW
1489
                    let shard_count = state.shards.len();
×
NEW
1490
                    info!(
×
1491
                        "[triplets:hf] state: candidates={} known_rows={} active_shards={} disk_cap={} min_resident_shards={}",
1492
                        candidate_count,
1493
                        known_rows,
1494
                        shard_count,
NEW
1495
                        self.config
×
NEW
1496
                            .local_disk_cap_bytes
×
NEW
1497
                            .map(|bytes| format!(
×
1498
                                "{:.2} GiB",
NEW
1499
                                bytes as f64 / (1024.0 * 1024.0 * 1024.0)
×
1500
                            ))
NEW
1501
                            .unwrap_or_else(|| "disabled".to_string()),
×
1502
                        self.config.min_resident_shards,
1503
                    );
NEW
1504
                    drop(state);
×
1505

NEW
1506
                    if bootstrap_needed {
×
NEW
1507
                        let bootstrap_target = REMOTE_BOOTSTRAP_SHARDS.min(candidate_count);
×
NEW
1508
                        info!(
×
1509
                            "[triplets:hf] bootstrapping remote shard diversity: target={} shard(s)",
1510
                            bootstrap_target
1511
                        );
NEW
1512
                        for step in 0..bootstrap_target {
×
NEW
1513
                            info!(
×
1514
                                "[triplets:hf] bootstrap progress: {}/{}",
NEW
1515
                                step + 1,
×
1516
                                bootstrap_target
1517
                            );
NEW
1518
                            if !self.download_next_remote_shard()? {
×
NEW
1519
                                break;
×
NEW
1520
                            }
×
1521
                        }
NEW
1522
                        info!("[triplets:hf] bootstrap complete");
×
NEW
1523
                    }
×
NEW
1524
                } else {
×
NEW
1525
                    drop(state);
×
NEW
1526
                }
×
NEW
1527
                continue;
×
1528
            }
3✔
1529
            if !self.download_next_remote_shard()? {
3✔
NEW
1530
                return Ok(false);
×
1531
            }
3✔
1532
        }
1533
    }
67✔
1534

1535
    /// Download and register the next remote shard candidate.
1536
    fn download_next_remote_shard(&self) -> Result<bool, SamplerError> {
8✔
1537
        let (remote_ordinal, remote_total, remote_path, expected_bytes) = {
8✔
1538
            let mut state = self
8✔
1539
                .state
8✔
1540
                .lock()
8✔
1541
                .map_err(|_| SamplerError::SourceUnavailable {
8✔
NEW
1542
                    source_id: self.config.source_id.clone(),
×
NEW
1543
                    reason: "huggingface source state lock poisoned".to_string(),
×
NEW
1544
                })?;
×
1545
            let Some(candidates) = &state.remote_candidates else {
8✔
NEW
1546
                return Ok(false);
×
1547
            };
1548
            if state.next_remote_idx >= candidates.len() {
8✔
NEW
1549
                return Ok(false);
×
1550
            }
8✔
1551
            let sequence_pos = state.next_remote_idx;
8✔
1552
            let remote_ordinal = sequence_pos + 1;
8✔
1553
            let remote_total = candidates.len();
8✔
1554
            let sampler_seed = self.configured_sampler_seed()?;
8✔
1555
            let seed = Self::shard_candidate_seed(&self.config, remote_total, sampler_seed);
8✔
1556
            let mut permutation =
8✔
1557
                crate::source::IndexPermutation::new(remote_total, seed, sequence_pos as u64);
8✔
1558
            let candidate_idx = permutation.next();
8✔
1559
            let remote_path = candidates[candidate_idx].clone();
8✔
1560
            let expected_bytes = state.remote_candidate_sizes.get(&remote_path).copied();
8✔
1561
            state.next_remote_idx += 1;
8✔
1562
            (remote_ordinal, remote_total, remote_path, expected_bytes)
8✔
1563
        };
1564

1565
        info!(
8✔
1566
            "[triplets:hf] lazy downloading shard {}/{}: {}",
1567
            remote_ordinal,
1568
            remote_total,
1569
            remote_path.as_str()
8✔
1570
        );
1571
        let local_path =
8✔
1572
            Self::download_and_materialize_shard(&self.config, &remote_path, expected_bytes)?;
8✔
1573

1574
        let global_start = {
8✔
1575
            let state = self
8✔
1576
                .state
8✔
1577
                .lock()
8✔
1578
                .map_err(|_| SamplerError::SourceUnavailable {
8✔
NEW
1579
                    source_id: self.config.source_id.clone(),
×
NEW
1580
                    reason: "huggingface source state lock poisoned".to_string(),
×
NEW
1581
                })?;
×
1582
            state.materialized_rows
8✔
1583
        };
1584

1585
        let Some(shard) = Self::index_single_shard(&self.config, &local_path, global_start)? else {
8✔
1586
            warn!(
1✔
1587
                "[triplets:hf] downloaded shard had zero rows and was skipped: {}",
1588
                local_path.display()
1✔
1589
            );
1590
            return Ok(true);
1✔
1591
        };
1592

1593
        let mut state = self
7✔
1594
            .state
7✔
1595
            .lock()
7✔
1596
            .map_err(|_| SamplerError::SourceUnavailable {
7✔
NEW
1597
                source_id: self.config.source_id.clone(),
×
NEW
1598
                reason: "huggingface source state lock poisoned".to_string(),
×
NEW
1599
            })?;
×
1600

1601
        if self
7✔
1602
            .config
7✔
1603
            .max_rows
7✔
1604
            .is_some_and(|max_rows| state.materialized_rows >= max_rows)
7✔
1605
        {
1606
            return Ok(true);
1✔
1607
        }
6✔
1608

1609
        let mut rows_to_add = shard.row_count;
6✔
1610
        if let Some(max_rows) = self.config.max_rows {
6✔
1611
            rows_to_add = rows_to_add.min(max_rows.saturating_sub(state.materialized_rows));
1✔
1612
        }
5✔
1613
        if rows_to_add == 0 {
6✔
NEW
1614
            return Ok(true);
×
1615
        }
6✔
1616

1617
        let mut shard = shard;
6✔
1618
        shard.global_start = state.materialized_rows;
6✔
1619
        shard.row_count = rows_to_add;
6✔
1620
        if shard.is_parquet {
6✔
NEW
1621
            shard
×
NEW
1622
                .parquet_row_groups
×
NEW
1623
                .retain(|(start, _)| *start < rows_to_add);
×
NEW
1624
            if let Some((start, count)) = shard.parquet_row_groups.last_mut() {
×
NEW
1625
                let allowed = rows_to_add.saturating_sub(*start);
×
NEW
1626
                *count = (*count).min(allowed);
×
NEW
1627
            }
×
1628
        }
6✔
1629
        state.materialized_rows += rows_to_add;
6✔
1630
        state.shards.push(shard);
6✔
1631

1632
        let evicted_any = self.enforce_disk_cap_locked(&mut state, &local_path)?;
6✔
1633
        self.persist_shard_sequence_locked(&state)?;
6✔
1634
        let materialized_rows = state.materialized_rows;
6✔
1635
        let shard_count = state.shards.len();
6✔
1636
        let remaining_candidates = state
6✔
1637
            .remote_candidates
6✔
1638
            .as_ref()
6✔
1639
            .map(|candidates| candidates.len().saturating_sub(state.next_remote_idx))
6✔
1640
            .unwrap_or(0);
6✔
1641
        let usage_bytes = self.manifest_usage_bytes_locked(&state);
6✔
1642
        let usage_gib = usage_bytes as f64 / (1024.0 * 1024.0 * 1024.0);
6✔
1643
        let cap_str = self
6✔
1644
            .config
6✔
1645
            .local_disk_cap_bytes
6✔
1646
            .map(|bytes| format!("{:.2} GiB", bytes as f64 / (1024.0 * 1024.0 * 1024.0)))
6✔
1647
            .unwrap_or_else(|| "disabled".to_string());
6✔
1648
        drop(state);
6✔
1649

1650
        if evicted_any {
6✔
1651
            if let Ok(mut cache) = self.cache.lock() {
1✔
1652
                cache.rows.clear();
1✔
1653
                cache.order.clear();
1✔
1654
            }
1✔
1655
            if let Ok(mut parquet_cache) = self.parquet_cache.lock() {
1✔
1656
                parquet_cache.readers.clear();
1✔
1657
            }
1✔
1658
        }
5✔
1659

1660
        info!(
6✔
1661
            "[triplets:hf] state: rows={} shards={} remaining_candidates={} disk_usage={:.2} GiB cap={}",
1662
            materialized_rows, shard_count, remaining_candidates, usage_gib, cap_str,
1663
        );
1664

1665
        Ok(true)
6✔
1666
    }
8✔
1667

1668
    /// Copy cached/downloaded source file into snapshot tree.
1669
    fn materialize_local_file(
4✔
1670
        config: &HuggingFaceRowsConfig,
4✔
1671
        source_path: &Path,
4✔
1672
        target_path: &Path,
4✔
1673
    ) -> Result<(), SamplerError> {
4✔
1674
        let resolved_source =
4✔
1675
            fs::canonicalize(source_path).unwrap_or_else(|_| source_path.to_path_buf());
4✔
1676

1677
        if let Some(parent) = target_path.parent() {
4✔
1678
            fs::create_dir_all(parent).map_err(|err| SamplerError::SourceUnavailable {
4✔
NEW
1679
                source_id: config.source_id.clone(),
×
NEW
1680
                reason: format!(
×
1681
                    "failed creating snapshot subdir {}: {err}",
NEW
1682
                    parent.display()
×
1683
                ),
NEW
1684
            })?;
×
NEW
1685
        }
×
1686

1687
        if target_path.exists() {
4✔
1688
            let src_meta =
2✔
1689
                fs::metadata(&resolved_source).map_err(|err| SamplerError::SourceUnavailable {
2✔
NEW
1690
                    source_id: config.source_id.clone(),
×
NEW
1691
                    reason: format!(
×
1692
                        "failed reading source metadata {}: {err}",
NEW
1693
                        resolved_source.display()
×
1694
                    ),
NEW
1695
                })?;
×
1696
            let dst_meta =
2✔
1697
                fs::metadata(target_path).map_err(|err| SamplerError::SourceUnavailable {
2✔
NEW
1698
                    source_id: config.source_id.clone(),
×
NEW
1699
                    reason: format!(
×
1700
                        "failed reading target metadata {}: {err}",
NEW
1701
                        target_path.display()
×
1702
                    ),
NEW
1703
                })?;
×
1704
            if src_meta.len() == dst_meta.len() {
2✔
1705
                return Ok(());
1✔
1706
            }
1✔
1707
            fs::remove_file(target_path).map_err(|err| SamplerError::SourceUnavailable {
1✔
NEW
1708
                source_id: config.source_id.clone(),
×
NEW
1709
                reason: format!(
×
1710
                    "failed replacing target file {}: {err}",
NEW
1711
                    target_path.display()
×
1712
                ),
NEW
1713
            })?;
×
1714
        }
2✔
1715

1716
        fs::copy(&resolved_source, target_path).map_err(|err| SamplerError::SourceUnavailable {
3✔
1717
            source_id: config.source_id.clone(),
1✔
1718
            reason: format!(
1✔
1719
                "failed copying synced file {} -> {}: {err}",
1720
                resolved_source.display(),
1✔
1721
                target_path.display()
1✔
1722
            ),
1723
        })?;
1✔
1724
        Ok(())
2✔
1725
    }
4✔
1726

1727
    /// Build deterministic local shard index for accepted extensions.
1728
    fn build_shard_index(
16✔
1729
        config: &HuggingFaceRowsConfig,
16✔
1730
    ) -> Result<(Vec<ShardIndex>, usize), SamplerError> {
16✔
1731
        let start_index = Instant::now();
16✔
1732
        let mut shard_paths = Vec::new();
16✔
1733
        let manifest_root = config.snapshot_dir.join("_parquet_manifest");
16✔
1734
        let accepted = config
16✔
1735
            .shard_extensions
16✔
1736
            .iter()
16✔
1737
            .map(|ext| ext.trim().trim_start_matches('.').to_ascii_lowercase())
28✔
1738
            .collect::<Vec<_>>();
16✔
1739

1740
        let mut saw_parquet = false;
16✔
1741
        for entry in WalkDir::new(&config.snapshot_dir)
38✔
1742
            .follow_links(true)
16✔
1743
            .into_iter()
16✔
1744
            .filter_map(Result::ok)
16✔
1745
        {
1746
            if !entry.file_type().is_file() {
38✔
1747
                continue;
19✔
1748
            }
19✔
1749
            if entry.path().starts_with(&manifest_root) {
19✔
1750
                continue;
1✔
1751
            }
18✔
1752
            let Some(ext) = entry.path().extension().and_then(|v| v.to_str()) else {
18✔
NEW
1753
                continue;
×
1754
            };
1755
            if ext.eq_ignore_ascii_case("parquet") {
18✔
1756
                saw_parquet = true;
3✔
1757
            }
15✔
1758
            if accepted
18✔
1759
                .iter()
18✔
1760
                .any(|allowed| allowed == &ext.to_ascii_lowercase())
30✔
1761
            {
15✔
1762
                shard_paths.push(entry.path().to_path_buf());
15✔
1763
            }
15✔
1764
        }
1765

1766
        shard_paths.sort();
16✔
1767
        if shard_paths.is_empty() {
16✔
1768
            if saw_parquet && !accepted.iter().any(|value| value == "parquet") {
3✔
1769
                return Err(SamplerError::SourceUnavailable {
1✔
1770
                    source_id: config.source_id.clone(),
1✔
1771
                    reason: format!(
1✔
1772
                        "found parquet files under {}, but shard_extensions does not include parquet.",
1✔
1773
                        config.snapshot_dir.display()
1✔
1774
                    ),
1✔
1775
                });
1✔
1776
            }
2✔
1777
            return Err(SamplerError::SourceUnavailable {
2✔
1778
                source_id: config.source_id.clone(),
2✔
1779
                reason: format!(
2✔
1780
                    "no shard files found under {} with extensions {:?}",
2✔
1781
                    config.snapshot_dir.display(),
2✔
1782
                    config.shard_extensions
2✔
1783
                ),
2✔
1784
            });
2✔
1785
        }
13✔
1786

1787
        let mut indexed_shards = shard_paths
13✔
1788
            .into_par_iter()
13✔
1789
            .enumerate()
13✔
1790
            .map(|(ordinal, path)| {
15✔
1791
                info!(
15✔
1792
                    "[triplets:hf] indexing shard {}: {}",
1793
                    ordinal + 1,
7✔
1794
                    path.display()
7✔
1795
                );
1796
                let shard = Self::index_single_shard(config, &path, 0)?;
15✔
1797
                Ok::<_, SamplerError>((ordinal, shard))
15✔
1798
            })
15✔
1799
            .collect::<Result<Vec<_>, _>>()?;
13✔
1800

1801
        indexed_shards.sort_by_key(|(ordinal, _)| *ordinal);
13✔
1802

1803
        let mut shards = Vec::new();
13✔
1804
        let mut running_total = 0usize;
13✔
1805
        for (_, maybe_shard) in indexed_shards {
15✔
1806
            let Some(mut shard) = maybe_shard else {
15✔
1807
                continue;
1✔
1808
            };
1809

1810
            if let Some(max_rows) = config.max_rows {
14✔
1811
                if running_total >= max_rows {
10✔
NEW
1812
                    break;
×
1813
                }
10✔
1814
                let allowed = max_rows.saturating_sub(running_total);
10✔
1815
                if shard.row_count > allowed {
10✔
1816
                    shard.row_count = allowed;
3✔
1817
                    if shard.is_parquet {
3✔
1818
                        shard
1✔
1819
                            .parquet_row_groups
1✔
1820
                            .retain(|(start, _)| *start < shard.row_count);
1✔
1821
                        if let Some((start, count)) = shard.parquet_row_groups.last_mut() {
1✔
1822
                            let group_allowed = shard.row_count.saturating_sub(*start);
1✔
1823
                            *count = (*count).min(group_allowed);
1✔
1824
                        }
1✔
1825
                    }
2✔
1826
                }
7✔
1827
            }
4✔
1828

1829
            if shard.row_count == 0 {
14✔
NEW
1830
                continue;
×
1831
            }
14✔
1832

1833
            shard.global_start = running_total;
14✔
1834
            running_total = running_total.saturating_add(shard.row_count);
14✔
1835
            shards.push(shard);
14✔
1836
        }
1837

1838
        info!(
13✔
1839
            "[triplets:hf] indexing complete in {:.2}s (rows={}, shards={})",
1840
            start_index.elapsed().as_secs_f64(),
5✔
1841
            running_total,
1842
            shards.len()
5✔
1843
        );
1844

1845
        Ok((shards, running_total))
13✔
1846
    }
16✔
1847

1848
    /// Locate containing shard and local offset for a global row index.
1849
    fn locate_shard(shards: &[ShardIndex], idx: usize) -> Option<(&ShardIndex, usize)> {
54✔
1850
        let pos = shards
54✔
1851
            .binary_search_by(|shard| {
54✔
1852
                if idx < shard.global_start {
54✔
1853
                    Ordering::Greater
1✔
1854
                } else if idx >= shard.global_start + shard.row_count {
53✔
1855
                    Ordering::Less
1✔
1856
                } else {
1857
                    Ordering::Equal
52✔
1858
                }
1859
            })
54✔
1860
            .ok()?;
54✔
1861
        let shard = shards.get(pos)?;
52✔
1862
        Some((shard, idx - shard.global_start))
52✔
1863
    }
54✔
1864

1865
    /// Read one JSONL/NDJSON line at a local row offset using checkpoints.
1866
    fn read_line_at(&self, shard: &ShardIndex, local_idx: usize) -> Result<String, SamplerError> {
48✔
1867
        let checkpoint_idx = local_idx / self.config.checkpoint_stride;
48✔
1868
        let checkpoint_line = checkpoint_idx * self.config.checkpoint_stride;
48✔
1869
        let seek_offset = *shard.checkpoints.get(checkpoint_idx).ok_or_else(|| {
48✔
1870
            SamplerError::SourceUnavailable {
2✔
1871
                source_id: self.config.source_id.clone(),
2✔
1872
                reason: format!(
2✔
1873
                    "missing checkpoint for shard {} line {}",
2✔
1874
                    shard.path.display(),
2✔
1875
                    local_idx
2✔
1876
                ),
2✔
1877
            }
2✔
1878
        })?;
2✔
1879

1880
        let mut file = File::open(&shard.path).map_err(|err| SamplerError::SourceUnavailable {
46✔
NEW
1881
            source_id: self.config.source_id.clone(),
×
NEW
1882
            reason: format!("failed opening shard {}: {err}", shard.path.display()),
×
NEW
1883
        })?;
×
1884
        file.seek(SeekFrom::Start(seek_offset))
46✔
1885
            .map_err(|err| SamplerError::SourceUnavailable {
46✔
NEW
1886
                source_id: self.config.source_id.clone(),
×
NEW
1887
                reason: format!("failed seeking shard {}: {err}", shard.path.display()),
×
NEW
1888
            })?;
×
1889

1890
        let mut reader = BufReader::new(file);
46✔
1891
        let mut line = String::new();
46✔
1892
        for _ in checkpoint_line..local_idx {
46✔
1893
            line.clear();
184✔
1894
            let bytes =
184✔
1895
                reader
184✔
1896
                    .read_line(&mut line)
184✔
1897
                    .map_err(|err| SamplerError::SourceUnavailable {
184✔
NEW
1898
                        source_id: self.config.source_id.clone(),
×
NEW
1899
                        reason: format!("failed scanning shard {}: {err}", shard.path.display()),
×
NEW
1900
                    })?;
×
1901
            if bytes == 0 {
184✔
NEW
1902
                return Err(SamplerError::SourceUnavailable {
×
NEW
1903
                    source_id: self.config.source_id.clone(),
×
NEW
1904
                    reason: format!(
×
NEW
1905
                        "unexpected EOF while scanning shard {} at row {}",
×
NEW
1906
                        shard.path.display(),
×
NEW
1907
                        local_idx
×
NEW
1908
                    ),
×
NEW
1909
                });
×
1910
            }
184✔
1911
        }
1912

1913
        line.clear();
46✔
1914
        let bytes = reader
46✔
1915
            .read_line(&mut line)
46✔
1916
            .map_err(|err| SamplerError::SourceUnavailable {
46✔
NEW
1917
                source_id: self.config.source_id.clone(),
×
NEW
1918
                reason: format!("failed reading shard {}: {err}", shard.path.display()),
×
NEW
1919
            })?;
×
1920
        if bytes == 0 {
46✔
1921
            return Err(SamplerError::SourceUnavailable {
1✔
1922
                source_id: self.config.source_id.clone(),
1✔
1923
                reason: format!(
1✔
1924
                    "unexpected EOF while reading shard {} row {}",
1✔
1925
                    shard.path.display(),
1✔
1926
                    local_idx
1✔
1927
                ),
1✔
1928
            });
1✔
1929
        }
45✔
1930
        Ok(line)
45✔
1931
    }
48✔
1932

1933
    /// Locate parquet row-group and in-group row offset for a local row index.
1934
    fn locate_parquet_group(
9✔
1935
        &self,
9✔
1936
        shard: &ShardIndex,
9✔
1937
        local_idx: usize,
9✔
1938
    ) -> Result<(usize, usize), SamplerError> {
9✔
1939
        let group_pos = shard
9✔
1940
            .parquet_row_groups
9✔
1941
            .binary_search_by(|(start, count)| {
13✔
1942
                if local_idx < *start {
13✔
1943
                    Ordering::Greater
1✔
1944
                } else if local_idx >= start.saturating_add(*count) {
12✔
1945
                    Ordering::Less
3✔
1946
                } else {
1947
                    Ordering::Equal
9✔
1948
                }
1949
            })
13✔
1950
            .map_err(|_| SamplerError::SourceUnavailable {
9✔
1951
                source_id: self.config.source_id.clone(),
1✔
1952
                reason: format!(
1✔
1953
                    "parquet row {} could not be mapped to a row group in {}",
1954
                    local_idx,
1955
                    shard.path.display()
1✔
1956
                ),
1957
            })?;
1✔
1958
        let (group_start, _) = shard.parquet_row_groups[group_pos];
8✔
1959
        Ok((group_pos, local_idx.saturating_sub(group_start)))
8✔
1960
    }
9✔
1961

1962
    /// Convert a serde JSON value into non-empty text when possible.
1963
    fn value_to_text(value: &Value) -> Option<String> {
121✔
1964
        match value {
121✔
1965
            Value::Null => None,
1✔
1966
            Value::String(s) => {
112✔
1967
                if s.trim().is_empty() {
112✔
1968
                    None
2✔
1969
                } else {
1970
                    Some(s.clone())
110✔
1971
                }
1972
            }
1973
            Value::Bool(b) => Some(b.to_string()),
2✔
1974
            Value::Number(n) => Some(n.to_string()),
5✔
1975
            Value::Array(_) | Value::Object(_) => Some(value.to_string()),
1✔
1976
        }
1977
    }
121✔
1978

1979
    /// Parse a raw row payload into normalized `RowView` fields.
1980
    fn parse_row(&self, absolute_idx: usize, row_value: &Value) -> Result<RowView, SamplerError> {
55✔
1981
        let row_payload = row_value.get("row").unwrap_or(row_value);
55✔
1982
        let row_obj = row_payload
55✔
1983
            .as_object()
55✔
1984
            .ok_or_else(|| SamplerError::SourceUnavailable {
55✔
1985
                source_id: self.config.source_id.clone(),
1✔
1986
                reason: "snapshot row entry missing JSON object payload".to_string(),
1✔
1987
            })?;
1✔
1988

1989
        let row_id = self
54✔
1990
            .config
54✔
1991
            .id_column
54✔
1992
            .as_ref()
54✔
1993
            .and_then(|col| row_obj.get(col))
54✔
1994
            .and_then(Self::value_to_text)
54✔
1995
            .unwrap_or_else(|| {
54✔
1996
                format!(
8✔
1997
                    "{}:{}:{}",
1998
                    self.config.dataset, self.config.split, absolute_idx
1999
                )
2000
            });
8✔
2001

2002
        let mut text_fields = Vec::new();
54✔
2003
        let use_role_columns = self.config.anchor_column.is_some()
54✔
2004
            || self.config.positive_column.is_some()
47✔
2005
            || !self.config.context_columns.is_empty();
47✔
2006

2007
        if use_role_columns {
54✔
2008
            if let Some(name) = &self.config.anchor_column {
7✔
2009
                let value = row_obj
7✔
2010
                    .get(name)
7✔
2011
                    .ok_or_else(|| SamplerError::SourceInconsistent {
7✔
NEW
2012
                        source_id: self.config.source_id.clone(),
×
NEW
2013
                        details: format!("missing configured anchor column '{name}'"),
×
NEW
2014
                    })?;
×
2015
                let text =
6✔
2016
                    Self::value_to_text(value).ok_or_else(|| SamplerError::SourceInconsistent {
7✔
2017
                        source_id: self.config.source_id.clone(),
1✔
2018
                        details: format!("configured anchor column '{name}' has null/empty value"),
1✔
2019
                    })?;
1✔
2020
                text_fields.push(RowTextField {
6✔
2021
                    name: name.clone(),
6✔
2022
                    text,
6✔
2023
                });
6✔
NEW
2024
            }
×
2025

2026
            if let Some(name) = &self.config.positive_column {
6✔
2027
                let value = row_obj
5✔
2028
                    .get(name)
5✔
2029
                    .ok_or_else(|| SamplerError::SourceInconsistent {
5✔
2030
                        source_id: self.config.source_id.clone(),
1✔
2031
                        details: format!("missing configured positive column '{name}'"),
1✔
2032
                    })?;
1✔
2033
                let text =
4✔
2034
                    Self::value_to_text(value).ok_or_else(|| SamplerError::SourceInconsistent {
4✔
NEW
2035
                        source_id: self.config.source_id.clone(),
×
NEW
2036
                        details: format!(
×
2037
                            "configured positive column '{name}' has null/empty value"
2038
                        ),
NEW
2039
                    })?;
×
2040
                text_fields.push(RowTextField {
4✔
2041
                    name: name.clone(),
4✔
2042
                    text,
4✔
2043
                });
4✔
2044
            }
1✔
2045

2046
            for name in &self.config.context_columns {
8✔
2047
                let value = row_obj
8✔
2048
                    .get(name)
8✔
2049
                    .ok_or_else(|| SamplerError::SourceInconsistent {
8✔
2050
                        source_id: self.config.source_id.clone(),
2✔
2051
                        details: format!("missing configured context column '{name}'"),
2✔
2052
                    })?;
2✔
2053
                let text =
6✔
2054
                    Self::value_to_text(value).ok_or_else(|| SamplerError::SourceInconsistent {
6✔
NEW
2055
                        source_id: self.config.source_id.clone(),
×
NEW
2056
                        details: format!("configured context column '{name}' has null/empty value"),
×
NEW
2057
                    })?;
×
2058
                text_fields.push(RowTextField {
6✔
2059
                    name: name.clone(),
6✔
2060
                    text,
6✔
2061
                });
6✔
2062
            }
2063
        } else if self.config.text_columns.is_empty() {
47✔
2064
            for (name, value) in row_obj {
71✔
2065
                if self.config.id_column.as_ref().is_some_and(|id| id == name) {
71✔
2066
                    continue;
34✔
2067
                }
37✔
2068
                if let Some(text) = Self::value_to_text(value) {
37✔
2069
                    text_fields.push(RowTextField {
37✔
2070
                        name: name.clone(),
37✔
2071
                        text,
37✔
2072
                    });
37✔
2073
                }
37✔
2074
            }
2075
        } else {
2076
            for name in &self.config.text_columns {
15✔
2077
                let value = row_obj
15✔
2078
                    .get(name)
15✔
2079
                    .ok_or_else(|| SamplerError::SourceInconsistent {
15✔
NEW
2080
                        source_id: self.config.source_id.clone(),
×
NEW
2081
                        details: format!("missing configured text column '{name}'"),
×
NEW
2082
                    })?;
×
2083
                let text =
15✔
2084
                    Self::value_to_text(value).ok_or_else(|| SamplerError::SourceInconsistent {
15✔
NEW
2085
                        source_id: self.config.source_id.clone(),
×
NEW
2086
                        details: format!("configured text column '{name}' has null/empty value"),
×
NEW
2087
                    })?;
×
2088
                text_fields.push(RowTextField {
15✔
2089
                    name: name.clone(),
15✔
2090
                    text,
15✔
2091
                });
15✔
2092
            }
2093
        }
2094

2095
        if text_fields.is_empty() {
50✔
NEW
2096
            return Err(SamplerError::SourceInconsistent {
×
NEW
2097
                source_id: self.config.source_id.clone(),
×
NEW
2098
                details: "row resolved to zero text fields".to_string(),
×
NEW
2099
            });
×
2100
        }
50✔
2101

2102
        Ok(RowView {
50✔
2103
            row_id: Some(row_id),
50✔
2104
            timestamp: None,
50✔
2105
            text_fields,
50✔
2106
        })
50✔
2107
    }
55✔
2108

2109
    /// Convert a `RowView` into a sampler `DataRecord`.
2110
    fn row_to_record(
55✔
2111
        &self,
55✔
2112
        row: &RowView,
55✔
2113
        row_index: u64,
55✔
2114
    ) -> Result<Option<DataRecord>, SamplerError> {
55✔
2115
        if row.text_fields.is_empty() {
55✔
2116
            return Ok(None);
1✔
2117
        }
54✔
2118

2119
        let record_id = row
54✔
2120
            .row_id
54✔
2121
            .as_ref()
54✔
2122
            .cloned()
54✔
2123
            .unwrap_or_else(|| format!("row_{row_index}"));
54✔
2124
        let id = format!("{}::{}", self.config.source_id, record_id);
54✔
2125

2126
        let mut sections = Vec::new();
54✔
2127
        let anchor = &row.text_fields[0];
54✔
2128
        sections.push(make_section(
54✔
2129
            SectionRole::Anchor,
54✔
2130
            Some(anchor.name.as_str()),
54✔
2131
            anchor.text.as_str(),
54✔
2132
        ));
2133

2134
        let positive = row.text_fields.get(1).unwrap_or(anchor);
54✔
2135
        sections.push(make_section(
54✔
2136
            SectionRole::Context,
54✔
2137
            Some(positive.name.as_str()),
54✔
2138
            positive.text.as_str(),
54✔
2139
        ));
2140

2141
        for field in row.text_fields.iter().skip(2) {
54✔
2142
            sections.push(make_section(
5✔
2143
                SectionRole::Context,
5✔
2144
                Some(field.name.as_str()),
5✔
2145
                field.text.as_str(),
5✔
2146
            ));
5✔
2147
        }
5✔
2148

2149
        let timestamp = row.timestamp.unwrap_or(DateTime::<Utc>::UNIX_EPOCH);
54✔
2150
        Ok(Some(DataRecord {
54✔
2151
            id,
54✔
2152
            source: self.config.source_id.clone(),
54✔
2153
            created_at: timestamp,
54✔
2154
            updated_at: timestamp,
54✔
2155
            quality: QualityScore::default(),
54✔
2156
            taxonomy: vec![
54✔
2157
                format!("dataset={}", self.config.dataset),
54✔
2158
                format!("config={}", self.config.config),
54✔
2159
                format!("split={}", self.config.split),
54✔
2160
            ],
54✔
2161
            sections,
54✔
2162
            meta_prefix: None,
54✔
2163
        }))
54✔
2164
    }
55✔
2165

2166
    /// Materialize records for requested indices into output buffer.
2167
    fn read_row_batch(
25✔
2168
        &self,
25✔
2169
        indices: &[usize],
25✔
2170
        out: &mut Vec<DataRecord>,
25✔
2171
        limit: Option<usize>,
25✔
2172
    ) -> Result<(), SamplerError> {
25✔
2173
        let mut sorted = indices.to_vec();
25✔
2174
        sorted.sort_unstable();
25✔
2175

2176
        let mut fetched = HashMap::with_capacity(sorted.len());
25✔
2177
        let mut pending = Vec::new();
25✔
2178
        for idx in &sorted {
59✔
2179
            if !self.ensure_row_available(*idx)? {
59✔
2180
                fetched.insert(*idx, None);
2✔
2181
                continue;
2✔
2182
            }
57✔
2183

2184
            if let Some(row) = self
57✔
2185
                .cache
57✔
2186
                .lock()
57✔
2187
                .map_err(|_| SamplerError::SourceUnavailable {
57✔
NEW
2188
                    source_id: self.config.source_id.clone(),
×
NEW
2189
                    reason: "huggingface row cache lock poisoned".to_string(),
×
NEW
2190
                })?
×
2191
                .get(*idx)
57✔
2192
            {
2193
                let record = self.row_to_record(&row, *idx as u64)?;
5✔
2194
                fetched.insert(*idx, record);
5✔
2195
                continue;
5✔
2196
            }
52✔
2197

2198
            pending.push(*idx);
52✔
2199
        }
2200

2201
        if !pending.is_empty() {
25✔
2202
            let resolutions =
20✔
2203
                {
2204
                    let state = self
21✔
2205
                        .state
21✔
2206
                        .lock()
21✔
2207
                        .map_err(|_| SamplerError::SourceUnavailable {
21✔
NEW
2208
                            source_id: self.config.source_id.clone(),
×
NEW
2209
                            reason: "huggingface source state lock poisoned".to_string(),
×
NEW
2210
                        })?;
×
2211
                    let mut resolved = Vec::with_capacity(pending.len());
21✔
2212
                    for idx in &pending {
52✔
2213
                        let (shard, local_idx) = Self::locate_shard(&state.shards, *idx)
52✔
2214
                            .ok_or_else(|| SamplerError::SourceUnavailable {
52✔
2215
                                source_id: self.config.source_id.clone(),
1✔
2216
                                reason: format!("row index out of range: {idx}"),
1✔
2217
                            })?;
1✔
2218
                        resolved.push((*idx, shard.clone(), local_idx));
51✔
2219
                    }
2220
                    resolved
20✔
2221
                };
2222

2223
            let mut parquet_groups: HashMap<ParquetGroupKey, Vec<ParquetGroupRequest>> =
20✔
2224
                HashMap::new();
20✔
2225
            for (idx, shard, local_idx) in resolutions {
51✔
2226
                if shard.is_parquet {
51✔
2227
                    let (group_pos, local_in_group) =
7✔
2228
                        self.locate_parquet_group(&shard, local_idx)?;
7✔
2229
                    parquet_groups
7✔
2230
                        .entry((shard.path.clone(), group_pos))
7✔
2231
                        .or_default()
7✔
2232
                        .push((idx, local_in_group, shard));
7✔
2233
                    continue;
7✔
2234
                }
44✔
2235

2236
                let line = self.read_line_at(&shard, local_idx)?;
44✔
2237
                let row_value = serde_json::from_str::<Value>(line.trim()).map_err(|err| {
44✔
2238
                    SamplerError::SourceInconsistent {
3✔
2239
                        source_id: self.config.source_id.clone(),
3✔
2240
                        details: format!(
3✔
2241
                            "failed decoding JSON row from shard {} at local index {}: {err}",
3✔
2242
                            shard.path.display(),
3✔
2243
                            local_idx
3✔
2244
                        ),
3✔
2245
                    }
3✔
2246
                })?;
3✔
2247
                let row = self.parse_row(idx, &row_value)?;
41✔
2248
                let record = self.row_to_record(&row, idx as u64)?;
40✔
2249
                self.cache
40✔
2250
                    .lock()
40✔
2251
                    .map_err(|_| SamplerError::SourceUnavailable {
40✔
NEW
2252
                        source_id: self.config.source_id.clone(),
×
NEW
2253
                        reason: "huggingface row cache lock poisoned".to_string(),
×
NEW
2254
                    })?
×
2255
                    .insert(idx, row, self.config.cache_capacity);
40✔
2256
                fetched.insert(idx, record);
40✔
2257
            }
2258

2259
            for ((shard_path, group_pos), mut requested) in parquet_groups {
16✔
2260
                requested.sort_by_key(|(_, local_in_group, _)| *local_in_group);
4✔
2261
                let shard = requested
4✔
2262
                    .first()
4✔
2263
                    .map(|(_, _, shard)| shard.clone())
4✔
2264
                    .ok_or_else(|| SamplerError::SourceUnavailable {
4✔
NEW
2265
                        source_id: self.config.source_id.clone(),
×
NEW
2266
                        reason: format!(
×
2267
                            "missing parquet request metadata for shard {} row_group {}",
NEW
2268
                            shard_path.display(),
×
2269
                            group_pos
2270
                        ),
NEW
2271
                    })?;
×
2272

2273
                let mut targets: BTreeMap<usize, Vec<usize>> = BTreeMap::new();
4✔
2274
                for (idx, local_in_group, _) in requested {
7✔
2275
                    targets.entry(local_in_group).or_default().push(idx);
7✔
2276
                }
7✔
2277
                let max_target = targets.keys().next_back().copied().unwrap_or(0);
4✔
2278

2279
                let reader = self
4✔
2280
                    .parquet_cache
4✔
2281
                    .lock()
4✔
2282
                    .map_err(|_| SamplerError::SourceUnavailable {
4✔
NEW
2283
                        source_id: self.config.source_id.clone(),
×
NEW
2284
                        reason: "huggingface parquet cache lock poisoned".to_string(),
×
NEW
2285
                    })?
×
2286
                    .reader_for(&self.config.source_id, &shard.path)?;
4✔
2287

2288
                let row_group = reader.get_row_group(group_pos).map_err(|err| {
3✔
NEW
2289
                    SamplerError::SourceUnavailable {
×
NEW
2290
                        source_id: self.config.source_id.clone(),
×
NEW
2291
                        reason: format!(
×
NEW
2292
                            "failed opening parquet row group {} for {}: {err}",
×
NEW
2293
                            group_pos,
×
NEW
2294
                            shard.path.display()
×
NEW
2295
                        ),
×
NEW
2296
                    }
×
NEW
2297
                })?;
×
2298
                let iter = RowIter::from_row_group(None, row_group.as_ref()).map_err(|err| {
3✔
NEW
2299
                    SamplerError::SourceUnavailable {
×
NEW
2300
                        source_id: self.config.source_id.clone(),
×
NEW
2301
                        reason: format!(
×
NEW
2302
                            "failed iterating parquet row group {} for {}: {err}",
×
NEW
2303
                            group_pos,
×
NEW
2304
                            shard.path.display()
×
NEW
2305
                        ),
×
NEW
2306
                    }
×
NEW
2307
                })?;
×
2308

2309
                for (position, row_result) in iter.enumerate() {
6✔
2310
                    if position > max_target {
6✔
NEW
2311
                        break;
×
2312
                    }
6✔
2313
                    let Some(indices_for_position) = targets.remove(&position) else {
6✔
2314
                        continue;
1✔
2315
                    };
2316
                    let row_value = row_result.map_err(|err| SamplerError::SourceUnavailable {
5✔
NEW
2317
                        source_id: self.config.source_id.clone(),
×
NEW
2318
                        reason: format!(
×
2319
                            "failed reading parquet row {} in shard {} row_group {}: {err}",
2320
                            position,
NEW
2321
                            shard.path.display(),
×
2322
                            group_pos
2323
                        ),
NEW
2324
                    })?;
×
2325
                    let row_value = row_value.to_json_value();
5✔
2326

2327
                    for idx in indices_for_position {
5✔
2328
                        let row = self.parse_row(idx, &row_value)?;
5✔
2329
                        let record = self.row_to_record(&row, idx as u64)?;
5✔
2330
                        self.cache
5✔
2331
                            .lock()
5✔
2332
                            .map_err(|_| SamplerError::SourceUnavailable {
5✔
NEW
2333
                                source_id: self.config.source_id.clone(),
×
NEW
2334
                                reason: "huggingface row cache lock poisoned".to_string(),
×
NEW
2335
                            })?
×
2336
                            .insert(idx, row, self.config.cache_capacity);
5✔
2337
                        fetched.insert(idx, record);
5✔
2338
                    }
2339

2340
                    if targets.is_empty() {
5✔
2341
                        break;
2✔
2342
                    }
3✔
2343
                }
2344

2345
                if !targets.is_empty() {
3✔
2346
                    let missing = targets
1✔
2347
                        .into_keys()
1✔
2348
                        .map(|value| value.to_string())
1✔
2349
                        .collect::<Vec<_>>()
1✔
2350
                        .join(",");
1✔
2351
                    return Err(SamplerError::SourceUnavailable {
1✔
2352
                        source_id: self.config.source_id.clone(),
1✔
2353
                        reason: format!(
1✔
2354
                            "parquet rows missing in shard {} row_group {} at local offsets [{}]",
1✔
2355
                            shard.path.display(),
1✔
2356
                            group_pos,
1✔
2357
                            missing
1✔
2358
                        ),
1✔
2359
                    });
1✔
2360
                }
2✔
2361
            }
2362
        }
4✔
2363

2364
        for idx in indices {
52✔
2365
            if limit.is_some_and(|max| out.len() >= max) {
52✔
2366
                break;
1✔
2367
            }
51✔
2368
            if let Some(record) = fetched.remove(idx).flatten() {
51✔
2369
                out.push(record);
49✔
2370
            }
49✔
2371
        }
2372
        Ok(())
18✔
2373
    }
25✔
2374

2375
    /// Return the current index-domain upper bound for refresh paging.
2376
    fn len_hint(&self) -> Option<usize> {
27✔
2377
        let state = self.state.lock().ok()?;
27✔
2378
        let known = state.materialized_rows;
27✔
2379
        if known > 0 {
27✔
2380
            let mut upper = known;
22✔
2381
            if state
22✔
2382
                .total_rows
22✔
2383
                .is_some_and(|total_rows| total_rows > known)
22✔
2384
            {
2385
                let headroom = self.effective_expansion_headroom_rows();
4✔
2386
                upper = known.saturating_add(headroom);
4✔
2387
                if let Some(total_rows) = state.total_rows {
4✔
2388
                    upper = upper.min(total_rows);
4✔
2389
                }
4✔
2390
            }
18✔
2391
            if let Some(max_rows) = self.config.max_rows {
22✔
2392
                upper = upper.min(max_rows);
12✔
2393
            }
12✔
2394
            return Some(upper.max(known));
22✔
2395
        }
5✔
2396

2397
        if state.total_rows.is_some_and(|total_rows| total_rows == 0) {
5✔
2398
            return Some(0);
2✔
2399
        }
3✔
2400

2401
        if state
3✔
2402
            .remote_candidates
3✔
2403
            .as_ref()
3✔
2404
            .is_some_and(|candidates| candidates.is_empty())
3✔
2405
        {
NEW
2406
            return Some(0);
×
2407
        }
3✔
2408

2409
        if self.config.max_rows.is_some_and(|max_rows| max_rows == 0) {
3✔
2410
            return Some(0);
1✔
2411
        }
2✔
2412

2413
        Some(1)
2✔
2414
    }
27✔
2415
}
2416

2417
impl DataSource for HuggingFaceRowSource {
2418
    /// Return stable source id.
NEW
2419
    fn id(&self) -> &str {
×
NEW
2420
        &self.config.source_id
×
NEW
2421
    }
×
2422

2423
    /// Refresh source records for the requested cursor and row limit.
2424
    fn refresh(
17✔
2425
        &self,
17✔
2426
        config: &SamplerConfig,
17✔
2427
        cursor: Option<&SourceCursor>,
17✔
2428
        limit: Option<usize>,
17✔
2429
    ) -> Result<SourceSnapshot, SamplerError> {
17✔
2430
        self.set_active_sampler_config(config);
17✔
2431
        let total = self
17✔
2432
            .len_hint()
17✔
2433
            .ok_or_else(|| SamplerError::SourceInconsistent {
17✔
NEW
2434
                source_id: self.config.source_id.clone(),
×
NEW
2435
                details: "huggingface source did not provide len_hint".to_string(),
×
NEW
2436
            })?;
×
2437

2438
        if total == 0 {
17✔
2439
            return Ok(SourceSnapshot {
1✔
2440
                records: Vec::new(),
1✔
2441
                cursor: SourceCursor {
1✔
2442
                    last_seen: Utc::now(),
1✔
2443
                    revision: 0,
1✔
2444
                },
1✔
2445
            });
1✔
2446
        }
16✔
2447

2448
        let max = limit.unwrap_or(total);
16✔
2449
        let mut start = cursor.map(|state| state.revision as usize).unwrap_or(0);
16✔
2450
        if start >= total {
16✔
2451
            start = 0;
2✔
2452
        }
14✔
2453

2454
        let source_id = self.config.source_id.clone();
16✔
2455
        let seed = self.paging_seed(total)?;
16✔
2456
        let mut permutation = crate::source::IndexPermutation::new(total, seed, start as u64);
16✔
2457

2458
        let mut records = Vec::new();
16✔
2459
        let read_batch_target = self.effective_refresh_batch_target(max);
16✔
2460
        let mut pending_indices = Vec::with_capacity(read_batch_target);
16✔
2461
        let should_report = total >= 10_000 || max >= 1_024;
16✔
2462
        let report_every = Duration::from_millis(750);
16✔
2463
        let refresh_start = Instant::now();
16✔
2464
        let mut last_report = refresh_start;
16✔
2465
        let mut attempts = 0usize;
16✔
2466

2467
        if should_report {
16✔
2468
            info!(
1✔
2469
                "[triplets:source] refresh start source='{}' total={} target={}",
2470
                source_id, total, max
2471
            );
2472
        }
15✔
2473

2474
        while attempts < total && records.len() < max {
30✔
2475
            pending_indices.clear();
16✔
2476
            let remaining_attempts = total.saturating_sub(attempts);
16✔
2477
            let to_collect = read_batch_target.min(remaining_attempts);
16✔
2478
            for _ in 0..to_collect {
16✔
2479
                if records.len() + pending_indices.len() >= max {
48✔
2480
                    break;
2✔
2481
                }
46✔
2482
                pending_indices.push(permutation.next());
46✔
2483
                attempts += 1;
46✔
2484
            }
2485

2486
            if pending_indices.is_empty() {
16✔
NEW
2487
                break;
×
2488
            }
16✔
2489

2490
            if should_report {
16✔
2491
                info!(
1✔
2492
                    "[triplets:source] refresh batch source='{}' batch_size={} attempted={} fetched={} elapsed={:.1}s",
2493
                    source_id,
2494
                    pending_indices.len(),
1✔
2495
                    attempts,
2496
                    records.len(),
1✔
2497
                    refresh_start.elapsed().as_secs_f64()
1✔
2498
                );
2499
            }
15✔
2500

2501
            self.read_row_batch(&pending_indices, &mut records, Some(max))?;
16✔
2502

2503
            if should_report && last_report.elapsed() >= report_every {
14✔
NEW
2504
                info!(
×
2505
                    "[triplets:source] refresh progress source='{}' attempted={}/{} fetched={}/{} elapsed={:.1}s",
2506
                    source_id,
2507
                    attempts,
2508
                    total,
NEW
2509
                    records.len(),
×
2510
                    max,
NEW
2511
                    refresh_start.elapsed().as_secs_f64()
×
2512
                );
NEW
2513
                last_report = Instant::now();
×
2514
            }
14✔
2515
        }
2516

2517
        if should_report {
14✔
2518
            info!(
1✔
2519
                "[triplets:source] refresh done source='{}' attempted={} fetched={} elapsed={:.2}s",
2520
                source_id,
2521
                attempts,
2522
                records.len(),
1✔
2523
                refresh_start.elapsed().as_secs_f64()
1✔
2524
            );
2525
        }
13✔
2526

2527
        let next_start = permutation.cursor();
14✔
2528
        let last_seen = records
14✔
2529
            .iter()
14✔
2530
            .map(|record| record.updated_at)
14✔
2531
            .max()
14✔
2532
            .unwrap_or_else(Utc::now);
14✔
2533

2534
        Ok(SourceSnapshot {
14✔
2535
            records,
14✔
2536
            cursor: SourceCursor {
14✔
2537
                last_seen,
14✔
2538
                revision: next_start as u64,
14✔
2539
            },
14✔
2540
        })
14✔
2541
    }
17✔
2542

2543
    /// Return exact reported record count from current len hint.
2544
    fn reported_record_count(&self, config: &SamplerConfig) -> Result<u128, SamplerError> {
5✔
2545
        self.set_active_sampler_config(config);
5✔
2546
        self.len_hint()
5✔
2547
            .map(|count| count as u128)
5✔
2548
            .ok_or_else(|| SamplerError::SourceInconsistent {
5✔
NEW
2549
                source_id: self.config.source_id.clone(),
×
NEW
2550
                details: "huggingface source did not provide len_hint".to_string(),
×
NEW
2551
            })
×
2552
    }
5✔
2553

2554
    /// Return default triplet recipe used by Hugging Face row sources.
2555
    fn default_triplet_recipes(&self) -> Vec<TripletRecipe> {
1✔
2556
        vec![TripletRecipe {
1✔
2557
            name: "huggingface_anchor_context".into(),
1✔
2558
            anchor: Selector::Role(SectionRole::Anchor),
1✔
2559
            positive_selector: Selector::Role(SectionRole::Context),
1✔
2560
            negative_selector: Selector::Role(SectionRole::Context),
1✔
2561
            negative_strategy: NegativeStrategy::WrongArticle,
1✔
2562
            weight: 1.0,
1✔
2563
            instruction: None,
1✔
2564
        }]
1✔
2565
    }
1✔
2566
}
2567

2568
#[cfg(test)]
2569
mod tests {
2570
    use super::*;
2571
    use parquet::data_type::{ByteArray, ByteArrayType};
2572
    use parquet::file::properties::WriterProperties;
2573
    use parquet::file::writer::SerializedFileWriter;
2574
    use parquet::schema::parser::parse_message_type;
2575
    use serde_json::json;
2576
    use std::env;
2577
    use std::io::{Read, Write};
2578
    use std::net::TcpListener;
2579
    use std::sync::OnceLock;
2580
    use std::thread;
2581
    use tempfile::tempdir;
2582

2583
    fn test_config(snapshot_dir: PathBuf) -> HuggingFaceRowsConfig {
123✔
2584
        let mut config =
123✔
2585
            HuggingFaceRowsConfig::new("hf_test", "org/dataset", "default", "train", snapshot_dir);
123✔
2586
        config.cache_capacity = 10;
123✔
2587
        config.remote_expansion_headroom_multiplier = 3;
123✔
2588
        config
123✔
2589
    }
123✔
2590

2591
    fn test_source(config: HuggingFaceRowsConfig) -> HuggingFaceRowSource {
66✔
2592
        let source = HuggingFaceRowSource {
66✔
2593
            config,
66✔
2594
            sampler_config: Mutex::new(None),
66✔
2595
            state: Mutex::new(SourceState {
66✔
2596
                materialized_rows: 0,
66✔
2597
                total_rows: None,
66✔
2598
                shards: Vec::new(),
66✔
2599
                remote_candidates: None,
66✔
2600
                remote_candidate_sizes: HashMap::new(),
66✔
2601
                next_remote_idx: 0,
66✔
2602
            }),
66✔
2603
            cache: Mutex::new(RowCache::default()),
66✔
2604
            parquet_cache: Mutex::new(ParquetCache::default()),
66✔
2605
        };
66✔
2606
        source.set_active_sampler_config(&SamplerConfig {
66✔
2607
            seed: 1,
66✔
2608
            ingestion_max_records: source.config.cache_capacity,
66✔
2609
            ..SamplerConfig::default()
66✔
2610
        });
66✔
2611
        source
66✔
2612
    }
66✔
2613

2614
    fn spawn_one_shot_http(payload: Vec<u8>) -> (String, thread::JoinHandle<()>) {
15✔
2615
        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
15✔
2616
        let addr = listener.local_addr().unwrap();
15✔
2617
        let handle = thread::spawn(move || {
15✔
2618
            let (mut stream, _) = listener.accept().unwrap();
15✔
2619
            let mut request_buf = [0u8; 1024];
15✔
2620
            let _ = stream.read(&mut request_buf);
15✔
2621
            let headers = format!(
15✔
2622
                "HTTP/1.1 200 OK\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
2623
                payload.len()
15✔
2624
            );
2625
            stream.write_all(headers.as_bytes()).unwrap();
15✔
2626
            stream.write_all(&payload).unwrap();
15✔
2627
            let _ = stream.flush();
15✔
2628
        });
15✔
2629
        (format!("http://{addr}"), handle)
15✔
2630
    }
15✔
2631

2632
    fn with_env_var<R>(key: &str, value: &str, run: impl FnOnce() -> R) -> R {
8✔
2633
        static ENV_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
2634
        let guard = ENV_LOCK
8✔
2635
            .get_or_init(|| Mutex::new(()))
8✔
2636
            .lock()
8✔
2637
            .expect("env lock poisoned");
8✔
2638
        let previous = env::var(key).ok();
8✔
2639
        unsafe { env::set_var(key, value) };
8✔
2640
        let result = run();
8✔
2641
        if let Some(old) = previous {
8✔
NEW
2642
            unsafe { env::set_var(key, old) };
×
2643
        } else {
8✔
2644
            unsafe { env::remove_var(key) };
8✔
2645
        }
8✔
2646
        drop(guard);
8✔
2647
        result
8✔
2648
    }
8✔
2649

2650
    fn write_parquet_fixture(path: &Path, rows: &[(&str, &str)]) {
5✔
2651
        let schema = Arc::new(
5✔
2652
            parse_message_type(
5✔
2653
                "message test_schema {
5✔
2654
                    REQUIRED BINARY id (UTF8);
5✔
2655
                    REQUIRED BINARY text (UTF8);
5✔
2656
                }",
5✔
2657
            )
2658
            .unwrap(),
5✔
2659
        );
2660
        let props = Arc::new(WriterProperties::builder().build());
5✔
2661
        let file = File::create(path).unwrap();
5✔
2662
        let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
5✔
2663
        let mut row_group = writer.next_row_group().unwrap();
5✔
2664

2665
        if let Some(mut col_writer) = row_group.next_column().unwrap() {
5✔
2666
            let values = rows
5✔
2667
                .iter()
5✔
2668
                .map(|(id, _)| ByteArray::from(*id))
9✔
2669
                .collect::<Vec<_>>();
5✔
2670
            col_writer
5✔
2671
                .typed::<ByteArrayType>()
5✔
2672
                .write_batch(&values, None, None)
5✔
2673
                .unwrap();
5✔
2674
            col_writer.close().unwrap();
5✔
NEW
2675
        }
×
2676

2677
        if let Some(mut col_writer) = row_group.next_column().unwrap() {
5✔
2678
            let values = rows
5✔
2679
                .iter()
5✔
2680
                .map(|(_, text)| ByteArray::from(*text))
9✔
2681
                .collect::<Vec<_>>();
5✔
2682
            col_writer
5✔
2683
                .typed::<ByteArrayType>()
5✔
2684
                .write_batch(&values, None, None)
5✔
2685
                .unwrap();
5✔
2686
            col_writer.close().unwrap();
5✔
NEW
2687
        }
×
2688

2689
        assert!(row_group.next_column().unwrap().is_none());
5✔
2690
        row_group.close().unwrap();
5✔
2691
        writer.close().unwrap();
5✔
2692
    }
5✔
2693

2694
    #[test]
2695
    fn row_cache_insert_and_evicts_oldest_entry() {
1✔
2696
        let mut cache = RowCache::default();
1✔
2697
        let row_a = RowView {
1✔
2698
            row_id: Some("a".to_string()),
1✔
2699
            timestamp: None,
1✔
2700
            text_fields: vec![RowTextField {
1✔
2701
                name: "text".to_string(),
1✔
2702
                text: "alpha".to_string(),
1✔
2703
            }],
1✔
2704
        };
1✔
2705
        let row_b = RowView {
1✔
2706
            row_id: Some("b".to_string()),
1✔
2707
            timestamp: None,
1✔
2708
            text_fields: vec![RowTextField {
1✔
2709
                name: "text".to_string(),
1✔
2710
                text: "beta".to_string(),
1✔
2711
            }],
1✔
2712
        };
1✔
2713

2714
        cache.insert(0, row_a.clone(), 1);
1✔
2715
        assert!(cache.get(0).is_some());
1✔
2716

2717
        cache.insert(1, row_b, 1);
1✔
2718
        assert!(cache.get(0).is_none());
1✔
2719
        assert_eq!(cache.get(1).unwrap().row_id.as_deref(), Some("b"));
1✔
2720

2721
        let mut zero_cache = RowCache::default();
1✔
2722
        zero_cache.insert(7, row_a, 0);
1✔
2723
        assert!(zero_cache.get(7).is_none());
1✔
2724
    }
1✔
2725

2726
    #[test]
2727
    fn parquet_cache_reader_for_reports_open_and_parse_errors() {
1✔
2728
        let dir = tempdir().unwrap();
1✔
2729
        let parquet_path = dir.path().join("missing.parquet");
1✔
2730
        let mut cache = ParquetCache::default();
1✔
2731
        let missing = cache.reader_for("hf_test", &parquet_path);
1✔
2732
        assert!(missing.is_err());
1✔
2733

2734
        let invalid_parquet = dir.path().join("invalid.parquet");
1✔
2735
        fs::write(&invalid_parquet, b"not parquet").unwrap();
1✔
2736
        let invalid = cache.reader_for("hf_test", &invalid_parquet);
1✔
2737
        assert!(invalid.is_err());
1✔
2738
    }
1✔
2739

2740
    #[test]
2741
    fn effective_targets_respect_minimum_multiplier_and_sampler_override() {
1✔
2742
        let dir = tempdir().unwrap();
1✔
2743
        let mut config = test_config(dir.path().to_path_buf());
1✔
2744
        config.refresh_batch_multiplier = 0;
1✔
2745
        config.remote_expansion_headroom_multiplier = 0;
1✔
2746
        config.cache_capacity = 9;
1✔
2747
        let source = test_source(config.clone());
1✔
2748

2749
        assert_eq!(source.effective_refresh_batch_target(5), 5);
1✔
2750
        assert_eq!(source.effective_expansion_headroom_rows(), 9);
1✔
2751

2752
        let sampler = SamplerConfig {
1✔
2753
            ingestion_max_records: 4,
1✔
2754
            ..SamplerConfig::default()
1✔
2755
        };
1✔
2756
        *source.sampler_config.lock().unwrap() = Some(sampler);
1✔
2757
        assert_eq!(source.effective_expansion_headroom_rows(), 4);
1✔
2758
    }
1✔
2759

2760
    #[test]
2761
    fn collect_candidates_from_siblings_filters_split_and_tracks_parquet() {
1✔
2762
        let dir = tempdir().unwrap();
1✔
2763
        let config = test_config(dir.path().to_path_buf());
1✔
2764
        let accepted = HuggingFaceRowSource::normalized_shard_extensions(&config);
1✔
2765
        let siblings = vec![
1✔
2766
            "train/a.ndjson".to_string(),
1✔
2767
            "dev/b.ndjson".to_string(),
1✔
2768
            "train-c.parquet".to_string(),
1✔
2769
            "train-z.txt".to_string(),
1✔
2770
        ];
2771

2772
        let (candidates, saw_parquet) = HuggingFaceRowSource::collect_candidates_from_siblings(
1✔
2773
            &config, &siblings, &accepted, true,
1✔
2774
        );
1✔
2775

2776
        assert!(saw_parquet);
1✔
2777
        assert_eq!(
1✔
2778
            candidates,
2779
            vec!["train/a.ndjson".to_string(), "train-c.parquet".to_string()]
1✔
2780
        );
2781
    }
1✔
2782

2783
    #[test]
2784
    fn collect_candidates_from_siblings_skips_existing_targets() {
1✔
2785
        let dir = tempdir().unwrap();
1✔
2786
        let config = test_config(dir.path().to_path_buf());
1✔
2787
        let accepted = HuggingFaceRowSource::normalized_shard_extensions(&config);
1✔
2788
        let existing = "train/already.ndjson".to_string();
1✔
2789
        let existing_target = HuggingFaceRowSource::candidate_target_path(&config, &existing);
1✔
2790
        fs::create_dir_all(existing_target.parent().unwrap()).unwrap();
1✔
2791
        fs::write(&existing_target, b"x\n").unwrap();
1✔
2792

2793
        let siblings = vec![existing, "train/new.ndjson".to_string()];
1✔
2794
        let (candidates, _) = HuggingFaceRowSource::collect_candidates_from_siblings(
1✔
2795
            &config, &siblings, &accepted, true,
1✔
2796
        );
1✔
2797
        assert_eq!(candidates, vec!["train/new.ndjson".to_string()]);
1✔
2798
    }
1✔
2799

2800
    #[test]
2801
    fn candidates_from_parquet_manifest_json_filters_and_records_sizes() {
1✔
2802
        let dir = tempdir().unwrap();
1✔
2803
        let config = test_config(dir.path().to_path_buf());
1✔
2804
        let payload = json!({
1✔
2805
            "parquet_files": [
1✔
2806
                {"url": "https://host/x/train/000.parquet", "size": 11},
1✔
2807
                {"url": "https://host/x/train/001.ndjson", "size": 13},
1✔
2808
                {"url": "https://host/x/train/002.txt", "size": 5},
1✔
2809
                {"foo": "missing-url"}
1✔
2810
            ]
2811
        });
2812

2813
        let (candidates, sizes) =
1✔
2814
            HuggingFaceRowSource::candidates_from_parquet_manifest_json(&config, &payload).unwrap();
1✔
2815
        assert_eq!(candidates.len(), 2);
1✔
2816
        assert!(
1✔
2817
            candidates
1✔
2818
                .iter()
1✔
2819
                .any(|c| c.ends_with("https://host/x/train/000.parquet"))
1✔
2820
        );
2821
        assert!(
1✔
2822
            candidates
1✔
2823
                .iter()
1✔
2824
                .any(|c| c.ends_with("https://host/x/train/001.ndjson"))
2✔
2825
        );
2826
        assert_eq!(sizes.len(), 2);
1✔
2827
    }
1✔
2828

2829
    #[test]
2830
    fn candidates_from_parquet_manifest_skips_complete_cached_and_replaces_incomplete() {
1✔
2831
        let dir = tempdir().unwrap();
1✔
2832
        let config = test_config(dir.path().to_path_buf());
1✔
2833

2834
        let complete_url = "https://host/datasets/org/ds/resolve/main/train/000.parquet";
1✔
2835
        let complete_candidate = format!("{REMOTE_URL_PREFIX}{complete_url}");
1✔
2836
        let complete_target =
1✔
2837
            HuggingFaceRowSource::candidate_target_path(&config, &complete_candidate);
1✔
2838
        fs::create_dir_all(complete_target.parent().unwrap()).unwrap();
1✔
2839
        fs::write(&complete_target, vec![1u8; 7]).unwrap();
1✔
2840

2841
        let stale_url = "https://host/datasets/org/ds/resolve/main/train/001.parquet";
1✔
2842
        let stale_candidate = format!("{REMOTE_URL_PREFIX}{stale_url}");
1✔
2843
        let stale_target = HuggingFaceRowSource::candidate_target_path(&config, &stale_candidate);
1✔
2844
        fs::create_dir_all(stale_target.parent().unwrap()).unwrap();
1✔
2845
        fs::write(&stale_target, vec![2u8; 3]).unwrap();
1✔
2846

2847
        let payload = json!({
1✔
2848
            "parquet_files": [
1✔
2849
                {"url": complete_url, "size": 7},
1✔
2850
                {"url": stale_url, "size": 9}
1✔
2851
            ]
2852
        });
2853

2854
        let (candidates, sizes) =
1✔
2855
            HuggingFaceRowSource::candidates_from_parquet_manifest_json(&config, &payload).unwrap();
1✔
2856
        assert_eq!(candidates.len(), 1);
1✔
2857
        assert!(candidates[0].ends_with(stale_url));
1✔
2858
        assert!(!stale_target.exists());
1✔
2859
        assert_eq!(sizes[&candidates[0]], 9);
1✔
2860
        assert!(complete_target.exists());
1✔
2861
    }
1✔
2862

2863
    #[test]
2864
    fn candidates_from_parquet_manifest_errors_when_removing_incomplete_target_fails() {
1✔
2865
        let dir = tempdir().unwrap();
1✔
2866
        let config = test_config(dir.path().to_path_buf());
1✔
2867
        let url = "https://host/datasets/org/ds/resolve/main/train/blocked.parquet";
1✔
2868
        let candidate = format!("{REMOTE_URL_PREFIX}{url}");
1✔
2869
        let target = HuggingFaceRowSource::candidate_target_path(&config, &candidate);
1✔
2870
        fs::create_dir_all(&target).unwrap();
1✔
2871

2872
        let payload = json!({
1✔
2873
            "parquet_files": [
1✔
2874
                {"url": url, "size": 1}
1✔
2875
            ]
2876
        });
2877

2878
        let err = HuggingFaceRowSource::candidates_from_parquet_manifest_json(&config, &payload);
1✔
2879
        assert!(err.is_err());
1✔
2880
    }
1✔
2881

2882
    #[test]
2883
    fn normalized_shard_extensions_trims_dots_and_lowercases() {
1✔
2884
        let dir = tempdir().unwrap();
1✔
2885
        let mut config = test_config(dir.path().to_path_buf());
1✔
2886
        config.shard_extensions = vec![".PARQUET".into(), " ndjson ".into()];
1✔
2887
        let normalized = HuggingFaceRowSource::normalized_shard_extensions(&config);
1✔
2888
        assert_eq!(
1✔
2889
            normalized,
2890
            vec!["parquet".to_string(), "ndjson".to_string()]
1✔
2891
        );
2892
    }
1✔
2893

2894
    #[test]
2895
    fn manifest_usage_bytes_locked_counts_only_manifest_shards() {
1✔
2896
        let dir = tempdir().unwrap();
1✔
2897
        let config = test_config(dir.path().to_path_buf());
1✔
2898
        let source = test_source(config);
1✔
2899
        let manifest_root = source.manifest_cache_root();
1✔
2900
        fs::create_dir_all(&manifest_root).unwrap();
1✔
2901

2902
        let manifest_file = manifest_root.join("a.parquet");
1✔
2903
        fs::write(&manifest_file, vec![1u8; 7]).unwrap();
1✔
2904
        let local_file = source.config.snapshot_dir.join("local.ndjson");
1✔
2905
        fs::write(&local_file, vec![2u8; 9]).unwrap();
1✔
2906

2907
        let state = SourceState {
1✔
2908
            materialized_rows: 2,
1✔
2909
            total_rows: None,
1✔
2910
            shards: vec![
1✔
2911
                ShardIndex {
1✔
2912
                    path: manifest_file,
1✔
2913
                    global_start: 0,
1✔
2914
                    row_count: 1,
1✔
2915
                    is_parquet: true,
1✔
2916
                    parquet_row_groups: vec![(0, 1)],
1✔
2917
                    checkpoints: Vec::new(),
1✔
2918
                },
1✔
2919
                ShardIndex {
1✔
2920
                    path: local_file,
1✔
2921
                    global_start: 1,
1✔
2922
                    row_count: 1,
1✔
2923
                    is_parquet: false,
1✔
2924
                    parquet_row_groups: Vec::new(),
1✔
2925
                    checkpoints: vec![0],
1✔
2926
                },
1✔
2927
            ],
1✔
2928
            remote_candidates: None,
1✔
2929
            remote_candidate_sizes: HashMap::new(),
1✔
2930
            next_remote_idx: 0,
1✔
2931
        };
1✔
2932

2933
        assert_eq!(source.manifest_usage_bytes_locked(&state), 7);
1✔
2934
    }
1✔
2935

2936
    #[test]
2937
    fn build_shard_index_errors_when_parquet_present_but_not_accepted() {
1✔
2938
        let dir = tempdir().unwrap();
1✔
2939
        fs::write(dir.path().join("rows.parquet"), b"fake").unwrap();
1✔
2940
        let mut config = test_config(dir.path().to_path_buf());
1✔
2941
        config.shard_extensions = vec!["ndjson".to_string()];
1✔
2942

2943
        let result = HuggingFaceRowSource::build_shard_index(&config);
1✔
2944
        assert!(result.is_err());
1✔
2945
    }
1✔
2946

2947
    #[test]
2948
    fn locate_parquet_group_maps_offsets_and_reports_missing() {
1✔
2949
        let dir = tempdir().unwrap();
1✔
2950
        let config = test_config(dir.path().to_path_buf());
1✔
2951
        let source = test_source(config);
1✔
2952
        let shard = ShardIndex {
1✔
2953
            path: dir.path().join("rows.parquet"),
1✔
2954
            global_start: 0,
1✔
2955
            row_count: 6,
1✔
2956
            is_parquet: true,
1✔
2957
            parquet_row_groups: vec![(0, 2), (2, 2), (4, 2)],
1✔
2958
            checkpoints: Vec::new(),
1✔
2959
        };
1✔
2960

2961
        let mapped = source.locate_parquet_group(&shard, 3).unwrap();
1✔
2962
        assert_eq!(mapped, (1, 1));
1✔
2963
        let missing = source.locate_parquet_group(&shard, 99);
1✔
2964
        assert!(missing.is_err());
1✔
2965
    }
1✔
2966

2967
    #[test]
2968
    fn parse_row_role_columns_mode_builds_expected_fields() {
1✔
2969
        let dir = tempdir().unwrap();
1✔
2970
        let mut config = test_config(dir.path().to_path_buf());
1✔
2971
        config.anchor_column = Some("anchor".into());
1✔
2972
        config.positive_column = Some("positive".into());
1✔
2973
        config.context_columns = vec!["ctx1".into(), "ctx2".into()];
1✔
2974
        let source = test_source(config);
1✔
2975

2976
        let row = source
1✔
2977
            .parse_row(
1✔
2978
                2,
2979
                &json!({"id":"r","anchor":"a","positive":"p","ctx1":"c1","ctx2":2}),
1✔
2980
            )
2981
            .unwrap();
1✔
2982
        assert_eq!(row.text_fields.len(), 4);
1✔
2983
        assert_eq!(row.text_fields[0].name, "anchor");
1✔
2984
        assert_eq!(row.text_fields[1].name, "positive");
1✔
2985
    }
1✔
2986

2987
    #[test]
2988
    fn parse_row_role_columns_mode_errors_on_missing_or_empty_values() {
1✔
2989
        let dir = tempdir().unwrap();
1✔
2990
        let mut config = test_config(dir.path().to_path_buf());
1✔
2991
        config.anchor_column = Some("anchor".into());
1✔
2992
        config.context_columns = vec!["ctx".into()];
1✔
2993
        let source = test_source(config);
1✔
2994

2995
        let missing = source.parse_row(0, &json!({"anchor":"a"}));
1✔
2996
        assert!(missing.is_err());
1✔
2997

2998
        let empty_anchor = source.parse_row(1, &json!({"anchor":"   ", "ctx":"ok"}));
1✔
2999
        assert!(empty_anchor.is_err());
1✔
3000
    }
1✔
3001

3002
    #[test]
3003
    fn row_to_record_uses_anchor_for_positive_when_single_field() {
1✔
3004
        let dir = tempdir().unwrap();
1✔
3005
        let config = test_config(dir.path().to_path_buf());
1✔
3006
        let source = test_source(config);
1✔
3007
        let row = RowView {
1✔
3008
            row_id: Some("r1".into()),
1✔
3009
            timestamp: None,
1✔
3010
            text_fields: vec![RowTextField {
1✔
3011
                name: "text".into(),
1✔
3012
                text: "alpha".into(),
1✔
3013
            }],
1✔
3014
        };
1✔
3015

3016
        let record = source.row_to_record(&row, 0).unwrap().unwrap();
1✔
3017
        assert_eq!(record.sections.len(), 2);
1✔
3018
        assert_eq!(record.sections[0].text, record.sections[1].text);
1✔
3019
    }
1✔
3020

3021
    #[test]
3022
    fn read_line_at_errors_on_unexpected_eof_while_scanning() {
1✔
3023
        let dir = tempdir().unwrap();
1✔
3024
        let path = dir.path().join("rows.jsonl");
1✔
3025
        fs::write(&path, b"{\"text\":\"a\"}\n").unwrap();
1✔
3026
        let mut config = test_config(dir.path().to_path_buf());
1✔
3027
        config.checkpoint_stride = 1;
1✔
3028
        let source = test_source(config.clone());
1✔
3029
        let mut shard = HuggingFaceRowSource::index_single_shard(&config, &path, 0)
1✔
3030
            .unwrap()
1✔
3031
            .unwrap();
1✔
3032
        shard.checkpoints = vec![0];
1✔
3033

3034
        let err = source.read_line_at(&shard, 3);
1✔
3035
        assert!(err.is_err());
1✔
3036
    }
1✔
3037

3038
    #[test]
3039
    fn target_matches_expected_size_is_false_for_missing_path() {
1✔
3040
        let dir = tempdir().unwrap();
1✔
3041
        let missing = dir.path().join("missing.bin");
1✔
3042
        assert!(!HuggingFaceRowSource::target_matches_expected_size(
1✔
3043
            &missing,
1✔
3044
            Some(1)
1✔
3045
        ));
1✔
3046
    }
1✔
3047

3048
    #[test]
3049
    fn candidate_target_path_uses_fallback_suffix_without_resolve_segment() {
1✔
3050
        let dir = tempdir().unwrap();
1✔
3051
        let config = test_config(dir.path().to_path_buf());
1✔
3052
        let candidate = "url::https://example.com/raw/file.parquet";
1✔
3053
        let target = HuggingFaceRowSource::candidate_target_path(&config, candidate);
1✔
3054
        assert!(target.ends_with("_parquet_manifest/parquet/unknown.parquet"));
1✔
3055
    }
1✔
3056

3057
    #[test]
3058
    fn persist_shard_sequence_is_noop_without_remote_candidates() {
1✔
3059
        let dir = tempdir().unwrap();
1✔
3060
        let config = test_config(dir.path().to_path_buf());
1✔
3061
        let source = test_source(config.clone());
1✔
3062
        let state = SourceState {
1✔
3063
            materialized_rows: 0,
1✔
3064
            total_rows: None,
1✔
3065
            shards: Vec::new(),
1✔
3066
            remote_candidates: None,
1✔
3067
            remote_candidate_sizes: HashMap::new(),
1✔
3068
            next_remote_idx: 0,
1✔
3069
        };
1✔
3070

3071
        source.persist_shard_sequence_locked(&state).unwrap();
1✔
3072
        assert!(!HuggingFaceRowSource::shard_sequence_state_path(&config).exists());
1✔
3073
    }
1✔
3074

3075
    #[test]
3076
    fn load_persisted_shard_sequence_returns_none_for_identity_mismatch() {
1✔
3077
        let dir = tempdir().unwrap();
1✔
3078
        let config = test_config(dir.path().to_path_buf());
1✔
3079
        let state_path = HuggingFaceRowSource::shard_sequence_state_path(&config);
1✔
3080
        fs::create_dir_all(state_path.parent().unwrap()).unwrap();
1✔
3081
        fs::write(
1✔
3082
            &state_path,
1✔
3083
            serde_json::to_vec_pretty(&json!({
1✔
3084
                "version": 1,
1✔
3085
                "source_id": "different",
1✔
3086
                "dataset": config.dataset,
1✔
3087
                "config": config.config,
1✔
3088
                "split": config.split,
1✔
3089
                "sampler_seed": 1,
1✔
3090
                "candidates": ["train/0.ndjson"],
1✔
3091
                "candidate_sizes": {},
1✔
3092
                "next_remote_idx": 0
1✔
3093
            }))
1✔
3094
            .unwrap(),
1✔
3095
        )
3096
        .unwrap();
1✔
3097

3098
        let loaded = HuggingFaceRowSource::load_persisted_shard_sequence(&config, 1).unwrap();
1✔
3099
        assert!(loaded.is_none());
1✔
3100
    }
1✔
3101

3102
    #[test]
3103
    fn parse_row_falls_back_to_synthetic_id_when_missing_id_column() {
1✔
3104
        let dir = tempdir().unwrap();
1✔
3105
        let mut config = test_config(dir.path().to_path_buf());
1✔
3106
        config.id_column = Some("id".into());
1✔
3107
        let source = test_source(config);
1✔
3108

3109
        let row = source.parse_row(42, &json!({"text": "hello"})).unwrap();
1✔
3110
        assert_eq!(row.row_id, Some("org/dataset:train:42".to_string()));
1✔
3111
    }
1✔
3112

3113
    #[test]
3114
    fn row_to_record_falls_back_to_row_index_when_row_id_missing() {
1✔
3115
        let dir = tempdir().unwrap();
1✔
3116
        let config = test_config(dir.path().to_path_buf());
1✔
3117
        let source = test_source(config);
1✔
3118
        let row = RowView {
1✔
3119
            row_id: None,
1✔
3120
            timestamp: None,
1✔
3121
            text_fields: vec![RowTextField {
1✔
3122
                name: "text".into(),
1✔
3123
                text: "body".into(),
1✔
3124
            }],
1✔
3125
        };
1✔
3126

3127
        let record = source.row_to_record(&row, 7).unwrap().unwrap();
1✔
3128
        assert!(record.id.ends_with("::row_7"));
1✔
3129
    }
1✔
3130

3131
    #[test]
3132
    fn locate_shard_returns_none_for_out_of_range_index() {
1✔
3133
        let shards = vec![ShardIndex {
1✔
3134
            path: PathBuf::from("a.ndjson"),
1✔
3135
            global_start: 0,
1✔
3136
            row_count: 2,
1✔
3137
            is_parquet: false,
1✔
3138
            parquet_row_groups: Vec::new(),
1✔
3139
            checkpoints: vec![0],
1✔
3140
        }];
1✔
3141

3142
        assert!(HuggingFaceRowSource::locate_shard(&shards, 5).is_none());
1✔
3143
    }
1✔
3144

3145
    #[test]
3146
    fn read_row_batch_errors_when_row_not_mappable_to_shard() {
1✔
3147
        let dir = tempdir().unwrap();
1✔
3148
        let config = test_config(dir.path().to_path_buf());
1✔
3149
        let source = test_source(config);
1✔
3150
        {
1✔
3151
            let mut state = source.state.lock().unwrap();
1✔
3152
            state.materialized_rows = 1;
1✔
3153
            state.total_rows = Some(1);
1✔
3154
            state.shards.clear();
1✔
3155
        }
1✔
3156

3157
        let mut out = Vec::new();
1✔
3158
        let err = source.read_row_batch(&[0], &mut out, Some(1));
1✔
3159
        assert!(err.is_err());
1✔
3160
    }
1✔
3161

3162
    #[test]
3163
    fn read_row_batch_errors_on_invalid_json_row() {
1✔
3164
        let dir = tempdir().unwrap();
1✔
3165
        let path = dir.path().join("broken.ndjson");
1✔
3166
        fs::write(&path, b"not-json\n").unwrap();
1✔
3167
        let config = test_config(dir.path().to_path_buf());
1✔
3168
        let source = test_source(config);
1✔
3169

3170
        {
1✔
3171
            let mut state = source.state.lock().unwrap();
1✔
3172
            state.materialized_rows = 1;
1✔
3173
            state.total_rows = Some(1);
1✔
3174
            state.shards = vec![ShardIndex {
1✔
3175
                path,
1✔
3176
                global_start: 0,
1✔
3177
                row_count: 1,
1✔
3178
                is_parquet: false,
1✔
3179
                parquet_row_groups: Vec::new(),
1✔
3180
                checkpoints: vec![0],
1✔
3181
            }];
1✔
3182
        }
1✔
3183

3184
        let mut out = Vec::new();
1✔
3185
        let err = source.read_row_batch(&[0], &mut out, Some(1)).unwrap_err();
1✔
3186
        assert!(matches!(
1✔
3187
            err,
1✔
3188
            SamplerError::SourceInconsistent { ref details, .. } if details.contains("failed decoding JSON row")
1✔
3189
        ));
3190
    }
1✔
3191

3192
    #[test]
3193
    fn read_row_batch_errors_when_parquet_local_offsets_are_missing() {
1✔
3194
        let dir = tempdir().unwrap();
1✔
3195
        let path = dir.path().join("rows.parquet");
1✔
3196
        write_parquet_fixture(&path, &[("id-1", "text-1")]);
1✔
3197
        let config = test_config(dir.path().to_path_buf());
1✔
3198
        let source = test_source(config);
1✔
3199

3200
        {
1✔
3201
            let mut state = source.state.lock().unwrap();
1✔
3202
            state.materialized_rows = 3;
1✔
3203
            state.total_rows = Some(3);
1✔
3204
            state.shards = vec![ShardIndex {
1✔
3205
                path,
1✔
3206
                global_start: 0,
1✔
3207
                row_count: 3,
1✔
3208
                is_parquet: true,
1✔
3209
                parquet_row_groups: vec![(0, 3)],
1✔
3210
                checkpoints: Vec::new(),
1✔
3211
            }];
1✔
3212
        }
1✔
3213

3214
        let mut out = Vec::new();
1✔
3215
        let err = source.read_row_batch(&[2], &mut out, Some(1)).unwrap_err();
1✔
3216
        assert!(matches!(
1✔
3217
            err,
1✔
3218
            SamplerError::SourceUnavailable { ref reason, .. } if reason.contains("parquet rows missing")
1✔
3219
        ));
3220
    }
1✔
3221

3222
    #[test]
3223
    fn len_hint_applies_max_rows_cap() {
1✔
3224
        let dir = tempdir().unwrap();
1✔
3225
        let mut config = test_config(dir.path().to_path_buf());
1✔
3226
        config.max_rows = Some(3);
1✔
3227
        let source = test_source(config);
1✔
3228
        {
1✔
3229
            let mut state = source.state.lock().unwrap();
1✔
3230
            state.materialized_rows = 2;
1✔
3231
            state.total_rows = Some(100);
1✔
3232
        }
1✔
3233
        assert_eq!(source.len_hint(), Some(3));
1✔
3234
    }
1✔
3235

3236
    #[test]
3237
    fn enforce_disk_cap_returns_false_when_disabled_or_under_limit() {
1✔
3238
        let dir = tempdir().unwrap();
1✔
3239
        let mut config = test_config(dir.path().to_path_buf());
1✔
3240
        config.local_disk_cap_bytes = None;
1✔
3241
        let source = test_source(config);
1✔
3242
        let mut state = SourceState {
1✔
3243
            materialized_rows: 0,
1✔
3244
            total_rows: None,
1✔
3245
            shards: Vec::new(),
1✔
3246
            remote_candidates: None,
1✔
3247
            remote_candidate_sizes: HashMap::new(),
1✔
3248
            next_remote_idx: 0,
1✔
3249
        };
1✔
3250
        let protected = dir.path().join("p");
1✔
3251
        assert!(
1✔
3252
            !source
1✔
3253
                .enforce_disk_cap_locked(&mut state, &protected)
1✔
3254
                .unwrap()
1✔
3255
        );
3256

3257
        let mut config2 = test_config(dir.path().to_path_buf());
1✔
3258
        config2.local_disk_cap_bytes = Some(10_000);
1✔
3259
        let source2 = test_source(config2);
1✔
3260
        let manifest_root = source2.manifest_cache_root();
1✔
3261
        fs::create_dir_all(&manifest_root).unwrap();
1✔
3262
        let shard_path = manifest_root.join("small.parquet");
1✔
3263
        fs::write(&shard_path, vec![1u8; 32]).unwrap();
1✔
3264
        let mut state2 = SourceState {
1✔
3265
            materialized_rows: 1,
1✔
3266
            total_rows: None,
1✔
3267
            shards: vec![ShardIndex {
1✔
3268
                path: shard_path,
1✔
3269
                global_start: 0,
1✔
3270
                row_count: 1,
1✔
3271
                is_parquet: true,
1✔
3272
                parquet_row_groups: vec![(0, 1)],
1✔
3273
                checkpoints: Vec::new(),
1✔
3274
            }],
1✔
3275
            remote_candidates: None,
1✔
3276
            remote_candidate_sizes: HashMap::new(),
1✔
3277
            next_remote_idx: 0,
1✔
3278
        };
1✔
3279
        assert!(
1✔
3280
            !source2
1✔
3281
                .enforce_disk_cap_locked(&mut state2, &protected)
1✔
3282
                .unwrap()
1✔
3283
        );
3284
    }
1✔
3285

3286
    #[test]
3287
    fn enforce_disk_cap_evicts_manifest_shards_and_recomputes_offsets() {
1✔
3288
        let dir = tempdir().unwrap();
1✔
3289
        let mut config = test_config(dir.path().to_path_buf());
1✔
3290
        config.local_disk_cap_bytes = Some(20);
1✔
3291
        config.min_resident_shards = 0;
1✔
3292
        let source = test_source(config);
1✔
3293
        let manifest_root = source.manifest_cache_root();
1✔
3294
        fs::create_dir_all(&manifest_root).unwrap();
1✔
3295

3296
        let first = manifest_root.join("first.parquet");
1✔
3297
        let second = manifest_root.join("second.parquet");
1✔
3298
        fs::write(&first, vec![1u8; 16]).unwrap();
1✔
3299
        fs::write(&second, vec![2u8; 16]).unwrap();
1✔
3300

3301
        let mut state = SourceState {
1✔
3302
            materialized_rows: 2,
1✔
3303
            total_rows: None,
1✔
3304
            shards: vec![
1✔
3305
                ShardIndex {
1✔
3306
                    path: first.clone(),
1✔
3307
                    global_start: 0,
1✔
3308
                    row_count: 1,
1✔
3309
                    is_parquet: true,
1✔
3310
                    parquet_row_groups: vec![(0, 1)],
1✔
3311
                    checkpoints: Vec::new(),
1✔
3312
                },
1✔
3313
                ShardIndex {
1✔
3314
                    path: second.clone(),
1✔
3315
                    global_start: 1,
1✔
3316
                    row_count: 1,
1✔
3317
                    is_parquet: true,
1✔
3318
                    parquet_row_groups: vec![(0, 1)],
1✔
3319
                    checkpoints: Vec::new(),
1✔
3320
                },
1✔
3321
            ],
1✔
3322
            remote_candidates: None,
1✔
3323
            remote_candidate_sizes: HashMap::new(),
1✔
3324
            next_remote_idx: 0,
1✔
3325
        };
1✔
3326

3327
        let evicted = source.enforce_disk_cap_locked(&mut state, &second).unwrap();
1✔
3328
        assert!(evicted);
1✔
3329
        assert!(!first.exists());
1✔
3330
        assert!(second.exists());
1✔
3331
        assert_eq!(state.shards.len(), 1);
1✔
3332
        assert_eq!(state.shards[0].global_start, 0);
1✔
3333
        assert_eq!(state.materialized_rows, 1);
1✔
3334
    }
1✔
3335

3336
    #[test]
3337
    fn enforce_disk_cap_errors_when_usage_still_exceeds_cap() {
1✔
3338
        let dir = tempdir().unwrap();
1✔
3339
        let mut config = test_config(dir.path().to_path_buf());
1✔
3340
        config.local_disk_cap_bytes = Some(1);
1✔
3341
        config.min_resident_shards = 1;
1✔
3342
        let source = test_source(config);
1✔
3343
        let manifest_root = source.manifest_cache_root();
1✔
3344
        fs::create_dir_all(&manifest_root).unwrap();
1✔
3345

3346
        let protected = manifest_root.join("protected.parquet");
1✔
3347
        fs::write(&protected, vec![3u8; 16]).unwrap();
1✔
3348

3349
        let mut state = SourceState {
1✔
3350
            materialized_rows: 1,
1✔
3351
            total_rows: None,
1✔
3352
            shards: vec![ShardIndex {
1✔
3353
                path: protected.clone(),
1✔
3354
                global_start: 0,
1✔
3355
                row_count: 1,
1✔
3356
                is_parquet: true,
1✔
3357
                parquet_row_groups: vec![(0, 1)],
1✔
3358
                checkpoints: Vec::new(),
1✔
3359
            }],
1✔
3360
            remote_candidates: None,
1✔
3361
            remote_candidate_sizes: HashMap::new(),
1✔
3362
            next_remote_idx: 0,
1✔
3363
        };
1✔
3364

3365
        let err = source
1✔
3366
            .enforce_disk_cap_locked(&mut state, &protected)
1✔
3367
            .unwrap_err();
1✔
3368
        assert!(matches!(
1✔
3369
            err,
1✔
3370
            SamplerError::SourceUnavailable { ref reason, .. } if reason.contains("cannot evict further")
1✔
3371
        ));
3372
        assert!(!protected.exists());
1✔
3373
    }
1✔
3374

3375
    #[test]
3376
    fn configured_sampler_seed_and_paging_seed_require_sampler_config() {
1✔
3377
        let dir = tempdir().unwrap();
1✔
3378
        let config = test_config(dir.path().to_path_buf());
1✔
3379
        let source = HuggingFaceRowSource {
1✔
3380
            config,
1✔
3381
            sampler_config: Mutex::new(None),
1✔
3382
            state: Mutex::new(SourceState {
1✔
3383
                materialized_rows: 0,
1✔
3384
                total_rows: None,
1✔
3385
                shards: Vec::new(),
1✔
3386
                remote_candidates: None,
1✔
3387
                remote_candidate_sizes: HashMap::new(),
1✔
3388
                next_remote_idx: 0,
1✔
3389
            }),
1✔
3390
            cache: Mutex::new(RowCache::default()),
1✔
3391
            parquet_cache: Mutex::new(ParquetCache::default()),
1✔
3392
        };
1✔
3393

3394
        assert!(source.configured_sampler_seed().is_err());
1✔
3395
        assert!(source.paging_seed(5).is_err());
1✔
3396
    }
1✔
3397

3398
    #[test]
3399
    fn shard_candidate_seed_and_rotation_are_deterministic() {
1✔
3400
        let dir = tempdir().unwrap();
1✔
3401
        let mut config = test_config(dir.path().to_path_buf());
1✔
3402
        config.source_id = "hf_rotator".to_string();
1✔
3403

3404
        let seed_a = HuggingFaceRowSource::shard_candidate_seed(&config, 12, 1);
1✔
3405
        let seed_b = HuggingFaceRowSource::shard_candidate_seed(&config, 12, 2);
1✔
3406
        assert_ne!(seed_a, seed_b);
1✔
3407

3408
        let baseline = vec!["c".to_string(), "a".to_string(), "b".to_string()];
1✔
3409
        let mut left = baseline.clone();
1✔
3410
        let mut right = baseline;
1✔
3411
        HuggingFaceRowSource::rotate_candidates_deterministically(&config, &mut left);
1✔
3412
        HuggingFaceRowSource::rotate_candidates_deterministically(&config, &mut right);
1✔
3413
        assert_eq!(left, right);
1✔
3414

3415
        let mut sorted = left.clone();
1✔
3416
        sorted.sort();
1✔
3417
        assert_eq!(
1✔
3418
            sorted,
3419
            vec!["a".to_string(), "b".to_string(), "c".to_string()]
1✔
3420
        );
3421
    }
1✔
3422

3423
    #[test]
3424
    fn extract_split_row_count_handles_configs_and_dataset_fallbacks() {
1✔
3425
        let by_config_splits = json!({
1✔
3426
            "size": {
1✔
3427
                "configs": [
1✔
3428
                    {
3429
                        "config_name": "default",
1✔
3430
                        "splits": [
1✔
3431
                            {"name": "train", "num_rows": 21},
1✔
3432
                            {"name": "validation", "num_rows": 4}
1✔
3433
                        ]
3434
                    }
3435
                ]
3436
            }
3437
        });
3438
        let rows = HuggingFaceRowSource::extract_split_row_count_from_size_response(
1✔
3439
            &by_config_splits,
1✔
3440
            "default",
1✔
3441
            "train",
1✔
3442
        );
3443
        assert_eq!(rows, Some(21));
1✔
3444

3445
        let dataset_only = json!({
1✔
3446
            "size": {
1✔
3447
                "dataset": {"num_rows": 99}
1✔
3448
            }
3449
        });
3450
        let rows = HuggingFaceRowSource::extract_split_row_count_from_size_response(
1✔
3451
            &dataset_only,
1✔
3452
            "default",
1✔
3453
            "",
1✔
3454
        );
3455
        assert_eq!(rows, Some(99));
1✔
3456
    }
1✔
3457

3458
    #[test]
3459
    fn parse_global_row_count_response_uses_config_total_when_split_empty() {
1✔
3460
        let dir = tempdir().unwrap();
1✔
3461
        let config = test_config(dir.path().to_path_buf());
1✔
3462
        let body = serde_json::to_string(&json!({
1✔
3463
            "size": {
1✔
3464
                "configs": [
1✔
3465
                    {"config": "default", "num_rows": 17}
1✔
3466
                ]
1✔
3467
            }
1✔
3468
        }))
1✔
3469
        .unwrap();
1✔
3470

3471
        let parsed = HuggingFaceRowSource::parse_global_row_count_response(
1✔
3472
            &HuggingFaceRowsConfig {
1✔
3473
                split: "".to_string(),
1✔
3474
                ..config
1✔
3475
            },
1✔
3476
            &body,
1✔
3477
        )
3478
        .unwrap();
1✔
3479
        assert_eq!(parsed, Some(17));
1✔
3480
    }
1✔
3481

3482
    #[test]
3483
    fn ensure_row_available_returns_from_fast_paths() {
1✔
3484
        let dir = tempdir().unwrap();
1✔
3485
        let config = test_config(dir.path().to_path_buf());
1✔
3486
        let source = test_source(config);
1✔
3487

3488
        {
1✔
3489
            let mut state = source.state.lock().unwrap();
1✔
3490
            state.materialized_rows = 3;
1✔
3491
            state.remote_candidates = Some(vec!["x".to_string()]);
1✔
3492
            state.next_remote_idx = 0;
1✔
3493
        }
1✔
3494
        assert!(source.ensure_row_available(1).unwrap());
1✔
3495

3496
        let mut cfg_max = test_config(dir.path().to_path_buf());
1✔
3497
        cfg_max.max_rows = Some(2);
1✔
3498
        let source_max = test_source(cfg_max);
1✔
3499
        {
1✔
3500
            let mut state = source_max.state.lock().unwrap();
1✔
3501
            state.materialized_rows = 0;
1✔
3502
            state.remote_candidates = Some(vec!["x".to_string()]);
1✔
3503
            state.next_remote_idx = 0;
1✔
3504
        }
1✔
3505
        assert!(!source_max.ensure_row_available(2).unwrap());
1✔
3506

3507
        let source_done = test_source(test_config(dir.path().to_path_buf()));
1✔
3508
        {
1✔
3509
            let mut state = source_done.state.lock().unwrap();
1✔
3510
            state.materialized_rows = 0;
1✔
3511
            state.remote_candidates = Some(vec!["a".to_string()]);
1✔
3512
            state.next_remote_idx = 1;
1✔
3513
        }
1✔
3514
        assert!(!source_done.ensure_row_available(0).unwrap());
1✔
3515
    }
1✔
3516

3517
    #[test]
3518
    fn build_shard_index_errors_when_no_accepted_files_exist() {
1✔
3519
        let dir = tempdir().unwrap();
1✔
3520
        fs::write(dir.path().join("notes.txt"), b"plain").unwrap();
1✔
3521
        let config = test_config(dir.path().to_path_buf());
1✔
3522

3523
        let err = HuggingFaceRowSource::build_shard_index(&config).unwrap_err();
1✔
3524
        assert!(matches!(
1✔
3525
            err,
1✔
3526
            SamplerError::SourceUnavailable { ref reason, .. } if reason.contains("no shard files found")
1✔
3527
        ));
3528
    }
1✔
3529

3530
    #[test]
3531
    fn build_shard_index_applies_max_rows_to_parquet_shard() {
1✔
3532
        let dir = tempdir().unwrap();
1✔
3533
        let parquet_path = dir.path().join("rows.parquet");
1✔
3534
        write_parquet_fixture(
1✔
3535
            &parquet_path,
1✔
3536
            &[("id-1", "text-1"), ("id-2", "text-2"), ("id-3", "text-3")],
1✔
3537
        );
3538
        let mut config = test_config(dir.path().to_path_buf());
1✔
3539
        config.max_rows = Some(2);
1✔
3540

3541
        let (shards, discovered) = HuggingFaceRowSource::build_shard_index(&config).unwrap();
1✔
3542
        assert_eq!(discovered, 2);
1✔
3543
        assert_eq!(shards.len(), 1);
1✔
3544
        assert!(shards[0].is_parquet);
1✔
3545
        assert_eq!(shards[0].row_count, 2);
1✔
3546
    }
1✔
3547

3548
    #[test]
3549
    fn materialize_local_file_errors_for_missing_source() {
1✔
3550
        let dir = tempdir().unwrap();
1✔
3551
        let config = test_config(dir.path().to_path_buf());
1✔
3552
        let missing = dir.path().join("missing.ndjson");
1✔
3553
        let target = dir.path().join("target.ndjson");
1✔
3554

3555
        let err =
1✔
3556
            HuggingFaceRowSource::materialize_local_file(&config, &missing, &target).unwrap_err();
1✔
3557
        assert!(matches!(
1✔
3558
            err,
1✔
3559
            SamplerError::SourceUnavailable { ref reason, .. } if reason.contains("failed copying synced file")
1✔
3560
        ));
3561
    }
1✔
3562

3563
    #[test]
3564
    fn download_and_materialize_shard_hf_hub_branch_returns_error_for_invalid_repo() {
1✔
3565
        let dir = tempdir().unwrap();
1✔
3566
        let mut config = test_config(dir.path().to_path_buf());
1✔
3567
        config.dataset = "invalid///dataset".to_string();
1✔
3568

3569
        let err = HuggingFaceRowSource::download_and_materialize_shard(
1✔
3570
            &config,
1✔
3571
            "train/part-000.parquet",
1✔
3572
            None,
1✔
3573
        )
3574
        .unwrap_err();
1✔
3575
        assert!(matches!(err, SamplerError::SourceUnavailable { .. }));
1✔
3576
    }
1✔
3577

3578
    #[test]
3579
    fn index_single_shard_errors_for_missing_file() {
1✔
3580
        let dir = tempdir().unwrap();
1✔
3581
        let config = test_config(dir.path().to_path_buf());
1✔
3582
        let missing = dir.path().join("missing.ndjson");
1✔
3583

3584
        let err = HuggingFaceRowSource::index_single_shard(&config, &missing, 0).unwrap_err();
1✔
3585
        assert!(matches!(err, SamplerError::SourceUnavailable { .. }));
1✔
3586
    }
1✔
3587

3588
    #[test]
3589
    fn index_single_shard_jsonl_records_checkpoints_by_stride() {
1✔
3590
        let dir = tempdir().unwrap();
1✔
3591
        let path = dir.path().join("rows.ndjson");
1✔
3592
        fs::write(
1✔
3593
            &path,
1✔
3594
            b"{\"text\":\"a\"}\n{\"text\":\"b\"}\n{\"text\":\"c\"}\n",
3595
        )
3596
        .unwrap();
1✔
3597
        let mut config = test_config(dir.path().to_path_buf());
1✔
3598
        config.checkpoint_stride = 2;
1✔
3599

3600
        let shard = HuggingFaceRowSource::index_single_shard(&config, &path, 5)
1✔
3601
            .unwrap()
1✔
3602
            .unwrap();
1✔
3603
        assert_eq!(shard.global_start, 5);
1✔
3604
        assert_eq!(shard.row_count, 3);
1✔
3605
        assert!(!shard.is_parquet);
1✔
3606
        assert!(shard.checkpoints.len() >= 2);
1✔
3607
        assert_eq!(shard.checkpoints[0], 0);
1✔
3608
    }
1✔
3609

3610
    #[test]
3611
    fn parquet_row_group_map_handles_empty_parquet_file() {
1✔
3612
        let dir = tempdir().unwrap();
1✔
3613
        let path = dir.path().join("empty.parquet");
1✔
3614
        write_parquet_fixture(&path, &[]);
1✔
3615
        let config = test_config(dir.path().to_path_buf());
1✔
3616

3617
        let (rows, groups) = HuggingFaceRowSource::parquet_row_group_map(&config, &path).unwrap();
1✔
3618
        assert_eq!(rows, 0);
1✔
3619
        assert!(groups.is_empty());
1✔
3620
    }
1✔
3621

3622
    #[test]
3623
    fn download_next_remote_shard_clears_row_cache_when_eviction_occurs() {
1✔
3624
        let dir = tempdir().unwrap();
1✔
3625
        let mut config = test_config(dir.path().to_path_buf());
1✔
3626
        config.local_disk_cap_bytes = Some(20);
1✔
3627
        config.min_resident_shards = 0;
1✔
3628
        let source = test_source(config.clone());
1✔
3629

3630
        let manifest_root = source.manifest_cache_root();
1✔
3631
        fs::create_dir_all(&manifest_root).unwrap();
1✔
3632
        let old_path = manifest_root.join("old.parquet");
1✔
3633
        fs::write(&old_path, vec![1u8; 20]).unwrap();
1✔
3634

3635
        let payload = b"{\"text\":\"new\"}\n".to_vec();
1✔
3636
        let (base_url, server) = spawn_one_shot_http(payload);
1✔
3637
        let candidate =
1✔
3638
            format!("url::{base_url}/datasets/org/ds/resolve/main/train/new-shard.ndjson");
1✔
3639

3640
        {
1✔
3641
            let mut state = source.state.lock().unwrap();
1✔
3642
            state.materialized_rows = 1;
1✔
3643
            state.shards = vec![ShardIndex {
1✔
3644
                path: old_path.clone(),
1✔
3645
                global_start: 0,
1✔
3646
                row_count: 1,
1✔
3647
                is_parquet: true,
1✔
3648
                parquet_row_groups: vec![(0, 1)],
1✔
3649
                checkpoints: Vec::new(),
1✔
3650
            }];
1✔
3651
            state.remote_candidates = Some(vec![candidate]);
1✔
3652
            state.next_remote_idx = 0;
1✔
3653
        }
1✔
3654
        {
1✔
3655
            let mut cache = source.cache.lock().unwrap();
1✔
3656
            cache.insert(
1✔
3657
                0,
1✔
3658
                RowView {
1✔
3659
                    row_id: Some("cached".to_string()),
1✔
3660
                    timestamp: None,
1✔
3661
                    text_fields: vec![RowTextField {
1✔
3662
                        name: "text".to_string(),
1✔
3663
                        text: "cached".to_string(),
1✔
3664
                    }],
1✔
3665
                },
1✔
3666
                8,
1✔
3667
            );
1✔
3668
        }
1✔
3669

3670
        assert!(source.download_next_remote_shard().unwrap());
1✔
3671
        server.join().unwrap();
1✔
3672

3673
        assert!(!old_path.exists());
1✔
3674
        let cache = source.cache.lock().unwrap();
1✔
3675
        assert!(cache.rows.is_empty());
1✔
3676
        assert!(cache.order.is_empty());
1✔
3677
    }
1✔
3678

3679
    #[test]
3680
    fn load_persisted_shard_sequence_clamps_next_remote_index() {
1✔
3681
        let dir = tempdir().unwrap();
1✔
3682
        let config = test_config(dir.path().to_path_buf());
1✔
3683
        let state_path = HuggingFaceRowSource::shard_sequence_state_path(&config);
1✔
3684
        fs::create_dir_all(state_path.parent().unwrap()).unwrap();
1✔
3685
        fs::write(
1✔
3686
            &state_path,
1✔
3687
            serde_json::to_vec_pretty(&json!({
1✔
3688
                "version": SHARD_SEQUENCE_STATE_VERSION,
1✔
3689
                "source_id": config.source_id,
1✔
3690
                "dataset": config.dataset,
1✔
3691
                "config": config.config,
1✔
3692
                "split": config.split,
1✔
3693
                "sampler_seed": 7,
1✔
3694
                "candidates": ["train/0.ndjson", "train/1.ndjson"],
1✔
3695
                "candidate_sizes": {},
1✔
3696
                "next_remote_idx": 99
1✔
3697
            }))
1✔
3698
            .unwrap(),
1✔
3699
        )
3700
        .unwrap();
1✔
3701

3702
        let loaded = HuggingFaceRowSource::load_persisted_shard_sequence(&config, 7)
1✔
3703
            .unwrap()
1✔
3704
            .unwrap();
1✔
3705
        assert_eq!(loaded.candidates.len(), 2);
1✔
3706
        assert_eq!(loaded.next_remote_idx, 2);
1✔
3707
    }
1✔
3708

3709
    #[test]
3710
    fn default_triplet_recipes_returns_expected_shape() {
1✔
3711
        let dir = tempdir().unwrap();
1✔
3712
        let config = test_config(dir.path().to_path_buf());
1✔
3713
        let source = test_source(config);
1✔
3714
        let recipes = source.default_triplet_recipes();
1✔
3715
        assert_eq!(recipes.len(), 1);
1✔
3716
        assert_eq!(recipes[0].name, "huggingface_anchor_context");
1✔
3717
    }
1✔
3718

3719
    #[test]
3720
    fn download_and_materialize_shard_url_short_circuits_when_cached_complete() {
1✔
3721
        let dir = tempdir().unwrap();
1✔
3722
        let config = test_config(dir.path().to_path_buf());
1✔
3723
        let candidate = "url::https://host/datasets/org/ds/resolve/main/train/ok.ndjson";
1✔
3724
        let target = HuggingFaceRowSource::candidate_target_path(&config, candidate);
1✔
3725
        fs::create_dir_all(target.parent().unwrap()).unwrap();
1✔
3726
        fs::write(&target, b"ok").unwrap();
1✔
3727

3728
        let resolved =
1✔
3729
            HuggingFaceRowSource::download_and_materialize_shard(&config, candidate, Some(2))
1✔
3730
                .unwrap();
1✔
3731
        assert_eq!(resolved, target);
1✔
3732
    }
1✔
3733

3734
    #[test]
3735
    fn download_and_materialize_shard_url_replaces_stale_part_file() {
1✔
3736
        let dir = tempdir().unwrap();
1✔
3737
        let config = test_config(dir.path().to_path_buf());
1✔
3738
        let payload = b"{\"text\":\"a\"}\n".to_vec();
1✔
3739
        let (base_url, server) = spawn_one_shot_http(payload.clone());
1✔
3740
        let candidate = format!("url::{base_url}/datasets/org/ds/resolve/main/train/part-x.ndjson");
1✔
3741
        let target = HuggingFaceRowSource::candidate_target_path(&config, &candidate);
1✔
3742
        let temp_target = target.with_extension("part");
1✔
3743
        fs::create_dir_all(temp_target.parent().unwrap()).unwrap();
1✔
3744
        fs::write(&temp_target, b"stale").unwrap();
1✔
3745

3746
        let out = HuggingFaceRowSource::download_and_materialize_shard(&config, &candidate, None)
1✔
3747
            .unwrap();
1✔
3748
        server.join().unwrap();
1✔
3749

3750
        assert_eq!(out, target);
1✔
3751
        assert_eq!(fs::read(&target).unwrap(), payload);
1✔
3752
    }
1✔
3753

3754
    #[test]
3755
    fn download_next_remote_shard_skips_when_max_rows_already_reached() {
1✔
3756
        let dir = tempdir().unwrap();
1✔
3757
        let mut config = test_config(dir.path().to_path_buf());
1✔
3758
        config.max_rows = Some(0);
1✔
3759
        let source = test_source(config);
1✔
3760
        let payload = b"{\"text\":\"x\"}\n".to_vec();
1✔
3761
        let (base_url, server) = spawn_one_shot_http(payload);
1✔
3762
        let candidate =
1✔
3763
            format!("url::{base_url}/datasets/org/ds/resolve/main/train/part-200.ndjson");
1✔
3764

3765
        {
1✔
3766
            let mut state = source.state.lock().unwrap();
1✔
3767
            state.remote_candidates = Some(vec![candidate]);
1✔
3768
            state.next_remote_idx = 0;
1✔
3769
            state.materialized_rows = 0;
1✔
3770
        }
1✔
3771

3772
        assert!(source.download_next_remote_shard().unwrap());
1✔
3773
        server.join().unwrap();
1✔
3774
        let state = source.state.lock().unwrap();
1✔
3775
        assert_eq!(state.materialized_rows, 0);
1✔
3776
        assert!(state.shards.is_empty());
1✔
3777
    }
1✔
3778

3779
    #[test]
3780
    fn download_next_remote_shard_skips_zero_row_download() {
1✔
3781
        let dir = tempdir().unwrap();
1✔
3782
        let config = test_config(dir.path().to_path_buf());
1✔
3783
        let source = test_source(config);
1✔
3784
        let payload = Vec::<u8>::new();
1✔
3785
        let (base_url, server) = spawn_one_shot_http(payload);
1✔
3786
        let candidate =
1✔
3787
            format!("url::{base_url}/datasets/org/ds/resolve/main/train/part-empty.ndjson");
1✔
3788

3789
        {
1✔
3790
            let mut state = source.state.lock().unwrap();
1✔
3791
            state.remote_candidates = Some(vec![candidate]);
1✔
3792
            state.next_remote_idx = 0;
1✔
3793
        }
1✔
3794

3795
        assert!(source.download_next_remote_shard().unwrap());
1✔
3796
        server.join().unwrap();
1✔
3797
        let state = source.state.lock().unwrap();
1✔
3798
        assert_eq!(state.materialized_rows, 0);
1✔
3799
        assert!(state.shards.is_empty());
1✔
3800
    }
1✔
3801

3802
    #[test]
3803
    fn read_row_batch_errors_when_parquet_reader_cannot_open_file() {
1✔
3804
        let dir = tempdir().unwrap();
1✔
3805
        let config = test_config(dir.path().to_path_buf());
1✔
3806
        let source = test_source(config);
1✔
3807
        {
1✔
3808
            let mut state = source.state.lock().unwrap();
1✔
3809
            state.materialized_rows = 1;
1✔
3810
            state.total_rows = Some(1);
1✔
3811
            state.shards = vec![ShardIndex {
1✔
3812
                path: dir.path().join("missing.parquet"),
1✔
3813
                global_start: 0,
1✔
3814
                row_count: 1,
1✔
3815
                is_parquet: true,
1✔
3816
                parquet_row_groups: vec![(0, 1)],
1✔
3817
                checkpoints: Vec::new(),
1✔
3818
            }];
1✔
3819
        }
1✔
3820

3821
        let mut out = Vec::new();
1✔
3822
        let err = source.read_row_batch(&[0], &mut out, Some(1));
1✔
3823
        assert!(err.is_err());
1✔
3824
    }
1✔
3825

3826
    #[test]
3827
    fn refresh_exercises_large_total_progress_branch() {
1✔
3828
        let dir = tempdir().unwrap();
1✔
3829
        let path = dir.path().join("rows.jsonl");
1✔
3830
        let line = b"{\"id\":\"r\",\"text\":\"v\"}\n";
1✔
3831
        let mut bytes = Vec::with_capacity(line.len() * 10_000);
1✔
3832
        for _ in 0..10_000 {
10,000✔
3833
            bytes.extend_from_slice(line);
10,000✔
3834
        }
10,000✔
3835
        fs::write(&path, bytes).unwrap();
1✔
3836

3837
        let mut config = test_config(dir.path().to_path_buf());
1✔
3838
        config.checkpoint_stride = 256;
1✔
3839
        config.refresh_batch_multiplier = 1;
1✔
3840
        let source = test_source(config.clone());
1✔
3841
        let shard = HuggingFaceRowSource::index_single_shard(&config, &path, 0)
1✔
3842
            .unwrap()
1✔
3843
            .unwrap();
1✔
3844

3845
        {
1✔
3846
            let mut state = source.state.lock().unwrap();
1✔
3847
            state.materialized_rows = 10_000;
1✔
3848
            state.total_rows = Some(10_000);
1✔
3849
            state.shards = vec![shard];
1✔
3850
        }
1✔
3851

3852
        let snapshot = source.refresh(None, Some(1)).unwrap();
1✔
3853
        assert_eq!(snapshot.records.len(), 1);
1✔
3854
    }
1✔
3855

3856
    #[test]
3857
    fn shard_size_bytes_returns_zero_for_missing_path() {
1✔
3858
        let dir = tempdir().unwrap();
1✔
3859
        let missing = dir.path().join("missing.file");
1✔
3860
        assert_eq!(HuggingFaceRowSource::shard_size_bytes(&missing), 0);
1✔
3861
    }
1✔
3862

3863
    #[test]
3864
    fn load_persisted_shard_sequence_errors_on_invalid_json() {
1✔
3865
        let dir = tempdir().unwrap();
1✔
3866
        let config = test_config(dir.path().to_path_buf());
1✔
3867
        let path = HuggingFaceRowSource::shard_sequence_state_path(&config);
1✔
3868
        fs::create_dir_all(path.parent().unwrap()).unwrap();
1✔
3869
        fs::write(&path, b"{not-valid-json").unwrap();
1✔
3870

3871
        let loaded = HuggingFaceRowSource::load_persisted_shard_sequence(&config, 1);
1✔
3872
        assert!(loaded.is_err());
1✔
3873
    }
1✔
3874

3875
    #[test]
3876
    fn rotate_candidates_deterministically_is_noop_for_singleton() {
1✔
3877
        let dir = tempdir().unwrap();
1✔
3878
        let config = test_config(dir.path().to_path_buf());
1✔
3879
        let mut candidates = vec!["one".to_string()];
1✔
3880
        HuggingFaceRowSource::rotate_candidates_deterministically(&config, &mut candidates);
1✔
3881
        assert_eq!(candidates, vec!["one".to_string()]);
1✔
3882
    }
1✔
3883

3884
    #[test]
3885
    fn extract_split_row_count_returns_none_when_missing_entries() {
1✔
3886
        let payload = json!({"size": {"configs": [{"config": "other", "splits": []}]}});
1✔
3887
        let rows = HuggingFaceRowSource::extract_split_row_count_from_size_response(
1✔
3888
            &payload, "default", "train",
1✔
3889
        );
3890
        assert!(rows.is_none());
1✔
3891
    }
1✔
3892

3893
    #[test]
3894
    fn candidates_from_parquet_manifest_json_returns_empty_without_entries() {
1✔
3895
        let dir = tempdir().unwrap();
1✔
3896
        let config = test_config(dir.path().to_path_buf());
1✔
3897
        let payload = json!({"other": []});
1✔
3898
        let (candidates, sizes) =
1✔
3899
            HuggingFaceRowSource::candidates_from_parquet_manifest_json(&config, &payload).unwrap();
1✔
3900
        assert!(candidates.is_empty());
1✔
3901
        assert!(sizes.is_empty());
1✔
3902
    }
1✔
3903

3904
    #[test]
3905
    fn read_line_at_errors_on_unexpected_eof_while_reading_target_row() {
1✔
3906
        let dir = tempdir().unwrap();
1✔
3907
        let path = dir.path().join("rows.jsonl");
1✔
3908
        fs::write(&path, b"{\"text\":\"a\"}\n").unwrap();
1✔
3909
        let mut config = test_config(dir.path().to_path_buf());
1✔
3910
        config.checkpoint_stride = 1;
1✔
3911
        let source = test_source(config.clone());
1✔
3912
        let mut shard = HuggingFaceRowSource::index_single_shard(&config, &path, 0)
1✔
3913
            .unwrap()
1✔
3914
            .unwrap();
1✔
3915
        let end = fs::metadata(&path).unwrap().len();
1✔
3916
        shard.checkpoints = vec![0, end];
1✔
3917

3918
        let err = source.read_line_at(&shard, 1);
1✔
3919
        assert!(err.is_err());
1✔
3920
    }
1✔
3921

3922
    #[test]
3923
    fn load_persisted_shard_sequence_returns_none_when_state_missing() {
1✔
3924
        let dir = tempdir().unwrap();
1✔
3925
        let config = test_config(dir.path().to_path_buf());
1✔
3926
        let loaded = HuggingFaceRowSource::load_persisted_shard_sequence(&config, 1).unwrap();
1✔
3927
        assert!(loaded.is_none());
1✔
3928
    }
1✔
3929

3930
    #[test]
3931
    fn persist_shard_sequence_clamps_next_index_on_write() {
1✔
3932
        let dir = tempdir().unwrap();
1✔
3933
        let config = test_config(dir.path().to_path_buf());
1✔
3934
        let source = test_source(config.clone());
1✔
3935
        let state = SourceState {
1✔
3936
            materialized_rows: 0,
1✔
3937
            total_rows: None,
1✔
3938
            shards: Vec::new(),
1✔
3939
            remote_candidates: Some(vec!["a".into(), "b".into()]),
1✔
3940
            remote_candidate_sizes: HashMap::new(),
1✔
3941
            next_remote_idx: 99,
1✔
3942
        };
1✔
3943

3944
        source.persist_shard_sequence_locked(&state).unwrap();
1✔
3945
        let raw =
1✔
3946
            fs::read_to_string(HuggingFaceRowSource::shard_sequence_state_path(&config)).unwrap();
1✔
3947
        let parsed: Value = serde_json::from_str(&raw).unwrap();
1✔
3948
        assert_eq!(
1✔
3949
            parsed.get("next_remote_idx").and_then(Value::as_u64),
1✔
3950
            Some(2)
3951
        );
3952
    }
1✔
3953

3954
    #[test]
3955
    fn materialize_local_file_replaces_target_when_size_differs() {
1✔
3956
        let dir = tempdir().unwrap();
1✔
3957
        let config = test_config(dir.path().to_path_buf());
1✔
3958
        let src = dir.path().join("src.ndjson");
1✔
3959
        let dst = dir.path().join("dst.ndjson");
1✔
3960
        fs::write(&src, b"newer\n").unwrap();
1✔
3961
        fs::write(&dst, b"old\n").unwrap();
1✔
3962

3963
        HuggingFaceRowSource::materialize_local_file(&config, &src, &dst).unwrap();
1✔
3964
        assert_eq!(fs::read(&dst).unwrap(), b"newer\n");
1✔
3965
    }
1✔
3966

3967
    #[test]
3968
    fn row_to_record_preserves_explicit_timestamp() {
1✔
3969
        let dir = tempdir().unwrap();
1✔
3970
        let config = test_config(dir.path().to_path_buf());
1✔
3971
        let source = test_source(config);
1✔
3972
        let ts = Utc::now();
1✔
3973
        let row = RowView {
1✔
3974
            row_id: Some("r1".into()),
1✔
3975
            timestamp: Some(ts),
1✔
3976
            text_fields: vec![RowTextField {
1✔
3977
                name: "text".into(),
1✔
3978
                text: "alpha".into(),
1✔
3979
            }],
1✔
3980
        };
1✔
3981

3982
        let record = source.row_to_record(&row, 0).unwrap().unwrap();
1✔
3983
        assert_eq!(record.created_at, ts);
1✔
3984
        assert_eq!(record.updated_at, ts);
1✔
3985
    }
1✔
3986

3987
    #[test]
3988
    fn parse_row_text_columns_accept_numeric_values() {
1✔
3989
        let dir = tempdir().unwrap();
1✔
3990
        let mut config = test_config(dir.path().to_path_buf());
1✔
3991
        config.text_columns = vec!["score".into()];
1✔
3992
        let source = test_source(config);
1✔
3993

3994
        let row = source.parse_row(0, &json!({"score": 123})).unwrap();
1✔
3995
        assert_eq!(row.text_fields.len(), 1);
1✔
3996
        assert_eq!(row.text_fields[0].text, "123");
1✔
3997
    }
1✔
3998

3999
    #[test]
4000
    fn len_hint_returns_zero_when_max_rows_is_zero() {
1✔
4001
        let dir = tempdir().unwrap();
1✔
4002
        let mut config = test_config(dir.path().to_path_buf());
1✔
4003
        config.max_rows = Some(0);
1✔
4004
        let source = test_source(config);
1✔
4005
        assert_eq!(source.len_hint(), Some(0));
1✔
4006
    }
1✔
4007

4008
    #[test]
4009
    fn refresh_limit_none_reads_up_to_total() {
1✔
4010
        let dir = tempdir().unwrap();
1✔
4011
        let path = dir.path().join("rows.jsonl");
1✔
4012
        fs::write(
1✔
4013
            &path,
1✔
4014
            b"{\"id\":\"r1\",\"text\":\"a\"}\n{\"id\":\"r2\",\"text\":\"b\"}\n",
4015
        )
4016
        .unwrap();
1✔
4017
        let mut config = test_config(dir.path().to_path_buf());
1✔
4018
        config.checkpoint_stride = 1;
1✔
4019
        config.refresh_batch_multiplier = 1;
1✔
4020
        let source = test_source(config.clone());
1✔
4021
        let shard = HuggingFaceRowSource::index_single_shard(&config, &path, 0)
1✔
4022
            .unwrap()
1✔
4023
            .unwrap();
1✔
4024
        {
1✔
4025
            let mut state = source.state.lock().unwrap();
1✔
4026
            state.materialized_rows = 2;
1✔
4027
            state.total_rows = Some(2);
1✔
4028
            state.shards = vec![shard];
1✔
4029
        }
1✔
4030

4031
        let snapshot = source.refresh(None, None).unwrap();
1✔
4032
        assert_eq!(snapshot.records.len(), 2);
1✔
4033
    }
1✔
4034

4035
    #[test]
4036
    fn read_row_batch_skips_unavailable_indices_without_error() {
1✔
4037
        let dir = tempdir().unwrap();
1✔
4038
        let config = test_config(dir.path().to_path_buf());
1✔
4039
        let source = test_source(config);
1✔
4040
        {
1✔
4041
            let mut state = source.state.lock().unwrap();
1✔
4042
            state.materialized_rows = 0;
1✔
4043
            state.total_rows = Some(0);
1✔
4044
            state.remote_candidates = Some(Vec::new());
1✔
4045
        }
1✔
4046

4047
        let mut out = Vec::new();
1✔
4048
        source.read_row_batch(&[0, 1], &mut out, Some(2)).unwrap();
1✔
4049
        assert!(out.is_empty());
1✔
4050
    }
1✔
4051

4052
    #[test]
4053
    fn candidate_target_path_maps_remote_urls_under_manifest_root() {
1✔
4054
        let dir = tempdir().unwrap();
1✔
4055
        let config = test_config(dir.path().to_path_buf());
1✔
4056
        let candidate =
1✔
4057
            "url::https://huggingface.co/datasets/org/ds/resolve/main/train/part-000.parquet";
1✔
4058
        let target = HuggingFaceRowSource::candidate_target_path(&config, candidate);
1✔
4059
        assert!(target.ends_with("_parquet_manifest/main/train/part-000.parquet"));
1✔
4060
    }
1✔
4061

4062
    #[test]
4063
    fn candidate_target_path_keeps_local_candidates_relative() {
1✔
4064
        let dir = tempdir().unwrap();
1✔
4065
        let config = test_config(dir.path().to_path_buf());
1✔
4066
        let candidate = "train/part-001.ndjson";
1✔
4067
        let target = HuggingFaceRowSource::candidate_target_path(&config, candidate);
1✔
4068
        assert_eq!(target, config.snapshot_dir.join(candidate));
1✔
4069
    }
1✔
4070

4071
    #[test]
4072
    fn target_matches_expected_size_validates_when_expected_is_provided() {
1✔
4073
        let dir = tempdir().unwrap();
1✔
4074
        let path = dir.path().join("payload.bin");
1✔
4075
        fs::write(&path, vec![0u8; 5]).unwrap();
1✔
4076

4077
        assert!(HuggingFaceRowSource::target_matches_expected_size(
1✔
4078
            &path,
1✔
4079
            Some(5)
1✔
4080
        ));
4081
        assert!(!HuggingFaceRowSource::target_matches_expected_size(
1✔
4082
            &path,
1✔
4083
            Some(4)
1✔
4084
        ));
1✔
4085
        assert!(HuggingFaceRowSource::target_matches_expected_size(
1✔
4086
            &path, None
1✔
4087
        ));
4088
    }
1✔
4089

4090
    #[test]
4091
    fn parquet_row_group_map_and_index_single_shard_cover_success_path() {
1✔
4092
        let dir = tempdir().unwrap();
1✔
4093
        let path = dir.path().join("rows.parquet");
1✔
4094
        write_parquet_fixture(&path, &[("r1", "alpha"), ("r2", "beta"), ("r3", "gamma")]);
1✔
4095
        let config = test_config(dir.path().to_path_buf());
1✔
4096

4097
        let (total_rows, groups) =
1✔
4098
            HuggingFaceRowSource::parquet_row_group_map(&config, &path).unwrap();
1✔
4099
        assert_eq!(total_rows, 3);
1✔
4100
        assert!(!groups.is_empty());
1✔
4101

4102
        let shard = HuggingFaceRowSource::index_single_shard(&config, &path, 0)
1✔
4103
            .unwrap()
1✔
4104
            .unwrap();
1✔
4105
        assert!(shard.is_parquet);
1✔
4106
        assert_eq!(shard.row_count, 3);
1✔
4107
        assert!(shard.checkpoints.is_empty());
1✔
4108
    }
1✔
4109

4110
    #[test]
4111
    fn read_row_batch_reads_parquet_rows_and_uses_cache_on_repeat() {
1✔
4112
        let dir = tempdir().unwrap();
1✔
4113
        let path = dir.path().join("rows.parquet");
1✔
4114
        write_parquet_fixture(&path, &[("r10", "ten"), ("r11", "eleven")]);
1✔
4115

4116
        let config = test_config(dir.path().to_path_buf());
1✔
4117
        let source = test_source(config.clone());
1✔
4118
        let shard = HuggingFaceRowSource::index_single_shard(&config, &path, 0)
1✔
4119
            .unwrap()
1✔
4120
            .unwrap();
1✔
4121
        {
1✔
4122
            let mut state = source.state.lock().unwrap();
1✔
4123
            state.materialized_rows = 2;
1✔
4124
            state.total_rows = Some(2);
1✔
4125
            state.shards = vec![shard];
1✔
4126
        }
1✔
4127

4128
        let mut first = Vec::new();
1✔
4129
        source.read_row_batch(&[0, 1], &mut first, None).unwrap();
1✔
4130
        assert_eq!(first.len(), 2);
1✔
4131
        assert!(first.iter().any(|record| record.id.ends_with("::r10")));
1✔
4132

4133
        let mut second = Vec::new();
1✔
4134
        source.read_row_batch(&[0, 1], &mut second, None).unwrap();
1✔
4135
        assert_eq!(second.len(), 2);
1✔
4136
    }
1✔
4137

4138
    #[test]
4139
    fn ensure_row_available_bootstraps_from_in_memory_candidates() {
1✔
4140
        let dir = tempdir().unwrap();
1✔
4141
        let config = test_config(dir.path().to_path_buf());
1✔
4142
        let source = test_source(config.clone());
1✔
4143

4144
        let payload =
1✔
4145
            b"{\"id\":\"r1\",\"text\":\"alpha\"}\n{\"id\":\"r2\",\"text\":\"beta\"}\n".to_vec();
1✔
4146
        let (base_url, server) = spawn_one_shot_http(payload);
1✔
4147
        let candidate =
1✔
4148
            format!("url::{base_url}/datasets/org/ds/resolve/main/train/persisted.ndjson");
1✔
4149

4150
        {
1✔
4151
            let mut state = source.state.lock().unwrap();
1✔
4152
            state.remote_candidates = Some(vec![candidate]);
1✔
4153
            state.next_remote_idx = 0;
1✔
4154
        }
1✔
4155

4156
        assert!(source.ensure_row_available(0).unwrap());
1✔
4157
        server.join().unwrap();
1✔
4158

4159
        let state = source.state.lock().unwrap();
1✔
4160
        assert_eq!(state.materialized_rows, 2);
1✔
4161
        assert_eq!(state.next_remote_idx, 1);
1✔
4162
        assert_eq!(state.shards.len(), 1);
1✔
4163
    }
1✔
4164

4165
    #[test]
4166
    fn configure_sampler_updates_len_hint_headroom_via_trait_methods() {
1✔
4167
        let dir = tempdir().unwrap();
1✔
4168
        let mut config = test_config(dir.path().to_path_buf());
1✔
4169
        config.cache_capacity = 10;
1✔
4170
        config.remote_expansion_headroom_multiplier = 3;
1✔
4171
        let source = test_source(config);
1✔
4172
        {
1✔
4173
            let mut state = source.state.lock().unwrap();
1✔
4174
            state.materialized_rows = 5;
1✔
4175
            state.total_rows = Some(100);
1✔
4176
        }
1✔
4177

4178
        assert_eq!(source.reported_record_count().unwrap(), 35);
1✔
4179

4180
        let sampler = SamplerConfig {
1✔
4181
            ingestion_max_records: 2,
1✔
4182
            ..SamplerConfig::default()
1✔
4183
        };
1✔
4184
        source.configure_sampler(&sampler);
1✔
4185

4186
        assert_eq!(source.reported_record_count().unwrap(), 11);
1✔
4187
    }
1✔
4188

4189
    #[test]
4190
    fn refresh_ignores_persisted_remote_sequence_state() {
1✔
4191
        let dir = tempdir().unwrap();
1✔
4192
        let config = test_config(dir.path().to_path_buf());
1✔
4193
        let source = test_source(config.clone());
1✔
4194

4195
        let payload = b"{\"id\":\"rr\",\"text\":\"hello\"}\n".to_vec();
1✔
4196
        let (base_url, server) = spawn_one_shot_http(payload);
1✔
4197
        let candidate =
1✔
4198
            format!("url::{base_url}/datasets/org/ds/resolve/main/train/refresh.ndjson");
1✔
4199

4200
        let state_path = HuggingFaceRowSource::shard_sequence_state_path(&config);
1✔
4201
        fs::create_dir_all(state_path.parent().unwrap()).unwrap();
1✔
4202
        fs::write(
1✔
4203
            &state_path,
1✔
4204
            serde_json::to_vec_pretty(&json!({
1✔
4205
                "version": 1,
1✔
4206
                "source_id": config.source_id,
1✔
4207
                "dataset": config.dataset,
1✔
4208
                "config": config.config,
1✔
4209
                "split": config.split,
1✔
4210
                "sampler_seed": 1,
1✔
4211
                "candidates": [candidate],
1✔
4212
                "candidate_sizes": {},
1✔
4213
                "next_remote_idx": 1
1✔
4214
            }))
1✔
4215
            .unwrap(),
1✔
4216
        )
4217
        .unwrap();
1✔
4218

4219
        {
1✔
4220
            let mut state = source.state.lock().unwrap();
1✔
4221
            state.remote_candidates = Some(vec![format!(
1✔
4222
                "url::{base_url}/datasets/org/ds/resolve/main/train/refresh.ndjson"
1✔
4223
            )]);
1✔
4224
            state.next_remote_idx = 0;
1✔
4225
        }
1✔
4226

4227
        let snapshot = source.refresh(None, Some(1)).unwrap();
1✔
4228
        server.join().unwrap();
1✔
4229

4230
        assert_eq!(snapshot.records.len(), 1);
1✔
4231
        assert!(snapshot.records[0].id.contains("hf_test::rr"));
1✔
4232
    }
1✔
4233

4234
    #[test]
4235
    fn download_next_remote_shard_trims_rows_to_max_rows_limit() {
1✔
4236
        let dir = tempdir().unwrap();
1✔
4237
        let mut config = test_config(dir.path().to_path_buf());
1✔
4238
        config.max_rows = Some(1);
1✔
4239
        let source = test_source(config);
1✔
4240
        let payload = b"{\"text\":\"a\"}\n{\"text\":\"b\"}\n".to_vec();
1✔
4241
        let (base_url, server) = spawn_one_shot_http(payload);
1✔
4242
        let candidate = format!("url::{base_url}/datasets/org/ds/resolve/main/train/trim.ndjson");
1✔
4243

4244
        {
1✔
4245
            let mut state = source.state.lock().unwrap();
1✔
4246
            state.remote_candidates = Some(vec![candidate]);
1✔
4247
            state.next_remote_idx = 0;
1✔
4248
            state.materialized_rows = 0;
1✔
4249
        }
1✔
4250

4251
        assert!(source.download_next_remote_shard().unwrap());
1✔
4252
        server.join().unwrap();
1✔
4253

4254
        let state = source.state.lock().unwrap();
1✔
4255
        assert_eq!(state.materialized_rows, 1);
1✔
4256
        assert_eq!(state.shards.len(), 1);
1✔
4257
        assert_eq!(state.shards[0].row_count, 1);
1✔
4258
    }
1✔
4259

4260
    #[test]
4261
    fn build_shard_index_skips_empty_files_and_keeps_non_empty() {
1✔
4262
        let dir = tempdir().unwrap();
1✔
4263
        fs::write(dir.path().join("a.ndjson"), b"").unwrap();
1✔
4264
        fs::write(dir.path().join("b.ndjson"), b"{\"text\":\"x\"}\n").unwrap();
1✔
4265
        let config = test_config(dir.path().to_path_buf());
1✔
4266

4267
        let (shards, discovered) = HuggingFaceRowSource::build_shard_index(&config).unwrap();
1✔
4268
        assert_eq!(discovered, 1);
1✔
4269
        assert_eq!(shards.len(), 1);
1✔
4270
        assert_eq!(shards[0].row_count, 1);
1✔
4271
    }
1✔
4272

4273
    #[test]
4274
    fn resolve_remote_candidates_from_siblings_falls_back_when_split_filter_misses() {
1✔
4275
        let dir = tempdir().unwrap();
1✔
4276
        let mut config = test_config(dir.path().to_path_buf());
1✔
4277
        config.split = "train".to_string();
1✔
4278
        let accepted = HuggingFaceRowSource::normalized_shard_extensions(&config);
1✔
4279
        let siblings = vec![
1✔
4280
            "validation/file-a.ndjson".to_string(),
1✔
4281
            "test/file-b.ndjson".to_string(),
1✔
4282
        ];
4283

4284
        let (candidates, sizes) = HuggingFaceRowSource::resolve_remote_candidates_from_siblings(
1✔
4285
            &config, &siblings, &accepted,
1✔
4286
        )
1✔
4287
        .unwrap();
1✔
4288

4289
        assert!(sizes.is_empty());
1✔
4290
        assert_eq!(candidates.len(), 2);
1✔
4291
    }
1✔
4292

4293
    #[test]
4294
    fn resolve_remote_candidates_from_siblings_errors_for_parquet_only_when_not_accepted() {
1✔
4295
        let dir = tempdir().unwrap();
1✔
4296
        let mut config = test_config(dir.path().to_path_buf());
1✔
4297
        config.shard_extensions = vec!["ndjson".to_string()];
1✔
4298
        let accepted = HuggingFaceRowSource::normalized_shard_extensions(&config);
1✔
4299
        let siblings = vec!["train/only.parquet".to_string()];
1✔
4300

4301
        let result = HuggingFaceRowSource::resolve_remote_candidates_from_siblings(
1✔
4302
            &config, &siblings, &accepted,
1✔
4303
        );
4304
        assert!(result.is_err());
1✔
4305
    }
1✔
4306

4307
    #[test]
4308
    fn resolve_remote_candidates_from_siblings_returns_empty_when_no_matches_and_no_parquet() {
1✔
4309
        let dir = tempdir().unwrap();
1✔
4310
        let config = test_config(dir.path().to_path_buf());
1✔
4311
        let accepted = HuggingFaceRowSource::normalized_shard_extensions(&config);
1✔
4312
        let siblings = vec!["train/notes.txt".to_string()];
1✔
4313

4314
        let (candidates, sizes) = HuggingFaceRowSource::resolve_remote_candidates_from_siblings(
1✔
4315
            &config, &siblings, &accepted,
1✔
4316
        )
1✔
4317
        .unwrap();
1✔
4318
        assert!(candidates.is_empty());
1✔
4319
        assert!(sizes.is_empty());
1✔
4320
    }
1✔
4321

4322
    #[test]
4323
    fn parse_global_row_count_response_applies_max_rows() {
1✔
4324
        let dir = tempdir().unwrap();
1✔
4325
        let mut config = test_config(dir.path().to_path_buf());
1✔
4326
        config.max_rows = Some(3);
1✔
4327
        let body = serde_json::to_string(&json!({
1✔
4328
            "size": {
1✔
4329
                "splits": [
1✔
4330
                    {"config": "default", "split": "train", "num_rows": 10}
1✔
4331
                ]
1✔
4332
            }
1✔
4333
        }))
1✔
4334
        .unwrap();
1✔
4335

4336
        let rows = HuggingFaceRowSource::parse_global_row_count_response(&config, &body)
1✔
4337
            .unwrap()
1✔
4338
            .unwrap();
1✔
4339
        assert_eq!(rows, 3);
1✔
4340
    }
1✔
4341

4342
    #[test]
4343
    fn parse_global_row_count_response_errors_on_invalid_json() {
1✔
4344
        let dir = tempdir().unwrap();
1✔
4345
        let config = test_config(dir.path().to_path_buf());
1✔
4346
        let parsed = HuggingFaceRowSource::parse_global_row_count_response(&config, "{bad-json");
1✔
4347
        assert!(parsed.is_err());
1✔
4348
    }
1✔
4349

4350
    #[test]
4351
    fn parse_parquet_manifest_response_errors_on_invalid_json() {
1✔
4352
        let dir = tempdir().unwrap();
1✔
4353
        let config = test_config(dir.path().to_path_buf());
1✔
4354
        let parsed = HuggingFaceRowSource::parse_parquet_manifest_response(&config, "{bad-json");
1✔
4355
        assert!(parsed.is_err());
1✔
4356
    }
1✔
4357

4358
    #[test]
4359
    fn parse_parquet_manifest_response_returns_candidates() {
1✔
4360
        let dir = tempdir().unwrap();
1✔
4361
        let config = test_config(dir.path().to_path_buf());
1✔
4362
        let body = serde_json::to_string(&json!({
1✔
4363
            "parquet_files": [
1✔
4364
                {"url": "https://host/datasets/x/resolve/main/train/0.parquet", "size": 5}
1✔
4365
            ]
1✔
4366
        }))
1✔
4367
        .unwrap();
1✔
4368

4369
        let (candidates, sizes) =
1✔
4370
            HuggingFaceRowSource::parse_parquet_manifest_response(&config, &body).unwrap();
1✔
4371
        assert_eq!(candidates.len(), 1);
1✔
4372
        assert_eq!(sizes.len(), 1);
1✔
4373
    }
1✔
4374

4375
    #[test]
4376
    fn list_remote_candidates_from_parquet_manifest_uses_test_endpoint_override() {
1✔
4377
        let dir = tempdir().unwrap();
1✔
4378
        let config = test_config(dir.path().to_path_buf());
1✔
4379
        let body = serde_json::to_vec(&json!({
1✔
4380
            "parquet_files": [
1✔
4381
                {"url": "https://host/datasets/x/resolve/main/train/0.parquet", "size": 5}
1✔
4382
            ]
1✔
4383
        }))
1✔
4384
        .unwrap();
1✔
4385
        let (base_url, server) = spawn_one_shot_http(body);
1✔
4386

4387
        let (candidates, sizes) = with_env_var("TRIPLETS_HF_PARQUET_ENDPOINT", &base_url, || {
1✔
4388
            HuggingFaceRowSource::list_remote_candidates_from_parquet_manifest(&config)
1✔
4389
        })
1✔
4390
        .unwrap();
1✔
4391
        server.join().unwrap();
1✔
4392

4393
        assert_eq!(candidates.len(), 1);
1✔
4394
        assert_eq!(sizes.len(), 1);
1✔
4395
    }
1✔
4396

4397
    #[test]
4398
    fn fetch_global_row_count_uses_test_endpoint_override() {
1✔
4399
        let dir = tempdir().unwrap();
1✔
4400
        let config = test_config(dir.path().to_path_buf());
1✔
4401
        let body = serde_json::to_vec(&json!({
1✔
4402
            "size": {
1✔
4403
                "splits": [
1✔
4404
                    {"config": "default", "split": "train", "num_rows": 12}
1✔
4405
                ]
1✔
4406
            }
1✔
4407
        }))
1✔
4408
        .unwrap();
1✔
4409
        let (base_url, server) = spawn_one_shot_http(body);
1✔
4410

4411
        let rows = with_env_var("TRIPLETS_HF_SIZE_ENDPOINT", &base_url, || {
1✔
4412
            HuggingFaceRowSource::fetch_global_row_count(&config)
1✔
4413
        })
1✔
4414
        .unwrap();
1✔
4415
        server.join().unwrap();
1✔
4416
        assert_eq!(rows, Some(12));
1✔
4417
    }
1✔
4418

4419
    #[test]
4420
    fn endpoint_helpers_fallback_for_empty_env_values() {
1✔
4421
        let parquet = with_env_var("TRIPLETS_HF_PARQUET_ENDPOINT", "   ", || {
1✔
4422
            HuggingFaceRowSource::parquet_manifest_endpoint()
1✔
4423
        });
1✔
4424
        assert_eq!(parquet, "https://datasets-server.huggingface.co/parquet");
1✔
4425

4426
        let size = with_env_var("TRIPLETS_HF_SIZE_ENDPOINT", "", || {
1✔
4427
            HuggingFaceRowSource::size_endpoint()
1✔
4428
        });
1✔
4429
        assert_eq!(size, "https://datasets-server.huggingface.co/size");
1✔
4430
    }
1✔
4431

4432
    #[test]
4433
    fn resolve_remote_candidates_respects_split_prefix_in_filename() {
1✔
4434
        let dir = tempdir().unwrap();
1✔
4435
        let mut config = test_config(dir.path().to_path_buf());
1✔
4436
        config.split = "train".to_string();
1✔
4437
        let accepted = HuggingFaceRowSource::normalized_shard_extensions(&config);
1✔
4438
        let siblings = vec![
1✔
4439
            "train-part-000.ndjson".to_string(),
1✔
4440
            "validation-part-000.ndjson".to_string(),
1✔
4441
        ];
4442

4443
        let (candidates, _) = HuggingFaceRowSource::resolve_remote_candidates_from_siblings(
1✔
4444
            &config, &siblings, &accepted,
1✔
4445
        )
1✔
4446
        .unwrap();
1✔
4447

4448
        assert_eq!(candidates, vec!["train-part-000.ndjson".to_string()]);
1✔
4449
    }
1✔
4450

4451
    #[test]
4452
    fn fetch_global_row_count_returns_none_when_split_not_present() {
1✔
4453
        let dir = tempdir().unwrap();
1✔
4454
        let config = test_config(dir.path().to_path_buf());
1✔
4455
        let body = serde_json::to_vec(&json!({
1✔
4456
            "size": {
1✔
4457
                "splits": [
1✔
4458
                    {"config": "default", "split": "validation", "num_rows": 12}
1✔
4459
                ]
1✔
4460
            }
1✔
4461
        }))
1✔
4462
        .unwrap();
1✔
4463
        let (base_url, server) = spawn_one_shot_http(body);
1✔
4464

4465
        let rows = with_env_var("TRIPLETS_HF_SIZE_ENDPOINT", &base_url, || {
1✔
4466
            HuggingFaceRowSource::fetch_global_row_count(&config)
1✔
4467
        })
1✔
4468
        .unwrap();
1✔
4469
        server.join().unwrap();
1✔
4470
        assert_eq!(rows, None);
1✔
4471
    }
1✔
4472

4473
    #[test]
4474
    fn list_remote_candidates_returns_manifest_candidates_before_repo_fallback() {
1✔
4475
        let dir = tempdir().unwrap();
1✔
4476
        let config = test_config(dir.path().to_path_buf());
1✔
4477
        let body = serde_json::to_vec(&json!({
1✔
4478
            "parquet_files": [
1✔
4479
                {"url": "https://host/datasets/x/resolve/main/train/1.ndjson", "size": 9}
1✔
4480
            ]
1✔
4481
        }))
1✔
4482
        .unwrap();
1✔
4483
        let (base_url, server) = spawn_one_shot_http(body);
1✔
4484

4485
        let (candidates, sizes) = with_env_var("TRIPLETS_HF_PARQUET_ENDPOINT", &base_url, || {
1✔
4486
            HuggingFaceRowSource::list_remote_candidates(&config)
1✔
4487
        })
1✔
4488
        .unwrap();
1✔
4489
        server.join().unwrap();
1✔
4490

4491
        assert_eq!(candidates.len(), 1);
1✔
4492
        assert_eq!(sizes.len(), 1);
1✔
4493
        assert!(candidates[0].ends_with("/1.ndjson"));
1✔
4494
    }
1✔
4495

4496
    #[test]
4497
    fn list_remote_candidates_from_parquet_manifest_errors_when_endpoint_unreachable() {
1✔
4498
        let dir = tempdir().unwrap();
1✔
4499
        let config = test_config(dir.path().to_path_buf());
1✔
4500

4501
        let result = with_env_var("TRIPLETS_HF_PARQUET_ENDPOINT", "http://127.0.0.1:1", || {
1✔
4502
            HuggingFaceRowSource::list_remote_candidates_from_parquet_manifest(&config)
1✔
4503
        });
1✔
4504
        assert!(result.is_err());
1✔
4505
    }
1✔
4506

4507
    #[test]
4508
    fn fetch_global_row_count_errors_when_endpoint_unreachable() {
1✔
4509
        let dir = tempdir().unwrap();
1✔
4510
        let config = test_config(dir.path().to_path_buf());
1✔
4511

4512
        let result = with_env_var("TRIPLETS_HF_SIZE_ENDPOINT", "http://127.0.0.1:1", || {
1✔
4513
            HuggingFaceRowSource::fetch_global_row_count(&config)
1✔
4514
        });
1✔
4515
        assert!(result.is_err());
1✔
4516
    }
1✔
4517

4518
    #[test]
4519
    fn download_and_materialize_shard_downloads_url_candidate() {
1✔
4520
        let dir = tempdir().unwrap();
1✔
4521
        let config = test_config(dir.path().to_path_buf());
1✔
4522
        let payload = b"{\"text\":\"a\"}\n{\"text\":\"b\"}\n".to_vec();
1✔
4523
        let (base_url, server) = spawn_one_shot_http(payload.clone());
1✔
4524
        let candidate =
1✔
4525
            format!("url::{base_url}/datasets/org/ds/resolve/main/train/part-000.ndjson");
1✔
4526

4527
        let target =
1✔
4528
            HuggingFaceRowSource::download_and_materialize_shard(&config, &candidate, None)
1✔
4529
                .unwrap();
1✔
4530

4531
        server.join().unwrap();
1✔
4532
        assert!(target.exists());
1✔
4533
        assert_eq!(fs::read(&target).unwrap(), payload);
1✔
4534
    }
1✔
4535

4536
    #[test]
4537
    fn download_and_materialize_shard_replaces_incomplete_existing_target() {
1✔
4538
        let dir = tempdir().unwrap();
1✔
4539
        let config = test_config(dir.path().to_path_buf());
1✔
4540
        let payload = b"{\"text\":\"a\"}\n".to_vec();
1✔
4541
        let (base_url, server) = spawn_one_shot_http(payload.clone());
1✔
4542
        let candidate =
1✔
4543
            format!("url::{base_url}/datasets/org/ds/resolve/main/train/part-009.ndjson");
1✔
4544

4545
        let target = HuggingFaceRowSource::candidate_target_path(&config, &candidate);
1✔
4546
        fs::create_dir_all(target.parent().unwrap()).unwrap();
1✔
4547
        fs::write(&target, b"bad").unwrap();
1✔
4548

4549
        let refreshed = HuggingFaceRowSource::download_and_materialize_shard(
1✔
4550
            &config,
1✔
4551
            &candidate,
1✔
4552
            Some(payload.len() as u64),
1✔
4553
        )
4554
        .unwrap();
1✔
4555

4556
        server.join().unwrap();
1✔
4557
        assert_eq!(refreshed, target);
1✔
4558
        assert_eq!(fs::read(&target).unwrap(), payload);
1✔
4559
    }
1✔
4560

4561
    #[test]
4562
    fn download_next_remote_shard_materializes_and_indexes_rows() {
1✔
4563
        let dir = tempdir().unwrap();
1✔
4564
        let config = test_config(dir.path().to_path_buf());
1✔
4565
        let source = test_source(config);
1✔
4566
        let payload = b"{\"text\":\"a\"}\n{\"text\":\"b\"}\n".to_vec();
1✔
4567
        let (base_url, server) = spawn_one_shot_http(payload);
1✔
4568
        let candidate =
1✔
4569
            format!("url::{base_url}/datasets/org/ds/resolve/main/train/part-001.ndjson");
1✔
4570

4571
        {
1✔
4572
            let mut state = source.state.lock().unwrap();
1✔
4573
            state.remote_candidates = Some(vec![candidate.clone()]);
1✔
4574
            state.remote_candidate_sizes.insert(candidate, 24);
1✔
4575
            state.next_remote_idx = 0;
1✔
4576
        }
1✔
4577

4578
        assert!(source.download_next_remote_shard().unwrap());
1✔
4579
        server.join().unwrap();
1✔
4580

4581
        let state = source.state.lock().unwrap();
1✔
4582
        assert_eq!(state.materialized_rows, 2);
1✔
4583
        assert_eq!(state.shards.len(), 1);
1✔
4584
        assert_eq!(state.next_remote_idx, 1);
1✔
4585
    }
1✔
4586

4587
    #[test]
4588
    fn ensure_row_available_triggers_lazy_download_for_remote_candidates() {
1✔
4589
        let dir = tempdir().unwrap();
1✔
4590
        let config = test_config(dir.path().to_path_buf());
1✔
4591
        let source = test_source(config);
1✔
4592
        let payload = b"{\"text\":\"x\"}\n{\"text\":\"y\"}\n".to_vec();
1✔
4593
        let (base_url, server) = spawn_one_shot_http(payload);
1✔
4594
        let candidate =
1✔
4595
            format!("url::{base_url}/datasets/org/ds/resolve/main/train/part-002.ndjson");
1✔
4596

4597
        {
1✔
4598
            let mut state = source.state.lock().unwrap();
1✔
4599
            state.materialized_rows = 0;
1✔
4600
            state.remote_candidates = Some(vec![candidate.clone()]);
1✔
4601
            state.remote_candidate_sizes.insert(candidate, 24);
1✔
4602
            state.next_remote_idx = 0;
1✔
4603
        }
1✔
4604

4605
        assert!(source.ensure_row_available(0).unwrap());
1✔
4606
        server.join().unwrap();
1✔
4607

4608
        let state = source.state.lock().unwrap();
1✔
4609
        assert!(state.materialized_rows >= 1);
1✔
4610
        assert_eq!(state.next_remote_idx, 1);
1✔
4611
    }
1✔
4612

4613
    #[test]
4614
    fn extract_split_row_count_reads_split_entries() {
1✔
4615
        let payload = json!({
1✔
4616
            "size": {
1✔
4617
                "splits": [
1✔
4618
                    {"config": "default", "split": "train", "num_rows": 123u64},
1✔
4619
                    {"config": "default", "split": "validation", "num_rows": 45u64}
1✔
4620
                ]
4621
            }
4622
        });
4623

4624
        let count = HuggingFaceRowSource::extract_split_row_count_from_size_response(
1✔
4625
            &payload,
1✔
4626
            "default",
1✔
4627
            "validation",
1✔
4628
        );
4629
        assert_eq!(count, Some(45));
1✔
4630
    }
1✔
4631

4632
    #[test]
4633
    fn extract_split_row_count_reads_config_fallback_and_dataset_total() {
1✔
4634
        let payload = json!({
1✔
4635
            "size": {
1✔
4636
                "configs": [
1✔
4637
                    {
4638
                        "config": "default",
1✔
4639
                        "splits": [{"name": "test", "num_rows": 77u64}],
1✔
4640
                        "num_rows": 200u64
1✔
4641
                    }
4642
                ],
4643
                "dataset": {"num_rows": 999u64}
1✔
4644
            }
4645
        });
4646

4647
        let split_count = HuggingFaceRowSource::extract_split_row_count_from_size_response(
1✔
4648
            &payload, "default", "test",
1✔
4649
        );
4650
        assert_eq!(split_count, Some(77));
1✔
4651

4652
        let empty_split_count = HuggingFaceRowSource::extract_split_row_count_from_size_response(
1✔
4653
            &payload, "default", "",
1✔
4654
        );
4655
        assert_eq!(empty_split_count, Some(200));
1✔
4656
    }
1✔
4657

4658
    #[test]
4659
    fn shard_candidate_seed_is_seeded_and_source_scoped() {
1✔
4660
        let dir = tempdir().unwrap();
1✔
4661
        let mut a = test_config(dir.path().join("a"));
1✔
4662
        let mut b = test_config(dir.path().join("b"));
1✔
4663
        a.source_id = "source_a".to_string();
1✔
4664
        b.source_id = "source_b".to_string();
1✔
4665

4666
        let with_seed_a = HuggingFaceRowSource::shard_candidate_seed(&a, 100, 42);
1✔
4667
        let with_seed_a_again = HuggingFaceRowSource::shard_candidate_seed(&a, 100, 42);
1✔
4668
        assert_eq!(with_seed_a, with_seed_a_again);
1✔
4669

4670
        let with_seed_b = HuggingFaceRowSource::shard_candidate_seed(&b, 100, 42);
1✔
4671
        assert_ne!(with_seed_a, with_seed_b);
1✔
4672

4673
        let different_seed_a = HuggingFaceRowSource::shard_candidate_seed(&a, 100, 7);
1✔
4674
        assert_ne!(with_seed_a, different_seed_a);
1✔
4675
    }
1✔
4676

4677
    #[test]
4678
    fn remote_shard_permutation_is_deterministic_by_sampler_seed() {
1✔
4679
        let dir = tempdir().unwrap();
1✔
4680
        let config = test_config(dir.path().to_path_buf());
1✔
4681
        let total = 8usize;
1✔
4682

4683
        let seed_a = HuggingFaceRowSource::shard_candidate_seed(&config, total, 7);
1✔
4684
        let seed_b = HuggingFaceRowSource::shard_candidate_seed(&config, total, 7);
1✔
4685
        let seed_c = HuggingFaceRowSource::shard_candidate_seed(&config, total, 123);
1✔
4686

4687
        let mut perm_a = crate::source::IndexPermutation::new(total, seed_a, 0);
1✔
4688
        let mut perm_b = crate::source::IndexPermutation::new(total, seed_b, 0);
1✔
4689
        let mut perm_c = crate::source::IndexPermutation::new(total, seed_c, 0);
1✔
4690

4691
        let take = 6usize;
1✔
4692
        let order_a: Vec<usize> = (0..take).map(|_| perm_a.next()).collect();
6✔
4693
        let order_b: Vec<usize> = (0..take).map(|_| perm_b.next()).collect();
6✔
4694
        let order_c: Vec<usize> = (0..take).map(|_| perm_c.next()).collect();
6✔
4695

4696
        assert_eq!(order_a, order_b);
1✔
4697
        assert_ne!(order_a, order_c);
1✔
4698
    }
1✔
4699

4700
    #[test]
4701
    fn build_shard_index_ignores_manifest_cache_artifacts() {
1✔
4702
        let dir = tempdir().unwrap();
1✔
4703
        let mut config = test_config(dir.path().to_path_buf());
1✔
4704
        config.shard_extensions = vec!["ndjson".to_string()];
1✔
4705

4706
        let local = dir.path().join("local.ndjson");
1✔
4707
        fs::write(&local, b"{\"id\":\"l1\",\"text\":\"x\"}\n").unwrap();
1✔
4708

4709
        let manifest_cached = dir
1✔
4710
            .path()
1✔
4711
            .join("_parquet_manifest")
1✔
4712
            .join("main/train/cached.ndjson");
1✔
4713
        fs::create_dir_all(manifest_cached.parent().unwrap()).unwrap();
1✔
4714
        fs::write(&manifest_cached, b"{\"id\":\"r1\",\"text\":\"y\"}\n").unwrap();
1✔
4715

4716
        let (shards, discovered) = HuggingFaceRowSource::build_shard_index(&config).unwrap();
1✔
4717
        assert_eq!(discovered, 1);
1✔
4718
        assert_eq!(shards.len(), 1);
1✔
4719
        assert_eq!(shards[0].path, local);
1✔
4720
    }
1✔
4721

4722
    #[test]
4723
    fn expansion_headroom_uses_sampler_ingestion_max_records_when_configured() {
1✔
4724
        let dir = tempdir().unwrap();
1✔
4725
        let config = test_config(dir.path().to_path_buf());
1✔
4726
        let source = test_source(config);
1✔
4727

4728
        assert_eq!(source.effective_expansion_headroom_rows(), 30);
1✔
4729

4730
        let sampler = SamplerConfig {
1✔
4731
            ingestion_max_records: 7,
1✔
4732
            ..SamplerConfig::default()
1✔
4733
        };
1✔
4734
        source.configure_sampler(&sampler);
1✔
4735
        assert_eq!(source.effective_expansion_headroom_rows(), 21);
1✔
4736
    }
1✔
4737

4738
    #[test]
4739
    fn persisted_shard_sequence_roundtrip_respects_sampler_seed() {
1✔
4740
        let dir = tempdir().unwrap();
1✔
4741
        let config = test_config(dir.path().to_path_buf());
1✔
4742
        let source = test_source(config.clone());
1✔
4743

4744
        {
1✔
4745
            let sampler = SamplerConfig {
1✔
4746
                seed: 4242,
1✔
4747
                ..SamplerConfig::default()
1✔
4748
            };
1✔
4749
            source.configure_sampler(&sampler);
1✔
4750
        }
1✔
4751

4752
        let mut state = SourceState {
1✔
4753
            materialized_rows: 0,
1✔
4754
            total_rows: None,
1✔
4755
            shards: Vec::new(),
1✔
4756
            remote_candidates: Some(vec![
1✔
4757
                "url::https://x/resolve/main/train/000.parquet".to_string(),
1✔
4758
                "url::https://x/resolve/main/train/001.parquet".to_string(),
1✔
4759
            ]),
1✔
4760
            remote_candidate_sizes: HashMap::new(),
1✔
4761
            next_remote_idx: 1,
1✔
4762
        };
1✔
4763
        state.remote_candidate_sizes.insert(
1✔
4764
            "url::https://x/resolve/main/train/000.parquet".to_string(),
1✔
4765
            10,
4766
        );
4767

4768
        source.persist_shard_sequence_locked(&state).unwrap();
1✔
4769

4770
        let restored = HuggingFaceRowSource::load_persisted_shard_sequence(&config, 4242).unwrap();
1✔
4771
        assert!(restored.is_some());
1✔
4772
        let restored = restored.unwrap();
1✔
4773
        assert_eq!(restored.next_remote_idx, 1);
1✔
4774
        assert_eq!(restored.candidates.len(), 2);
1✔
4775

4776
        let rejected = HuggingFaceRowSource::load_persisted_shard_sequence(&config, 9999).unwrap();
1✔
4777
        assert!(rejected.is_none());
1✔
4778
    }
1✔
4779

4780
    #[test]
4781
    fn value_to_text_handles_scalar_and_structured_values() {
1✔
4782
        assert_eq!(HuggingFaceRowSource::value_to_text(&json!(null)), None);
1✔
4783
        assert_eq!(HuggingFaceRowSource::value_to_text(&json!("   ")), None);
1✔
4784
        assert_eq!(
1✔
4785
            HuggingFaceRowSource::value_to_text(&json!("hello")),
1✔
4786
            Some("hello".into())
1✔
4787
        );
4788
        assert_eq!(
1✔
4789
            HuggingFaceRowSource::value_to_text(&json!(true)),
1✔
4790
            Some("true".into())
1✔
4791
        );
4792
        assert_eq!(
1✔
4793
            HuggingFaceRowSource::value_to_text(&json!(3.5)),
1✔
4794
            Some("3.5".into())
1✔
4795
        );
4796
        assert_eq!(
1✔
4797
            HuggingFaceRowSource::value_to_text(&json!([1, 2])),
1✔
4798
            Some("[1,2]".into())
1✔
4799
        );
4800
    }
1✔
4801

4802
    #[test]
4803
    fn parse_row_auto_detects_text_fields_and_skips_id() {
1✔
4804
        let dir = tempdir().unwrap();
1✔
4805
        let mut config = test_config(dir.path().to_path_buf());
1✔
4806
        config.id_column = Some("id".into());
1✔
4807
        let source = test_source(config);
1✔
4808

4809
        let row = source
1✔
4810
            .parse_row(
1✔
4811
                5,
4812
                &json!({
1✔
4813
                    "id": "row-5",
1✔
4814
                    "title": "Anchor text",
1✔
4815
                    "body": "Context text",
1✔
4816
                    "flag": true
1✔
4817
                }),
1✔
4818
            )
4819
            .unwrap();
1✔
4820

4821
        assert_eq!(row.row_id.as_deref(), Some("row-5"));
1✔
4822
        assert!(row.text_fields.iter().any(|f| f.name == "title"));
3✔
4823
        assert!(row.text_fields.iter().any(|f| f.name == "body"));
1✔
4824
        assert!(row.text_fields.iter().all(|f| f.name != "id"));
3✔
4825
    }
1✔
4826

4827
    #[test]
4828
    fn parse_row_with_required_columns_errors_when_missing() {
1✔
4829
        let dir = tempdir().unwrap();
1✔
4830
        let mut config = test_config(dir.path().to_path_buf());
1✔
4831
        config.anchor_column = Some("anchor".into());
1✔
4832
        config.positive_column = Some("positive".into());
1✔
4833
        config.context_columns = vec!["context".into()];
1✔
4834
        let source = test_source(config);
1✔
4835

4836
        let err = source.parse_row(0, &json!({"anchor": "x", "context": "z"}));
1✔
4837
        assert!(err.is_err());
1✔
4838
    }
1✔
4839

4840
    #[test]
4841
    fn parse_row_errors_when_payload_is_not_object() {
1✔
4842
        let dir = tempdir().unwrap();
1✔
4843
        let config = test_config(dir.path().to_path_buf());
1✔
4844
        let source = test_source(config);
1✔
4845

4846
        let err = source.parse_row(0, &json!("not-an-object"));
1✔
4847
        assert!(err.is_err());
1✔
4848
    }
1✔
4849

4850
    #[test]
4851
    fn row_to_record_builds_expected_sections() {
1✔
4852
        let dir = tempdir().unwrap();
1✔
4853
        let config = test_config(dir.path().to_path_buf());
1✔
4854
        let source = test_source(config);
1✔
4855
        let row = RowView {
1✔
4856
            row_id: Some("abc".into()),
1✔
4857
            timestamp: Some(Utc::now()),
1✔
4858
            text_fields: vec![
1✔
4859
                RowTextField {
1✔
4860
                    name: "title".into(),
1✔
4861
                    text: "anchor".into(),
1✔
4862
                },
1✔
4863
                RowTextField {
1✔
4864
                    name: "pos".into(),
1✔
4865
                    text: "positive".into(),
1✔
4866
                },
1✔
4867
                RowTextField {
1✔
4868
                    name: "ctx".into(),
1✔
4869
                    text: "extra".into(),
1✔
4870
                },
1✔
4871
            ],
1✔
4872
        };
1✔
4873

4874
        let record = source.row_to_record(&row, 1).unwrap().unwrap();
1✔
4875
        assert_eq!(record.sections.len(), 3);
1✔
4876
        assert_eq!(record.sections[0].role, SectionRole::Anchor);
1✔
4877
        assert_eq!(record.sections[1].role, SectionRole::Context);
1✔
4878
        assert_eq!(record.id, "hf_test::abc");
1✔
4879
    }
1✔
4880

4881
    #[test]
4882
    fn effective_refresh_batch_target_uses_multiplier_floor_of_one() {
1✔
4883
        let dir = tempdir().unwrap();
1✔
4884
        let mut config = test_config(dir.path().to_path_buf());
1✔
4885
        config.refresh_batch_multiplier = 0;
1✔
4886
        let source = test_source(config);
1✔
4887
        assert_eq!(source.effective_refresh_batch_target(7), 7);
1✔
4888
    }
1✔
4889

4890
    #[test]
4891
    fn locate_shard_and_recompute_offsets_work() {
1✔
4892
        let mut shards = vec![
1✔
4893
            ShardIndex {
1✔
4894
                path: PathBuf::from("a"),
1✔
4895
                global_start: 10,
1✔
4896
                row_count: 3,
1✔
4897
                is_parquet: false,
1✔
4898
                parquet_row_groups: Vec::new(),
1✔
4899
                checkpoints: vec![0],
1✔
4900
            },
1✔
4901
            ShardIndex {
1✔
4902
                path: PathBuf::from("b"),
1✔
4903
                global_start: 20,
1✔
4904
                row_count: 2,
1✔
4905
                is_parquet: false,
1✔
4906
                parquet_row_groups: Vec::new(),
1✔
4907
                checkpoints: vec![0],
1✔
4908
            },
1✔
4909
        ];
4910
        let hit = HuggingFaceRowSource::locate_shard(&shards, 11).unwrap();
1✔
4911
        assert_eq!(hit.1, 1);
1✔
4912

4913
        let mut state = SourceState {
1✔
4914
            materialized_rows: 0,
1✔
4915
            total_rows: None,
1✔
4916
            shards: std::mem::take(&mut shards),
1✔
4917
            remote_candidates: None,
1✔
4918
            remote_candidate_sizes: HashMap::new(),
1✔
4919
            next_remote_idx: 0,
1✔
4920
        };
1✔
4921
        HuggingFaceRowSource::recompute_shard_offsets(&mut state);
1✔
4922
        assert_eq!(state.shards[0].global_start, 0);
1✔
4923
        assert_eq!(state.shards[1].global_start, 3);
1✔
4924
        assert_eq!(state.materialized_rows, 5);
1✔
4925
    }
1✔
4926

4927
    #[test]
4928
    fn len_hint_covers_known_and_empty_paths() {
1✔
4929
        let dir = tempdir().unwrap();
1✔
4930
        let mut config = test_config(dir.path().to_path_buf());
1✔
4931
        config.max_rows = Some(9);
1✔
4932
        let source = test_source(config);
1✔
4933

4934
        {
1✔
4935
            let mut state = source.state.lock().unwrap();
1✔
4936
            state.materialized_rows = 5;
1✔
4937
            state.total_rows = Some(100);
1✔
4938
        }
1✔
4939
        assert_eq!(source.len_hint(), Some(9));
1✔
4940

4941
        {
1✔
4942
            let mut state = source.state.lock().unwrap();
1✔
4943
            state.materialized_rows = 0;
1✔
4944
            state.total_rows = Some(0);
1✔
4945
        }
1✔
4946
        assert_eq!(source.len_hint(), Some(0));
1✔
4947
    }
1✔
4948

4949
    #[test]
4950
    fn len_hint_defaults_to_one_when_unknown_and_not_exhausted() {
1✔
4951
        let dir = tempdir().unwrap();
1✔
4952
        let config = test_config(dir.path().to_path_buf());
1✔
4953
        let source = test_source(config);
1✔
4954
        assert_eq!(source.len_hint(), Some(1));
1✔
4955
    }
1✔
4956

4957
    #[test]
4958
    fn read_line_at_reads_expected_row_with_checkpoints() {
1✔
4959
        let dir = tempdir().unwrap();
1✔
4960
        let path = dir.path().join("rows.jsonl");
1✔
4961
        let mut file = File::create(&path).unwrap();
1✔
4962
        file.write_all(b"{\"text\":\"a\"}\n").unwrap();
1✔
4963
        file.write_all(b"{\"text\":\"b\"}\n").unwrap();
1✔
4964
        file.write_all(b"{\"text\":\"c\"}\n").unwrap();
1✔
4965

4966
        let mut config = test_config(dir.path().to_path_buf());
1✔
4967
        config.checkpoint_stride = 1;
1✔
4968
        let source = test_source(config.clone());
1✔
4969
        let shard = HuggingFaceRowSource::index_single_shard(&config, &path, 0)
1✔
4970
            .unwrap()
1✔
4971
            .unwrap();
1✔
4972

4973
        let line = source.read_line_at(&shard, 2).unwrap();
1✔
4974
        assert!(line.contains("\"c\""));
1✔
4975
    }
1✔
4976

4977
    #[test]
4978
    fn read_line_at_errors_when_checkpoint_is_missing() {
1✔
4979
        let dir = tempdir().unwrap();
1✔
4980
        let path = dir.path().join("rows.jsonl");
1✔
4981
        fs::write(&path, b"{\"text\":\"a\"}\n").unwrap();
1✔
4982

4983
        let mut config = test_config(dir.path().to_path_buf());
1✔
4984
        config.checkpoint_stride = 1;
1✔
4985
        let source = test_source(config.clone());
1✔
4986
        let mut shard = HuggingFaceRowSource::index_single_shard(&config, &path, 0)
1✔
4987
            .unwrap()
1✔
4988
            .unwrap();
1✔
4989
        shard.checkpoints.clear();
1✔
4990

4991
        let err = source.read_line_at(&shard, 0);
1✔
4992
        assert!(err.is_err());
1✔
4993
    }
1✔
4994

4995
    #[test]
4996
    fn load_persisted_shard_sequence_clamps_next_index_to_candidate_len() {
1✔
4997
        let dir = tempdir().unwrap();
1✔
4998
        let config = test_config(dir.path().to_path_buf());
1✔
4999
        let state_path = HuggingFaceRowSource::shard_sequence_state_path(&config);
1✔
5000
        fs::create_dir_all(state_path.parent().unwrap()).unwrap();
1✔
5001
        fs::write(
1✔
5002
            &state_path,
1✔
5003
            serde_json::to_vec_pretty(&json!({
1✔
5004
                "version": 1,
1✔
5005
                "source_id": config.source_id,
1✔
5006
                "dataset": config.dataset,
1✔
5007
                "config": config.config,
1✔
5008
                "split": config.split,
1✔
5009
                "sampler_seed": 1,
1✔
5010
                "candidates": ["url::http://x/resolve/main/train/000.ndjson"],
1✔
5011
                "candidate_sizes": {},
1✔
5012
                "next_remote_idx": 99
1✔
5013
            }))
1✔
5014
            .unwrap(),
1✔
5015
        )
5016
        .unwrap();
1✔
5017

5018
        let loaded = HuggingFaceRowSource::load_persisted_shard_sequence(&config, 1)
1✔
5019
            .unwrap()
1✔
5020
            .unwrap();
1✔
5021
        assert_eq!(loaded.next_remote_idx, 1);
1✔
5022
    }
1✔
5023

5024
    #[test]
5025
    fn materialize_local_file_copies_and_is_idempotent_when_size_matches() {
1✔
5026
        let dir = tempdir().unwrap();
1✔
5027
        let config = test_config(dir.path().to_path_buf());
1✔
5028
        let src = dir.path().join("src.ndjson");
1✔
5029
        let dst = dir.path().join("nested/dst.ndjson");
1✔
5030

5031
        fs::write(&src, b"line\n").unwrap();
1✔
5032
        HuggingFaceRowSource::materialize_local_file(&config, &src, &dst).unwrap();
1✔
5033
        let first = fs::read(&dst).unwrap();
1✔
5034
        HuggingFaceRowSource::materialize_local_file(&config, &src, &dst).unwrap();
1✔
5035
        let second = fs::read(&dst).unwrap();
1✔
5036
        assert_eq!(first, second);
1✔
5037
    }
1✔
5038

5039
    #[test]
5040
    fn enforce_disk_cap_evicts_old_manifest_shards() {
1✔
5041
        let dir = tempdir().unwrap();
1✔
5042
        let mut config = test_config(dir.path().to_path_buf());
1✔
5043
        config.local_disk_cap_bytes = Some(10);
1✔
5044
        config.min_resident_shards = 0;
1✔
5045
        let source = test_source(config);
1✔
5046

5047
        let manifest_root = source.manifest_cache_root();
1✔
5048
        fs::create_dir_all(&manifest_root).unwrap();
1✔
5049
        let evict_path = manifest_root.join("a.parquet");
1✔
5050
        let keep_path = manifest_root.join("b.parquet");
1✔
5051
        fs::write(&evict_path, vec![1u8; 8]).unwrap();
1✔
5052
        fs::write(&keep_path, vec![2u8; 8]).unwrap();
1✔
5053

5054
        let mut state = SourceState {
1✔
5055
            materialized_rows: 16,
1✔
5056
            total_rows: None,
1✔
5057
            shards: vec![
1✔
5058
                ShardIndex {
1✔
5059
                    path: evict_path.clone(),
1✔
5060
                    global_start: 0,
1✔
5061
                    row_count: 8,
1✔
5062
                    is_parquet: true,
1✔
5063
                    parquet_row_groups: vec![(0, 8)],
1✔
5064
                    checkpoints: Vec::new(),
1✔
5065
                },
1✔
5066
                ShardIndex {
1✔
5067
                    path: keep_path.clone(),
1✔
5068
                    global_start: 8,
1✔
5069
                    row_count: 8,
1✔
5070
                    is_parquet: true,
1✔
5071
                    parquet_row_groups: vec![(0, 8)],
1✔
5072
                    checkpoints: Vec::new(),
1✔
5073
                },
1✔
5074
            ],
1✔
5075
            remote_candidates: None,
1✔
5076
            remote_candidate_sizes: HashMap::new(),
1✔
5077
            next_remote_idx: 0,
1✔
5078
        };
1✔
5079

5080
        let evicted = source
1✔
5081
            .enforce_disk_cap_locked(&mut state, &keep_path)
1✔
5082
            .unwrap();
1✔
5083
        assert!(evicted);
1✔
5084
        assert!(!evict_path.exists());
1✔
5085
        assert!(keep_path.exists());
1✔
5086
        assert_eq!(state.shards.len(), 1);
1✔
5087
    }
1✔
5088

5089
    #[test]
5090
    fn enforce_disk_cap_errors_when_min_resident_prevents_eviction() {
1✔
5091
        let dir = tempdir().unwrap();
1✔
5092
        let mut config = test_config(dir.path().to_path_buf());
1✔
5093
        config.local_disk_cap_bytes = Some(4);
1✔
5094
        config.min_resident_shards = 1;
1✔
5095
        let source = test_source(config);
1✔
5096

5097
        let manifest_root = source.manifest_cache_root();
1✔
5098
        fs::create_dir_all(&manifest_root).unwrap();
1✔
5099
        let protected = manifest_root.join("only.parquet");
1✔
5100
        fs::write(&protected, vec![1u8; 8]).unwrap();
1✔
5101

5102
        let mut state = SourceState {
1✔
5103
            materialized_rows: 8,
1✔
5104
            total_rows: None,
1✔
5105
            shards: vec![ShardIndex {
1✔
5106
                path: protected.clone(),
1✔
5107
                global_start: 0,
1✔
5108
                row_count: 8,
1✔
5109
                is_parquet: true,
1✔
5110
                parquet_row_groups: vec![(0, 8)],
1✔
5111
                checkpoints: Vec::new(),
1✔
5112
            }],
1✔
5113
            remote_candidates: None,
1✔
5114
            remote_candidate_sizes: HashMap::new(),
1✔
5115
            next_remote_idx: 0,
1✔
5116
        };
1✔
5117

5118
        let err = source.enforce_disk_cap_locked(&mut state, &protected);
1✔
5119
        assert!(err.is_err());
1✔
5120
        assert!(!protected.exists());
1✔
5121
    }
1✔
5122

5123
    #[test]
5124
    fn build_shard_index_discovers_local_jsonl_shards() {
1✔
5125
        let dir = tempdir().unwrap();
1✔
5126
        let root = dir.path().to_path_buf();
1✔
5127
        fs::write(root.join("a.jsonl"), b"{\"text\":\"a\"}\n").unwrap();
1✔
5128
        fs::write(root.join("b.ndjson"), b"{\"text\":\"b\"}\n").unwrap();
1✔
5129

5130
        let config = test_config(root.clone());
1✔
5131
        let (shards, discovered) = HuggingFaceRowSource::build_shard_index(&config).unwrap();
1✔
5132
        assert_eq!(discovered, 2);
1✔
5133
        assert_eq!(shards.len(), 2);
1✔
5134
    }
1✔
5135

5136
    #[test]
5137
    fn index_single_shard_returns_none_for_empty_file() {
1✔
5138
        let dir = tempdir().unwrap();
1✔
5139
        let config = test_config(dir.path().to_path_buf());
1✔
5140
        let path = dir.path().join("empty.jsonl");
1✔
5141
        fs::write(&path, b"").unwrap();
1✔
5142
        let shard = HuggingFaceRowSource::index_single_shard(&config, &path, 0).unwrap();
1✔
5143
        assert!(shard.is_none());
1✔
5144
    }
1✔
5145

5146
    #[test]
5147
    fn refresh_reads_local_rows_and_advances_cursor() {
1✔
5148
        let dir = tempdir().unwrap();
1✔
5149
        let path = dir.path().join("rows.jsonl");
1✔
5150
        fs::write(
1✔
5151
            &path,
1✔
5152
            b"{\"id\":\"r1\",\"text\":\"alpha\"}\n{\"id\":\"r2\",\"text\":\"beta\"}\n{\"id\":\"r3\",\"text\":\"gamma\"}\n",
5153
        )
5154
        .unwrap();
1✔
5155

5156
        let mut config = test_config(dir.path().to_path_buf());
1✔
5157
        config.checkpoint_stride = 1;
1✔
5158
        config.refresh_batch_multiplier = 1;
1✔
5159
        let source = test_source(config.clone());
1✔
5160
        let shard = HuggingFaceRowSource::index_single_shard(&config, &path, 0)
1✔
5161
            .unwrap()
1✔
5162
            .unwrap();
1✔
5163
        {
1✔
5164
            let mut state = source.state.lock().unwrap();
1✔
5165
            state.materialized_rows = shard.row_count;
1✔
5166
            state.total_rows = Some(shard.row_count);
1✔
5167
            state.shards = vec![shard];
1✔
5168
        }
1✔
5169

5170
        let snapshot = source.refresh(None, Some(2)).unwrap();
1✔
5171
        assert_eq!(snapshot.records.len(), 2);
1✔
5172
        assert!(snapshot.cursor.revision > 0);
1✔
5173
    }
1✔
5174

5175
    #[test]
5176
    fn reported_record_count_uses_len_hint_for_local_state() {
1✔
5177
        let dir = tempdir().unwrap();
1✔
5178
        let config = test_config(dir.path().to_path_buf());
1✔
5179
        let source = test_source(config);
1✔
5180
        {
1✔
5181
            let mut state = source.state.lock().unwrap();
1✔
5182
            state.materialized_rows = 4;
1✔
5183
            state.total_rows = Some(4);
1✔
5184
        }
1✔
5185
        assert_eq!(source.reported_record_count().unwrap(), 4);
1✔
5186
    }
1✔
5187

5188
    #[test]
5189
    fn rotate_candidates_deterministically_preserves_membership() {
1✔
5190
        let dir = tempdir().unwrap();
1✔
5191
        let config = test_config(dir.path().to_path_buf());
1✔
5192
        let original = vec!["a".to_string(), "b".to_string(), "c".to_string()];
1✔
5193
        let mut rotated = original.clone();
1✔
5194
        HuggingFaceRowSource::rotate_candidates_deterministically(&config, &mut rotated);
1✔
5195
        let mut sorted_original = original;
1✔
5196
        let mut sorted_rotated = rotated;
1✔
5197
        sorted_original.sort();
1✔
5198
        sorted_rotated.sort();
1✔
5199
        assert_eq!(sorted_rotated, sorted_original);
1✔
5200
    }
1✔
5201

5202
    #[test]
5203
    fn parse_row_supports_row_wrapped_payload_and_text_columns() {
1✔
5204
        let dir = tempdir().unwrap();
1✔
5205
        let mut config = test_config(dir.path().to_path_buf());
1✔
5206
        config.text_columns = vec!["headline".into(), "body".into()];
1✔
5207
        config.id_column = Some("rid".into());
1✔
5208
        let source = test_source(config);
1✔
5209

5210
        let parsed = source
1✔
5211
            .parse_row(
1✔
5212
                0,
5213
                &json!({"row": {"rid": "r-1", "headline": "h", "body": "b"}}),
1✔
5214
            )
5215
            .unwrap();
1✔
5216

5217
        assert_eq!(parsed.row_id.as_deref(), Some("r-1"));
1✔
5218
        assert_eq!(parsed.text_fields.len(), 2);
1✔
5219
        assert_eq!(parsed.text_fields[0].name, "headline");
1✔
5220
    }
1✔
5221

5222
    #[test]
5223
    fn row_to_record_returns_none_for_empty_fields() {
1✔
5224
        let dir = tempdir().unwrap();
1✔
5225
        let config = test_config(dir.path().to_path_buf());
1✔
5226
        let source = test_source(config);
1✔
5227
        let row = RowView {
1✔
5228
            row_id: Some("x".into()),
1✔
5229
            timestamp: None,
1✔
5230
            text_fields: Vec::new(),
1✔
5231
        };
1✔
5232
        assert!(source.row_to_record(&row, 0).unwrap().is_none());
1✔
5233
    }
1✔
5234

5235
    #[test]
5236
    fn ensure_row_available_handles_materialized_max_and_exhausted_candidates() {
1✔
5237
        let dir = tempdir().unwrap();
1✔
5238
        let mut config = test_config(dir.path().to_path_buf());
1✔
5239
        config.max_rows = Some(2);
1✔
5240
        let source = test_source(config);
1✔
5241
        {
1✔
5242
            let mut state = source.state.lock().unwrap();
1✔
5243
            state.materialized_rows = 1;
1✔
5244
            state.remote_candidates = Some(vec![]);
1✔
5245
            state.next_remote_idx = 0;
1✔
5246
        }
1✔
5247

5248
        assert!(source.ensure_row_available(0).unwrap());
1✔
5249
        assert!(!source.ensure_row_available(3).unwrap());
1✔
5250
        assert!(!source.ensure_row_available(1).unwrap());
1✔
5251
    }
1✔
5252

5253
    #[test]
5254
    fn read_row_batch_uses_cached_rows_and_respects_limit() {
1✔
5255
        let dir = tempdir().unwrap();
1✔
5256
        let config = test_config(dir.path().to_path_buf());
1✔
5257
        let source = test_source(config.clone());
1✔
5258

5259
        {
1✔
5260
            let mut state = source.state.lock().unwrap();
1✔
5261
            state.materialized_rows = 2;
1✔
5262
            state.total_rows = Some(2);
1✔
5263
        }
1✔
5264

5265
        let row0 = RowView {
1✔
5266
            row_id: Some("r0".into()),
1✔
5267
            timestamp: Some(Utc::now()),
1✔
5268
            text_fields: vec![RowTextField {
1✔
5269
                name: "text".into(),
1✔
5270
                text: "alpha".into(),
1✔
5271
            }],
1✔
5272
        };
1✔
5273
        let row1 = RowView {
1✔
5274
            row_id: Some("r1".into()),
1✔
5275
            timestamp: Some(Utc::now()),
1✔
5276
            text_fields: vec![RowTextField {
1✔
5277
                name: "text".into(),
1✔
5278
                text: "beta".into(),
1✔
5279
            }],
1✔
5280
        };
1✔
5281
        {
1✔
5282
            let mut cache = source.cache.lock().unwrap();
1✔
5283
            cache.insert(0, row0, config.cache_capacity);
1✔
5284
            cache.insert(1, row1, config.cache_capacity);
1✔
5285
        }
1✔
5286

5287
        let mut out = Vec::new();
1✔
5288
        source.read_row_batch(&[0, 1], &mut out, Some(1)).unwrap();
1✔
5289
        assert_eq!(out.len(), 1);
1✔
5290
    }
1✔
5291

5292
    #[test]
5293
    fn read_row_batch_errors_on_invalid_json_line() {
1✔
5294
        let dir = tempdir().unwrap();
1✔
5295
        let path = dir.path().join("broken.jsonl");
1✔
5296
        fs::write(&path, b"not-json\n").unwrap();
1✔
5297

5298
        let mut config = test_config(dir.path().to_path_buf());
1✔
5299
        config.checkpoint_stride = 1;
1✔
5300
        let source = test_source(config.clone());
1✔
5301
        let shard = HuggingFaceRowSource::index_single_shard(&config, &path, 0)
1✔
5302
            .unwrap()
1✔
5303
            .unwrap();
1✔
5304
        {
1✔
5305
            let mut state = source.state.lock().unwrap();
1✔
5306
            state.materialized_rows = 1;
1✔
5307
            state.total_rows = Some(1);
1✔
5308
            state.shards = vec![shard];
1✔
5309
        }
1✔
5310

5311
        let mut out = Vec::new();
1✔
5312
        let result = source.read_row_batch(&[0], &mut out, Some(1));
1✔
5313
        assert!(result.is_err());
1✔
5314
    }
1✔
5315

5316
    #[test]
5317
    fn build_shard_index_errors_when_no_matching_extensions() {
1✔
5318
        let dir = tempdir().unwrap();
1✔
5319
        fs::write(dir.path().join("data.txt"), b"x\n").unwrap();
1✔
5320
        let config = test_config(dir.path().to_path_buf());
1✔
5321
        let result = HuggingFaceRowSource::build_shard_index(&config);
1✔
5322
        assert!(result.is_err());
1✔
5323
    }
1✔
5324

5325
    #[test]
5326
    fn build_shard_index_honors_max_rows() {
1✔
5327
        let dir = tempdir().unwrap();
1✔
5328
        fs::write(
1✔
5329
            dir.path().join("rows.jsonl"),
1✔
5330
            b"{\"text\":\"1\"}\n{\"text\":\"2\"}\n{\"text\":\"3\"}\n",
5331
        )
5332
        .unwrap();
1✔
5333
        let mut config = test_config(dir.path().to_path_buf());
1✔
5334
        config.max_rows = Some(2);
1✔
5335

5336
        let (_, discovered) = HuggingFaceRowSource::build_shard_index(&config).unwrap();
1✔
5337
        assert_eq!(discovered, 2);
1✔
5338
    }
1✔
5339

5340
    #[test]
5341
    fn refresh_handles_empty_total_and_cursor_wrap() {
1✔
5342
        let dir = tempdir().unwrap();
1✔
5343
        let config = test_config(dir.path().to_path_buf());
1✔
5344
        let source = test_source(config.clone());
1✔
5345

5346
        {
1✔
5347
            let mut state = source.state.lock().unwrap();
1✔
5348
            state.materialized_rows = 0;
1✔
5349
            state.total_rows = Some(0);
1✔
5350
        }
1✔
5351
        let empty = source.refresh(None, Some(5)).unwrap();
1✔
5352
        assert!(empty.records.is_empty());
1✔
5353
        assert_eq!(empty.cursor.revision, 0);
1✔
5354

5355
        let path = dir.path().join("rows.jsonl");
1✔
5356
        fs::write(
1✔
5357
            &path,
1✔
5358
            b"{\"id\":\"a\",\"text\":\"A\"}\n{\"id\":\"b\",\"text\":\"B\"}\n",
5359
        )
5360
        .unwrap();
1✔
5361
        let mut cfg2 = config;
1✔
5362
        cfg2.checkpoint_stride = 1;
1✔
5363
        let source2 = test_source(cfg2.clone());
1✔
5364
        let shard = HuggingFaceRowSource::index_single_shard(&cfg2, &path, 0)
1✔
5365
            .unwrap()
1✔
5366
            .unwrap();
1✔
5367
        {
1✔
5368
            let mut state = source2.state.lock().unwrap();
1✔
5369
            state.materialized_rows = 2;
1✔
5370
            state.total_rows = Some(2);
1✔
5371
            state.shards = vec![shard];
1✔
5372
        }
1✔
5373
        let cursor = SourceCursor {
1✔
5374
            last_seen: Utc::now(),
1✔
5375
            revision: 99,
1✔
5376
        };
1✔
5377
        let snapshot = source2.refresh(Some(&cursor), Some(1)).unwrap();
1✔
5378
        assert_eq!(snapshot.records.len(), 1);
1✔
5379
    }
1✔
5380

5381
    #[test]
5382
    fn new_rejects_zero_checkpoint_stride() {
1✔
5383
        let dir = tempdir().unwrap();
1✔
5384
        let mut config = test_config(dir.path().to_path_buf());
1✔
5385
        config.checkpoint_stride = 0;
1✔
5386
        let result = HuggingFaceRowSource::new(config);
1✔
5387
        assert!(result.is_err());
1✔
5388
    }
1✔
5389

5390
    #[test]
5391
    fn parse_global_row_count_response_returns_none_when_split_missing() {
1✔
5392
        let dir = tempdir().unwrap();
1✔
5393
        let config = test_config(dir.path().to_path_buf());
1✔
5394
        let body = r#"{
1✔
5395
            "size": {
1✔
5396
                "splits": [
1✔
5397
                    {"config":"main","split":"test","num_rows":7}
1✔
5398
                ]
1✔
5399
            }
1✔
5400
        }"#;
1✔
5401

5402
        let parsed = HuggingFaceRowSource::parse_global_row_count_response(&config, body).unwrap();
1✔
5403
        assert_eq!(parsed, None);
1✔
5404
    }
1✔
5405

5406
    #[test]
5407
    fn extract_split_row_count_uses_config_num_rows_when_split_empty() {
1✔
5408
        let payload = serde_json::json!({
1✔
5409
            "size": {
1✔
5410
                "configs": [
1✔
5411
                    {
5412
                        "config": "main",
1✔
5413
                        "num_rows": 123,
1✔
5414
                        "splits": [
1✔
5415
                            {"split": "train", "num_rows": 999}
1✔
5416
                        ]
5417
                    }
5418
                ]
5419
            }
5420
        });
5421

5422
        let rows =
1✔
5423
            HuggingFaceRowSource::extract_split_row_count_from_size_response(&payload, "main", "");
1✔
5424
        assert_eq!(rows, Some(123));
1✔
5425
    }
1✔
5426

5427
    #[test]
5428
    fn extract_split_row_count_uses_dataset_num_rows_when_split_empty() {
1✔
5429
        let payload = serde_json::json!({
1✔
5430
            "size": {
1✔
5431
                "dataset": {
1✔
5432
                    "num_rows": 77
1✔
5433
                }
5434
            }
5435
        });
5436

5437
        let rows =
1✔
5438
            HuggingFaceRowSource::extract_split_row_count_from_size_response(&payload, "main", "");
1✔
5439
        assert_eq!(rows, Some(77));
1✔
5440
    }
1✔
5441

5442
    #[test]
5443
    fn refresh_order_uses_sampler_seed_for_local_rows() {
1✔
5444
        let dir = tempdir().unwrap();
1✔
5445
        let path = dir.path().join("rows.jsonl");
1✔
5446
        let mut payload = String::new();
1✔
5447
        for idx in 0..12 {
12✔
5448
            payload.push_str(&format!("{{\"id\":\"r{idx}\",\"text\":\"v{idx}\"}}\n"));
12✔
5449
        }
12✔
5450
        fs::write(&path, payload).unwrap();
1✔
5451

5452
        let mut config = test_config(dir.path().to_path_buf());
1✔
5453
        config.checkpoint_stride = 1;
1✔
5454
        config.refresh_batch_multiplier = 1;
1✔
5455

5456
        let source_a = test_source(config.clone());
1✔
5457
        let source_b = test_source(config.clone());
1✔
5458
        let source_c = test_source(config.clone());
1✔
5459
        let shard = HuggingFaceRowSource::index_single_shard(&config, &path, 0)
1✔
5460
            .unwrap()
1✔
5461
            .unwrap();
1✔
5462

5463
        for source in [&source_a, &source_b, &source_c] {
3✔
5464
            let mut state = source.state.lock().unwrap();
3✔
5465
            state.materialized_rows = 12;
3✔
5466
            state.total_rows = Some(12);
3✔
5467
            state.shards = vec![shard.clone()];
3✔
5468
        }
3✔
5469

5470
        let seed_1 = SamplerConfig {
1✔
5471
            seed: 7,
1✔
5472
            ..SamplerConfig::default()
1✔
5473
        };
1✔
5474
        let seed_2 = SamplerConfig {
1✔
5475
            seed: 7,
1✔
5476
            ..SamplerConfig::default()
1✔
5477
        };
1✔
5478
        let seed_3 = SamplerConfig {
1✔
5479
            seed: 123,
1✔
5480
            ..SamplerConfig::default()
1✔
5481
        };
1✔
5482

5483
        source_a.configure_sampler(&seed_1);
1✔
5484
        source_b.configure_sampler(&seed_2);
1✔
5485
        source_c.configure_sampler(&seed_3);
1✔
5486

5487
        let ids_a: Vec<String> = source_a
1✔
5488
            .refresh(None, Some(8))
1✔
5489
            .unwrap()
1✔
5490
            .records
1✔
5491
            .into_iter()
1✔
5492
            .map(|record| record.id)
1✔
5493
            .collect();
1✔
5494
        let ids_b: Vec<String> = source_b
1✔
5495
            .refresh(None, Some(8))
1✔
5496
            .unwrap()
1✔
5497
            .records
1✔
5498
            .into_iter()
1✔
5499
            .map(|record| record.id)
1✔
5500
            .collect();
1✔
5501
        let ids_c: Vec<String> = source_c
1✔
5502
            .refresh(None, Some(8))
1✔
5503
            .unwrap()
1✔
5504
            .records
1✔
5505
            .into_iter()
1✔
5506
            .map(|record| record.id)
1✔
5507
            .collect();
1✔
5508

5509
        assert_eq!(ids_a, ids_b);
1✔
5510
        assert_ne!(ids_a, ids_c);
1✔
5511
    }
1✔
5512
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc