• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jzombie / rust-triplets / 23912054502

02 Apr 2026 04:58PM UTC coverage: 95.31% (-0.01%) from 95.324%
23912054502

Pull #54

github

web-flow
Merge 56b60ce05 into 2a7c205da
Pull Request #54: Centralize tokenization

14 of 17 new or added lines in 4 files covered. (82.35%)

22 existing lines in 2 files now uncovered.

16805 of 17632 relevant lines covered (95.31%)

126004.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.02
/src/example_apps.rs
1
// TODO: Consider extracting to a debug crate
2

3
use std::collections::HashMap;
4
use std::error::Error;
5
use std::path::PathBuf;
6
use std::sync::Arc;
7
use std::sync::Once;
8
#[cfg(test)]
9
use std::sync::OnceLock;
10
use std::time::Instant;
11
#[cfg(test)]
12
use tempfile::TempDir;
13

14
use cache_manager::CacheRoot;
15
use clap::{Parser, ValueEnum, error::ErrorKind};
16

17
use crate::config::{ChunkingStrategy, SamplerConfig, TripletRecipe};
18
use crate::constants::cache::{MULTI_SOURCE_DEMO_GROUP, MULTI_SOURCE_DEMO_STORE_FILENAME};
19
use crate::data::ChunkView;
20
use crate::heuristics::{
21
    CapacityTotals, EFFECTIVE_NEGATIVES_PER_ANCHOR, EFFECTIVE_POSITIVES_PER_ANCHOR,
22
    estimate_source_split_capacity_from_counts, format_replay_factor, format_u128_with_commas,
23
    resolve_text_recipes_for_source, split_counts_for_total,
24
};
25
use crate::metrics::{chunk_proximity_score, source_skew, window_chunk_distance};
26
use crate::sampler::chunk_weight;
27
use crate::source::DataSource;
28
use crate::splits::{FileSplitStore, SplitLabel, SplitRatios, SplitStore};
29
use crate::{
30
    RecordChunk, SampleBatch, Sampler, SamplerError, SourceId, TextBatch, TextRecipe, TripletBatch,
31
    TripletSampler,
32
};
33

34
type DynSource = Box<dyn DataSource + 'static>;
35

36
#[cfg(feature = "extended-metrics")]
37
type MetricEntry = (f32, f32, f32, f32, f32);
38

39
#[cfg(feature = "extended-metrics")]
40
type SourceMetricsMap = HashMap<String, Vec<MetricEntry>>;
41

42
fn managed_demo_split_store_path() -> Result<PathBuf, String> {
2✔
43
    #[cfg(test)]
44
    {
45
        static TEST_CACHE_ROOT: OnceLock<TempDir> = OnceLock::new();
46
        let root = TEST_CACHE_ROOT.get_or_init(|| {
2✔
47
            TempDir::new().expect("failed to create test demo split-store cache root")
1✔
48
        });
1✔
49
        let cache_root = CacheRoot::from_root(root.path());
2✔
50
        let group = PathBuf::from(MULTI_SOURCE_DEMO_GROUP);
2✔
51
        let dir = cache_root.ensure_group(&group).map_err(|err| {
2✔
52
            format!(
×
53
                "failed creating managed demo cache group '{}': {err}",
54
                group.display()
×
55
            )
56
        })?;
×
57
        Ok(dir.join(MULTI_SOURCE_DEMO_STORE_FILENAME))
2✔
58
    }
59

60
    #[cfg(not(test))]
61
    {
62
        let cache_root = CacheRoot::from_discovery()
×
63
            .map_err(|err| format!("failed discovering managed cache root: {err}"))?;
×
64
        let group = PathBuf::from(MULTI_SOURCE_DEMO_GROUP);
×
65
        let dir = cache_root.ensure_group(&group).map_err(|err| {
×
66
            format!(
×
67
                "failed creating managed demo cache group '{}': {err}",
68
                group.display()
×
69
            )
70
        })?;
×
71
        Ok(dir.join(MULTI_SOURCE_DEMO_STORE_FILENAME))
×
72
    }
73
}
2✔
74

75
fn init_example_tracing() {
35✔
76
    static INIT: Once = Once::new();
77
    INIT.call_once(|| {
35✔
78
        let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
1✔
79
            .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("triplets=info"));
1✔
80
        let _ = tracing_subscriber::fmt()
1✔
81
            .with_env_filter(env_filter)
1✔
82
            .try_init();
1✔
83
    });
1✔
84
}
35✔
85

86
#[derive(Debug, Clone, Copy, ValueEnum)]
87
/// CLI split selector mapped onto `SplitLabel`.
88
enum SplitArg {
89
    Train,
90
    Validation,
91
    Test,
92
}
93

94
impl From<SplitArg> for SplitLabel {
95
    fn from(value: SplitArg) -> Self {
6✔
96
        match value {
6✔
97
            SplitArg::Train => SplitLabel::Train,
1✔
98
            SplitArg::Validation => SplitLabel::Validation,
4✔
99
            SplitArg::Test => SplitLabel::Test,
1✔
100
        }
101
    }
6✔
102
}
103

104
#[derive(Debug, Parser)]
105
#[command(
106
    name = "estimate_capacity",
107
    disable_help_subcommand = true,
108
    about = "Metadata-only capacity estimation",
109
    long_about = "Estimate record, pair, triplet, and text-sample capacity using source-reported counts only (no data refresh).",
110
    after_help = "Source roots are optional and resolved in order by explicit arg, environment variables, then project defaults."
111
)]
112
/// CLI arguments for metadata-only capacity estimation.
113
struct EstimateCapacityCli {
114
    #[arg(
115
        long,
116
        default_value_t = 99,
117
        help = "Deterministic seed used for split allocation"
118
    )]
119
    seed: u64,
120
    #[arg(
121
        long = "split-ratios",
122
        value_name = "TRAIN,VALIDATION,TEST",
123
        value_parser = parse_split_ratios_arg,
124
        default_value = "0.8,0.1,0.1",
125
        help = "Comma-separated split ratios that must sum to 1.0"
126
    )]
127
    split: SplitRatios,
128
    #[arg(
129
        long = "source-root",
130
        value_name = "PATH",
131
        help = "Optional source root override, repeat as needed in source order"
132
    )]
133
    source_roots: Vec<String>,
134
}
135

136
#[derive(Debug, Parser)]
137
#[command(
138
    name = "multi_source_demo",
139
    disable_help_subcommand = true,
140
    about = "Run sampled batches from multiple sources",
141
    long_about = "Sample triplet, pair, or text batches from multiple sources and persist split/epoch state.",
142
    after_help = "Source roots are optional and resolved in order by explicit arg, environment variables, then project defaults."
143
)]
144
/// CLI for `multi_source_demo`.
145
///
146
/// Common usage:
147
/// - Use managed cache-group default path (no flag)
148
/// - Set an explicit file path: `--split-store-path /tmp/split_store.bin`
149
/// - Repeat `--source-root <PATH>` to override source roots in order
150
struct MultiSourceDemoCli {
151
    #[arg(
152
        long = "text-recipes",
153
        help = "Emit a text batch instead of a triplet batch"
154
    )]
155
    show_text_samples: bool,
156
    #[arg(
157
        long = "pair-batch",
158
        help = "Emit a pair batch instead of a triplet batch"
159
    )]
160
    show_pair_samples: bool,
161
    #[arg(
162
        long = "list-text-recipes",
163
        help = "Print registered text recipes and exit"
164
    )]
165
    list_text_recipes: bool,
166
    #[arg(
167
        long = "batch-size",
168
        default_value_t = 4,
169
        value_parser = parse_batch_size,
170
        help = "Batch size used for sampling"
171
    )]
172
    batch_size: usize,
173
    #[arg(
174
        long = "ingestion-max-records",
175
        default_value_t = default_ingestion_max_records(),
176
        value_parser = parse_ingestion_max_records,
177
        help = "Per-source ingestion buffer target used while refreshing records"
178
    )]
179
    ingestion_max_records: usize,
180
    #[arg(long, help = "Optional deterministic seed override")]
181
    seed: Option<u64>,
182
    #[arg(long, value_enum, help = "Target split to sample from")]
183
    split: Option<SplitArg>,
184
    #[arg(
185
        long = "source-root",
186
        value_name = "PATH",
187
        help = "Optional source root override, repeat as needed in source order"
188
    )]
189
    source_roots: Vec<String>,
190
    #[arg(
191
        long = "split-store-path",
192
        value_name = "SPLIT_STORE_PATH",
193
        help = "Optional explicit path for persisted split/epoch state file"
194
    )]
195
    split_store_path: Option<PathBuf>,
196
    #[arg(
197
        long = "reset",
198
        help = "Delete the persisted split/epoch state before sampling, restarting from epoch 0"
199
    )]
200
    reset: bool,
201
    #[arg(
202
        long = "batches",
203
        value_name = "N",
204
        value_parser = parse_batch_count,
205
        help = "Run N triplet batches in succession, printing a timing line per batch and (with --features extended-metrics) a per-source similarity summary at the end"
206
    )]
207
    batches: Option<usize>,
208
}
209

210
#[derive(Debug, Clone)]
211
/// Source-level inventory used by capacity estimation output.
212
struct SourceInventory {
213
    source_id: String,
214
    reported_records: u128,
215
    triplet_recipes: Vec<TripletRecipe>,
216
}
217

218
/// Run the capacity-estimation CLI with injectable root resolution/source builders.
219
///
220
/// `build_sources` is construction-only; sampler configuration is applied
221
/// centrally by this function before any source calls.
222
pub fn run_estimate_capacity<R, Resolve, Build, I>(
7✔
223
    args_iter: I,
7✔
224
    resolve_roots: Resolve,
7✔
225
    build_sources: Build,
7✔
226
) -> Result<(), Box<dyn Error>>
7✔
227
where
7✔
228
    Resolve: FnOnce(Vec<String>) -> Result<R, Box<dyn Error>>,
7✔
229
    Build: FnOnce(&R) -> Vec<DynSource>,
7✔
230
    I: Iterator<Item = String>,
7✔
231
{
232
    init_example_tracing();
7✔
233

234
    let Some(cli) = parse_cli::<EstimateCapacityCli, _>(
7✔
235
        std::iter::once("estimate_capacity".to_string()).chain(args_iter),
7✔
236
    )?
1✔
237
    else {
238
        return Ok(());
1✔
239
    };
240

241
    let roots = resolve_roots(cli.source_roots)?;
5✔
242

243
    let config = SamplerConfig {
4✔
244
        seed: cli.seed,
4✔
245
        split: cli.split,
4✔
246
        ..SamplerConfig::default()
4✔
247
    };
4✔
248

249
    let sources = build_sources(&roots);
4✔
250

251
    let mut inventories = Vec::new();
4✔
252
    for source in &sources {
4✔
253
        let recipes = if config.recipes.is_empty() {
3✔
254
            source.default_triplet_recipes()
3✔
255
        } else {
256
            config.recipes.clone()
×
257
        };
258
        let reported_records = source.reported_record_count(&config).map_err(|err| {
3✔
259
            format!(
1✔
260
                "source '{}' failed to report exact record count: {err}",
261
                source.id()
1✔
262
            )
263
        })?;
1✔
264
        inventories.push(SourceInventory {
2✔
265
            source_id: source.id().to_string(),
2✔
266
            reported_records,
2✔
267
            triplet_recipes: recipes,
2✔
268
        });
2✔
269
    }
270

271
    let mut per_source_split_counts: HashMap<(String, SplitLabel), u128> = HashMap::new();
3✔
272
    let mut split_record_counts: HashMap<SplitLabel, u128> = HashMap::new();
3✔
273

274
    for source in &inventories {
3✔
275
        let counts = split_counts_for_total(source.reported_records, cli.split);
2✔
276
        for (label, count) in counts {
6✔
277
            per_source_split_counts.insert((source.source_id.clone(), label), count);
6✔
278
            *split_record_counts.entry(label).or_insert(0) += count;
6✔
279
        }
6✔
280
    }
281

282
    let mut totals_by_split: HashMap<SplitLabel, CapacityTotals> = HashMap::new();
3✔
283
    let mut totals_by_source_and_split: HashMap<(String, SplitLabel), CapacityTotals> =
3✔
284
        HashMap::new();
3✔
285

286
    for split_label in [SplitLabel::Train, SplitLabel::Validation, SplitLabel::Test] {
9✔
287
        let mut totals = CapacityTotals::default();
9✔
288

289
        for source in &inventories {
9✔
290
            let source_split_records = per_source_split_counts
6✔
291
                .get(&(source.source_id.clone(), split_label))
6✔
292
                .copied()
6✔
293
                .unwrap_or(0);
6✔
294

6✔
295
            let triplet_recipes = &source.triplet_recipes;
6✔
296
            let text_recipes = resolve_text_recipes_for_source(&config, triplet_recipes);
6✔
297

6✔
298
            let capacity = estimate_source_split_capacity_from_counts(
6✔
299
                source_split_records,
6✔
300
                triplet_recipes,
6✔
301
                &text_recipes,
6✔
302
            );
6✔
303

6✔
304
            totals_by_source_and_split.insert((source.source_id.clone(), split_label), capacity);
6✔
305

6✔
306
            totals.triplets += capacity.triplets;
6✔
307
            totals.effective_triplets += capacity.effective_triplets;
6✔
308
            totals.pairs += capacity.pairs;
6✔
309
            totals.text_samples += capacity.text_samples;
6✔
310
        }
6✔
311

312
        totals_by_split.insert(split_label, totals);
9✔
313
    }
314

315
    let min_nonzero_records_by_split: HashMap<SplitLabel, u128> =
3✔
316
        [SplitLabel::Train, SplitLabel::Validation, SplitLabel::Test]
3✔
317
            .into_iter()
3✔
318
            .map(|split_label| {
9✔
319
                let min_nonzero = inventories
9✔
320
                    .iter()
9✔
321
                    .filter_map(|source| {
9✔
322
                        per_source_split_counts
6✔
323
                            .get(&(source.source_id.clone(), split_label))
6✔
324
                            .copied()
6✔
325
                    })
6✔
326
                    .filter(|&records| records > 0)
9✔
327
                    .min()
9✔
328
                    .unwrap_or(0);
9✔
329
                (split_label, min_nonzero)
9✔
330
            })
9✔
331
            .collect();
3✔
332

333
    let min_nonzero_records_all_splits = inventories
3✔
334
        .iter()
3✔
335
        .map(|source| source.reported_records)
3✔
336
        .filter(|&records| records > 0)
3✔
337
        .min()
3✔
338
        .unwrap_or(0);
3✔
339

340
    println!("=== capacity estimate (length-only) ===");
3✔
341
    println!("mode: metadata-only (no source.refresh calls)");
3✔
342
    println!("classification: heuristic approximation (not exact)");
3✔
343
    println!("split seed: {}", cli.seed);
3✔
344
    println!(
3✔
345
        "split ratios: train={:.4}, validation={:.4}, test={:.4}",
346
        cli.split.train, cli.split.validation, cli.split.test
347
    );
348
    println!();
3✔
349

350
    println!("[SOURCES]");
3✔
351
    for source in &inventories {
3✔
352
        println!(
2✔
353
            "  {} => reported records: {}",
2✔
354
            source.source_id,
2✔
355
            format_u128_with_commas(source.reported_records)
2✔
356
        );
2✔
357
    }
2✔
358
    println!();
3✔
359

360
    println!("[PER SOURCE BREAKDOWN]");
3✔
361
    for source in &inventories {
3✔
362
        println!("  {}", source.source_id);
2✔
363
        let mut source_grand = CapacityTotals::default();
2✔
364
        let mut source_total_records = 0u128;
2✔
365
        for split_label in [SplitLabel::Train, SplitLabel::Validation, SplitLabel::Test] {
6✔
366
            let split_records = per_source_split_counts
6✔
367
                .get(&(source.source_id.clone(), split_label))
6✔
368
                .copied()
6✔
369
                .unwrap_or(0);
6✔
370
            source_total_records = source_total_records.saturating_add(split_records);
6✔
371
            let split_longest_records = inventories
6✔
372
                .iter()
6✔
373
                .map(|candidate| {
6✔
374
                    per_source_split_counts
6✔
375
                        .get(&(candidate.source_id.clone(), split_label))
6✔
376
                        .copied()
6✔
377
                        .unwrap_or(0)
6✔
378
                })
6✔
379
                .max()
6✔
380
                .unwrap_or(0);
6✔
381
            let totals = totals_by_source_and_split
6✔
382
                .get(&(source.source_id.clone(), split_label))
6✔
383
                .copied()
6✔
384
                .unwrap_or_default();
6✔
385
            source_grand.triplets += totals.triplets;
6✔
386
            source_grand.effective_triplets += totals.effective_triplets;
6✔
387
            source_grand.pairs += totals.pairs;
6✔
388
            source_grand.text_samples += totals.text_samples;
6✔
389
            println!("    [{:?}]", split_label);
6✔
390
            println!("      records: {}", format_u128_with_commas(split_records));
6✔
391
            println!(
6✔
392
                "      triplet combinations: {}",
393
                format_u128_with_commas(totals.triplets)
6✔
394
            );
395
            println!(
6✔
396
                "      effective sampled triplets (p={}, k={}): {}",
397
                EFFECTIVE_POSITIVES_PER_ANCHOR,
398
                EFFECTIVE_NEGATIVES_PER_ANCHOR,
399
                format_u128_with_commas(totals.effective_triplets)
6✔
400
            );
401
            println!(
6✔
402
                "      pair combinations:    {}",
403
                format_u128_with_commas(totals.pairs)
6✔
404
            );
405
            println!(
6✔
406
                "      text samples:         {}",
407
                format_u128_with_commas(totals.text_samples)
6✔
408
            );
409
            println!(
6✔
410
                "      replay factor vs longest source: {}",
411
                format_replay_factor(split_longest_records, split_records)
6✔
412
            );
413
            println!(
6✔
414
                "      suggested proportional-size batch weight (0-1): {:.4}",
415
                suggested_balancing_weight(split_longest_records, split_records)
6✔
416
            );
417
            let split_smallest_nonzero = min_nonzero_records_by_split
6✔
418
                .get(&split_label)
6✔
419
                .copied()
6✔
420
                .unwrap_or(0);
6✔
421
            println!(
6✔
422
                "      suggested small-source-boost batch weight (0-1): {:.4}",
423
                suggested_oversampling_weight(split_smallest_nonzero, split_records)
6✔
424
            );
425
            println!();
6✔
426
        }
427
        let longest_source_total = inventories
2✔
428
            .iter()
2✔
429
            .map(|candidate| candidate.reported_records)
2✔
430
            .max()
2✔
431
            .unwrap_or(0);
2✔
432
        println!("    [ALL SPLITS FOR SOURCE]");
2✔
433
        println!(
2✔
434
            "      triplet combinations: {}",
435
            format_u128_with_commas(source_grand.triplets)
2✔
436
        );
437
        println!(
2✔
438
            "      effective sampled triplets (p={}, k={}): {}",
439
            EFFECTIVE_POSITIVES_PER_ANCHOR,
440
            EFFECTIVE_NEGATIVES_PER_ANCHOR,
441
            format_u128_with_commas(source_grand.effective_triplets)
2✔
442
        );
443
        println!(
2✔
444
            "      pair combinations:    {}",
445
            format_u128_with_commas(source_grand.pairs)
2✔
446
        );
447
        println!(
2✔
448
            "      text samples:         {}",
449
            format_u128_with_commas(source_grand.text_samples)
2✔
450
        );
451
        println!(
2✔
452
            "      replay factor vs longest source: {}",
453
            format_replay_factor(longest_source_total, source_total_records)
2✔
454
        );
455
        println!(
2✔
456
            "      suggested proportional-size batch weight (0-1): {:.4}",
457
            suggested_balancing_weight(longest_source_total, source_total_records)
2✔
458
        );
459
        println!(
2✔
460
            "      suggested small-source-boost batch weight (0-1): {:.4}",
461
            suggested_oversampling_weight(min_nonzero_records_all_splits, source_total_records)
2✔
462
        );
463
        println!();
2✔
464
    }
465

466
    let mut grand = CapacityTotals::default();
3✔
467
    for split_label in [SplitLabel::Train, SplitLabel::Validation, SplitLabel::Test] {
9✔
468
        let record_count = split_record_counts.get(&split_label).copied().unwrap_or(0);
9✔
469
        let totals = totals_by_split
9✔
470
            .get(&split_label)
9✔
471
            .copied()
9✔
472
            .unwrap_or_default();
9✔
473

9✔
474
        grand.triplets += totals.triplets;
9✔
475
        grand.effective_triplets += totals.effective_triplets;
9✔
476
        grand.pairs += totals.pairs;
9✔
477
        grand.text_samples += totals.text_samples;
9✔
478

9✔
479
        println!("[{:?}]", split_label);
9✔
480
        println!("  records: {}", format_u128_with_commas(record_count));
9✔
481
        println!(
9✔
482
            "  triplet combinations: {}",
9✔
483
            format_u128_with_commas(totals.triplets)
9✔
484
        );
9✔
485
        println!(
9✔
486
            "  effective sampled triplets (p={}, k={}): {}",
9✔
487
            EFFECTIVE_POSITIVES_PER_ANCHOR,
9✔
488
            EFFECTIVE_NEGATIVES_PER_ANCHOR,
9✔
489
            format_u128_with_commas(totals.effective_triplets)
9✔
490
        );
9✔
491
        println!(
9✔
492
            "  pair combinations:    {}",
9✔
493
            format_u128_with_commas(totals.pairs)
9✔
494
        );
9✔
495
        println!(
9✔
496
            "  text samples:         {}",
9✔
497
            format_u128_with_commas(totals.text_samples)
9✔
498
        );
9✔
499
        println!();
9✔
500
    }
9✔
501

502
    println!("[ALL SPLITS TOTAL]");
3✔
503
    println!(
3✔
504
        "  triplet combinations: {}",
505
        format_u128_with_commas(grand.triplets)
3✔
506
    );
507
    println!(
3✔
508
        "  effective sampled triplets (p={}, k={}): {}",
509
        EFFECTIVE_POSITIVES_PER_ANCHOR,
510
        EFFECTIVE_NEGATIVES_PER_ANCHOR,
511
        format_u128_with_commas(grand.effective_triplets)
3✔
512
    );
513
    println!(
3✔
514
        "  pair combinations:    {}",
515
        format_u128_with_commas(grand.pairs)
3✔
516
    );
517
    println!(
3✔
518
        "  text samples:         {}",
519
        format_u128_with_commas(grand.text_samples)
3✔
520
    );
521
    println!();
3✔
522
    println!(
3✔
523
        "Note: counts are heuristic, length-based estimates from source-reported totals and recipe structure. They are approximate, not exact, and assume anchor-positive pairs=records (one positive per anchor by default), negatives=source_records_in_split-1 (anchor excluded as its own negative), and at most one chunk/window realization per sample. In real-world chunked sampling, practical combinations are often higher, so treat this as a floor-like baseline."
524
    );
525
    println!();
3✔
526
    println!(
3✔
527
        "Effective sampled triplets apply a bounded training assumption: effective_triplets = records * p * k per triplet recipe, with defaults p={} positives per anchor and k={} negatives per anchor.",
528
        EFFECTIVE_POSITIVES_PER_ANCHOR, EFFECTIVE_NEGATIVES_PER_ANCHOR
529
    );
530
    println!();
3✔
531
    println!(
3✔
532
        "Oversample loops are not inferred from this static report. To measure true oversampling (how many times sampling loops through the combination space), use observed sampled draw counts from an actual run."
533
    );
534
    println!();
3✔
535
    println!(
3✔
536
        "Suggested proportional-size batch weight (0-1) is source/max_source by record count: 1.0 for the largest source in scope, smaller values for smaller sources."
537
    );
538
    println!();
3✔
539
    println!(
3✔
540
        "Suggested small-source-boost batch weight (0-1) is min_nonzero_source/source by record count: 1.0 for the smallest non-zero source in scope, smaller values for larger sources."
541
    );
542
    println!();
3✔
543
    println!(
3✔
544
        "When passed to next_*_batch_with_weights, higher weight means that source is sampled more often relative to lower-weight sources."
545
    );
546

547
    Ok(())
3✔
548
}
7✔
549

550
/// Run the multi-source demo CLI with injectable root resolution/source builders.
551
///
552
/// `build_sources` is construction-only. Source sampler configuration is owned
553
/// by sampler registration (`TripletSampler::register_source`).
554
pub fn run_multi_source_demo<R, Resolve, Build, I>(
28✔
555
    args_iter: I,
28✔
556
    resolve_roots: Resolve,
28✔
557
    build_sources: Build,
28✔
558
) -> Result<(), Box<dyn Error>>
28✔
559
where
28✔
560
    Resolve: FnOnce(Vec<String>) -> Result<R, Box<dyn Error>>,
28✔
561
    Build: FnOnce(&R) -> Vec<DynSource>,
28✔
562
    I: Iterator<Item = String>,
28✔
563
{
564
    init_example_tracing();
28✔
565

566
    let Some(cli) = parse_cli::<MultiSourceDemoCli, _>(
28✔
567
        std::iter::once("multi_source_demo".to_string()).chain(args_iter),
28✔
568
    )?
1✔
569
    else {
570
        return Ok(());
1✔
571
    };
572

573
    let roots = resolve_roots(cli.source_roots)?;
26✔
574

575
    let mut config = SamplerConfig::default();
24✔
576
    config.seed = cli.seed.unwrap_or(config.seed);
24✔
577
    config.batch_size = cli.batch_size;
24✔
578
    config.ingestion_max_records = cli.ingestion_max_records;
24✔
579
    config.chunking = Default::default();
24✔
580
    let selected_split = cli.split.map(Into::into).unwrap_or(SplitLabel::Train);
24✔
581
    config.split = SplitRatios::default();
24✔
582
    config.allowed_splits = vec![selected_split];
24✔
583
    let chunking = config.chunking.clone();
24✔
584
    let config_snapshot = MultiSourceDemoConfigSnapshot {
24✔
585
        seed: config.seed,
24✔
586
        batch_size: config.batch_size,
24✔
587
        ingestion_max_records: config.ingestion_max_records,
24✔
588
        split: selected_split,
24✔
589
        split_ratios: config.split,
24✔
590
        max_window_tokens: config.chunking.max_window_tokens,
24✔
591
        overlap_tokens: config.chunking.overlap_tokens.clone(),
24✔
592
        summary_fallback_tokens: config.chunking.summary_fallback_tokens,
24✔
593
    };
24✔
594

595
    let split_store_path = if let Some(path) = cli.split_store_path {
24✔
596
        path
23✔
597
    } else {
598
        managed_demo_split_store_path().map_err(|err| {
1✔
599
            Box::<dyn Error>::from(format!("failed to resolve demo split-store path: {err}"))
×
600
        })?
×
601
    };
602

603
    if cli.reset && split_store_path.exists() {
24✔
604
        std::fs::remove_file(&split_store_path).map_err(|err| {
2✔
605
            Box::<dyn Error>::from(format!(
1✔
606
                "failed to remove split store '{}': {err}",
1✔
607
                split_store_path.display()
1✔
608
            ))
1✔
609
        })?;
1✔
610
        println!("Reset: removed {}", split_store_path.display());
1✔
611
    }
22✔
612
    println!(
23✔
613
        "Persisting split assignments and epoch state to {}",
614
        split_store_path.display()
23✔
615
    );
616
    let sources = build_sources(&roots);
23✔
617
    let split_store = Arc::new(FileSplitStore::open(&split_store_path, config.split, 99)?);
23✔
618
    let sampler = TripletSampler::new(config, split_store.clone());
23✔
619
    for source in sources {
23✔
620
        sampler.register_source(source);
22✔
621
    }
22✔
622

623
    if cli.show_pair_samples {
23✔
624
        match sampler.next_pair_batch(selected_split) {
7✔
625
            Ok(pair_batch) => {
2✔
626
                if pair_batch.pairs.is_empty() {
2✔
627
                    println!("Pair sampling produced no results.");
×
628
                } else {
2✔
629
                    print_pair_batch(&chunking, &pair_batch, split_store.as_ref());
2✔
630
                }
2✔
631
                sampler.save_sampler_state(None)?;
2✔
632
            }
633
            Err(SamplerError::Exhausted(name)) => {
5✔
634
                eprintln!(
5✔
635
                    "Pair sampler exhausted recipe '{}'. Ensure both positive and negative examples exist.",
5✔
636
                    name
5✔
637
                );
5✔
638
            }
5✔
639
            Err(err) => return Err(err.into()),
×
640
        }
641
    } else if cli.show_text_samples {
16✔
642
        match sampler.next_text_batch(selected_split) {
4✔
643
            Ok(text_batch) => {
1✔
644
                if text_batch.samples.is_empty() {
1✔
645
                    println!(
×
646
                        "Text sampling produced no results. Ensure each source has eligible sections."
×
647
                    );
×
648
                } else {
1✔
649
                    print_text_batch(&chunking, &text_batch, split_store.as_ref());
1✔
650
                }
1✔
651
                sampler.save_sampler_state(None)?;
1✔
652
            }
653
            Err(SamplerError::Exhausted(name)) => {
3✔
654
                eprintln!(
3✔
655
                    "Text sampler exhausted selector '{}'. Ensure matching sections exist.",
3✔
656
                    name
3✔
657
                );
3✔
658
            }
3✔
659
            Err(err) => return Err(err.into()),
×
660
        }
661
    } else if cli.list_text_recipes {
12✔
662
        let recipes = sampler.text_recipes();
4✔
663
        if recipes.is_empty() {
4✔
664
            println!(
2✔
665
                "No text recipes registered. Ensure your sources expose triplet selectors or configure text_recipes explicitly."
2✔
666
            );
2✔
667
        } else {
2✔
668
            print_text_recipes(&recipes);
2✔
669
        }
2✔
670
    } else if let Some(batch_count) = cli.batches {
8✔
671
        print_demo_config(&config_snapshot);
3✔
672
        println!("=== benchmark: {} triplet batches ===", batch_count);
3✔
673

674
        // source_id -> Vec<(pos_jaccard, pos_byte_cosine, neg_jaccard, neg_byte_cosine, pos_proximity)>
675
        #[cfg(feature = "extended-metrics")]
676
        let mut source_metrics: SourceMetricsMap = HashMap::new();
3✔
677

678
        for i in 0..batch_count {
4✔
679
            let t0 = Instant::now();
4✔
680
            match sampler.next_triplet_batch(selected_split) {
4✔
681
                Ok(batch) => {
2✔
682
                    let elapsed = t0.elapsed();
2✔
683
                    let n = batch.triplets.len();
2✔
684
                    println!(
2✔
685
                        "batch {:>4}  triplets={:<4}  elapsed={:>8.2}ms  per_triplet={:.2}ms",
686
                        i + 1,
2✔
687
                        n,
688
                        elapsed.as_secs_f64() * 1000.0,
2✔
689
                        if n > 0 {
2✔
690
                            elapsed.as_secs_f64() * 1000.0 / n as f64
2✔
691
                        } else {
692
                            0.0
×
693
                        },
694
                    );
695
                    #[cfg(feature = "extended-metrics")]
696
                    {
697
                        use crate::metrics::lexical_similarity_scores;
698
                        for triplet in &batch.triplets {
8✔
699
                            let (pj, pc) = lexical_similarity_scores(
8✔
700
                                &triplet.anchor.text,
8✔
701
                                &triplet.positive.text,
8✔
702
                            );
8✔
703
                            let (nj, nc) = lexical_similarity_scores(
8✔
704
                                &triplet.anchor.text,
8✔
705
                                &triplet.negative.text,
8✔
706
                            );
8✔
707
                            let proximity =
8✔
708
                                chunk_proximity_score(&triplet.anchor, &triplet.positive);
8✔
709
                            let source = extract_source(&triplet.anchor.record_id);
8✔
710
                            source_metrics
8✔
711
                                .entry(source)
8✔
712
                                .or_default()
8✔
713
                                .push((pj, pc, nj, nc, proximity));
8✔
714
                        }
8✔
715
                    }
716
                }
717
                Err(SamplerError::Exhausted(name)) => {
2✔
718
                    println!(
2✔
719
                        "batch {:>4}  exhausted recipe '{}' — stopping early",
720
                        i + 1,
2✔
721
                        name
722
                    );
723
                    break;
2✔
724
                }
725
                Err(err) => return Err(err.into()),
×
726
            }
727
        }
728

729
        sampler.save_sampler_state(None)?;
3✔
730

731
        #[cfg(feature = "extended-metrics")]
732
        if !source_metrics.is_empty() {
3✔
733
            println!();
1✔
734
            print_metric_summary(&source_metrics);
1✔
735
        }
2✔
736

737
        #[cfg(all(feature = "extended-metrics", feature = "bm25-mining"))]
738
        {
739
            let (fallback, total) = sampler.bm25_fallback_stats();
3✔
740
            if total > 0 {
3✔
741
                let pct = fallback as f64 / total as f64 * 100.0;
1✔
742
                println!("bm25 fallback rate : {}/{} ({:.1}%)", fallback, total, pct);
1✔
743
            }
2✔
744
        }
745
    } else {
746
        match sampler.next_triplet_batch(selected_split) {
5✔
747
            Ok(triplet_batch) => {
1✔
748
                if triplet_batch.triplets.is_empty() {
1✔
749
                    println!(
×
750
                        "Triplet sampling produced no results. Ensure multiple records per source exist."
×
751
                    );
×
752
                } else {
1✔
753
                    print_triplet_batch(&chunking, &triplet_batch, split_store.as_ref());
1✔
754
                }
1✔
755
                sampler.save_sampler_state(None)?;
1✔
756
                #[cfg(all(feature = "extended-metrics", feature = "bm25-mining"))]
757
                {
758
                    let (fallback, total) = sampler.bm25_fallback_stats();
1✔
759
                    if total > 0 {
1✔
760
                        let pct = fallback as f64 / total as f64 * 100.0;
1✔
761
                        println!("bm25 fallback rate : {}/{} ({:.1}%)", fallback, total, pct);
1✔
762
                    }
1✔
763
                }
764
            }
765
            Err(SamplerError::Exhausted(name)) => {
4✔
766
                eprintln!(
4✔
767
                    "Triplet sampler exhausted recipe '{}'. Ensure both positive and negative examples exist.",
4✔
768
                    name
4✔
769
                );
4✔
770
            }
4✔
771
            Err(err) => return Err(err.into()),
×
772
        }
773
    }
774

775
    Ok(())
23✔
776
}
28✔
777

778
struct MultiSourceDemoConfigSnapshot {
779
    seed: u64,
780
    batch_size: usize,
781
    ingestion_max_records: usize,
782
    split: SplitLabel,
783
    split_ratios: SplitRatios,
784
    max_window_tokens: usize,
785
    overlap_tokens: Vec<usize>,
786
    summary_fallback_tokens: usize,
787
}
788

789
fn print_demo_config(cfg: &MultiSourceDemoConfigSnapshot) {
3✔
790
    let overlaps: Vec<String> = cfg.overlap_tokens.iter().map(|t| t.to_string()).collect();
3✔
791
    println!("=== sampler config ===");
3✔
792
    println!("seed                 : {}", cfg.seed);
3✔
793
    println!("batch_size           : {}", cfg.batch_size);
3✔
794
    println!("ingestion_max_records: {}", cfg.ingestion_max_records);
3✔
795
    println!("split                : {:?}", cfg.split);
3✔
796
    println!(
3✔
797
        "split_ratios         : train={:.2} val={:.2} test={:.2}",
798
        cfg.split_ratios.train, cfg.split_ratios.validation, cfg.split_ratios.test
799
    );
800
    println!("max_window_tokens    : {}", cfg.max_window_tokens);
3✔
801
    println!("overlap_tokens       : [{}]", overlaps.join(", "));
3✔
802
    println!(
3✔
803
        "summary_fallback     : {} tokens (0 = disabled)",
804
        cfg.summary_fallback_tokens
805
    );
806
    println!();
3✔
807
}
3✔
808

809
fn default_ingestion_max_records() -> usize {
1✔
810
    SamplerConfig::default().ingestion_max_records
1✔
811
}
1✔
812

813
fn parse_positive_usize_flag(raw: &str, flag: &str) -> Result<usize, String> {
65✔
814
    let parsed = raw.parse::<usize>().map_err(|_| {
65✔
815
        format!(
1✔
816
            "Could not parse {} value '{}' as a positive integer",
817
            flag, raw
818
        )
819
    })?;
1✔
820
    if parsed == 0 {
64✔
821
        return Err(format!("{} must be greater than zero", flag));
5✔
822
    }
59✔
823
    Ok(parsed)
59✔
824
}
65✔
825

826
fn parse_batch_size(raw: &str) -> Result<usize, String> {
31✔
827
    parse_positive_usize_flag(raw, "--batch-size")
31✔
828
}
31✔
829

830
fn parse_ingestion_max_records(raw: &str) -> Result<usize, String> {
30✔
831
    parse_positive_usize_flag(raw, "--ingestion-max-records")
30✔
832
}
30✔
833

834
fn parse_batch_count(raw: &str) -> Result<usize, String> {
4✔
835
    parse_positive_usize_flag(raw, "--batches")
4✔
836
}
4✔
837

838
fn suggested_balancing_weight(max_baseline: u128, source_baseline: u128) -> f32 {
13✔
839
    if max_baseline == 0 || source_baseline == 0 {
13✔
840
        return 0.0;
4✔
841
    }
9✔
842
    (source_baseline as f64 / max_baseline as f64).clamp(0.0, 1.0) as f32
9✔
843
}
13✔
844

845
fn suggested_oversampling_weight(min_nonzero_baseline: u128, source_baseline: u128) -> f32 {
13✔
846
    if min_nonzero_baseline == 0 || source_baseline == 0 {
13✔
847
        return 0.0;
4✔
848
    }
9✔
849
    (min_nonzero_baseline as f64 / source_baseline as f64).clamp(0.0, 1.0) as f32
9✔
850
}
13✔
851

852
fn parse_cli<T, I>(args: I) -> Result<Option<T>, Box<dyn Error>>
42✔
853
where
42✔
854
    T: Parser,
42✔
855
    I: IntoIterator,
42✔
856
    I::Item: Into<std::ffi::OsString> + Clone,
42✔
857
{
858
    match T::try_parse_from(args) {
42✔
859
        Ok(cli) => Ok(Some(cli)),
32✔
860
        Err(err) => match err.kind() {
10✔
861
            ErrorKind::DisplayHelp | ErrorKind::DisplayVersion => {
862
                err.print()?;
5✔
863
                Ok(None)
5✔
864
            }
865
            _ => Err(err.into()),
5✔
866
        },
867
    }
868
}
42✔
869

870
fn parse_split_ratios_arg(raw: &str) -> Result<SplitRatios, String> {
12✔
871
    let parts: Vec<&str> = raw.split(',').collect();
12✔
872
    if parts.len() != 3 {
12✔
873
        return Err("--split-ratios expects exactly 3 comma-separated values".to_string());
1✔
874
    }
11✔
875
    let train = parts[0]
11✔
876
        .trim()
11✔
877
        .parse::<f32>()
11✔
878
        .map_err(|_| format!("invalid train ratio '{}': must be a float", parts[0].trim()))?;
11✔
879
    let validation = parts[1].trim().parse::<f32>().map_err(|_| {
10✔
880
        format!(
1✔
881
            "invalid validation ratio '{}': must be a float",
882
            parts[1].trim()
1✔
883
        )
884
    })?;
1✔
885
    let test = parts[2]
9✔
886
        .trim()
9✔
887
        .parse::<f32>()
9✔
888
        .map_err(|_| format!("invalid test ratio '{}': must be a float", parts[2].trim()))?;
9✔
889
    let ratios = SplitRatios {
8✔
890
        train,
8✔
891
        validation,
8✔
892
        test,
8✔
893
    };
8✔
894
    let sum = ratios.train + ratios.validation + ratios.test;
8✔
895
    if (sum - 1.0).abs() > 1e-5 {
8✔
896
        return Err(format!(
1✔
897
            "split ratios must sum to 1.0, got {:.6} (train={}, validation={}, test={})",
1✔
898
            sum, ratios.train, ratios.validation, ratios.test
1✔
899
        ));
1✔
900
    }
7✔
901
    if ratios.train < 0.0 || ratios.validation < 0.0 || ratios.test < 0.0 {
7✔
902
        return Err("split ratios must be non-negative".to_string());
1✔
903
    }
6✔
904
    Ok(ratios)
6✔
905
}
12✔
906

907
fn print_triplet_batch(
2✔
908
    strategy: &ChunkingStrategy,
2✔
909
    batch: &TripletBatch,
2✔
910
    split_store: &impl SplitStore,
2✔
911
) {
2✔
912
    println!("=== triplet batch ===");
2✔
913
    for (idx, triplet) in batch.triplets.iter().enumerate() {
5✔
914
        println!("--- triplet #{} ---", idx);
5✔
915
        println!("recipe       : {}", triplet.recipe);
5✔
916
        println!("sample_weight: {:.4}", triplet.weight);
5✔
917
        if let Some(instr) = &triplet.instruction {
5✔
918
            println!("instruction shown to model:\n{}\n", instr);
1✔
919
        }
4✔
920
        let pos_proximity = chunk_proximity_score(&triplet.anchor, &triplet.positive);
5✔
921
        let pos_distance = window_chunk_distance(&triplet.anchor, &triplet.positive);
5✔
922
        #[cfg(feature = "extended-metrics")]
923
        let (pos_sim, neg_sim) = {
5✔
924
            use crate::metrics::lexical_similarity_scores;
925
            (
5✔
926
                Some(lexical_similarity_scores(
5✔
927
                    &triplet.anchor.text,
5✔
928
                    &triplet.positive.text,
5✔
929
                )),
5✔
930
                Some(lexical_similarity_scores(
5✔
931
                    &triplet.anchor.text,
5✔
932
                    &triplet.negative.text,
5✔
933
                )),
5✔
934
            )
5✔
935
        };
936
        #[cfg(not(feature = "extended-metrics"))]
937
        let (pos_sim, neg_sim): (Option<(f32, f32)>, Option<(f32, f32)>) = (None, None);
938
        print_chunk_block(
5✔
939
            "ANCHOR",
5✔
940
            &triplet.anchor,
5✔
941
            strategy,
5✔
942
            split_store,
5✔
943
            None,
5✔
944
            None,
5✔
945
            None,
5✔
946
        );
947
        print_chunk_block(
5✔
948
            "POSITIVE",
5✔
949
            &triplet.positive,
5✔
950
            strategy,
5✔
951
            split_store,
5✔
952
            pos_sim,
5✔
953
            Some(pos_proximity),
5✔
954
            pos_distance,
5✔
955
        );
956
        print_chunk_block(
5✔
957
            "NEGATIVE",
5✔
958
            &triplet.negative,
5✔
959
            strategy,
5✔
960
            split_store,
5✔
961
            neg_sim,
5✔
962
            None,
5✔
963
            None,
5✔
964
        );
965
    }
966
    print_source_summary(
2✔
967
        "triplet anchors",
2✔
968
        batch
2✔
969
            .triplets
2✔
970
            .iter()
2✔
971
            .map(|triplet| triplet.anchor.record_id.as_str()),
5✔
972
    );
973
    print_recipe_context_by_source(
2✔
974
        "triplet recipes by source",
2✔
975
        batch
2✔
976
            .triplets
2✔
977
            .iter()
2✔
978
            .map(|triplet| (triplet.anchor.record_id.as_str(), triplet.recipe.as_str())),
5✔
979
    );
980
}
2✔
981

982
fn print_text_batch(strategy: &ChunkingStrategy, batch: &TextBatch, split_store: &impl SplitStore) {
2✔
983
    println!("=== text batch ===");
2✔
984
    for (idx, sample) in batch.samples.iter().enumerate() {
5✔
985
        println!("--- sample #{} ---", idx);
5✔
986
        println!("recipe       : {}", sample.recipe);
5✔
987
        println!("sample_weight: {:.4}", sample.weight);
5✔
988
        if let Some(instr) = &sample.instruction {
5✔
989
            println!("instruction shown to model:\n{}\n", instr);
1✔
990
        }
4✔
991
        print_chunk_block(
5✔
992
            "TEXT",
5✔
993
            &sample.chunk,
5✔
994
            strategy,
5✔
995
            split_store,
5✔
996
            None,
5✔
997
            None,
5✔
998
            None,
5✔
999
        );
1000
    }
1001
    print_source_summary(
2✔
1002
        "text samples",
2✔
1003
        batch
2✔
1004
            .samples
2✔
1005
            .iter()
2✔
1006
            .map(|sample| sample.chunk.record_id.as_str()),
5✔
1007
    );
1008
    print_recipe_context_by_source(
2✔
1009
        "text recipes by source",
2✔
1010
        batch
2✔
1011
            .samples
2✔
1012
            .iter()
2✔
1013
            .map(|sample| (sample.chunk.record_id.as_str(), sample.recipe.as_str())),
5✔
1014
    );
1015
}
2✔
1016

1017
fn print_pair_batch(
3✔
1018
    strategy: &ChunkingStrategy,
3✔
1019
    batch: &SampleBatch,
3✔
1020
    split_store: &impl SplitStore,
3✔
1021
) {
3✔
1022
    println!("=== pair batch ===");
3✔
1023
    for (idx, pair) in batch.pairs.iter().enumerate() {
9✔
1024
        println!("--- pair #{} ---", idx);
9✔
1025
        println!("recipe       : {}", pair.recipe);
9✔
1026
        println!("label        : {:?}", pair.label);
9✔
1027
        if let Some(reason) = &pair.reason {
9✔
1028
            println!("reason       : {}", reason);
5✔
1029
        }
5✔
1030
        print_chunk_block(
9✔
1031
            "ANCHOR",
9✔
1032
            &pair.anchor,
9✔
1033
            strategy,
9✔
1034
            split_store,
9✔
1035
            None,
9✔
1036
            None,
9✔
1037
            None,
9✔
1038
        );
1039
        print_chunk_block(
9✔
1040
            "OTHER",
9✔
1041
            &pair.positive,
9✔
1042
            strategy,
9✔
1043
            split_store,
9✔
1044
            None,
9✔
1045
            None,
9✔
1046
            None,
9✔
1047
        );
1048
    }
1049
    print_source_summary(
3✔
1050
        "pair anchors",
3✔
1051
        batch
3✔
1052
            .pairs
3✔
1053
            .iter()
3✔
1054
            .map(|pair| pair.anchor.record_id.as_str()),
9✔
1055
    );
1056
    print_recipe_context_by_source(
3✔
1057
        "pair recipes by source",
3✔
1058
        batch
3✔
1059
            .pairs
3✔
1060
            .iter()
3✔
1061
            .map(|pair| (pair.anchor.record_id.as_str(), pair.recipe.as_str())),
9✔
1062
    );
1063
}
3✔
1064

1065
fn print_text_recipes(recipes: &[TextRecipe]) {
3✔
1066
    println!("=== available text recipes ===");
3✔
1067
    for recipe in recipes {
7✔
1068
        println!(
7✔
1069
            "- {} (weight: {:.3}) selector={:?}",
1070
            recipe.name, recipe.weight, recipe.selector
1071
        );
1072
        if let Some(instr) = &recipe.instruction {
7✔
1073
            println!("  instruction: {}", instr);
1✔
1074
        }
6✔
1075
    }
1076
}
3✔
1077

1078
#[cfg(feature = "extended-metrics")]
1079
fn metric_mean_median(vals: &mut [f32]) -> (f32, f32) {
22✔
1080
    let mean = vals.iter().sum::<f32>() / vals.len() as f32;
22✔
1081
    vals.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
95✔
1082
    let median = if vals.len() % 2 == 1 {
22✔
1083
        vals[vals.len() / 2]
1✔
1084
    } else {
1085
        (vals[vals.len() / 2 - 1] + vals[vals.len() / 2]) / 2.0
21✔
1086
    };
1087
    (mean, median)
22✔
1088
}
22✔
1089

1090
#[cfg(feature = "extended-metrics")]
1091
fn print_metric_summary(source_data: &SourceMetricsMap) {
2✔
1092
    let total: usize = source_data.values().map(|v| v.len()).sum();
3✔
1093
    let n_sources = source_data.len();
2✔
1094
    println!(
2✔
1095
        "=== extended metrics summary ({} triplets, {} {}) ===",
1096
        total,
1097
        n_sources,
1098
        if n_sources == 1 { "source" } else { "sources" }
2✔
1099
    );
1100

1101
    // Returns [pos, neg] as (mean, median) pairs for one metric across entries.
1102
    fn metric_pair(entries: &[MetricEntry], pos_idx: usize, neg_idx: usize) -> [(f32, f32); 2] {
8✔
1103
        let extract = |idx: usize| -> Vec<f32> {
16✔
1104
            entries
16✔
1105
                .iter()
16✔
1106
                .map(|e| match idx {
64✔
1107
                    0 => e.0,
16✔
1108
                    1 => e.1,
16✔
1109
                    2 => e.2,
16✔
1110
                    3 => e.3,
16✔
UNCOV
1111
                    _ => e.4,
×
1112
                })
64✔
1113
                .collect()
16✔
1114
        };
16✔
1115
        let mut pos_vals = extract(pos_idx);
8✔
1116
        let mut neg_vals = extract(neg_idx);
8✔
1117
        [
8✔
1118
            metric_mean_median(&mut pos_vals),
8✔
1119
            metric_mean_median(&mut neg_vals),
8✔
1120
        ]
8✔
1121
    }
8✔
1122

1123
    fn print_metric_section(
4✔
1124
        label: &str,
4✔
1125
        sources: &[&String],
4✔
1126
        source_data: &SourceMetricsMap,
4✔
1127
        pos_idx: usize,
4✔
1128
        neg_idx: usize,
4✔
1129
        total: usize,
4✔
1130
        n_sources: usize,
4✔
1131
    ) {
4✔
1132
        const SEP: usize = 83;
1133
        println!();
4✔
1134
        println!("[{}]", label);
4✔
1135
        println!(
4✔
1136
            "{:<24} {:>5}  {:<16} {:<16} {:<16}",
1137
            "source", "n", "positive", "negative", "gap (pos\u{2212}neg)"
1138
        );
1139
        println!(
4✔
1140
            "{:<24} {:>5}  {:<16} {:<16} {:<16}",
1141
            "", "", "mean / median", "mean / median", "mean / median"
1142
        );
1143
        println!("{}", "-".repeat(SEP));
4✔
1144
        for source in sources {
6✔
1145
            let entries = &source_data[*source];
6✔
1146
            let [pos, neg] = metric_pair(entries, pos_idx, neg_idx);
6✔
1147
            let gap_mean = pos.0 - neg.0;
6✔
1148
            let gap_med = pos.1 - neg.1;
6✔
1149
            println!(
6✔
1150
                "{:<24} {:>5}  {:.3} / {:.3}     {:.3} / {:.3}     {:+.3} / {:+.3}",
6✔
1151
                source,
6✔
1152
                entries.len(),
6✔
1153
                pos.0,
6✔
1154
                pos.1,
6✔
1155
                neg.0,
6✔
1156
                neg.1,
6✔
1157
                gap_mean,
6✔
1158
                gap_med,
6✔
1159
            );
6✔
1160
        }
6✔
1161
        if n_sources > 1 {
4✔
1162
            let all: Vec<MetricEntry> = source_data.values().flatten().copied().collect();
2✔
1163
            let [pos, neg] = metric_pair(&all, pos_idx, neg_idx);
2✔
1164
            let gap_mean = pos.0 - neg.0;
2✔
1165
            let gap_med = pos.1 - neg.1;
2✔
1166
            println!("{}", "-".repeat(SEP));
2✔
1167
            println!(
2✔
1168
                "{:<24} {:>5}  {:.3} / {:.3}     {:.3} / {:.3}     {:+.3} / {:+.3}",
2✔
1169
                "ALL", total, pos.0, pos.1, neg.0, neg.1, gap_mean, gap_med,
2✔
1170
            );
2✔
1171
        }
2✔
1172
    }
4✔
1173

1174
    fn print_single_metric_section(
2✔
1175
        label: &str,
2✔
1176
        sources: &[&String],
2✔
1177
        source_data: &SourceMetricsMap,
2✔
1178
        idx: usize,
2✔
1179
        total: usize,
2✔
1180
        n_sources: usize,
2✔
1181
    ) {
2✔
1182
        const SEP: usize = 58;
1183
        println!();
2✔
1184
        println!("[{}]", label);
2✔
1185
        println!("{:<24} {:>5}  {:<16}", "source", "n", "mean / median");
2✔
1186
        println!("{}", "-".repeat(SEP));
2✔
1187
        for source in sources {
3✔
1188
            let entries = &source_data[*source];
3✔
1189
            let mut vals: Vec<f32> = entries
3✔
1190
                .iter()
3✔
1191
                .map(|e| match idx {
12✔
UNCOV
1192
                    0 => e.0,
×
UNCOV
1193
                    1 => e.1,
×
UNCOV
1194
                    2 => e.2,
×
UNCOV
1195
                    3 => e.3,
×
1196
                    _ => e.4,
12✔
1197
                })
12✔
1198
                .collect();
3✔
1199
            let (mean, median) = metric_mean_median(&mut vals);
3✔
1200
            println!(
3✔
1201
                "{:<24} {:>5}  {:.3} / {:.3}",
1202
                source,
1203
                entries.len(),
3✔
1204
                mean,
1205
                median,
1206
            );
1207
        }
1208
        if n_sources > 1 {
2✔
1209
            let mut all: Vec<f32> = source_data
1✔
1210
                .values()
1✔
1211
                .flatten()
1✔
1212
                .map(|e| match idx {
4✔
UNCOV
1213
                    0 => e.0,
×
UNCOV
1214
                    1 => e.1,
×
UNCOV
1215
                    2 => e.2,
×
UNCOV
1216
                    3 => e.3,
×
1217
                    _ => e.4,
4✔
1218
                })
4✔
1219
                .collect();
1✔
1220
            let (mean, median) = metric_mean_median(&mut all);
1✔
1221
            println!("{}", "-".repeat(SEP));
1✔
1222
            println!("{:<24} {:>5}  {:.3} / {:.3}", "ALL", total, mean, median);
1✔
1223
        }
1✔
1224
    }
2✔
1225

1226
    let mut sources: Vec<&String> = source_data.keys().collect();
2✔
1227
    sources.sort();
2✔
1228

1229
    print_metric_section(
2✔
1230
        "jaccard \u{2194} anchor",
2✔
1231
        &sources,
2✔
1232
        source_data,
2✔
1233
        0,
1234
        2,
1235
        total,
2✔
1236
        n_sources,
2✔
1237
    );
1238
    print_metric_section(
2✔
1239
        "byte-cos \u{2194} anchor",
2✔
1240
        &sources,
2✔
1241
        source_data,
2✔
1242
        1,
1243
        3,
1244
        total,
2✔
1245
        n_sources,
2✔
1246
    );
1247
    print_single_metric_section(
2✔
1248
        "anchor-positive proximity",
2✔
1249
        &sources,
2✔
1250
        source_data,
2✔
1251
        4,
1252
        total,
2✔
1253
        n_sources,
2✔
1254
    );
1255
    println!();
2✔
1256
}
2✔
1257

1258
trait ChunkDebug {
1259
    fn view_name(&self) -> String;
1260
}
1261

1262
impl ChunkDebug for RecordChunk {
1263
    fn view_name(&self) -> String {
38✔
1264
        match &self.view {
38✔
1265
            ChunkView::Window {
1266
                index,
36✔
1267
                span,
36✔
1268
                overlap,
36✔
1269
            } => format!(
36✔
1270
                "window#index={} span={} overlap={} tokens={}",
1271
                index, span, overlap, self.tokens_estimate
1272
            ),
1273
            ChunkView::SummaryFallback { strategy, .. } => {
2✔
1274
                format!("summary:{} tokens={}", strategy, self.tokens_estimate)
2✔
1275
            }
1276
        }
1277
    }
38✔
1278
}
1279

1280
fn print_chunk_block(
38✔
1281
    title: &str,
38✔
1282
    chunk: &RecordChunk,
38✔
1283
    strategy: &ChunkingStrategy,
38✔
1284
    split_store: &impl SplitStore,
38✔
1285
    anchor_sim: Option<(f32, f32)>,
38✔
1286
    ap_proximity: Option<f32>,
38✔
1287
    ap_distance: Option<f32>,
38✔
1288
) {
38✔
1289
    let chunk_weight = chunk_weight(strategy, chunk);
38✔
1290
    let split = split_store
38✔
1291
        .label_for(&chunk.record_id)
38✔
1292
        .map(|label| format!("{:?}", label))
38✔
1293
        .unwrap_or_else(|| "Unknown".to_string());
38✔
1294
    println!("--- {} ---", title);
38✔
1295
    println!("split        : {}", split);
38✔
1296
    println!("view         : {}", chunk.view_name());
38✔
1297
    println!("chunk_weight : {:.4}", chunk_weight);
38✔
1298
    println!("record_id    : {}", chunk.record_id);
38✔
1299
    println!("section_idx  : {}", chunk.section_idx);
38✔
1300
    println!("token_est    : {}", chunk.tokens_estimate);
38✔
1301
    if let Some(proximity) = ap_proximity {
38✔
1302
        println!("a<->p proximity  : {:.4}", proximity);
5✔
1303
    }
33✔
1304
    if let Some(distance) = ap_distance {
38✔
UNCOV
1305
        println!("a<->p distance   : {:.4}", distance);
×
1306
    }
38✔
1307
    if let Some((j, c)) = anchor_sim {
38✔
1308
        println!("jaccard(↔a)  : {:.4}  byte-cos(↔a): {:.4}", j, c);
10✔
1309
    }
28✔
1310
    println!("model_input (exact text sent to the model):");
38✔
1311
    println!(
38✔
1312
        "<<< BEGIN MODEL TEXT >>>\n{}\n<<< END MODEL TEXT >>>\n",
1313
        chunk.text
1314
    );
1315
}
38✔
1316

1317
fn print_source_summary<'a, I>(label: &str, ids: I)
9✔
1318
where
9✔
1319
    I: Iterator<Item = &'a str>,
9✔
1320
{
1321
    let mut counts: HashMap<SourceId, usize> = HashMap::new();
9✔
1322
    for id in ids {
23✔
1323
        let source = extract_source(id);
23✔
1324
        *counts.entry(source).or_insert(0) += 1;
23✔
1325
    }
23✔
1326
    if counts.is_empty() {
9✔
1327
        return;
1✔
1328
    }
8✔
1329
    let skew = source_skew(&counts);
8✔
1330
    let mut entries: Vec<(String, usize)> = counts.into_iter().collect();
8✔
1331
    entries.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
8✔
1332
    println!("--- {} by source ---", label);
8✔
1333
    if let Some(skew) = skew {
8✔
1334
        for entry in &skew.per_source {
10✔
1335
            println!(
10✔
1336
                "{}: count={} share={:.2}",
10✔
1337
                entry.source, entry.count, entry.share
10✔
1338
            );
10✔
1339
        }
10✔
1340
        println!(
8✔
1341
            "skew: sources={} total={} min={} max={} mean={:.2} ratio={:.2}",
1342
            skew.sources, skew.total, skew.min, skew.max, skew.mean, skew.ratio
1343
        );
UNCOV
1344
    }
×
1345
}
9✔
1346

1347
fn print_recipe_context_by_source<'a, I>(label: &str, entries: I)
8✔
1348
where
8✔
1349
    I: Iterator<Item = (&'a str, &'a str)>,
8✔
1350
{
1351
    let mut counts: HashMap<SourceId, HashMap<String, usize>> = HashMap::new();
8✔
1352
    for (record_id, recipe) in entries {
19✔
1353
        let source = extract_source(record_id);
19✔
1354
        let entry = counts
19✔
1355
            .entry(source)
19✔
1356
            .or_default()
19✔
1357
            .entry(recipe.to_string())
19✔
1358
            .or_insert(0);
19✔
1359
        *entry += 1;
19✔
1360
    }
19✔
1361
    if counts.is_empty() {
8✔
1362
        return;
1✔
1363
    }
7✔
1364
    let mut sources: Vec<(SourceId, HashMap<String, usize>)> = counts.into_iter().collect();
7✔
1365
    sources.sort_by(|a, b| a.0.cmp(&b.0));
7✔
1366
    println!("--- {} ---", label);
7✔
1367
    for (source, recipes) in sources {
7✔
1368
        println!("{source}");
7✔
1369
        let mut entries: Vec<(String, usize)> = recipes.into_iter().collect();
7✔
1370
        entries.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
7✔
1371
        for (recipe, count) in entries {
8✔
1372
            println!("  - {recipe}={count}");
8✔
1373
        }
8✔
1374
    }
1375
}
8✔
1376

1377
fn extract_source(record_id: &str) -> SourceId {
52✔
1378
    record_id
52✔
1379
        .split_once("::")
52✔
1380
        .map(|(source, _)| source.to_string())
52✔
1381
        .unwrap_or_else(|| "unknown".to_string())
52✔
1382
}
52✔
1383

1384
#[cfg(test)]
1385
mod tests {
1386
    use super::*;
1387
    use crate::DataRecord;
1388
    use crate::DeterministicSplitStore;
1389
    use crate::data::{QualityScore, RecordSection, SectionRole};
1390
    use crate::source::{SourceCursor, SourceSnapshot};
1391
    use crate::utils::make_section;
1392
    use chrono::{TimeZone, Utc};
1393
    use tempfile::tempdir;
1394

1395
    fn empty_dyn_sources(_: &()) -> Vec<DynSource> {
2✔
1396
        Vec::new()
2✔
1397
    }
2✔
1398

1399
    fn ok_unit_roots(_: Vec<String>) -> Result<(), Box<dyn Error>> {
1✔
1400
        Ok(())
1✔
1401
    }
1✔
1402

1403
    fn error_unit_roots(_: Vec<String>) -> Result<(), Box<dyn Error>> {
2✔
1404
        Err("root-resolution-error".into())
2✔
1405
    }
2✔
1406

1407
    struct ErrorRefreshSource {
1408
        id: String,
1409
    }
1410

1411
    impl DataSource for ErrorRefreshSource {
1412
        fn id(&self) -> &str {
84✔
1413
            &self.id
84✔
1414
        }
84✔
1415

1416
        fn refresh(
20✔
1417
            &self,
20✔
1418
            _config: &SamplerConfig,
20✔
1419
            _cursor: Option<&SourceCursor>,
20✔
1420
            _limit: Option<usize>,
20✔
1421
        ) -> Result<SourceSnapshot, SamplerError> {
20✔
1422
            Err(SamplerError::SourceUnavailable {
20✔
1423
                source_id: self.id.clone(),
20✔
1424
                reason: "simulated refresh failure".to_string(),
20✔
1425
            })
20✔
1426
        }
20✔
1427

1428
        fn reported_record_count(&self, _config: &SamplerConfig) -> Result<u128, SamplerError> {
1✔
1429
            Ok(1)
1✔
1430
        }
1✔
1431

1432
        fn default_triplet_recipes(&self) -> Vec<TripletRecipe> {
5✔
1433
            vec![default_recipe("error_refresh_recipe")]
5✔
1434
        }
5✔
1435
    }
1436

1437
    /// Minimal in-memory `DataSource` test double for example app tests.
1438
    struct TestSource {
1439
        id: String,
1440
        count: Option<u128>,
1441
        recipes: Vec<TripletRecipe>,
1442
    }
1443

1444
    impl DataSource for TestSource {
1445
        fn id(&self) -> &str {
131✔
1446
            &self.id
131✔
1447
        }
131✔
1448

1449
        fn refresh(
30✔
1450
            &self,
30✔
1451
            _config: &SamplerConfig,
30✔
1452
            _cursor: Option<&SourceCursor>,
30✔
1453
            _limit: Option<usize>,
30✔
1454
        ) -> Result<SourceSnapshot, SamplerError> {
30✔
1455
            Ok(SourceSnapshot {
30✔
1456
                records: Vec::new(),
30✔
1457
                cursor: SourceCursor {
30✔
1458
                    last_seen: Utc::now(),
30✔
1459
                    revision: 0,
30✔
1460
                },
30✔
1461
            })
30✔
1462
        }
30✔
1463

1464
        fn reported_record_count(&self, _config: &SamplerConfig) -> Result<u128, SamplerError> {
2✔
1465
            self.count.ok_or_else(|| SamplerError::SourceInconsistent {
2✔
1466
                source_id: self.id.clone(),
1✔
1467
                details: "test source has no configured exact count".to_string(),
1✔
1468
            })
1✔
1469
        }
2✔
1470

1471
        fn default_triplet_recipes(&self) -> Vec<TripletRecipe> {
11✔
1472
            self.recipes.clone()
11✔
1473
        }
11✔
1474
    }
1475

1476
    struct ConfigRequiredSource {
1477
        id: String,
1478
        expected_seed: u64,
1479
    }
1480

1481
    impl DataSource for ConfigRequiredSource {
1482
        fn id(&self) -> &str {
1✔
1483
            &self.id
1✔
1484
        }
1✔
1485

1486
        fn refresh(
1✔
1487
            &self,
1✔
1488
            _config: &SamplerConfig,
1✔
1489
            _cursor: Option<&SourceCursor>,
1✔
1490
            _limit: Option<usize>,
1✔
1491
        ) -> Result<SourceSnapshot, SamplerError> {
1✔
1492
            Ok(SourceSnapshot {
1✔
1493
                records: Vec::new(),
1✔
1494
                cursor: SourceCursor {
1✔
1495
                    last_seen: Utc::now(),
1✔
1496
                    revision: 0,
1✔
1497
                },
1✔
1498
            })
1✔
1499
        }
1✔
1500

1501
        fn reported_record_count(&self, config: &SamplerConfig) -> Result<u128, SamplerError> {
2✔
1502
            if config.seed == self.expected_seed {
2✔
1503
                Ok(1)
1✔
1504
            } else {
1505
                Err(SamplerError::SourceInconsistent {
1✔
1506
                    source_id: self.id.clone(),
1✔
1507
                    details: format!(
1✔
1508
                        "expected sampler seed {} but got {}",
1✔
1509
                        self.expected_seed, config.seed
1✔
1510
                    ),
1✔
1511
                })
1✔
1512
            }
1513
        }
2✔
1514

1515
        fn default_triplet_recipes(&self) -> Vec<TripletRecipe> {
2✔
1516
            Vec::new()
2✔
1517
        }
2✔
1518
    }
1519

1520
    struct FixtureSource {
1521
        id: String,
1522
        records: Vec<DataRecord>,
1523
        recipes: Vec<TripletRecipe>,
1524
    }
1525

1526
    impl DataSource for FixtureSource {
1527
        fn id(&self) -> &str {
65✔
1528
            &self.id
65✔
1529
        }
65✔
1530

1531
        fn refresh(
15✔
1532
            &self,
15✔
1533
            _config: &SamplerConfig,
15✔
1534
            _cursor: Option<&SourceCursor>,
15✔
1535
            _limit: Option<usize>,
15✔
1536
        ) -> Result<SourceSnapshot, SamplerError> {
15✔
1537
            Ok(SourceSnapshot {
15✔
1538
                records: self.records.clone(),
15✔
1539
                cursor: SourceCursor {
15✔
1540
                    last_seen: Utc::now(),
15✔
1541
                    revision: 0,
15✔
1542
                },
15✔
1543
            })
15✔
1544
        }
15✔
1545

1546
        fn reported_record_count(&self, _config: &SamplerConfig) -> Result<u128, SamplerError> {
1✔
1547
            Ok(self.records.len() as u128)
1✔
1548
        }
1✔
1549

1550
        fn default_triplet_recipes(&self) -> Vec<TripletRecipe> {
6✔
1551
            self.recipes.clone()
6✔
1552
        }
6✔
1553
    }
1554

1555
    struct IngestionConfigSource {
1556
        expected_ingestion_max_records: usize,
1557
        records: Vec<DataRecord>,
1558
    }
1559

1560
    impl DataSource for IngestionConfigSource {
1561
        fn id(&self) -> &str {
7✔
1562
            "ingestion_config_source"
7✔
1563
        }
7✔
1564

1565
        fn refresh(
3✔
1566
            &self,
3✔
1567
            config: &SamplerConfig,
3✔
1568
            _cursor: Option<&SourceCursor>,
3✔
1569
            _limit: Option<usize>,
3✔
1570
        ) -> Result<SourceSnapshot, SamplerError> {
3✔
1571
            if config.ingestion_max_records != self.expected_ingestion_max_records {
3✔
1572
                return Err(SamplerError::SourceInconsistent {
1✔
1573
                    source_id: self.id().to_string(),
1✔
1574
                    details: format!(
1✔
1575
                        "expected ingestion_max_records {} but got {}",
1✔
1576
                        self.expected_ingestion_max_records, config.ingestion_max_records
1✔
1577
                    ),
1✔
1578
                });
1✔
1579
            }
2✔
1580
            Ok(SourceSnapshot {
2✔
1581
                records: self.records.clone(),
2✔
1582
                cursor: SourceCursor {
2✔
1583
                    last_seen: Utc::now(),
2✔
1584
                    revision: 0,
2✔
1585
                },
2✔
1586
            })
2✔
1587
        }
3✔
1588

1589
        fn reported_record_count(&self, _config: &SamplerConfig) -> Result<u128, SamplerError> {
1✔
1590
            Ok(self.records.len() as u128)
1✔
1591
        }
1✔
1592

1593
        fn default_triplet_recipes(&self) -> Vec<TripletRecipe> {
2✔
1594
            vec![default_recipe("ingestion_config_recipe")]
2✔
1595
        }
2✔
1596
    }
1597

1598
    fn fixture_record(
25✔
1599
        source: &str,
25✔
1600
        id_suffix: &str,
25✔
1601
        day: u32,
25✔
1602
        title: &str,
25✔
1603
        body: &str,
25✔
1604
    ) -> DataRecord {
25✔
1605
        let now = Utc.with_ymd_and_hms(2025, 1, day, 12, 0, 0).unwrap();
25✔
1606
        DataRecord {
25✔
1607
            id: format!("{source}::{id_suffix}"),
25✔
1608
            source: source.to_string(),
25✔
1609
            created_at: now,
25✔
1610
            updated_at: now,
25✔
1611
            quality: QualityScore { trust: 1.0 },
25✔
1612
            taxonomy: Vec::new(),
25✔
1613
            sections: vec![
25✔
1614
                make_section(SectionRole::Anchor, Some("title"), title),
25✔
1615
                make_section(SectionRole::Context, Some("body"), body),
25✔
1616
            ],
25✔
1617
            meta_prefix: None,
25✔
1618
        }
25✔
1619
    }
25✔
1620

1621
    fn default_recipe(name: &str) -> TripletRecipe {
24✔
1622
        TripletRecipe {
24✔
1623
            name: name.to_string().into(),
24✔
1624
            anchor: crate::config::Selector::Role(SectionRole::Anchor),
24✔
1625
            positive_selector: crate::config::Selector::Role(SectionRole::Context),
24✔
1626
            negative_selector: crate::config::Selector::Role(SectionRole::Context),
24✔
1627
            negative_strategy: crate::config::NegativeStrategy::WrongArticle,
24✔
1628
            weight: 1.0,
24✔
1629
            instruction: None,
24✔
1630
            allow_same_anchor_positive: false,
24✔
1631
        }
24✔
1632
    }
24✔
1633

1634
    #[test]
1635
    fn parse_helpers_validate_inputs() {
1✔
1636
        assert_eq!(parse_batch_size("2").unwrap(), 2);
1✔
1637
        assert!(parse_batch_size("0").is_err());
1✔
1638
        assert!(parse_batch_size("abc").is_err());
1✔
1639
        assert_eq!(parse_ingestion_max_records("16").unwrap(), 16);
1✔
1640
        assert!(parse_ingestion_max_records("0").is_err());
1✔
1641
        assert!(parse_batch_count("0").is_err());
1✔
1642

1643
        let split = parse_split_ratios_arg("0.8,0.1,0.1").unwrap();
1✔
1644
        assert!((split.train - 0.8).abs() < 1e-6);
1✔
1645
        assert!(parse_split_ratios_arg("0.8,0.1").is_err());
1✔
1646
        assert!(parse_split_ratios_arg("1.0,0.0,0.1").is_err());
1✔
1647
        assert!(parse_split_ratios_arg("-0.1,0.6,0.5").is_err());
1✔
1648
    }
1✔
1649

1650
    #[test]
1651
    fn fixture_and_ingestion_sources_trait_methods_cover_paths() {
1✔
1652
        let records = vec![fixture_record("fixture_source", "r1", 1, "Title", "Body")];
1✔
1653
        let recipes = vec![default_recipe("fixture_recipe")];
1✔
1654
        let fixture = FixtureSource {
1✔
1655
            id: "fixture_source".into(),
1✔
1656
            records: records.clone(),
1✔
1657
            recipes: recipes.clone(),
1✔
1658
        };
1✔
1659

1660
        let snapshot = fixture
1✔
1661
            .refresh(&SamplerConfig::default(), None, None)
1✔
1662
            .expect("fixture refresh should succeed");
1✔
1663
        assert_eq!(snapshot.records.len(), 1);
1✔
1664
        assert_eq!(
1✔
1665
            fixture
1✔
1666
                .reported_record_count(&SamplerConfig::default())
1✔
1667
                .unwrap(),
1✔
1668
            1
1669
        );
1670
        assert_eq!(fixture.default_triplet_recipes().len(), 1);
1✔
1671

1672
        let source = IngestionConfigSource {
1✔
1673
            expected_ingestion_max_records: 7,
1✔
1674
            records,
1✔
1675
        };
1✔
1676
        let ok_cfg = SamplerConfig {
1✔
1677
            ingestion_max_records: 7,
1✔
1678
            ..SamplerConfig::default()
1✔
1679
        };
1✔
1680
        assert!(source.refresh(&ok_cfg, None, None).is_ok());
1✔
1681
        assert_eq!(source.reported_record_count(&ok_cfg).unwrap(), 1);
1✔
1682
        assert_eq!(source.default_triplet_recipes().len(), 1);
1✔
1683

1684
        let bad_cfg = SamplerConfig {
1✔
1685
            ingestion_max_records: 8,
1✔
1686
            ..SamplerConfig::default()
1✔
1687
        };
1✔
1688
        let err = source.refresh(&bad_cfg, None, None).unwrap_err();
1✔
1689
        assert!(matches!(err, SamplerError::SourceInconsistent { .. }));
1✔
1690
    }
1✔
1691

1692
    #[test]
1693
    fn suggested_balancing_weight_is_longest_normalized_and_bounded() {
1✔
1694
        assert!((suggested_balancing_weight(100, 100) - 1.0).abs() < 1e-6);
1✔
1695
        assert!((suggested_balancing_weight(400, 100) - 0.25).abs() < 1e-6);
1✔
1696
        assert!((suggested_balancing_weight(400, 400) - 1.0).abs() < 1e-6);
1✔
1697
        assert_eq!(suggested_balancing_weight(0, 100), 0.0);
1✔
1698
        assert_eq!(suggested_balancing_weight(100, 0), 0.0);
1✔
1699
    }
1✔
1700

1701
    #[test]
1702
    fn suggested_oversampling_weight_is_inverse_in_unit_interval() {
1✔
1703
        assert!((suggested_oversampling_weight(100, 100) - 1.0).abs() < 1e-6);
1✔
1704
        assert!((suggested_oversampling_weight(100, 400) - 0.25).abs() < 1e-6);
1✔
1705
        assert!((suggested_oversampling_weight(100, 1000) - 0.1).abs() < 1e-6);
1✔
1706
        assert_eq!(suggested_oversampling_weight(0, 100), 0.0);
1✔
1707
        assert_eq!(suggested_oversampling_weight(100, 0), 0.0);
1✔
1708
    }
1✔
1709

1710
    #[test]
1711
    fn parse_cli_handles_help_and_invalid_args() {
1✔
1712
        let help = parse_cli::<EstimateCapacityCli, _>(["estimate_capacity", "--help"]).unwrap();
1✔
1713
        assert!(help.is_none());
1✔
1714

1715
        let err = parse_cli::<EstimateCapacityCli, _>(["estimate_capacity", "--unknown"]);
1✔
1716
        assert!(err.is_err());
1✔
1717
    }
1✔
1718

1719
    #[test]
1720
    fn run_estimate_capacity_succeeds_with_reported_counts() {
1✔
1721
        let result = run_estimate_capacity(
1✔
1722
            std::iter::empty::<String>(),
1✔
1723
            |roots| {
1✔
1724
                assert!(roots.is_empty());
1✔
1725
                Ok(())
1✔
1726
            },
1✔
1727
            |_| {
1✔
1728
                vec![Box::new(TestSource {
1✔
1729
                    id: "source_a".into(),
1✔
1730
                    count: Some(12),
1✔
1731
                    recipes: vec![default_recipe("r1")],
1✔
1732
                }) as DynSource]
1✔
1733
            },
1✔
1734
        );
1735

1736
        assert!(result.is_ok());
1✔
1737
    }
1✔
1738

1739
    #[test]
1740
    fn run_estimate_capacity_errors_when_source_count_missing() {
1✔
1741
        let result = run_estimate_capacity(
1✔
1742
            std::iter::empty::<String>(),
1✔
1743
            |_| Ok(()),
1✔
1744
            |_| {
1✔
1745
                vec![Box::new(TestSource {
1✔
1746
                    id: "source_missing".into(),
1✔
1747
                    count: None,
1✔
1748
                    recipes: vec![default_recipe("r1")],
1✔
1749
                }) as DynSource]
1✔
1750
            },
1✔
1751
        );
1752

1753
        let err = result.unwrap_err().to_string();
1✔
1754
        assert!(err.contains("failed to report exact record count"));
1✔
1755
    }
1✔
1756

1757
    #[test]
1758
    fn run_estimate_capacity_propagates_root_resolution_error() {
1✔
1759
        let result = run_estimate_capacity(
1✔
1760
            std::iter::empty::<String>(),
1✔
1761
            |_| Err("root resolution failed".into()),
1✔
1762
            empty_dyn_sources,
1763
        );
1764

1765
        let err = result.unwrap_err().to_string();
1✔
1766
        assert!(err.contains("root resolution failed"));
1✔
1767
    }
1✔
1768

1769
    #[test]
1770
    fn run_estimate_capacity_allows_empty_source_list() {
1✔
1771
        let result =
1✔
1772
            run_estimate_capacity(std::iter::empty::<String>(), |_| Ok(()), empty_dyn_sources);
1✔
1773

1774
        assert!(result.is_ok());
1✔
1775
    }
1✔
1776

1777
    #[test]
1778
    fn run_estimate_capacity_configures_sources_centrally_before_counting() {
1✔
1779
        let result = run_estimate_capacity(
1✔
1780
            std::iter::empty::<String>(),
1✔
1781
            |_| Ok(()),
1✔
1782
            |_| {
1✔
1783
                vec![Box::new(ConfigRequiredSource {
1✔
1784
                    id: "requires_config".into(),
1✔
1785
                    expected_seed: 99,
1✔
1786
                }) as DynSource]
1✔
1787
            },
1✔
1788
        );
1789

1790
        assert!(result.is_ok());
1✔
1791
    }
1✔
1792

1793
    #[test]
1794
    fn config_required_source_refresh_and_seed_mismatch_are_exercised() {
1✔
1795
        let source = ConfigRequiredSource {
1✔
1796
            id: "cfg-source".to_string(),
1✔
1797
            expected_seed: 42,
1✔
1798
        };
1✔
1799

1800
        let refreshed = source
1✔
1801
            .refresh(&SamplerConfig::default(), None, None)
1✔
1802
            .unwrap();
1✔
1803
        assert!(refreshed.records.is_empty());
1✔
1804

1805
        let mismatched = source.reported_record_count(&SamplerConfig {
1✔
1806
            seed: 7,
1✔
1807
            ..SamplerConfig::default()
1✔
1808
        });
1✔
1809
        assert!(matches!(
1✔
1810
            mismatched,
1✔
1811
            Err(SamplerError::SourceInconsistent { .. })
1812
        ));
1813

1814
        assert!(source.default_triplet_recipes().is_empty());
1✔
1815
    }
1✔
1816

1817
    #[test]
1818
    fn run_multi_source_demo_exhausted_paths_return_ok() {
1✔
1819
        struct OneRecordSource;
1820

1821
        impl DataSource for OneRecordSource {
1822
            fn id(&self) -> &str {
48✔
1823
                "one_record"
48✔
1824
            }
48✔
1825

1826
            fn refresh(
11✔
1827
                &self,
11✔
1828
                _config: &SamplerConfig,
11✔
1829
                _cursor: Option<&SourceCursor>,
11✔
1830
                _limit: Option<usize>,
11✔
1831
            ) -> Result<SourceSnapshot, SamplerError> {
11✔
1832
                let now = Utc::now();
11✔
1833
                Ok(SourceSnapshot {
11✔
1834
                    records: vec![DataRecord {
11✔
1835
                        id: "one_record::r1".to_string(),
11✔
1836
                        source: "one_record".to_string(),
11✔
1837
                        created_at: now,
11✔
1838
                        updated_at: now,
11✔
1839
                        quality: QualityScore { trust: 1.0 },
11✔
1840
                        taxonomy: Vec::new(),
11✔
1841
                        sections: vec![
11✔
1842
                            RecordSection {
11✔
1843
                                role: SectionRole::Anchor,
11✔
1844
                                heading: Some("title".to_string()),
11✔
1845
                                text: "anchor".to_string(),
11✔
1846
                                sentences: vec!["anchor".to_string()],
11✔
1847
                            },
11✔
1848
                            RecordSection {
11✔
1849
                                role: SectionRole::Context,
11✔
1850
                                heading: Some("body".to_string()),
11✔
1851
                                text: "context".to_string(),
11✔
1852
                                sentences: vec!["context".to_string()],
11✔
1853
                            },
11✔
1854
                        ],
11✔
1855
                        meta_prefix: None,
11✔
1856
                    }],
11✔
1857
                    cursor: SourceCursor {
11✔
1858
                        last_seen: now,
11✔
1859
                        revision: 0,
11✔
1860
                    },
11✔
1861
                })
11✔
1862
            }
11✔
1863

1864
            fn reported_record_count(&self, _config: &SamplerConfig) -> Result<u128, SamplerError> {
1✔
1865
                Ok(1)
1✔
1866
            }
1✔
1867

1868
            fn default_triplet_recipes(&self) -> Vec<TripletRecipe> {
4✔
1869
                vec![default_recipe("single_record_recipe")]
4✔
1870
            }
4✔
1871
        }
1872

1873
        let one = OneRecordSource;
1✔
1874
        assert_eq!(
1✔
1875
            one.reported_record_count(&SamplerConfig::default())
1✔
1876
                .unwrap(),
1✔
1877
            1
1878
        );
1879
        assert_eq!(one.default_triplet_recipes().len(), 1);
1✔
1880

1881
        for mode in ["--pair-batch", "--text-recipes", ""] {
3✔
1882
            let dir = tempdir().unwrap();
3✔
1883
            let split_store_path = dir.path().join("split_store.bin");
3✔
1884
            let mut args = vec![
3✔
1885
                "--split-store-path".to_string(),
3✔
1886
                split_store_path.to_string_lossy().to_string(),
3✔
1887
            ];
1888
            if !mode.is_empty() {
3✔
1889
                args.push(mode.to_string());
2✔
1890
            }
2✔
1891

1892
            let result = run_multi_source_demo(
3✔
1893
                args.into_iter(),
3✔
1894
                |_| Ok(()),
3✔
1895
                |_| vec![Box::new(OneRecordSource) as DynSource],
3✔
1896
            );
1897
            assert!(result.is_ok());
3✔
1898
        }
1899
    }
1✔
1900

1901
    #[test]
1902
    fn parse_multi_source_cli_handles_help_and_batch_size_validation() {
1✔
1903
        let help = parse_cli::<MultiSourceDemoCli, _>(["multi_source_demo", "--help"]).unwrap();
1✔
1904
        assert!(help.is_none());
1✔
1905

1906
        let err = parse_cli::<MultiSourceDemoCli, _>(["multi_source_demo", "--batch-size", "0"]);
1✔
1907
        assert!(err.is_err());
1✔
1908

1909
        let err = parse_cli::<MultiSourceDemoCli, _>([
1✔
1910
            "multi_source_demo",
1✔
1911
            "--ingestion-max-records",
1✔
1912
            "0",
1✔
1913
        ]);
1✔
1914
        assert!(err.is_err());
1✔
1915

1916
        let parsed = parse_cli::<MultiSourceDemoCli, _>(["multi_source_demo"]);
1✔
1917
        assert!(parsed.is_ok());
1✔
1918
    }
1✔
1919

1920
    #[test]
1921
    fn run_example_apps_invalid_cli_args_return_errors() {
1✔
1922
        let estimate = run_estimate_capacity(
1✔
1923
            ["--unknown".to_string()].into_iter(),
1✔
1924
            ok_unit_roots,
1925
            empty_dyn_sources,
1926
        );
1927
        assert!(estimate.is_err());
1✔
1928

1929
        let demo = run_multi_source_demo(
1✔
1930
            ["--unknown".to_string()].into_iter(),
1✔
1931
            ok_unit_roots,
1932
            empty_dyn_sources,
1933
        );
1934
        assert!(demo.is_err());
1✔
1935
    }
1✔
1936

1937
    #[test]
1938
    fn helper_and_error_refresh_source_methods_are_exercised() {
1✔
1939
        assert!(ok_unit_roots(Vec::new()).is_ok());
1✔
1940
        assert!(error_unit_roots(Vec::new()).is_err());
1✔
1941

1942
        let source = ErrorRefreshSource {
1✔
1943
            id: "error_refresh_source".to_string(),
1✔
1944
        };
1✔
1945
        assert_eq!(
1✔
1946
            source
1✔
1947
                .reported_record_count(&SamplerConfig::default())
1✔
1948
                .unwrap(),
1✔
1949
            1
1950
        );
1951
        assert_eq!(source.default_triplet_recipes().len(), 1);
1✔
1952
    }
1✔
1953

1954
    #[test]
1955
    fn print_source_summary_handles_non_empty_ids() {
1✔
1956
        let ids = [
1✔
1957
            "source_a::r1",
1✔
1958
            "source_a::r2",
1✔
1959
            "source_b::r1",
1✔
1960
            "source_without_delimiter",
1✔
1961
        ];
1✔
1962
        print_source_summary("non-empty summary", ids.into_iter());
1✔
1963
    }
1✔
1964

1965
    #[test]
1966
    fn run_multi_source_demo_refresh_failures_degrade_to_exhausted_paths() {
1✔
1967
        for mode in [
4✔
1968
            vec!["--pair-batch".to_string()],
1✔
1969
            vec!["--text-recipes".to_string()],
1✔
1970
            vec!["--batches".to_string(), "1".to_string()],
1✔
1971
            Vec::new(),
1✔
1972
        ] {
1✔
1973
            let dir = tempdir().unwrap();
4✔
1974
            let split_store_path = dir.path().join("error_modes_split_store.bin");
4✔
1975
            let mut args = mode;
4✔
1976
            args.push("--split-store-path".to_string());
4✔
1977
            args.push(split_store_path.to_string_lossy().to_string());
4✔
1978

1979
            let result = run_multi_source_demo(
4✔
1980
                args.into_iter(),
4✔
1981
                |_| Ok(()),
4✔
1982
                |_| {
4✔
1983
                    vec![Box::new(ErrorRefreshSource {
4✔
1984
                        id: "error_refresh_source".to_string(),
4✔
1985
                    }) as DynSource]
4✔
1986
                },
4✔
1987
            );
1988

1989
            assert!(result.is_ok());
4✔
1990
        }
1991
    }
1✔
1992

1993
    #[test]
1994
    fn run_multi_source_demo_batches_exhausted_path_returns_ok() {
1✔
1995
        let dir = tempdir().unwrap();
1✔
1996
        let split_store_path = dir.path().join("batches_exhausted_split_store.bin");
1✔
1997
        let args = vec![
1✔
1998
            "--batches".to_string(),
1✔
1999
            "3".to_string(),
1✔
2000
            "--split-store-path".to_string(),
1✔
2001
            split_store_path.to_string_lossy().to_string(),
1✔
2002
        ];
2003

2004
        let result = run_multi_source_demo(
1✔
2005
            args.into_iter(),
1✔
2006
            |_| Ok(()),
1✔
2007
            |_| {
1✔
2008
                vec![Box::new(FixtureSource {
1✔
2009
                    id: "batches_exhausted_source".into(),
1✔
2010
                    records: vec![fixture_record(
1✔
2011
                        "batches_exhausted_source",
1✔
2012
                        "r1",
1✔
2013
                        1,
1✔
2014
                        "Only one record",
1✔
2015
                        "Single record body",
1✔
2016
                    )],
1✔
2017
                    recipes: vec![default_recipe("batches_exhausted_recipe")],
1✔
2018
                }) as DynSource]
1✔
2019
            },
1✔
2020
        );
2021

2022
        assert!(result.is_ok());
1✔
2023
    }
1✔
2024

2025
    #[test]
2026
    fn run_multi_source_demo_default_triplet_success_path_returns_ok() {
1✔
2027
        let dir = tempdir().unwrap();
1✔
2028
        let split_store_path = dir.path().join("default_triplet_success_split_store.bin");
1✔
2029
        let args = vec![
1✔
2030
            "--split-store-path".to_string(),
1✔
2031
            split_store_path.to_string_lossy().to_string(),
1✔
2032
        ];
2033

2034
        let result = run_multi_source_demo(
1✔
2035
            args.into_iter(),
1✔
2036
            |_| Ok(()),
1✔
2037
            |_| {
1✔
2038
                vec![Box::new(FixtureSource {
1✔
2039
                    id: "default_triplet_success_source".into(),
1✔
2040
                    records: vec![
1✔
2041
                        fixture_record(
1✔
2042
                            "default_triplet_success_source",
1✔
2043
                            "r1",
1✔
2044
                            1,
1✔
2045
                            "Title one",
1✔
2046
                            "Body one",
1✔
2047
                        ),
1✔
2048
                        fixture_record(
1✔
2049
                            "default_triplet_success_source",
1✔
2050
                            "r2",
1✔
2051
                            2,
1✔
2052
                            "Title two",
1✔
2053
                            "Body two",
1✔
2054
                        ),
1✔
2055
                        fixture_record(
1✔
2056
                            "default_triplet_success_source",
1✔
2057
                            "r3",
1✔
2058
                            3,
1✔
2059
                            "Title three",
1✔
2060
                            "Body three",
1✔
2061
                        ),
1✔
2062
                    ],
1✔
2063
                    recipes: vec![default_recipe("default_triplet_success_recipe")],
1✔
2064
                }) as DynSource]
1✔
2065
            },
1✔
2066
        );
2067

2068
        assert!(result.is_ok());
1✔
2069
    }
1✔
2070

2071
    #[test]
2072
    fn run_multi_source_demo_passes_ingestion_max_records_to_sources() {
1✔
2073
        let dir = tempdir().unwrap();
1✔
2074
        let split_store_path = dir.path().join("ingestion_config_split_store.bin");
1✔
2075
        let expected = 7;
1✔
2076

2077
        let result = run_multi_source_demo(
1✔
2078
            [
1✔
2079
                "--pair-batch".to_string(),
1✔
2080
                "--ingestion-max-records".to_string(),
1✔
2081
                expected.to_string(),
1✔
2082
                "--split-store-path".to_string(),
1✔
2083
                split_store_path.to_string_lossy().to_string(),
1✔
2084
            ]
1✔
2085
            .into_iter(),
1✔
2086
            |_| Ok(()),
1✔
2087
            |_| {
1✔
2088
                vec![Box::new(IngestionConfigSource {
1✔
2089
                    expected_ingestion_max_records: expected,
1✔
2090
                    records: (1..=8)
1✔
2091
                        .map(|day| {
8✔
2092
                            fixture_record(
8✔
2093
                                "ingestion_config_source",
8✔
2094
                                &format!("r{day}"),
8✔
2095
                                day,
8✔
2096
                                &format!("Config headline {day}"),
8✔
2097
                                &format!("Config body {day}"),
8✔
2098
                            )
2099
                        })
8✔
2100
                        .collect(),
1✔
2101
                }) as DynSource]
1✔
2102
            },
1✔
2103
        );
2104

2105
        assert!(result.is_ok());
1✔
2106
    }
1✔
2107

2108
    #[test]
2109
    fn parse_cli_handles_display_version_path() {
1✔
2110
        #[derive(Debug, Parser)]
2111
        #[command(name = "version_test", version = "1.0.0")]
2112
        struct VersionCli {}
2113

2114
        let parsed = parse_cli::<VersionCli, _>(["version_test", "--version"]).unwrap();
1✔
2115
        assert!(parsed.is_none());
1✔
2116
    }
1✔
2117

2118
    #[test]
2119
    fn run_multi_source_demo_list_text_recipes_path_succeeds() {
1✔
2120
        let dir = tempdir().unwrap();
1✔
2121
        let split_store_path = dir.path().join("recipes_split_store.bin");
1✔
2122
        let mut args = vec![
1✔
2123
            "--list-text-recipes".to_string(),
1✔
2124
            "--split-store-path".to_string(),
1✔
2125
            split_store_path.to_string_lossy().to_string(),
1✔
2126
        ];
2127
        let result = run_multi_source_demo(
1✔
2128
            args.drain(..),
1✔
2129
            |_| Ok(()),
1✔
2130
            |_| {
1✔
2131
                vec![Box::new(TestSource {
1✔
2132
                    id: "source_for_recipes".into(),
1✔
2133
                    count: Some(10),
1✔
2134
                    recipes: vec![default_recipe("recipe_a")],
1✔
2135
                }) as DynSource]
1✔
2136
            },
1✔
2137
        );
2138

2139
        assert!(result.is_ok());
1✔
2140
    }
1✔
2141

2142
    #[test]
2143
    fn run_multi_source_demo_list_text_recipes_uses_explicit_split_store_path() {
1✔
2144
        let dir = tempdir().unwrap();
1✔
2145
        let split_store_path = dir.path().join("custom_split_store.bin");
1✔
2146
        let args = vec![
1✔
2147
            "--list-text-recipes".to_string(),
1✔
2148
            "--split-store-path".to_string(),
1✔
2149
            split_store_path.to_string_lossy().to_string(),
1✔
2150
        ];
2151

2152
        let result = run_multi_source_demo(
1✔
2153
            args.into_iter(),
1✔
2154
            |_| Ok(()),
1✔
2155
            |_| {
1✔
2156
                vec![Box::new(TestSource {
1✔
2157
                    id: "source_without_text_recipes".into(),
1✔
2158
                    count: Some(1),
1✔
2159
                    recipes: Vec::new(),
1✔
2160
                }) as DynSource]
1✔
2161
            },
1✔
2162
        );
2163

2164
        assert!(result.is_ok());
1✔
2165
    }
1✔
2166

2167
    #[test]
2168
    fn run_multi_source_demo_sampling_modes_handle_empty_sources() {
1✔
2169
        for mode in [
3✔
2170
            vec!["--pair-batch".to_string()],
1✔
2171
            vec!["--text-recipes".to_string()],
1✔
2172
            vec![],
1✔
2173
        ] {
1✔
2174
            let dir = tempdir().unwrap();
3✔
2175
            let split_store_path = dir.path().join("empty_sources_split_store.bin");
3✔
2176
            let mut args = mode;
3✔
2177
            args.push("--split-store-path".to_string());
3✔
2178
            args.push(split_store_path.to_string_lossy().to_string());
3✔
2179
            args.push("--split".to_string());
3✔
2180
            args.push("validation".to_string());
3✔
2181

2182
            let result = run_multi_source_demo(
3✔
2183
                args.into_iter(),
3✔
2184
                |_| Ok(()),
3✔
2185
                |_| {
3✔
2186
                    vec![Box::new(TestSource {
3✔
2187
                        id: "source_empty".into(),
3✔
2188
                        count: Some(0),
3✔
2189
                        recipes: vec![default_recipe("recipe_empty")],
3✔
2190
                    }) as DynSource]
3✔
2191
                },
3✔
2192
            );
2193

2194
            assert!(result.is_ok());
3✔
2195
        }
2196
    }
1✔
2197

2198
    #[test]
2199
    fn run_multi_source_demo_propagates_root_resolution_error() {
1✔
2200
        let dir = tempdir().unwrap();
1✔
2201
        let split_store_path = dir.path().join("root_resolution_error_store.bin");
1✔
2202
        let result = run_multi_source_demo(
1✔
2203
            [
1✔
2204
                "--split-store-path".to_string(),
1✔
2205
                split_store_path.to_string_lossy().to_string(),
1✔
2206
            ]
1✔
2207
            .into_iter(),
1✔
2208
            |_| Err("demo root resolution failed".into()),
1✔
2209
            empty_dyn_sources,
2210
        );
2211

2212
        let err = result.unwrap_err().to_string();
1✔
2213
        assert!(err.contains("demo root resolution failed"));
1✔
2214
    }
1✔
2215

2216
    #[test]
2217
    fn run_multi_source_demo_list_text_recipes_allows_empty_sources() {
1✔
2218
        let dir = tempdir().unwrap();
1✔
2219
        let split_store_path = dir.path().join("empty_source_list_recipes.bin");
1✔
2220
        let result = run_multi_source_demo(
1✔
2221
            [
1✔
2222
                "--list-text-recipes".to_string(),
1✔
2223
                "--split-store-path".to_string(),
1✔
2224
                split_store_path.to_string_lossy().to_string(),
1✔
2225
            ]
1✔
2226
            .into_iter(),
1✔
2227
            |_| Ok(()),
1✔
2228
            empty_dyn_sources,
2229
        );
2230

2231
        assert!(result.is_ok());
1✔
2232
    }
1✔
2233

2234
    #[test]
2235
    fn print_helpers_and_extract_source_cover_paths() {
1✔
2236
        let split = SplitRatios::default();
1✔
2237
        let store = DeterministicSplitStore::new(split, 42).unwrap();
1✔
2238
        let strategy = ChunkingStrategy::default();
1✔
2239

2240
        let anchor = RecordChunk {
1✔
2241
            record_id: "source_a::rec1".to_string(),
1✔
2242
            section_idx: 0,
1✔
2243
            view: ChunkView::Window {
1✔
2244
                index: 1,
1✔
2245
                overlap: 2,
1✔
2246
                span: 12,
1✔
2247
            },
1✔
2248
            text: "anchor text".to_string(),
1✔
2249
            tokens_estimate: 8,
1✔
2250
            quality: crate::data::QualityScore { trust: 0.9 },
1✔
2251
        };
1✔
2252
        let positive = RecordChunk {
1✔
2253
            record_id: "source_a::rec2".to_string(),
1✔
2254
            section_idx: 1,
1✔
2255
            view: ChunkView::SummaryFallback {
1✔
2256
                strategy: "summary".to_string(),
1✔
2257
                weight: 0.7,
1✔
2258
            },
1✔
2259
            text: "positive text".to_string(),
1✔
2260
            tokens_estimate: 6,
1✔
2261
            quality: crate::data::QualityScore { trust: 0.8 },
1✔
2262
        };
1✔
2263
        let negative = RecordChunk {
1✔
2264
            record_id: "source_b::rec3".to_string(),
1✔
2265
            section_idx: 2,
1✔
2266
            view: ChunkView::Window {
1✔
2267
                index: 0,
1✔
2268
                overlap: 0,
1✔
2269
                span: 16,
1✔
2270
            },
1✔
2271
            text: "negative text".to_string(),
1✔
2272
            tokens_estimate: 7,
1✔
2273
            quality: crate::data::QualityScore { trust: 0.5 },
1✔
2274
        };
1✔
2275

2276
        let triplet_batch = TripletBatch {
1✔
2277
            triplets: vec![crate::SampleTriplet {
1✔
2278
                recipe: "triplet_recipe".to_string(),
1✔
2279
                anchor: anchor.clone(),
1✔
2280
                positive: positive.clone(),
1✔
2281
                negative: negative.clone(),
1✔
2282
                weight: 1.0,
1✔
2283
                instruction: Some("triplet instruction".to_string()),
1✔
2284
            }],
1✔
2285
        };
1✔
2286
        print_triplet_batch(&strategy, &triplet_batch, &store);
1✔
2287

2288
        let pair_batch = SampleBatch {
1✔
2289
            pairs: vec![crate::SamplePair {
1✔
2290
                recipe: "pair_recipe".to_string(),
1✔
2291
                anchor: anchor.clone(),
1✔
2292
                positive: positive.clone(),
1✔
2293
                weight: 1.0,
1✔
2294
                instruction: None,
1✔
2295
                label: crate::PairLabel::Positive,
1✔
2296
                reason: Some("same topic".to_string()),
1✔
2297
            }],
1✔
2298
        };
1✔
2299
        print_pair_batch(&strategy, &pair_batch, &store);
1✔
2300

2301
        let text_batch = TextBatch {
1✔
2302
            samples: vec![crate::TextSample {
1✔
2303
                recipe: "text_recipe".to_string(),
1✔
2304
                chunk: negative,
1✔
2305
                weight: 0.8,
1✔
2306
                instruction: Some("text instruction".to_string()),
1✔
2307
            }],
1✔
2308
        };
1✔
2309
        print_text_batch(&strategy, &text_batch, &store);
1✔
2310

2311
        let recipes = vec![TextRecipe {
1✔
2312
            name: "recipe_name".into(),
1✔
2313
            selector: crate::config::Selector::Role(SectionRole::Context),
1✔
2314
            instruction: Some("instruction".into()),
1✔
2315
            weight: 1.0,
1✔
2316
        }];
1✔
2317
        print_text_recipes(&recipes);
1✔
2318

2319
        assert_eq!(extract_source("source_a::record"), "source_a");
1✔
2320
        assert_eq!(extract_source("record-without-delimiter"), "unknown");
1✔
2321
    }
1✔
2322

2323
    #[test]
2324
    fn split_arg_conversion_and_version_parse_paths_are_covered() {
1✔
2325
        assert!(matches!(
1✔
2326
            SplitLabel::from(SplitArg::Train),
1✔
2327
            SplitLabel::Train
2328
        ));
2329
        assert!(matches!(
1✔
2330
            SplitLabel::from(SplitArg::Validation),
1✔
2331
            SplitLabel::Validation
2332
        ));
2333
        assert!(matches!(SplitLabel::from(SplitArg::Test), SplitLabel::Test));
1✔
2334
    }
1✔
2335

2336
    #[test]
2337
    fn parse_split_ratios_reports_per_field_parse_errors() {
1✔
2338
        assert!(
1✔
2339
            parse_split_ratios_arg("x,0.1,0.9")
1✔
2340
                .unwrap_err()
1✔
2341
                .contains("invalid train ratio")
1✔
2342
        );
2343
        assert!(
1✔
2344
            parse_split_ratios_arg("0.1,y,0.8")
1✔
2345
                .unwrap_err()
1✔
2346
                .contains("invalid validation ratio")
1✔
2347
        );
2348
        assert!(
1✔
2349
            parse_split_ratios_arg("0.1,0.2,z")
1✔
2350
                .unwrap_err()
1✔
2351
                .contains("invalid test ratio")
1✔
2352
        );
2353
    }
1✔
2354

2355
    #[test]
2356
    fn run_multi_source_demo_exhausted_paths_are_handled() {
1✔
2357
        for mode in [
3✔
2358
            vec!["--pair-batch".to_string()],
1✔
2359
            vec!["--text-recipes".to_string()],
1✔
2360
            Vec::new(),
1✔
2361
        ] {
1✔
2362
            let dir = tempdir().unwrap();
3✔
2363
            let split_store_path = dir.path().join("exhausted_split_store.bin");
3✔
2364
            let mut args = mode;
3✔
2365
            args.push("--split-store-path".to_string());
3✔
2366
            args.push(split_store_path.to_string_lossy().to_string());
3✔
2367

2368
            let result = run_multi_source_demo(
3✔
2369
                args.into_iter(),
3✔
2370
                |_| Ok(()),
3✔
2371
                |_| {
3✔
2372
                    vec![Box::new(TestSource {
3✔
2373
                        id: "source_without_recipes".into(),
3✔
2374
                        count: Some(1),
3✔
2375
                        recipes: Vec::new(),
3✔
2376
                    }) as DynSource]
3✔
2377
                },
3✔
2378
            );
2379

2380
            assert!(result.is_ok());
3✔
2381
        }
2382
    }
1✔
2383

2384
    #[test]
2385
    fn run_multi_source_demo_reset_recreates_split_store_and_samples() {
1✔
2386
        let dir = tempdir().unwrap();
1✔
2387
        let split_store_path = dir.path().join("reset_split_store.bin");
1✔
2388
        std::fs::write(&split_store_path, b"stale-data").unwrap();
1✔
2389

2390
        let args = vec![
1✔
2391
            "--reset".to_string(),
1✔
2392
            "--pair-batch".to_string(),
1✔
2393
            "--split-store-path".to_string(),
1✔
2394
            split_store_path.to_string_lossy().to_string(),
1✔
2395
        ];
2396

2397
        let result = run_multi_source_demo(
1✔
2398
            args.into_iter(),
1✔
2399
            |_| Ok(()),
1✔
2400
            |_| {
1✔
2401
                let recipes = vec![default_recipe("fixture_recipe")];
1✔
2402
                let records: Vec<DataRecord> = (1..=8)
1✔
2403
                    .map(|day| {
8✔
2404
                        fixture_record(
8✔
2405
                            "fixture_source",
8✔
2406
                            &format!("r{day}"),
8✔
2407
                            day,
8✔
2408
                            &format!("Fixture headline {day}"),
8✔
2409
                            &format!("Fixture body content for day {day}."),
8✔
2410
                        )
2411
                    })
8✔
2412
                    .collect();
1✔
2413
                vec![Box::new(FixtureSource {
1✔
2414
                    id: "fixture_source".into(),
1✔
2415
                    records,
1✔
2416
                    recipes,
1✔
2417
                }) as DynSource]
1✔
2418
            },
1✔
2419
        );
2420

2421
        assert!(result.is_ok());
1✔
2422
        assert!(split_store_path.exists());
1✔
2423
        let metadata = std::fs::metadata(&split_store_path).unwrap();
1✔
2424
        assert!(metadata.len() > 0);
1✔
2425
    }
1✔
2426

2427
    #[test]
2428
    fn run_multi_source_demo_batches_mode_executes_multiple_batches() {
1✔
2429
        let dir = tempdir().unwrap();
1✔
2430
        let split_store_path = dir.path().join("batches_split_store.bin");
1✔
2431
        let args = vec![
1✔
2432
            "--batches".to_string(),
1✔
2433
            "2".to_string(),
1✔
2434
            "--split-store-path".to_string(),
1✔
2435
            split_store_path.to_string_lossy().to_string(),
1✔
2436
        ];
2437

2438
        let result = run_multi_source_demo(
1✔
2439
            args.into_iter(),
1✔
2440
            |_| Ok(()),
1✔
2441
            |_| {
1✔
2442
                let recipes = vec![default_recipe("batch_recipe")];
1✔
2443
                vec![Box::new(FixtureSource {
1✔
2444
                    id: "batch_source".into(),
1✔
2445
                    records: vec![
1✔
2446
                        fixture_record(
1✔
2447
                            "batch_source",
1✔
2448
                            "r1",
1✔
2449
                            3,
1✔
2450
                            "Inflation cools in latest report",
1✔
2451
                            "Core inflation moderated compared with prior quarter.",
1✔
2452
                        ),
1✔
2453
                        fixture_record(
1✔
2454
                            "batch_source",
1✔
2455
                            "r2",
1✔
2456
                            4,
1✔
2457
                            "Labor market remains resilient",
1✔
2458
                            "Job openings remain elevated despite slower growth.",
1✔
2459
                        ),
1✔
2460
                        fixture_record(
1✔
2461
                            "batch_source",
1✔
2462
                            "r3",
1✔
2463
                            5,
1✔
2464
                            "Manufacturing sentiment stabilizes",
1✔
2465
                            "Survey data suggests output expectations are improving.",
1✔
2466
                        ),
1✔
2467
                    ],
1✔
2468
                    recipes,
1✔
2469
                }) as DynSource]
1✔
2470
            },
1✔
2471
        );
2472

2473
        assert!(result.is_ok());
1✔
2474
        assert!(split_store_path.exists());
1✔
2475
    }
1✔
2476

2477
    #[test]
2478
    fn managed_demo_split_store_path_resolves_under_cache_group() {
1✔
2479
        let path = managed_demo_split_store_path().unwrap();
1✔
2480
        assert!(path.ends_with(MULTI_SOURCE_DEMO_STORE_FILENAME));
1✔
2481
        let parent = path
1✔
2482
            .parent()
1✔
2483
            .expect("managed split-store path should have a parent");
1✔
2484
        assert!(parent.ends_with(PathBuf::from(MULTI_SOURCE_DEMO_GROUP)));
1✔
2485
    }
1✔
2486

2487
    #[test]
2488
    fn run_multi_source_demo_help_returns_ok_without_work() {
1✔
2489
        let no_help = run_multi_source_demo(
1✔
2490
            std::iter::empty::<String>(),
1✔
2491
            error_unit_roots,
2492
            empty_dyn_sources,
2493
        );
2494
        assert!(
1✔
2495
            no_help
1✔
2496
                .expect_err("non-help path should attempt to resolve roots")
1✔
2497
                .to_string()
1✔
2498
                .contains("root-resolution-error")
1✔
2499
        );
2500

2501
        let result = run_multi_source_demo(
1✔
2502
            ["--help".to_string()].into_iter(),
1✔
2503
            ok_unit_roots,
2504
            empty_dyn_sources,
2505
        );
2506

2507
        assert!(result.is_ok());
1✔
2508
    }
1✔
2509

2510
    #[test]
2511
    fn run_estimate_capacity_help_returns_ok_without_work() {
1✔
2512
        let result = run_estimate_capacity(
1✔
2513
            ["--help".to_string()].into_iter(),
1✔
2514
            ok_unit_roots,
2515
            empty_dyn_sources,
2516
        );
2517

2518
        assert!(result.is_ok());
1✔
2519
    }
1✔
2520

2521
    #[test]
2522
    fn run_multi_source_demo_pair_exhausted_branch_returns_ok() {
1✔
2523
        let dir = tempdir().unwrap();
1✔
2524
        let split_store_path = dir.path().join("pair_exhausted_split_store.bin");
1✔
2525
        let args = vec![
1✔
2526
            "--pair-batch".to_string(),
1✔
2527
            "--split-store-path".to_string(),
1✔
2528
            split_store_path.to_string_lossy().to_string(),
1✔
2529
        ];
2530

2531
        let result = run_multi_source_demo(
1✔
2532
            args.into_iter(),
1✔
2533
            |_| Ok(()),
1✔
2534
            |_| {
1✔
2535
                vec![Box::new(FixtureSource {
1✔
2536
                    id: "pair_exhausted_source".into(),
1✔
2537
                    records: vec![fixture_record(
1✔
2538
                        "pair_exhausted_source",
1✔
2539
                        "r1",
1✔
2540
                        1,
1✔
2541
                        "Single record title",
1✔
2542
                        "Single record body",
1✔
2543
                    )],
1✔
2544
                    recipes: vec![default_recipe("pair_exhausted_recipe")],
1✔
2545
                }) as DynSource]
1✔
2546
            },
1✔
2547
        );
2548

2549
        assert!(result.is_ok());
1✔
2550
    }
1✔
2551

2552
    #[test]
2553
    fn run_multi_source_demo_uses_managed_split_store_path_when_not_provided() {
1✔
2554
        let result = run_multi_source_demo(
1✔
2555
            ["--list-text-recipes".to_string()].into_iter(),
1✔
2556
            |_| Ok(()),
1✔
2557
            |_| {
1✔
2558
                vec![Box::new(TestSource {
1✔
2559
                    id: "managed_path_source".into(),
1✔
2560
                    count: Some(2),
1✔
2561
                    recipes: vec![default_recipe("managed_recipe")],
1✔
2562
                }) as DynSource]
1✔
2563
            },
1✔
2564
        );
2565

2566
        assert!(result.is_ok());
1✔
2567
    }
1✔
2568

2569
    #[test]
2570
    fn run_multi_source_demo_reset_errors_when_target_is_directory() {
1✔
2571
        let dir = tempdir().unwrap();
1✔
2572
        let split_store_path = dir.path().join("split_store_dir");
1✔
2573
        std::fs::create_dir(&split_store_path).unwrap();
1✔
2574

2575
        let result = run_multi_source_demo(
1✔
2576
            [
1✔
2577
                "--reset".to_string(),
1✔
2578
                "--split-store-path".to_string(),
1✔
2579
                split_store_path.to_string_lossy().to_string(),
1✔
2580
            ]
1✔
2581
            .into_iter(),
1✔
2582
            |_| Ok(()),
1✔
2583
            empty_dyn_sources,
2584
        );
2585

2586
        let err = result.unwrap_err().to_string();
1✔
2587
        assert!(err.contains("failed to remove split store"));
1✔
2588
    }
1✔
2589

2590
    #[test]
2591
    fn print_summary_helpers_accept_empty_iterators() {
1✔
2592
        print_source_summary("empty summary", std::iter::empty::<&str>());
1✔
2593
        print_recipe_context_by_source("empty recipe context", std::iter::empty::<(&str, &str)>());
1✔
2594
    }
1✔
2595

2596
    #[cfg(feature = "extended-metrics")]
2597
    #[test]
2598
    fn metric_mean_median_handles_even_length_inputs() {
1✔
2599
        let mut vals = [1.0, 4.0, 2.0, 3.0];
1✔
2600
        let (mean, median) = metric_mean_median(&mut vals);
1✔
2601
        assert!((mean - 2.5).abs() < 1e-6);
1✔
2602
        assert!((median - 2.5).abs() < 1e-6);
1✔
2603
    }
1✔
2604

2605
    #[cfg(feature = "extended-metrics")]
2606
    #[test]
2607
    fn metric_mean_median_handles_odd_length_inputs() {
1✔
2608
        let mut vals = [3.0, 1.0, 2.0];
1✔
2609
        let (mean, median) = metric_mean_median(&mut vals);
1✔
2610
        assert!((mean - 2.0).abs() < 1e-6);
1✔
2611
        assert!((median - 2.0).abs() < 1e-6);
1✔
2612
    }
1✔
2613

2614
    #[cfg(feature = "extended-metrics")]
2615
    #[test]
2616
    fn print_metric_summary_includes_multi_source_aggregate() {
1✔
2617
        let source_data = HashMap::from([
1✔
2618
            (
1✔
2619
                "source_a".to_string(),
1✔
2620
                vec![(0.9, 0.8, 0.2, 0.1, 0.7), (0.8, 0.7, 0.3, 0.2, 0.8)],
1✔
2621
            ),
1✔
2622
            (
1✔
2623
                "source_b".to_string(),
1✔
2624
                vec![(0.7, 0.6, 0.4, 0.3, 0.5), (0.6, 0.5, 0.5, 0.4, 0.6)],
1✔
2625
            ),
1✔
2626
        ]);
1✔
2627

2628
        print_metric_summary(&source_data);
1✔
2629
    }
1✔
2630
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc