• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jzombie / rust-triplets / 23559737756

25 Mar 2026 07:21PM UTC coverage: 93.167% (-1.6%) from 94.794%
23559737756

Pull #40

github

web-flow
Merge 7cf56e29c into 65addee9d
Pull Request #40: Refactor BM25 integration

2570 of 2918 new or added lines in 6 files covered. (88.07%)

8 existing lines in 1 file now uncovered.

15217 of 16333 relevant lines covered (93.17%)

135123.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.89
/src/example_apps.rs
1
// TODO: Consider extracting to a debug crate
2

3
use std::collections::HashMap;
4
use std::error::Error;
5
use std::path::PathBuf;
6
use std::sync::Arc;
7
use std::sync::Once;
8
use std::time::Instant;
9

10
use cache_manager::CacheRoot;
11
use clap::{Parser, ValueEnum, error::ErrorKind};
12

13
use crate::config::{ChunkingStrategy, SamplerConfig, TripletRecipe};
14
use crate::constants::cache::{MULTI_SOURCE_DEMO_GROUP, MULTI_SOURCE_DEMO_STORE_FILENAME};
15
use crate::data::ChunkView;
16
use crate::heuristics::{
17
    CapacityTotals, EFFECTIVE_NEGATIVES_PER_ANCHOR, EFFECTIVE_POSITIVES_PER_ANCHOR,
18
    estimate_source_split_capacity_from_counts, format_replay_factor, format_u128_with_commas,
19
    resolve_text_recipes_for_source, split_counts_for_total,
20
};
21
use crate::metrics::source_skew;
22
use crate::sampler::chunk_weight;
23
use crate::source::DataSource;
24
use crate::splits::{FileSplitStore, SplitLabel, SplitRatios, SplitStore};
25
use crate::{
26
    RecordChunk, SampleBatch, Sampler, SamplerError, SourceId, TextBatch, TextRecipe, TripletBatch,
27
    TripletSampler,
28
};
29

30
type DynSource = Box<dyn DataSource + 'static>;
31

32
fn managed_demo_split_store_path() -> Result<PathBuf, String> {
2✔
33
    let cache_root = CacheRoot::from_discovery()
2✔
34
        .map_err(|err| format!("failed discovering managed cache root: {err}"))?;
2✔
35
    let group = PathBuf::from(MULTI_SOURCE_DEMO_GROUP);
2✔
36
    let dir = cache_root.ensure_group(&group).map_err(|err| {
2✔
37
        format!(
×
38
            "failed creating managed demo cache group '{}': {err}",
39
            group.display()
×
40
        )
41
    })?;
×
42
    Ok(dir.join(MULTI_SOURCE_DEMO_STORE_FILENAME))
2✔
43
}
2✔
44

45
fn init_example_tracing() {
22✔
46
    static INIT: Once = Once::new();
47
    INIT.call_once(|| {
22✔
48
        let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
1✔
49
            .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("triplets=info"));
1✔
50
        let _ = tracing_subscriber::fmt()
1✔
51
            .with_env_filter(env_filter)
1✔
52
            .try_init();
1✔
53
    });
1✔
54
}
22✔
55

56
#[derive(Debug, Clone, Copy, ValueEnum)]
57
/// CLI split selector mapped onto `SplitLabel`.
58
enum SplitArg {
59
    Train,
60
    Validation,
61
    Test,
62
}
63

64
impl From<SplitArg> for SplitLabel {
65
    fn from(value: SplitArg) -> Self {
6✔
66
        match value {
6✔
67
            SplitArg::Train => SplitLabel::Train,
1✔
68
            SplitArg::Validation => SplitLabel::Validation,
4✔
69
            SplitArg::Test => SplitLabel::Test,
1✔
70
        }
71
    }
6✔
72
}
73

74
#[derive(Debug, Parser)]
75
#[command(
76
    name = "estimate_capacity",
77
    disable_help_subcommand = true,
78
    about = "Metadata-only capacity estimation",
79
    long_about = "Estimate record, pair, triplet, and text-sample capacity using source-reported counts only (no data refresh).",
80
    after_help = "Source roots are optional and resolved in order by explicit arg, environment variables, then project defaults."
81
)]
82
/// CLI arguments for metadata-only capacity estimation.
83
struct EstimateCapacityCli {
84
    #[arg(
85
        long,
86
        default_value_t = 99,
87
        help = "Deterministic seed used for split allocation"
88
    )]
89
    seed: u64,
90
    #[arg(
91
        long = "split-ratios",
92
        value_name = "TRAIN,VALIDATION,TEST",
93
        value_parser = parse_split_ratios_arg,
94
        default_value = "0.8,0.1,0.1",
95
        help = "Comma-separated split ratios that must sum to 1.0"
96
    )]
97
    split: SplitRatios,
98
    #[arg(
99
        long = "source-root",
100
        value_name = "PATH",
101
        help = "Optional source root override, repeat as needed in source order"
102
    )]
103
    source_roots: Vec<String>,
104
}
105

106
#[derive(Debug, Parser)]
107
#[command(
108
    name = "multi_source_demo",
109
    disable_help_subcommand = true,
110
    about = "Run sampled batches from multiple sources",
111
    long_about = "Sample triplet, pair, or text batches from multiple sources and persist split/epoch state.",
112
    after_help = "Source roots are optional and resolved in order by explicit arg, environment variables, then project defaults."
113
)]
114
/// CLI for `multi_source_demo`.
115
///
116
/// Common usage:
117
/// - Use managed cache-group default path (no flag)
118
/// - Set an explicit file path: `--split-store-path /tmp/split_store.bin`
119
/// - Repeat `--source-root <PATH>` to override source roots in order
120
struct MultiSourceDemoCli {
121
    #[arg(
122
        long = "text-recipes",
123
        help = "Emit a text batch instead of a triplet batch"
124
    )]
125
    show_text_samples: bool,
126
    #[arg(
127
        long = "pair-batch",
128
        help = "Emit a pair batch instead of a triplet batch"
129
    )]
130
    show_pair_samples: bool,
131
    #[arg(
132
        long = "list-text-recipes",
133
        help = "Print registered text recipes and exit"
134
    )]
135
    list_text_recipes: bool,
136
    #[arg(
137
        long = "batch-size",
138
        default_value_t = 4,
139
        value_parser = parse_batch_size,
140
        help = "Batch size used for sampling"
141
    )]
142
    batch_size: usize,
143
    #[arg(
144
        long = "ingestion-max-records",
145
        default_value_t = default_ingestion_max_records(),
146
        value_parser = parse_ingestion_max_records,
147
        help = "Per-source ingestion buffer target used while refreshing records"
148
    )]
149
    ingestion_max_records: usize,
150
    #[arg(long, help = "Optional deterministic seed override")]
151
    seed: Option<u64>,
152
    #[arg(long, value_enum, help = "Target split to sample from")]
153
    split: Option<SplitArg>,
154
    #[arg(
155
        long = "source-root",
156
        value_name = "PATH",
157
        help = "Optional source root override, repeat as needed in source order"
158
    )]
159
    source_roots: Vec<String>,
160
    #[arg(
161
        long = "split-store-path",
162
        value_name = "SPLIT_STORE_PATH",
163
        help = "Optional explicit path for persisted split/epoch state file"
164
    )]
165
    split_store_path: Option<PathBuf>,
166
    #[arg(
167
        long = "reset",
168
        help = "Delete the persisted split/epoch state before sampling, restarting from epoch 0"
169
    )]
170
    reset: bool,
171
    #[arg(
172
        long = "batches",
173
        value_name = "N",
174
        value_parser = parse_batch_count,
175
        help = "Run N triplet batches in succession, printing a timing line per batch and (with --features extended-metrics) a per-source similarity summary at the end"
176
    )]
177
    batches: Option<usize>,
178
}
179

180
#[derive(Debug, Clone)]
181
/// Source-level inventory used by capacity estimation output.
182
struct SourceInventory {
183
    source_id: String,
184
    reported_records: u128,
185
    triplet_recipes: Vec<TripletRecipe>,
186
}
187

188
/// Run the capacity-estimation CLI with injectable root resolution/source builders.
189
///
190
/// `build_sources` is construction-only; sampler configuration is applied
191
/// centrally by this function before any source calls.
192
pub fn run_estimate_capacity<R, Resolve, Build, I>(
4✔
193
    args_iter: I,
4✔
194
    resolve_roots: Resolve,
4✔
195
    build_sources: Build,
4✔
196
) -> Result<(), Box<dyn Error>>
4✔
197
where
4✔
198
    Resolve: FnOnce(Vec<String>) -> Result<R, Box<dyn Error>>,
4✔
199
    Build: FnOnce(&R) -> Vec<DynSource>,
4✔
200
    I: Iterator<Item = String>,
4✔
201
{
202
    init_example_tracing();
4✔
203

204
    let Some(cli) = parse_cli::<EstimateCapacityCli, _>(
4✔
205
        std::iter::once("estimate_capacity".to_string()).chain(args_iter),
4✔
206
    )?
×
207
    else {
208
        return Ok(());
×
209
    };
210

211
    let roots = resolve_roots(cli.source_roots)?;
4✔
212

213
    let config = SamplerConfig {
3✔
214
        seed: cli.seed,
3✔
215
        split: cli.split,
3✔
216
        ..SamplerConfig::default()
3✔
217
    };
3✔
218

219
    let sources = build_sources(&roots);
3✔
220

221
    let mut inventories = Vec::new();
3✔
222
    for source in &sources {
3✔
223
        let recipes = if config.recipes.is_empty() {
3✔
224
            source.default_triplet_recipes()
3✔
225
        } else {
226
            config.recipes.clone()
×
227
        };
228
        let reported_records = source.reported_record_count(&config).map_err(|err| {
3✔
229
            format!(
1✔
230
                "source '{}' failed to report exact record count: {err}",
231
                source.id()
1✔
232
            )
233
        })?;
1✔
234
        inventories.push(SourceInventory {
2✔
235
            source_id: source.id().to_string(),
2✔
236
            reported_records,
2✔
237
            triplet_recipes: recipes,
2✔
238
        });
2✔
239
    }
240

241
    let mut per_source_split_counts: HashMap<(String, SplitLabel), u128> = HashMap::new();
2✔
242
    let mut split_record_counts: HashMap<SplitLabel, u128> = HashMap::new();
2✔
243

244
    for source in &inventories {
2✔
245
        let counts = split_counts_for_total(source.reported_records, cli.split);
2✔
246
        for (label, count) in counts {
6✔
247
            per_source_split_counts.insert((source.source_id.clone(), label), count);
6✔
248
            *split_record_counts.entry(label).or_insert(0) += count;
6✔
249
        }
6✔
250
    }
251

252
    let mut totals_by_split: HashMap<SplitLabel, CapacityTotals> = HashMap::new();
2✔
253
    let mut totals_by_source_and_split: HashMap<(String, SplitLabel), CapacityTotals> =
2✔
254
        HashMap::new();
2✔
255

256
    for split_label in [SplitLabel::Train, SplitLabel::Validation, SplitLabel::Test] {
6✔
257
        let mut totals = CapacityTotals::default();
6✔
258

259
        for source in &inventories {
6✔
260
            let source_split_records = per_source_split_counts
6✔
261
                .get(&(source.source_id.clone(), split_label))
6✔
262
                .copied()
6✔
263
                .unwrap_or(0);
6✔
264

6✔
265
            let triplet_recipes = &source.triplet_recipes;
6✔
266
            let text_recipes = resolve_text_recipes_for_source(&config, triplet_recipes);
6✔
267

6✔
268
            let capacity = estimate_source_split_capacity_from_counts(
6✔
269
                source_split_records,
6✔
270
                triplet_recipes,
6✔
271
                &text_recipes,
6✔
272
            );
6✔
273

6✔
274
            totals_by_source_and_split.insert((source.source_id.clone(), split_label), capacity);
6✔
275

6✔
276
            totals.triplets += capacity.triplets;
6✔
277
            totals.effective_triplets += capacity.effective_triplets;
6✔
278
            totals.pairs += capacity.pairs;
6✔
279
            totals.text_samples += capacity.text_samples;
6✔
280
        }
6✔
281

282
        totals_by_split.insert(split_label, totals);
6✔
283
    }
284

285
    let min_nonzero_records_by_split: HashMap<SplitLabel, u128> =
2✔
286
        [SplitLabel::Train, SplitLabel::Validation, SplitLabel::Test]
2✔
287
            .into_iter()
2✔
288
            .map(|split_label| {
6✔
289
                let min_nonzero = inventories
6✔
290
                    .iter()
6✔
291
                    .filter_map(|source| {
6✔
292
                        per_source_split_counts
6✔
293
                            .get(&(source.source_id.clone(), split_label))
6✔
294
                            .copied()
6✔
295
                    })
6✔
296
                    .filter(|&records| records > 0)
6✔
297
                    .min()
6✔
298
                    .unwrap_or(0);
6✔
299
                (split_label, min_nonzero)
6✔
300
            })
6✔
301
            .collect();
2✔
302

303
    let min_nonzero_records_all_splits = inventories
2✔
304
        .iter()
2✔
305
        .map(|source| source.reported_records)
2✔
306
        .filter(|&records| records > 0)
2✔
307
        .min()
2✔
308
        .unwrap_or(0);
2✔
309

310
    println!("=== capacity estimate (length-only) ===");
2✔
311
    println!("mode: metadata-only (no source.refresh calls)");
2✔
312
    println!("classification: heuristic approximation (not exact)");
2✔
313
    println!("split seed: {}", cli.seed);
2✔
314
    println!(
2✔
315
        "split ratios: train={:.4}, validation={:.4}, test={:.4}",
316
        cli.split.train, cli.split.validation, cli.split.test
317
    );
318
    println!();
2✔
319

320
    println!("[SOURCES]");
2✔
321
    for source in &inventories {
2✔
322
        println!(
2✔
323
            "  {} => reported records: {}",
2✔
324
            source.source_id,
2✔
325
            format_u128_with_commas(source.reported_records)
2✔
326
        );
2✔
327
    }
2✔
328
    println!();
2✔
329

330
    println!("[PER SOURCE BREAKDOWN]");
2✔
331
    for source in &inventories {
2✔
332
        println!("  {}", source.source_id);
2✔
333
        let mut source_grand = CapacityTotals::default();
2✔
334
        let mut source_total_records = 0u128;
2✔
335
        for split_label in [SplitLabel::Train, SplitLabel::Validation, SplitLabel::Test] {
6✔
336
            let split_records = per_source_split_counts
6✔
337
                .get(&(source.source_id.clone(), split_label))
6✔
338
                .copied()
6✔
339
                .unwrap_or(0);
6✔
340
            source_total_records = source_total_records.saturating_add(split_records);
6✔
341
            let split_longest_records = inventories
6✔
342
                .iter()
6✔
343
                .map(|candidate| {
6✔
344
                    per_source_split_counts
6✔
345
                        .get(&(candidate.source_id.clone(), split_label))
6✔
346
                        .copied()
6✔
347
                        .unwrap_or(0)
6✔
348
                })
6✔
349
                .max()
6✔
350
                .unwrap_or(0);
6✔
351
            let totals = totals_by_source_and_split
6✔
352
                .get(&(source.source_id.clone(), split_label))
6✔
353
                .copied()
6✔
354
                .unwrap_or_default();
6✔
355
            source_grand.triplets += totals.triplets;
6✔
356
            source_grand.effective_triplets += totals.effective_triplets;
6✔
357
            source_grand.pairs += totals.pairs;
6✔
358
            source_grand.text_samples += totals.text_samples;
6✔
359
            println!("    [{:?}]", split_label);
6✔
360
            println!("      records: {}", format_u128_with_commas(split_records));
6✔
361
            println!(
6✔
362
                "      triplet combinations: {}",
363
                format_u128_with_commas(totals.triplets)
6✔
364
            );
365
            println!(
6✔
366
                "      effective sampled triplets (p={}, k={}): {}",
367
                EFFECTIVE_POSITIVES_PER_ANCHOR,
368
                EFFECTIVE_NEGATIVES_PER_ANCHOR,
369
                format_u128_with_commas(totals.effective_triplets)
6✔
370
            );
371
            println!(
6✔
372
                "      pair combinations:    {}",
373
                format_u128_with_commas(totals.pairs)
6✔
374
            );
375
            println!(
6✔
376
                "      text samples:         {}",
377
                format_u128_with_commas(totals.text_samples)
6✔
378
            );
379
            println!(
6✔
380
                "      replay factor vs longest source: {}",
381
                format_replay_factor(split_longest_records, split_records)
6✔
382
            );
383
            println!(
6✔
384
                "      suggested proportional-size batch weight (0-1): {:.4}",
385
                suggested_balancing_weight(split_longest_records, split_records)
6✔
386
            );
387
            let split_smallest_nonzero = min_nonzero_records_by_split
6✔
388
                .get(&split_label)
6✔
389
                .copied()
6✔
390
                .unwrap_or(0);
6✔
391
            println!(
6✔
392
                "      suggested small-source-boost batch weight (0-1): {:.4}",
393
                suggested_oversampling_weight(split_smallest_nonzero, split_records)
6✔
394
            );
395
            println!();
6✔
396
        }
397
        let longest_source_total = inventories
2✔
398
            .iter()
2✔
399
            .map(|candidate| candidate.reported_records)
2✔
400
            .max()
2✔
401
            .unwrap_or(0);
2✔
402
        println!("    [ALL SPLITS FOR SOURCE]");
2✔
403
        println!(
2✔
404
            "      triplet combinations: {}",
405
            format_u128_with_commas(source_grand.triplets)
2✔
406
        );
407
        println!(
2✔
408
            "      effective sampled triplets (p={}, k={}): {}",
409
            EFFECTIVE_POSITIVES_PER_ANCHOR,
410
            EFFECTIVE_NEGATIVES_PER_ANCHOR,
411
            format_u128_with_commas(source_grand.effective_triplets)
2✔
412
        );
413
        println!(
2✔
414
            "      pair combinations:    {}",
415
            format_u128_with_commas(source_grand.pairs)
2✔
416
        );
417
        println!(
2✔
418
            "      text samples:         {}",
419
            format_u128_with_commas(source_grand.text_samples)
2✔
420
        );
421
        println!(
2✔
422
            "      replay factor vs longest source: {}",
423
            format_replay_factor(longest_source_total, source_total_records)
2✔
424
        );
425
        println!(
2✔
426
            "      suggested proportional-size batch weight (0-1): {:.4}",
427
            suggested_balancing_weight(longest_source_total, source_total_records)
2✔
428
        );
429
        println!(
2✔
430
            "      suggested small-source-boost batch weight (0-1): {:.4}",
431
            suggested_oversampling_weight(min_nonzero_records_all_splits, source_total_records)
2✔
432
        );
433
        println!();
2✔
434
    }
435

436
    let mut grand = CapacityTotals::default();
2✔
437
    for split_label in [SplitLabel::Train, SplitLabel::Validation, SplitLabel::Test] {
6✔
438
        let record_count = split_record_counts.get(&split_label).copied().unwrap_or(0);
6✔
439
        let totals = totals_by_split
6✔
440
            .get(&split_label)
6✔
441
            .copied()
6✔
442
            .unwrap_or_default();
6✔
443

6✔
444
        grand.triplets += totals.triplets;
6✔
445
        grand.effective_triplets += totals.effective_triplets;
6✔
446
        grand.pairs += totals.pairs;
6✔
447
        grand.text_samples += totals.text_samples;
6✔
448

6✔
449
        println!("[{:?}]", split_label);
6✔
450
        println!("  records: {}", format_u128_with_commas(record_count));
6✔
451
        println!(
6✔
452
            "  triplet combinations: {}",
6✔
453
            format_u128_with_commas(totals.triplets)
6✔
454
        );
6✔
455
        println!(
6✔
456
            "  effective sampled triplets (p={}, k={}): {}",
6✔
457
            EFFECTIVE_POSITIVES_PER_ANCHOR,
6✔
458
            EFFECTIVE_NEGATIVES_PER_ANCHOR,
6✔
459
            format_u128_with_commas(totals.effective_triplets)
6✔
460
        );
6✔
461
        println!(
6✔
462
            "  pair combinations:    {}",
6✔
463
            format_u128_with_commas(totals.pairs)
6✔
464
        );
6✔
465
        println!(
6✔
466
            "  text samples:         {}",
6✔
467
            format_u128_with_commas(totals.text_samples)
6✔
468
        );
6✔
469
        println!();
6✔
470
    }
6✔
471

472
    println!("[ALL SPLITS TOTAL]");
2✔
473
    println!(
2✔
474
        "  triplet combinations: {}",
475
        format_u128_with_commas(grand.triplets)
2✔
476
    );
477
    println!(
2✔
478
        "  effective sampled triplets (p={}, k={}): {}",
479
        EFFECTIVE_POSITIVES_PER_ANCHOR,
480
        EFFECTIVE_NEGATIVES_PER_ANCHOR,
481
        format_u128_with_commas(grand.effective_triplets)
2✔
482
    );
483
    println!(
2✔
484
        "  pair combinations:    {}",
485
        format_u128_with_commas(grand.pairs)
2✔
486
    );
487
    println!(
2✔
488
        "  text samples:         {}",
489
        format_u128_with_commas(grand.text_samples)
2✔
490
    );
491
    println!();
2✔
492
    println!(
2✔
493
        "Note: counts are heuristic, length-based estimates from source-reported totals and recipe structure. They are approximate, not exact, and assume anchor-positive pairs=records (one positive per anchor by default), negatives=source_records_in_split-1 (anchor excluded as its own negative), and at most one chunk/window realization per sample. In real-world chunked sampling, practical combinations are often higher, so treat this as a floor-like baseline."
494
    );
495
    println!();
2✔
496
    println!(
2✔
497
        "Effective sampled triplets apply a bounded training assumption: effective_triplets = records * p * k per triplet recipe, with defaults p={} positives per anchor and k={} negatives per anchor.",
498
        EFFECTIVE_POSITIVES_PER_ANCHOR, EFFECTIVE_NEGATIVES_PER_ANCHOR
499
    );
500
    println!();
2✔
501
    println!(
2✔
502
        "Oversample loops are not inferred from this static report. To measure true oversampling (how many times sampling loops through the combination space), use observed sampled draw counts from an actual run."
503
    );
504
    println!();
2✔
505
    println!(
2✔
506
        "Suggested proportional-size batch weight (0-1) is source/max_source by record count: 1.0 for the largest source in scope, smaller values for smaller sources."
507
    );
508
    println!();
2✔
509
    println!(
2✔
510
        "Suggested small-source-boost batch weight (0-1) is min_nonzero_source/source by record count: 1.0 for the smallest non-zero source in scope, smaller values for larger sources."
511
    );
512
    println!();
2✔
513
    println!(
2✔
514
        "When passed to next_*_batch_with_weights, higher weight means that source is sampled more often relative to lower-weight sources."
515
    );
516

517
    Ok(())
2✔
518
}
4✔
519

520
/// Run the multi-source demo CLI with injectable root resolution/source builders.
521
///
522
/// `build_sources` is construction-only. Source sampler configuration is owned
523
/// by sampler registration (`TripletSampler::register_source`).
524
pub fn run_multi_source_demo<R, Resolve, Build, I>(
18✔
525
    args_iter: I,
18✔
526
    resolve_roots: Resolve,
18✔
527
    build_sources: Build,
18✔
528
) -> Result<(), Box<dyn Error>>
18✔
529
where
18✔
530
    Resolve: FnOnce(Vec<String>) -> Result<R, Box<dyn Error>>,
18✔
531
    Build: FnOnce(&R) -> Vec<DynSource>,
18✔
532
    I: Iterator<Item = String>,
18✔
533
{
534
    init_example_tracing();
18✔
535

536
    let Some(cli) = parse_cli::<MultiSourceDemoCli, _>(
18✔
537
        std::iter::once("multi_source_demo".to_string()).chain(args_iter),
18✔
538
    )?
×
539
    else {
540
        return Ok(());
1✔
541
    };
542

543
    let roots = resolve_roots(cli.source_roots)?;
17✔
544

545
    let mut config = SamplerConfig::default();
16✔
546
    config.seed = cli.seed.unwrap_or(config.seed);
16✔
547
    config.batch_size = cli.batch_size;
16✔
548
    config.ingestion_max_records = cli.ingestion_max_records;
16✔
549
    config.chunking = Default::default();
16✔
550
    let selected_split = cli.split.map(Into::into).unwrap_or(SplitLabel::Train);
16✔
551
    config.split = SplitRatios::default();
16✔
552
    config.allowed_splits = vec![selected_split];
16✔
553
    let chunking = config.chunking.clone();
16✔
554
    let config_snapshot = MultiSourceDemoConfigSnapshot {
16✔
555
        seed: config.seed,
16✔
556
        batch_size: config.batch_size,
16✔
557
        ingestion_max_records: config.ingestion_max_records,
16✔
558
        split: selected_split,
16✔
559
        split_ratios: config.split,
16✔
560
        max_window_tokens: config.chunking.max_window_tokens,
16✔
561
        overlap_tokens: config.chunking.overlap_tokens.clone(),
16✔
562
        summary_fallback_tokens: config.chunking.summary_fallback_tokens,
16✔
563
    };
16✔
564

565
    let split_store_path = if let Some(path) = cli.split_store_path {
16✔
566
        path
15✔
567
    } else {
568
        managed_demo_split_store_path().map_err(|err| {
1✔
569
            Box::<dyn Error>::from(format!("failed to resolve demo split-store path: {err}"))
×
570
        })?
×
571
    };
572

573
    if cli.reset && split_store_path.exists() {
16✔
574
        std::fs::remove_file(&split_store_path).map_err(|err| {
2✔
575
            Box::<dyn Error>::from(format!(
1✔
576
                "failed to remove split store '{}': {err}",
1✔
577
                split_store_path.display()
1✔
578
            ))
1✔
579
        })?;
1✔
580
        println!("Reset: removed {}", split_store_path.display());
1✔
581
    }
14✔
582
    println!(
15✔
583
        "Persisting split assignments and epoch state to {}",
584
        split_store_path.display()
15✔
585
    );
586
    let sources = build_sources(&roots);
15✔
587
    let split_store = Arc::new(FileSplitStore::open(&split_store_path, config.split, 99)?);
15✔
588
    let sampler = TripletSampler::new(config, split_store.clone());
15✔
589
    for source in sources {
15✔
590
        sampler.register_source(source);
15✔
591
    }
15✔
592

593
    if cli.show_pair_samples {
15✔
594
        match sampler.next_pair_batch(selected_split) {
5✔
595
            Ok(pair_batch) => {
2✔
596
                if pair_batch.pairs.is_empty() {
2✔
597
                    println!("Pair sampling produced no results.");
×
598
                } else {
2✔
599
                    print_pair_batch(&chunking, &pair_batch, split_store.as_ref());
2✔
600
                }
2✔
601
                sampler.save_sampler_state(None)?;
2✔
602
            }
603
            Err(SamplerError::Exhausted(name)) => {
3✔
604
                eprintln!(
3✔
605
                    "Pair sampler exhausted recipe '{}'. Ensure both positive and negative examples exist.",
3✔
606
                    name
3✔
607
                );
3✔
608
            }
3✔
609
            Err(err) => return Err(err.into()),
×
610
        }
611
    } else if cli.show_text_samples {
10✔
612
        match sampler.next_text_batch(selected_split) {
3✔
613
            Ok(text_batch) => {
1✔
614
                if text_batch.samples.is_empty() {
1✔
615
                    println!(
×
616
                        "Text sampling produced no results. Ensure each source has eligible sections."
×
617
                    );
×
618
                } else {
1✔
619
                    print_text_batch(&chunking, &text_batch, split_store.as_ref());
1✔
620
                }
1✔
621
                sampler.save_sampler_state(None)?;
1✔
622
            }
623
            Err(SamplerError::Exhausted(name)) => {
2✔
624
                eprintln!(
2✔
625
                    "Text sampler exhausted selector '{}'. Ensure matching sections exist.",
2✔
626
                    name
2✔
627
                );
2✔
628
            }
2✔
629
            Err(err) => return Err(err.into()),
×
630
        }
631
    } else if cli.list_text_recipes {
7✔
632
        let recipes = sampler.text_recipes();
3✔
633
        if recipes.is_empty() {
3✔
634
            println!(
1✔
635
                "No text recipes registered. Ensure your sources expose triplet selectors or configure text_recipes explicitly."
1✔
636
            );
1✔
637
        } else {
2✔
638
            print_text_recipes(&recipes);
2✔
639
        }
2✔
640
    } else if let Some(batch_count) = cli.batches {
4✔
641
        print_demo_config(&config_snapshot);
1✔
642
        println!("=== benchmark: {} triplet batches ===", batch_count);
1✔
643

644
        // source_id -> Vec<(pos_jaccard, pos_byte_cosine, neg_jaccard, neg_byte_cosine)>
645
        #[cfg(feature = "extended-metrics")]
646
        let mut source_metrics: HashMap<String, Vec<(f32, f32, f32, f32)>> = HashMap::new();
1✔
647

648
        for i in 0..batch_count {
2✔
649
            let t0 = Instant::now();
2✔
650
            match sampler.next_triplet_batch(selected_split) {
2✔
651
                Ok(batch) => {
2✔
652
                    let elapsed = t0.elapsed();
2✔
653
                    let n = batch.triplets.len();
2✔
654
                    println!(
2✔
655
                        "batch {:>4}  triplets={:<4}  elapsed={:>8.2}ms  per_triplet={:.2}ms",
656
                        i + 1,
2✔
657
                        n,
658
                        elapsed.as_secs_f64() * 1000.0,
2✔
659
                        if n > 0 {
2✔
660
                            elapsed.as_secs_f64() * 1000.0 / n as f64
2✔
661
                        } else {
NEW
662
                            0.0
×
663
                        },
664
                    );
665
                    #[cfg(feature = "extended-metrics")]
666
                    {
667
                        use crate::metrics::lexical_similarity_scores;
668
                        for triplet in &batch.triplets {
8✔
669
                            let (pj, pc) = lexical_similarity_scores(
8✔
670
                                &triplet.anchor.text,
8✔
671
                                &triplet.positive.text,
8✔
672
                            );
8✔
673
                            let (nj, nc) = lexical_similarity_scores(
8✔
674
                                &triplet.anchor.text,
8✔
675
                                &triplet.negative.text,
8✔
676
                            );
8✔
677
                            let source = extract_source(&triplet.anchor.record_id);
8✔
678
                            source_metrics
8✔
679
                                .entry(source)
8✔
680
                                .or_default()
8✔
681
                                .push((pj, pc, nj, nc));
8✔
682
                        }
8✔
683
                    }
684
                }
NEW
685
                Err(SamplerError::Exhausted(name)) => {
×
NEW
686
                    println!(
×
687
                        "batch {:>4}  exhausted recipe '{}' — stopping early",
NEW
688
                        i + 1,
×
689
                        name
690
                    );
NEW
691
                    break;
×
692
                }
NEW
693
                Err(err) => return Err(err.into()),
×
694
            }
695
        }
696

697
        sampler.save_sampler_state(None)?;
1✔
698

699
        #[cfg(feature = "extended-metrics")]
700
        if !source_metrics.is_empty() {
1✔
701
            println!();
1✔
702
            print_metric_summary(&source_metrics);
1✔
703
        }
1✔
704

705
        #[cfg(all(feature = "extended-metrics", feature = "bm25-mining"))]
706
        {
707
            let (fallback, total) = sampler.bm25_fallback_stats();
1✔
708
            if total > 0 {
1✔
709
                let pct = fallback as f64 / total as f64 * 100.0;
1✔
710
                println!("bm25 fallback rate : {}/{} ({:.1}%)", fallback, total, pct);
1✔
711
            }
1✔
712
        }
713
    } else {
714
        match sampler.next_triplet_batch(selected_split) {
3✔
715
            Ok(triplet_batch) => {
×
716
                if triplet_batch.triplets.is_empty() {
×
717
                    println!(
×
718
                        "Triplet sampling produced no results. Ensure multiple records per source exist."
×
719
                    );
×
720
                } else {
×
721
                    print_triplet_batch(&chunking, &triplet_batch, split_store.as_ref());
×
722
                }
×
723
                sampler.save_sampler_state(None)?;
×
724
                #[cfg(all(feature = "extended-metrics", feature = "bm25-mining"))]
725
                {
NEW
726
                    let (fallback, total) = sampler.bm25_fallback_stats();
×
NEW
727
                    if total > 0 {
×
NEW
728
                        let pct = fallback as f64 / total as f64 * 100.0;
×
NEW
729
                        println!("bm25 fallback rate : {}/{} ({:.1}%)", fallback, total, pct);
×
NEW
730
                    }
×
731
                }
732
            }
733
            Err(SamplerError::Exhausted(name)) => {
3✔
734
                eprintln!(
3✔
735
                    "Triplet sampler exhausted recipe '{}'. Ensure both positive and negative examples exist.",
3✔
736
                    name
3✔
737
                );
3✔
738
            }
3✔
739
            Err(err) => return Err(err.into()),
×
740
        }
741
    }
742

743
    Ok(())
15✔
744
}
18✔
745

746
struct MultiSourceDemoConfigSnapshot {
747
    seed: u64,
748
    batch_size: usize,
749
    ingestion_max_records: usize,
750
    split: SplitLabel,
751
    split_ratios: SplitRatios,
752
    max_window_tokens: usize,
753
    overlap_tokens: Vec<usize>,
754
    summary_fallback_tokens: usize,
755
}
756

757
fn print_demo_config(cfg: &MultiSourceDemoConfigSnapshot) {
1✔
758
    let overlaps: Vec<String> = cfg.overlap_tokens.iter().map(|t| t.to_string()).collect();
1✔
759
    println!("=== sampler config ===");
1✔
760
    println!("seed                 : {}", cfg.seed);
1✔
761
    println!("batch_size           : {}", cfg.batch_size);
1✔
762
    println!("ingestion_max_records: {}", cfg.ingestion_max_records);
1✔
763
    println!("split                : {:?}", cfg.split);
1✔
764
    println!(
1✔
765
        "split_ratios         : train={:.2} val={:.2} test={:.2}",
766
        cfg.split_ratios.train, cfg.split_ratios.validation, cfg.split_ratios.test
767
    );
768
    println!("max_window_tokens    : {}", cfg.max_window_tokens);
1✔
769
    println!("overlap_tokens       : [{}]", overlaps.join(", "));
1✔
770
    println!(
1✔
771
        "summary_fallback     : {} tokens (0 = disabled)",
772
        cfg.summary_fallback_tokens
773
    );
774
    println!();
1✔
775
}
1✔
776

777
fn default_ingestion_max_records() -> usize {
1✔
778
    SamplerConfig::default().ingestion_max_records
1✔
779
}
1✔
780

781
fn parse_positive_usize_flag(raw: &str, flag: &str) -> Result<usize, String> {
45✔
782
    let parsed = raw.parse::<usize>().map_err(|_| {
45✔
783
        format!(
1✔
784
            "Could not parse {} value '{}' as a positive integer",
785
            flag, raw
786
        )
787
    })?;
1✔
788
    if parsed == 0 {
44✔
789
        return Err(format!("{} must be greater than zero", flag));
5✔
790
    }
39✔
791
    Ok(parsed)
39✔
792
}
45✔
793

794
fn parse_batch_size(raw: &str) -> Result<usize, String> {
22✔
795
    parse_positive_usize_flag(raw, "--batch-size")
22✔
796
}
22✔
797

798
fn parse_ingestion_max_records(raw: &str) -> Result<usize, String> {
21✔
799
    parse_positive_usize_flag(raw, "--ingestion-max-records")
21✔
800
}
21✔
801

802
fn parse_batch_count(raw: &str) -> Result<usize, String> {
2✔
803
    parse_positive_usize_flag(raw, "--batches")
2✔
804
}
2✔
805

806
fn suggested_balancing_weight(max_baseline: u128, source_baseline: u128) -> f32 {
13✔
807
    if max_baseline == 0 || source_baseline == 0 {
13✔
808
        return 0.0;
4✔
809
    }
9✔
810
    (source_baseline as f64 / max_baseline as f64).clamp(0.0, 1.0) as f32
9✔
811
}
13✔
812

813
fn suggested_oversampling_weight(min_nonzero_baseline: u128, source_baseline: u128) -> f32 {
13✔
814
    if min_nonzero_baseline == 0 || source_baseline == 0 {
13✔
815
        return 0.0;
4✔
816
    }
9✔
817
    (min_nonzero_baseline as f64 / source_baseline as f64).clamp(0.0, 1.0) as f32
9✔
818
}
13✔
819

820
fn parse_cli<T, I>(args: I) -> Result<Option<T>, Box<dyn Error>>
29✔
821
where
29✔
822
    T: Parser,
29✔
823
    I: IntoIterator,
29✔
824
    I::Item: Into<std::ffi::OsString> + Clone,
29✔
825
{
826
    match T::try_parse_from(args) {
29✔
827
        Ok(cli) => Ok(Some(cli)),
22✔
828
        Err(err) => match err.kind() {
7✔
829
            ErrorKind::DisplayHelp | ErrorKind::DisplayVersion => {
830
                err.print()?;
4✔
831
                Ok(None)
4✔
832
            }
833
            _ => Err(err.into()),
3✔
834
        },
835
    }
836
}
29✔
837

838
fn parse_split_ratios_arg(raw: &str) -> Result<SplitRatios, String> {
11✔
839
    let parts: Vec<&str> = raw.split(',').collect();
11✔
840
    if parts.len() != 3 {
11✔
841
        return Err("--split-ratios expects exactly 3 comma-separated values".to_string());
1✔
842
    }
10✔
843
    let train = parts[0]
10✔
844
        .trim()
10✔
845
        .parse::<f32>()
10✔
846
        .map_err(|_| format!("invalid train ratio '{}': must be a float", parts[0].trim()))?;
10✔
847
    let validation = parts[1].trim().parse::<f32>().map_err(|_| {
9✔
848
        format!(
1✔
849
            "invalid validation ratio '{}': must be a float",
850
            parts[1].trim()
1✔
851
        )
852
    })?;
1✔
853
    let test = parts[2]
8✔
854
        .trim()
8✔
855
        .parse::<f32>()
8✔
856
        .map_err(|_| format!("invalid test ratio '{}': must be a float", parts[2].trim()))?;
8✔
857
    let ratios = SplitRatios {
7✔
858
        train,
7✔
859
        validation,
7✔
860
        test,
7✔
861
    };
7✔
862
    let sum = ratios.train + ratios.validation + ratios.test;
7✔
863
    if (sum - 1.0).abs() > 1e-5 {
7✔
864
        return Err(format!(
1✔
865
            "split ratios must sum to 1.0, got {:.6} (train={}, validation={}, test={})",
1✔
866
            sum, ratios.train, ratios.validation, ratios.test
1✔
867
        ));
1✔
868
    }
6✔
869
    if ratios.train < 0.0 || ratios.validation < 0.0 || ratios.test < 0.0 {
6✔
870
        return Err("split ratios must be non-negative".to_string());
1✔
871
    }
5✔
872
    Ok(ratios)
5✔
873
}
11✔
874

875
fn print_triplet_batch(
1✔
876
    strategy: &ChunkingStrategy,
1✔
877
    batch: &TripletBatch,
1✔
878
    split_store: &impl SplitStore,
1✔
879
) {
1✔
880
    println!("=== triplet batch ===");
1✔
881
    for (idx, triplet) in batch.triplets.iter().enumerate() {
1✔
882
        println!("--- triplet #{} ---", idx);
1✔
883
        println!("recipe       : {}", triplet.recipe);
1✔
884
        println!("sample_weight: {:.4}", triplet.weight);
1✔
885
        if let Some(instr) = &triplet.instruction {
1✔
886
            println!("instruction shown to model:\n{}\n", instr);
1✔
887
        }
1✔
888
        #[cfg(feature = "extended-metrics")]
889
        let (pos_sim, neg_sim) = {
1✔
890
            use crate::metrics::lexical_similarity_scores;
891
            (
1✔
892
                Some(lexical_similarity_scores(
1✔
893
                    &triplet.anchor.text,
1✔
894
                    &triplet.positive.text,
1✔
895
                )),
1✔
896
                Some(lexical_similarity_scores(
1✔
897
                    &triplet.anchor.text,
1✔
898
                    &triplet.negative.text,
1✔
899
                )),
1✔
900
            )
1✔
901
        };
902
        #[cfg(not(feature = "extended-metrics"))]
903
        let (pos_sim, neg_sim): (Option<(f32, f32)>, Option<(f32, f32)>) = (None, None);
904
        print_chunk_block("ANCHOR", &triplet.anchor, strategy, split_store, None);
1✔
905
        print_chunk_block(
1✔
906
            "POSITIVE",
1✔
907
            &triplet.positive,
1✔
908
            strategy,
1✔
909
            split_store,
1✔
910
            pos_sim,
1✔
911
        );
912
        print_chunk_block(
1✔
913
            "NEGATIVE",
1✔
914
            &triplet.negative,
1✔
915
            strategy,
1✔
916
            split_store,
1✔
917
            neg_sim,
1✔
918
        );
919
    }
920
    print_source_summary(
1✔
921
        "triplet anchors",
1✔
922
        batch
1✔
923
            .triplets
1✔
924
            .iter()
1✔
925
            .map(|triplet| triplet.anchor.record_id.as_str()),
1✔
926
    );
927
    print_recipe_context_by_source(
1✔
928
        "triplet recipes by source",
1✔
929
        batch
1✔
930
            .triplets
1✔
931
            .iter()
1✔
932
            .map(|triplet| (triplet.anchor.record_id.as_str(), triplet.recipe.as_str())),
1✔
933
    );
934
}
1✔
935

936
fn print_text_batch(strategy: &ChunkingStrategy, batch: &TextBatch, split_store: &impl SplitStore) {
2✔
937
    println!("=== text batch ===");
2✔
938
    for (idx, sample) in batch.samples.iter().enumerate() {
5✔
939
        println!("--- sample #{} ---", idx);
5✔
940
        println!("recipe       : {}", sample.recipe);
5✔
941
        println!("sample_weight: {:.4}", sample.weight);
5✔
942
        if let Some(instr) = &sample.instruction {
5✔
943
            println!("instruction shown to model:\n{}\n", instr);
1✔
944
        }
4✔
945
        print_chunk_block("TEXT", &sample.chunk, strategy, split_store, None);
5✔
946
    }
947
    print_source_summary(
2✔
948
        "text samples",
2✔
949
        batch
2✔
950
            .samples
2✔
951
            .iter()
2✔
952
            .map(|sample| sample.chunk.record_id.as_str()),
5✔
953
    );
954
    print_recipe_context_by_source(
2✔
955
        "text recipes by source",
2✔
956
        batch
2✔
957
            .samples
2✔
958
            .iter()
2✔
959
            .map(|sample| (sample.chunk.record_id.as_str(), sample.recipe.as_str())),
5✔
960
    );
961
}
2✔
962

963
fn print_pair_batch(
3✔
964
    strategy: &ChunkingStrategy,
3✔
965
    batch: &SampleBatch,
3✔
966
    split_store: &impl SplitStore,
3✔
967
) {
3✔
968
    println!("=== pair batch ===");
3✔
969
    for (idx, pair) in batch.pairs.iter().enumerate() {
9✔
970
        println!("--- pair #{} ---", idx);
9✔
971
        println!("recipe       : {}", pair.recipe);
9✔
972
        println!("label        : {:?}", pair.label);
9✔
973
        if let Some(reason) = &pair.reason {
9✔
974
            println!("reason       : {}", reason);
5✔
975
        }
5✔
976
        print_chunk_block("ANCHOR", &pair.anchor, strategy, split_store, None);
9✔
977
        print_chunk_block("OTHER", &pair.positive, strategy, split_store, None);
9✔
978
    }
979
    print_source_summary(
3✔
980
        "pair anchors",
3✔
981
        batch
3✔
982
            .pairs
3✔
983
            .iter()
3✔
984
            .map(|pair| pair.anchor.record_id.as_str()),
9✔
985
    );
986
    print_recipe_context_by_source(
3✔
987
        "pair recipes by source",
3✔
988
        batch
3✔
989
            .pairs
3✔
990
            .iter()
3✔
991
            .map(|pair| (pair.anchor.record_id.as_str(), pair.recipe.as_str())),
9✔
992
    );
993
}
3✔
994

995
fn print_text_recipes(recipes: &[TextRecipe]) {
3✔
996
    println!("=== available text recipes ===");
3✔
997
    for recipe in recipes {
7✔
998
        println!(
7✔
999
            "- {} (weight: {:.3}) selector={:?}",
1000
            recipe.name, recipe.weight, recipe.selector
1001
        );
1002
        if let Some(instr) = &recipe.instruction {
7✔
1003
            println!("  instruction: {}", instr);
1✔
1004
        }
6✔
1005
    }
1006
}
3✔
1007

1008
#[cfg(feature = "extended-metrics")]
1009
fn metric_mean_median(vals: &mut [f32]) -> (f32, f32) {
17✔
1010
    let mean = vals.iter().sum::<f32>() / vals.len() as f32;
17✔
1011
    vals.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
89✔
1012
    let median = if vals.len() % 2 == 1 {
17✔
NEW
1013
        vals[vals.len() / 2]
×
1014
    } else {
1015
        (vals[vals.len() / 2 - 1] + vals[vals.len() / 2]) / 2.0
17✔
1016
    };
1017
    (mean, median)
17✔
1018
}
17✔
1019

1020
#[cfg(feature = "extended-metrics")]
1021
fn print_metric_summary(source_data: &HashMap<String, Vec<(f32, f32, f32, f32)>>) {
2✔
1022
    let total: usize = source_data.values().map(|v| v.len()).sum();
3✔
1023
    let n_sources = source_data.len();
2✔
1024
    println!(
2✔
1025
        "=== extended metrics summary ({} triplets, {} {}) ===",
1026
        total,
1027
        n_sources,
1028
        if n_sources == 1 { "source" } else { "sources" }
2✔
1029
    );
1030

1031
    // Returns [pos, neg] as (mean, median) pairs for one metric across entries.
1032
    fn metric_pair(
8✔
1033
        entries: &[(f32, f32, f32, f32)],
8✔
1034
        pos_idx: usize,
8✔
1035
        neg_idx: usize,
8✔
1036
    ) -> [(f32, f32); 2] {
8✔
1037
        let extract = |idx: usize| -> Vec<f32> {
16✔
1038
            entries
16✔
1039
                .iter()
16✔
1040
                .map(|e| match idx {
64✔
1041
                    0 => e.0,
16✔
1042
                    1 => e.1,
16✔
1043
                    2 => e.2,
16✔
1044
                    _ => e.3,
16✔
1045
                })
64✔
1046
                .collect()
16✔
1047
        };
16✔
1048
        let mut pos_vals = extract(pos_idx);
8✔
1049
        let mut neg_vals = extract(neg_idx);
8✔
1050
        [
8✔
1051
            metric_mean_median(&mut pos_vals),
8✔
1052
            metric_mean_median(&mut neg_vals),
8✔
1053
        ]
8✔
1054
    }
8✔
1055

1056
    fn print_metric_section(
4✔
1057
        label: &str,
4✔
1058
        sources: &[&String],
4✔
1059
        source_data: &HashMap<String, Vec<(f32, f32, f32, f32)>>,
4✔
1060
        pos_idx: usize,
4✔
1061
        neg_idx: usize,
4✔
1062
        total: usize,
4✔
1063
        n_sources: usize,
4✔
1064
    ) {
4✔
1065
        const SEP: usize = 83;
1066
        println!();
4✔
1067
        println!("[{}]", label);
4✔
1068
        println!(
4✔
1069
            "{:<24} {:>5}  {:<16} {:<16} {:<16}",
1070
            "source", "n", "positive", "negative", "gap (pos\u{2212}neg)"
1071
        );
1072
        println!(
4✔
1073
            "{:<24} {:>5}  {:<16} {:<16} {:<16}",
1074
            "", "", "mean / median", "mean / median", "mean / median"
1075
        );
1076
        println!("{}", "-".repeat(SEP));
4✔
1077
        for source in sources {
6✔
1078
            let entries = &source_data[*source];
6✔
1079
            let [pos, neg] = metric_pair(entries, pos_idx, neg_idx);
6✔
1080
            let gap_mean = pos.0 - neg.0;
6✔
1081
            let gap_med = pos.1 - neg.1;
6✔
1082
            println!(
6✔
1083
                "{:<24} {:>5}  {:.3} / {:.3}     {:.3} / {:.3}     {:+.3} / {:+.3}",
6✔
1084
                source,
6✔
1085
                entries.len(),
6✔
1086
                pos.0,
6✔
1087
                pos.1,
6✔
1088
                neg.0,
6✔
1089
                neg.1,
6✔
1090
                gap_mean,
6✔
1091
                gap_med,
6✔
1092
            );
6✔
1093
        }
6✔
1094
        if n_sources > 1 {
4✔
1095
            let all: Vec<(f32, f32, f32, f32)> = source_data.values().flatten().copied().collect();
2✔
1096
            let [pos, neg] = metric_pair(&all, pos_idx, neg_idx);
2✔
1097
            let gap_mean = pos.0 - neg.0;
2✔
1098
            let gap_med = pos.1 - neg.1;
2✔
1099
            println!("{}", "-".repeat(SEP));
2✔
1100
            println!(
2✔
1101
                "{:<24} {:>5}  {:.3} / {:.3}     {:.3} / {:.3}     {:+.3} / {:+.3}",
2✔
1102
                "ALL", total, pos.0, pos.1, neg.0, neg.1, gap_mean, gap_med,
2✔
1103
            );
2✔
1104
        }
2✔
1105
    }
4✔
1106

1107
    let mut sources: Vec<&String> = source_data.keys().collect();
2✔
1108
    sources.sort();
2✔
1109

1110
    print_metric_section(
2✔
1111
        "jaccard \u{2194} anchor",
2✔
1112
        &sources,
2✔
1113
        source_data,
2✔
1114
        0,
1115
        2,
1116
        total,
2✔
1117
        n_sources,
2✔
1118
    );
1119
    print_metric_section(
2✔
1120
        "byte-cos \u{2194} anchor",
2✔
1121
        &sources,
2✔
1122
        source_data,
2✔
1123
        1,
1124
        3,
1125
        total,
2✔
1126
        n_sources,
2✔
1127
    );
1128
    println!();
2✔
1129
}
2✔
1130

1131
trait ChunkDebug {
1132
    fn view_name(&self) -> String;
1133
}
1134

1135
impl ChunkDebug for RecordChunk {
1136
    fn view_name(&self) -> String {
26✔
1137
        match &self.view {
26✔
1138
            ChunkView::Window {
1139
                index,
24✔
1140
                span,
24✔
1141
                overlap,
24✔
1142
                start_ratio,
24✔
1143
            } => format!(
24✔
1144
                "window#index={} span={} overlap={} start_ratio={:.3} tokens={}",
1145
                index, span, overlap, start_ratio, self.tokens_estimate
1146
            ),
1147
            ChunkView::SummaryFallback { strategy, .. } => {
2✔
1148
                format!("summary:{} tokens={}", strategy, self.tokens_estimate)
2✔
1149
            }
1150
        }
1151
    }
26✔
1152
}
1153

1154
fn print_chunk_block(
26✔
1155
    title: &str,
26✔
1156
    chunk: &RecordChunk,
26✔
1157
    strategy: &ChunkingStrategy,
26✔
1158
    split_store: &impl SplitStore,
26✔
1159
    anchor_sim: Option<(f32, f32)>,
26✔
1160
) {
26✔
1161
    let chunk_weight = chunk_weight(strategy, chunk);
26✔
1162
    let split = split_store
26✔
1163
        .label_for(&chunk.record_id)
26✔
1164
        .map(|label| format!("{:?}", label))
26✔
1165
        .unwrap_or_else(|| "Unknown".to_string());
26✔
1166
    println!("--- {} ---", title);
26✔
1167
    println!("split        : {}", split);
26✔
1168
    println!("view         : {}", chunk.view_name());
26✔
1169
    println!("chunk_weight : {:.4}", chunk_weight);
26✔
1170
    println!("record_id    : {}", chunk.record_id);
26✔
1171
    println!("section_idx  : {}", chunk.section_idx);
26✔
1172
    println!("token_est    : {}", chunk.tokens_estimate);
26✔
1173
    if let Some((j, c)) = anchor_sim {
26✔
1174
        println!("jaccard(↔a)  : {:.4}  byte-cos(↔a): {:.4}", j, c);
2✔
1175
    }
24✔
1176
    println!("model_input (exact text sent to the model):");
26✔
1177
    println!(
26✔
1178
        "<<< BEGIN MODEL TEXT >>>\n{}\n<<< END MODEL TEXT >>>\n",
1179
        chunk.text
1180
    );
1181
}
26✔
1182

1183
fn print_source_summary<'a, I>(label: &str, ids: I)
7✔
1184
where
7✔
1185
    I: Iterator<Item = &'a str>,
7✔
1186
{
1187
    let mut counts: HashMap<SourceId, usize> = HashMap::new();
7✔
1188
    for id in ids {
15✔
1189
        let source = extract_source(id);
15✔
1190
        *counts.entry(source).or_insert(0) += 1;
15✔
1191
    }
15✔
1192
    if counts.is_empty() {
7✔
1193
        return;
1✔
1194
    }
6✔
1195
    let skew = source_skew(&counts);
6✔
1196
    let mut entries: Vec<(String, usize)> = counts.into_iter().collect();
6✔
1197
    entries.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
6✔
1198
    println!("--- {} by source ---", label);
6✔
1199
    if let Some(skew) = skew {
6✔
1200
        for entry in &skew.per_source {
6✔
1201
            println!(
6✔
1202
                "{}: count={} share={:.2}",
6✔
1203
                entry.source, entry.count, entry.share
6✔
1204
            );
6✔
1205
        }
6✔
1206
        println!(
6✔
1207
            "skew: sources={} total={} min={} max={} mean={:.2} ratio={:.2}",
1208
            skew.sources, skew.total, skew.min, skew.max, skew.mean, skew.ratio
1209
        );
1210
    } else {
1211
        for (source, count) in &entries {
×
UNCOV
1212
            println!("{source}: count={count}");
×
UNCOV
1213
        }
×
1214
    }
1215
}
7✔
1216

1217
fn print_recipe_context_by_source<'a, I>(label: &str, entries: I)
7✔
1218
where
7✔
1219
    I: Iterator<Item = (&'a str, &'a str)>,
7✔
1220
{
1221
    let mut counts: HashMap<SourceId, HashMap<String, usize>> = HashMap::new();
7✔
1222
    for (record_id, recipe) in entries {
15✔
1223
        let source = extract_source(record_id);
15✔
1224
        let entry = counts
15✔
1225
            .entry(source)
15✔
1226
            .or_default()
15✔
1227
            .entry(recipe.to_string())
15✔
1228
            .or_insert(0);
15✔
1229
        *entry += 1;
15✔
1230
    }
15✔
1231
    if counts.is_empty() {
7✔
1232
        return;
1✔
1233
    }
6✔
1234
    let mut sources: Vec<(SourceId, HashMap<String, usize>)> = counts.into_iter().collect();
6✔
1235
    sources.sort_by(|a, b| a.0.cmp(&b.0));
6✔
1236
    println!("--- {} ---", label);
6✔
1237
    for (source, recipes) in sources {
6✔
1238
        println!("{source}");
6✔
1239
        let mut entries: Vec<(String, usize)> = recipes.into_iter().collect();
6✔
1240
        entries.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
6✔
1241
        for (recipe, count) in entries {
7✔
1242
            println!("  - {recipe}={count}");
7✔
1243
        }
7✔
1244
    }
1245
}
7✔
1246

1247
fn extract_source(record_id: &str) -> SourceId {
40✔
1248
    record_id
40✔
1249
        .split_once("::")
40✔
1250
        .map(|(source, _)| source.to_string())
40✔
1251
        .unwrap_or_else(|| "unknown".to_string())
40✔
1252
}
40✔
1253

1254
#[cfg(test)]
1255
mod tests {
1256
    use super::*;
1257
    use crate::DataRecord;
1258
    use crate::DeterministicSplitStore;
1259
    use crate::data::{QualityScore, RecordSection, SectionRole};
1260
    use crate::source::{SourceCursor, SourceSnapshot};
1261
    use crate::utils::make_section;
1262
    use chrono::{TimeZone, Utc};
1263
    use tempfile::tempdir;
1264

1265
    /// Minimal in-memory `DataSource` test double for example app tests.
1266
    struct TestSource {
1267
        id: String,
1268
        count: Option<u128>,
1269
        recipes: Vec<TripletRecipe>,
1270
    }
1271

1272
    impl DataSource for TestSource {
1273
        fn id(&self) -> &str {
131✔
1274
            &self.id
131✔
1275
        }
131✔
1276

1277
        fn refresh(
30✔
1278
            &self,
30✔
1279
            _config: &SamplerConfig,
30✔
1280
            _cursor: Option<&SourceCursor>,
30✔
1281
            _limit: Option<usize>,
30✔
1282
        ) -> Result<SourceSnapshot, SamplerError> {
30✔
1283
            Ok(SourceSnapshot {
30✔
1284
                records: Vec::new(),
30✔
1285
                cursor: SourceCursor {
30✔
1286
                    last_seen: Utc::now(),
30✔
1287
                    revision: 0,
30✔
1288
                },
30✔
1289
            })
30✔
1290
        }
30✔
1291

1292
        fn reported_record_count(&self, _config: &SamplerConfig) -> Result<u128, SamplerError> {
2✔
1293
            self.count.ok_or_else(|| SamplerError::SourceInconsistent {
2✔
1294
                source_id: self.id.clone(),
1✔
1295
                details: "test source has no configured exact count".to_string(),
1✔
1296
            })
1✔
1297
        }
2✔
1298

1299
        fn default_triplet_recipes(&self) -> Vec<TripletRecipe> {
11✔
1300
            self.recipes.clone()
11✔
1301
        }
11✔
1302
    }
1303

1304
    struct ConfigRequiredSource {
1305
        id: String,
1306
        expected_seed: u64,
1307
    }
1308

1309
    impl DataSource for ConfigRequiredSource {
1310
        fn id(&self) -> &str {
1✔
1311
            &self.id
1✔
1312
        }
1✔
1313

1314
        fn refresh(
1✔
1315
            &self,
1✔
1316
            _config: &SamplerConfig,
1✔
1317
            _cursor: Option<&SourceCursor>,
1✔
1318
            _limit: Option<usize>,
1✔
1319
        ) -> Result<SourceSnapshot, SamplerError> {
1✔
1320
            Ok(SourceSnapshot {
1✔
1321
                records: Vec::new(),
1✔
1322
                cursor: SourceCursor {
1✔
1323
                    last_seen: Utc::now(),
1✔
1324
                    revision: 0,
1✔
1325
                },
1✔
1326
            })
1✔
1327
        }
1✔
1328

1329
        fn reported_record_count(&self, config: &SamplerConfig) -> Result<u128, SamplerError> {
2✔
1330
            if config.seed == self.expected_seed {
2✔
1331
                Ok(1)
1✔
1332
            } else {
1333
                Err(SamplerError::SourceInconsistent {
1✔
1334
                    source_id: self.id.clone(),
1✔
1335
                    details: format!(
1✔
1336
                        "expected sampler seed {} but got {}",
1✔
1337
                        self.expected_seed, config.seed
1✔
1338
                    ),
1✔
1339
                })
1✔
1340
            }
1341
        }
2✔
1342

1343
        fn default_triplet_recipes(&self) -> Vec<TripletRecipe> {
2✔
1344
            Vec::new()
2✔
1345
        }
2✔
1346
    }
1347

1348
    struct FixtureSource {
1349
        id: String,
1350
        records: Vec<DataRecord>,
1351
        recipes: Vec<TripletRecipe>,
1352
    }
1353

1354
    impl DataSource for FixtureSource {
1355
        fn id(&self) -> &str {
16✔
1356
            &self.id
16✔
1357
        }
16✔
1358

1359
        fn refresh(
3✔
1360
            &self,
3✔
1361
            _config: &SamplerConfig,
3✔
1362
            _cursor: Option<&SourceCursor>,
3✔
1363
            _limit: Option<usize>,
3✔
1364
        ) -> Result<SourceSnapshot, SamplerError> {
3✔
1365
            Ok(SourceSnapshot {
3✔
1366
                records: self.records.clone(),
3✔
1367
                cursor: SourceCursor {
3✔
1368
                    last_seen: Utc::now(),
3✔
1369
                    revision: 0,
3✔
1370
                },
3✔
1371
            })
3✔
1372
        }
3✔
1373

NEW
UNCOV
1374
        fn reported_record_count(&self, _config: &SamplerConfig) -> Result<u128, SamplerError> {
×
NEW
UNCOV
1375
            Ok(self.records.len() as u128)
×
NEW
UNCOV
1376
        }
×
1377

1378
        fn default_triplet_recipes(&self) -> Vec<TripletRecipe> {
2✔
1379
            self.recipes.clone()
2✔
1380
        }
2✔
1381
    }
1382

1383
    struct IngestionConfigSource {
1384
        expected_ingestion_max_records: usize,
1385
        records: Vec<DataRecord>,
1386
    }
1387

1388
    impl DataSource for IngestionConfigSource {
1389
        fn id(&self) -> &str {
6✔
1390
            "ingestion_config_source"
6✔
1391
        }
6✔
1392

1393
        fn refresh(
1✔
1394
            &self,
1✔
1395
            config: &SamplerConfig,
1✔
1396
            _cursor: Option<&SourceCursor>,
1✔
1397
            _limit: Option<usize>,
1✔
1398
        ) -> Result<SourceSnapshot, SamplerError> {
1✔
1399
            if config.ingestion_max_records != self.expected_ingestion_max_records {
1✔
NEW
1400
                return Err(SamplerError::SourceInconsistent {
×
NEW
1401
                    source_id: self.id().to_string(),
×
NEW
1402
                    details: format!(
×
NEW
1403
                        "expected ingestion_max_records {} but got {}",
×
NEW
1404
                        self.expected_ingestion_max_records, config.ingestion_max_records
×
NEW
1405
                    ),
×
NEW
1406
                });
×
1407
            }
1✔
1408
            Ok(SourceSnapshot {
1✔
1409
                records: self.records.clone(),
1✔
1410
                cursor: SourceCursor {
1✔
1411
                    last_seen: Utc::now(),
1✔
1412
                    revision: 0,
1✔
1413
                },
1✔
1414
            })
1✔
1415
        }
1✔
1416

NEW
1417
        fn reported_record_count(&self, _config: &SamplerConfig) -> Result<u128, SamplerError> {
×
NEW
1418
            Ok(self.records.len() as u128)
×
NEW
1419
        }
×
1420

1421
        fn default_triplet_recipes(&self) -> Vec<TripletRecipe> {
1✔
1422
            vec![default_recipe("ingestion_config_recipe")]
1✔
1423
        }
1✔
1424
    }
1425

1426
    fn fixture_record(
19✔
1427
        source: &str,
19✔
1428
        id_suffix: &str,
19✔
1429
        day: u32,
19✔
1430
        title: &str,
19✔
1431
        body: &str,
19✔
1432
    ) -> DataRecord {
19✔
1433
        let now = Utc.with_ymd_and_hms(2025, 1, day, 12, 0, 0).unwrap();
19✔
1434
        DataRecord {
19✔
1435
            id: format!("{source}::{id_suffix}"),
19✔
1436
            source: source.to_string(),
19✔
1437
            created_at: now,
19✔
1438
            updated_at: now,
19✔
1439
            quality: QualityScore { trust: 1.0 },
19✔
1440
            taxonomy: Vec::new(),
19✔
1441
            sections: vec![
19✔
1442
                make_section(SectionRole::Anchor, Some("title"), title),
19✔
1443
                make_section(SectionRole::Context, Some("body"), body),
19✔
1444
            ],
19✔
1445
            meta_prefix: None,
19✔
1446
        }
19✔
1447
    }
19✔
1448

1449
    fn default_recipe(name: &str) -> TripletRecipe {
13✔
1450
        TripletRecipe {
13✔
1451
            name: name.to_string().into(),
13✔
1452
            anchor: crate::config::Selector::Role(SectionRole::Anchor),
13✔
1453
            positive_selector: crate::config::Selector::Role(SectionRole::Context),
13✔
1454
            negative_selector: crate::config::Selector::Role(SectionRole::Context),
13✔
1455
            negative_strategy: crate::config::NegativeStrategy::WrongArticle,
13✔
1456
            weight: 1.0,
13✔
1457
            instruction: None,
13✔
1458
            allow_same_anchor_positive: false,
13✔
1459
        }
13✔
1460
    }
13✔
1461

1462
    #[test]
1463
    fn parse_helpers_validate_inputs() {
1✔
1464
        assert_eq!(parse_batch_size("2").unwrap(), 2);
1✔
1465
        assert!(parse_batch_size("0").is_err());
1✔
1466
        assert!(parse_batch_size("abc").is_err());
1✔
1467
        assert_eq!(parse_ingestion_max_records("16").unwrap(), 16);
1✔
1468
        assert!(parse_ingestion_max_records("0").is_err());
1✔
1469
        assert!(parse_batch_count("0").is_err());
1✔
1470

1471
        let split = parse_split_ratios_arg("0.8,0.1,0.1").unwrap();
1✔
1472
        assert!((split.train - 0.8).abs() < 1e-6);
1✔
1473
        assert!(parse_split_ratios_arg("0.8,0.1").is_err());
1✔
1474
        assert!(parse_split_ratios_arg("1.0,0.0,0.1").is_err());
1✔
1475
        assert!(parse_split_ratios_arg("-0.1,0.6,0.5").is_err());
1✔
1476
    }
1✔
1477

1478
    #[test]
1479
    fn suggested_balancing_weight_is_longest_normalized_and_bounded() {
1✔
1480
        assert!((suggested_balancing_weight(100, 100) - 1.0).abs() < 1e-6);
1✔
1481
        assert!((suggested_balancing_weight(400, 100) - 0.25).abs() < 1e-6);
1✔
1482
        assert!((suggested_balancing_weight(400, 400) - 1.0).abs() < 1e-6);
1✔
1483
        assert_eq!(suggested_balancing_weight(0, 100), 0.0);
1✔
1484
        assert_eq!(suggested_balancing_weight(100, 0), 0.0);
1✔
1485
    }
1✔
1486

1487
    #[test]
1488
    fn suggested_oversampling_weight_is_inverse_in_unit_interval() {
1✔
1489
        assert!((suggested_oversampling_weight(100, 100) - 1.0).abs() < 1e-6);
1✔
1490
        assert!((suggested_oversampling_weight(100, 400) - 0.25).abs() < 1e-6);
1✔
1491
        assert!((suggested_oversampling_weight(100, 1000) - 0.1).abs() < 1e-6);
1✔
1492
        assert_eq!(suggested_oversampling_weight(0, 100), 0.0);
1✔
1493
        assert_eq!(suggested_oversampling_weight(100, 0), 0.0);
1✔
1494
    }
1✔
1495

1496
    #[test]
1497
    fn parse_cli_handles_help_and_invalid_args() {
1✔
1498
        let help = parse_cli::<EstimateCapacityCli, _>(["estimate_capacity", "--help"]).unwrap();
1✔
1499
        assert!(help.is_none());
1✔
1500

1501
        let err = parse_cli::<EstimateCapacityCli, _>(["estimate_capacity", "--unknown"]);
1✔
1502
        assert!(err.is_err());
1✔
1503
    }
1✔
1504

1505
    #[test]
1506
    fn run_estimate_capacity_succeeds_with_reported_counts() {
1✔
1507
        let result = run_estimate_capacity(
1✔
1508
            std::iter::empty::<String>(),
1✔
1509
            |roots| {
1✔
1510
                assert!(roots.is_empty());
1✔
1511
                Ok(())
1✔
1512
            },
1✔
1513
            |_| {
1✔
1514
                vec![Box::new(TestSource {
1✔
1515
                    id: "source_a".into(),
1✔
1516
                    count: Some(12),
1✔
1517
                    recipes: vec![default_recipe("r1")],
1✔
1518
                }) as DynSource]
1✔
1519
            },
1✔
1520
        );
1521

1522
        assert!(result.is_ok());
1✔
1523
    }
1✔
1524

1525
    #[test]
1526
    fn run_estimate_capacity_errors_when_source_count_missing() {
1✔
1527
        let result = run_estimate_capacity(
1✔
1528
            std::iter::empty::<String>(),
1✔
1529
            |_| Ok(()),
1✔
1530
            |_| {
1✔
1531
                vec![Box::new(TestSource {
1✔
1532
                    id: "source_missing".into(),
1✔
1533
                    count: None,
1✔
1534
                    recipes: vec![default_recipe("r1")],
1✔
1535
                }) as DynSource]
1✔
1536
            },
1✔
1537
        );
1538

1539
        let err = result.unwrap_err().to_string();
1✔
1540
        assert!(err.contains("failed to report exact record count"));
1✔
1541
    }
1✔
1542

1543
    #[test]
1544
    fn run_estimate_capacity_propagates_root_resolution_error() {
1✔
1545
        let result = run_estimate_capacity(
1✔
1546
            std::iter::empty::<String>(),
1✔
1547
            |_| Err("root resolution failed".into()),
1✔
1548
            |_: &()| Vec::<DynSource>::new(),
×
1549
        );
1550

1551
        let err = result.unwrap_err().to_string();
1✔
1552
        assert!(err.contains("root resolution failed"));
1✔
1553
    }
1✔
1554

1555
    #[test]
1556
    fn run_estimate_capacity_configures_sources_centrally_before_counting() {
1✔
1557
        let result = run_estimate_capacity(
1✔
1558
            std::iter::empty::<String>(),
1✔
1559
            |_| Ok(()),
1✔
1560
            |_| {
1✔
1561
                vec![Box::new(ConfigRequiredSource {
1✔
1562
                    id: "requires_config".into(),
1✔
1563
                    expected_seed: 99,
1✔
1564
                }) as DynSource]
1✔
1565
            },
1✔
1566
        );
1567

1568
        assert!(result.is_ok());
1✔
1569
    }
1✔
1570

1571
    #[test]
1572
    fn config_required_source_refresh_and_seed_mismatch_are_exercised() {
1✔
1573
        let source = ConfigRequiredSource {
1✔
1574
            id: "cfg-source".to_string(),
1✔
1575
            expected_seed: 42,
1✔
1576
        };
1✔
1577

1578
        let refreshed = source
1✔
1579
            .refresh(&SamplerConfig::default(), None, None)
1✔
1580
            .unwrap();
1✔
1581
        assert!(refreshed.records.is_empty());
1✔
1582

1583
        let mismatched = source.reported_record_count(&SamplerConfig {
1✔
1584
            seed: 7,
1✔
1585
            ..SamplerConfig::default()
1✔
1586
        });
1✔
1587
        assert!(matches!(
1✔
1588
            mismatched,
1✔
1589
            Err(SamplerError::SourceInconsistent { .. })
1590
        ));
1591

1592
        assert!(source.default_triplet_recipes().is_empty());
1✔
1593
    }
1✔
1594

1595
    #[test]
1596
    fn run_multi_source_demo_exhausted_paths_return_ok() {
1✔
1597
        struct OneRecordSource;
1598

1599
        impl DataSource for OneRecordSource {
1600
            fn id(&self) -> &str {
48✔
1601
                "one_record"
48✔
1602
            }
48✔
1603

1604
            fn refresh(
11✔
1605
                &self,
11✔
1606
                _config: &SamplerConfig,
11✔
1607
                _cursor: Option<&SourceCursor>,
11✔
1608
                _limit: Option<usize>,
11✔
1609
            ) -> Result<SourceSnapshot, SamplerError> {
11✔
1610
                let now = Utc::now();
11✔
1611
                Ok(SourceSnapshot {
11✔
1612
                    records: vec![DataRecord {
11✔
1613
                        id: "one_record::r1".to_string(),
11✔
1614
                        source: "one_record".to_string(),
11✔
1615
                        created_at: now,
11✔
1616
                        updated_at: now,
11✔
1617
                        quality: QualityScore { trust: 1.0 },
11✔
1618
                        taxonomy: Vec::new(),
11✔
1619
                        sections: vec![
11✔
1620
                            RecordSection {
11✔
1621
                                role: SectionRole::Anchor,
11✔
1622
                                heading: Some("title".to_string()),
11✔
1623
                                text: "anchor".to_string(),
11✔
1624
                                sentences: vec!["anchor".to_string()],
11✔
1625
                            },
11✔
1626
                            RecordSection {
11✔
1627
                                role: SectionRole::Context,
11✔
1628
                                heading: Some("body".to_string()),
11✔
1629
                                text: "context".to_string(),
11✔
1630
                                sentences: vec!["context".to_string()],
11✔
1631
                            },
11✔
1632
                        ],
11✔
1633
                        meta_prefix: None,
11✔
1634
                    }],
11✔
1635
                    cursor: SourceCursor {
11✔
1636
                        last_seen: now,
11✔
1637
                        revision: 0,
11✔
1638
                    },
11✔
1639
                })
11✔
1640
            }
11✔
1641

1642
            fn reported_record_count(&self, _config: &SamplerConfig) -> Result<u128, SamplerError> {
×
UNCOV
1643
                Ok(1)
×
UNCOV
1644
            }
×
1645

1646
            fn default_triplet_recipes(&self) -> Vec<TripletRecipe> {
3✔
1647
                vec![default_recipe("single_record_recipe")]
3✔
1648
            }
3✔
1649
        }
1650

1651
        for mode in ["--pair-batch", "--text-recipes", ""] {
3✔
1652
            let dir = tempdir().unwrap();
3✔
1653
            let split_store_path = dir.path().join("split_store.bin");
3✔
1654
            let mut args = vec![
3✔
1655
                "--split-store-path".to_string(),
3✔
1656
                split_store_path.to_string_lossy().to_string(),
3✔
1657
            ];
1658
            if !mode.is_empty() {
3✔
1659
                args.push(mode.to_string());
2✔
1660
            }
2✔
1661

1662
            let result = run_multi_source_demo(
3✔
1663
                args.into_iter(),
3✔
1664
                |_| Ok(()),
3✔
1665
                |_| vec![Box::new(OneRecordSource) as DynSource],
3✔
1666
            );
1667
            assert!(result.is_ok());
3✔
1668
        }
1669
    }
1✔
1670

1671
    #[test]
1672
    fn parse_multi_source_cli_handles_help_and_batch_size_validation() {
1✔
1673
        let help = parse_cli::<MultiSourceDemoCli, _>(["multi_source_demo", "--help"]).unwrap();
1✔
1674
        assert!(help.is_none());
1✔
1675

1676
        let err = parse_cli::<MultiSourceDemoCli, _>(["multi_source_demo", "--batch-size", "0"]);
1✔
1677
        assert!(err.is_err());
1✔
1678

1679
        let err = parse_cli::<MultiSourceDemoCli, _>([
1✔
1680
            "multi_source_demo",
1✔
1681
            "--ingestion-max-records",
1✔
1682
            "0",
1✔
1683
        ]);
1✔
1684
        assert!(err.is_err());
1✔
1685

1686
        let parsed = parse_cli::<MultiSourceDemoCli, _>(["multi_source_demo"]);
1✔
1687
        assert!(parsed.is_ok());
1✔
1688
    }
1✔
1689

1690
    #[test]
1691
    fn run_multi_source_demo_passes_ingestion_max_records_to_sources() {
1✔
1692
        let dir = tempdir().unwrap();
1✔
1693
        let split_store_path = dir.path().join("ingestion_config_split_store.bin");
1✔
1694
        let expected = 7;
1✔
1695

1696
        let result = run_multi_source_demo(
1✔
1697
            [
1✔
1698
                "--pair-batch".to_string(),
1✔
1699
                "--ingestion-max-records".to_string(),
1✔
1700
                expected.to_string(),
1✔
1701
                "--split-store-path".to_string(),
1✔
1702
                split_store_path.to_string_lossy().to_string(),
1✔
1703
            ]
1✔
1704
            .into_iter(),
1✔
1705
            |_| Ok(()),
1✔
1706
            |_| {
1✔
1707
                vec![Box::new(IngestionConfigSource {
1✔
1708
                    expected_ingestion_max_records: expected,
1✔
1709
                    records: (1..=8)
1✔
1710
                        .map(|day| {
8✔
1711
                            fixture_record(
8✔
1712
                                "ingestion_config_source",
8✔
1713
                                &format!("r{day}"),
8✔
1714
                                day,
8✔
1715
                                &format!("Config headline {day}"),
8✔
1716
                                &format!("Config body {day}"),
8✔
1717
                            )
1718
                        })
8✔
1719
                        .collect(),
1✔
1720
                }) as DynSource]
1✔
1721
            },
1✔
1722
        );
1723

1724
        assert!(result.is_ok());
1✔
1725
    }
1✔
1726

1727
    #[test]
1728
    fn parse_cli_handles_display_version_path() {
1✔
1729
        #[derive(Debug, Parser)]
1730
        #[command(name = "version_test", version = "1.0.0")]
1731
        struct VersionCli {}
1732

1733
        let parsed = parse_cli::<VersionCli, _>(["version_test", "--version"]).unwrap();
1✔
1734
        assert!(parsed.is_none());
1✔
1735
    }
1✔
1736

1737
    #[test]
1738
    fn run_multi_source_demo_list_text_recipes_path_succeeds() {
1✔
1739
        let dir = tempdir().unwrap();
1✔
1740
        let split_store_path = dir.path().join("recipes_split_store.bin");
1✔
1741
        let mut args = vec![
1✔
1742
            "--list-text-recipes".to_string(),
1✔
1743
            "--split-store-path".to_string(),
1✔
1744
            split_store_path.to_string_lossy().to_string(),
1✔
1745
        ];
1746
        let result = run_multi_source_demo(
1✔
1747
            args.drain(..),
1✔
1748
            |_| Ok(()),
1✔
1749
            |_| {
1✔
1750
                vec![Box::new(TestSource {
1✔
1751
                    id: "source_for_recipes".into(),
1✔
1752
                    count: Some(10),
1✔
1753
                    recipes: vec![default_recipe("recipe_a")],
1✔
1754
                }) as DynSource]
1✔
1755
            },
1✔
1756
        );
1757

1758
        assert!(result.is_ok());
1✔
1759
    }
1✔
1760

1761
    #[test]
1762
    fn run_multi_source_demo_list_text_recipes_uses_explicit_split_store_path() {
1✔
1763
        let dir = tempdir().unwrap();
1✔
1764
        let split_store_path = dir.path().join("custom_split_store.bin");
1✔
1765
        let args = vec![
1✔
1766
            "--list-text-recipes".to_string(),
1✔
1767
            "--split-store-path".to_string(),
1✔
1768
            split_store_path.to_string_lossy().to_string(),
1✔
1769
        ];
1770

1771
        let result = run_multi_source_demo(
1✔
1772
            args.into_iter(),
1✔
1773
            |_| Ok(()),
1✔
1774
            |_| {
1✔
1775
                vec![Box::new(TestSource {
1✔
1776
                    id: "source_without_text_recipes".into(),
1✔
1777
                    count: Some(1),
1✔
1778
                    recipes: Vec::new(),
1✔
1779
                }) as DynSource]
1✔
1780
            },
1✔
1781
        );
1782

1783
        assert!(result.is_ok());
1✔
1784
    }
1✔
1785

1786
    #[test]
1787
    fn run_multi_source_demo_sampling_modes_handle_empty_sources() {
1✔
1788
        for mode in [
3✔
1789
            vec!["--pair-batch".to_string()],
1✔
1790
            vec!["--text-recipes".to_string()],
1✔
1791
            vec![],
1✔
1792
        ] {
1✔
1793
            let dir = tempdir().unwrap();
3✔
1794
            let split_store_path = dir.path().join("empty_sources_split_store.bin");
3✔
1795
            let mut args = mode;
3✔
1796
            args.push("--split-store-path".to_string());
3✔
1797
            args.push(split_store_path.to_string_lossy().to_string());
3✔
1798
            args.push("--split".to_string());
3✔
1799
            args.push("validation".to_string());
3✔
1800

1801
            let result = run_multi_source_demo(
3✔
1802
                args.into_iter(),
3✔
1803
                |_| Ok(()),
3✔
1804
                |_| {
3✔
1805
                    vec![Box::new(TestSource {
3✔
1806
                        id: "source_empty".into(),
3✔
1807
                        count: Some(0),
3✔
1808
                        recipes: vec![default_recipe("recipe_empty")],
3✔
1809
                    }) as DynSource]
3✔
1810
                },
3✔
1811
            );
1812

1813
            assert!(result.is_ok());
3✔
1814
        }
1815
    }
1✔
1816

1817
    #[test]
1818
    fn run_multi_source_demo_propagates_root_resolution_error() {
1✔
1819
        let dir = tempdir().unwrap();
1✔
1820
        let split_store_path = dir.path().join("root_resolution_error_store.bin");
1✔
1821
        let result = run_multi_source_demo(
1✔
1822
            [
1✔
1823
                "--split-store-path".to_string(),
1✔
1824
                split_store_path.to_string_lossy().to_string(),
1✔
1825
            ]
1✔
1826
            .into_iter(),
1✔
1827
            |_| Err("demo root resolution failed".into()),
1✔
1828
            |_: &()| Vec::<DynSource>::new(),
×
1829
        );
1830

1831
        let err = result.unwrap_err().to_string();
1✔
1832
        assert!(err.contains("demo root resolution failed"));
1✔
1833
    }
1✔
1834

1835
    #[test]
1836
    fn print_helpers_and_extract_source_cover_paths() {
1✔
1837
        let split = SplitRatios::default();
1✔
1838
        let store = DeterministicSplitStore::new(split, 42).unwrap();
1✔
1839
        let strategy = ChunkingStrategy::default();
1✔
1840

1841
        let anchor = RecordChunk {
1✔
1842
            record_id: "source_a::rec1".to_string(),
1✔
1843
            section_idx: 0,
1✔
1844
            view: ChunkView::Window {
1✔
1845
                index: 1,
1✔
1846
                overlap: 2,
1✔
1847
                span: 12,
1✔
1848
                start_ratio: 0.25,
1✔
1849
            },
1✔
1850
            text: "anchor text".to_string(),
1✔
1851
            tokens_estimate: 8,
1✔
1852
            quality: crate::data::QualityScore { trust: 0.9 },
1✔
1853
        };
1✔
1854
        let positive = RecordChunk {
1✔
1855
            record_id: "source_a::rec2".to_string(),
1✔
1856
            section_idx: 1,
1✔
1857
            view: ChunkView::SummaryFallback {
1✔
1858
                strategy: "summary".to_string(),
1✔
1859
                weight: 0.7,
1✔
1860
            },
1✔
1861
            text: "positive text".to_string(),
1✔
1862
            tokens_estimate: 6,
1✔
1863
            quality: crate::data::QualityScore { trust: 0.8 },
1✔
1864
        };
1✔
1865
        let negative = RecordChunk {
1✔
1866
            record_id: "source_b::rec3".to_string(),
1✔
1867
            section_idx: 2,
1✔
1868
            view: ChunkView::Window {
1✔
1869
                index: 0,
1✔
1870
                overlap: 0,
1✔
1871
                span: 16,
1✔
1872
                start_ratio: 0.0,
1✔
1873
            },
1✔
1874
            text: "negative text".to_string(),
1✔
1875
            tokens_estimate: 7,
1✔
1876
            quality: crate::data::QualityScore { trust: 0.5 },
1✔
1877
        };
1✔
1878

1879
        let triplet_batch = TripletBatch {
1✔
1880
            triplets: vec![crate::SampleTriplet {
1✔
1881
                recipe: "triplet_recipe".to_string(),
1✔
1882
                anchor: anchor.clone(),
1✔
1883
                positive: positive.clone(),
1✔
1884
                negative: negative.clone(),
1✔
1885
                weight: 1.0,
1✔
1886
                instruction: Some("triplet instruction".to_string()),
1✔
1887
            }],
1✔
1888
        };
1✔
1889
        print_triplet_batch(&strategy, &triplet_batch, &store);
1✔
1890

1891
        let pair_batch = SampleBatch {
1✔
1892
            pairs: vec![crate::SamplePair {
1✔
1893
                recipe: "pair_recipe".to_string(),
1✔
1894
                anchor: anchor.clone(),
1✔
1895
                positive: positive.clone(),
1✔
1896
                weight: 1.0,
1✔
1897
                instruction: None,
1✔
1898
                label: crate::PairLabel::Positive,
1✔
1899
                reason: Some("same topic".to_string()),
1✔
1900
            }],
1✔
1901
        };
1✔
1902
        print_pair_batch(&strategy, &pair_batch, &store);
1✔
1903

1904
        let text_batch = TextBatch {
1✔
1905
            samples: vec![crate::TextSample {
1✔
1906
                recipe: "text_recipe".to_string(),
1✔
1907
                chunk: negative,
1✔
1908
                weight: 0.8,
1✔
1909
                instruction: Some("text instruction".to_string()),
1✔
1910
            }],
1✔
1911
        };
1✔
1912
        print_text_batch(&strategy, &text_batch, &store);
1✔
1913

1914
        let recipes = vec![TextRecipe {
1✔
1915
            name: "recipe_name".into(),
1✔
1916
            selector: crate::config::Selector::Role(SectionRole::Context),
1✔
1917
            instruction: Some("instruction".into()),
1✔
1918
            weight: 1.0,
1✔
1919
        }];
1✔
1920
        print_text_recipes(&recipes);
1✔
1921

1922
        assert_eq!(extract_source("source_a::record"), "source_a");
1✔
1923
        assert_eq!(extract_source("record-without-delimiter"), "unknown");
1✔
1924
    }
1✔
1925

1926
    #[test]
1927
    fn split_arg_conversion_and_version_parse_paths_are_covered() {
1✔
1928
        assert!(matches!(
1✔
1929
            SplitLabel::from(SplitArg::Train),
1✔
1930
            SplitLabel::Train
1931
        ));
1932
        assert!(matches!(
1✔
1933
            SplitLabel::from(SplitArg::Validation),
1✔
1934
            SplitLabel::Validation
1935
        ));
1936
        assert!(matches!(SplitLabel::from(SplitArg::Test), SplitLabel::Test));
1✔
1937
    }
1✔
1938

1939
    #[test]
1940
    fn parse_split_ratios_reports_per_field_parse_errors() {
1✔
1941
        assert!(
1✔
1942
            parse_split_ratios_arg("x,0.1,0.9")
1✔
1943
                .unwrap_err()
1✔
1944
                .contains("invalid train ratio")
1✔
1945
        );
1946
        assert!(
1✔
1947
            parse_split_ratios_arg("0.1,y,0.8")
1✔
1948
                .unwrap_err()
1✔
1949
                .contains("invalid validation ratio")
1✔
1950
        );
1951
        assert!(
1✔
1952
            parse_split_ratios_arg("0.1,0.2,z")
1✔
1953
                .unwrap_err()
1✔
1954
                .contains("invalid test ratio")
1✔
1955
        );
1956
    }
1✔
1957

1958
    #[test]
1959
    fn run_multi_source_demo_exhausted_paths_are_handled() {
1✔
1960
        for mode in [
3✔
1961
            vec!["--pair-batch".to_string()],
1✔
1962
            vec!["--text-recipes".to_string()],
1✔
1963
            Vec::new(),
1✔
1964
        ] {
1✔
1965
            let dir = tempdir().unwrap();
3✔
1966
            let split_store_path = dir.path().join("exhausted_split_store.bin");
3✔
1967
            let mut args = mode;
3✔
1968
            args.push("--split-store-path".to_string());
3✔
1969
            args.push(split_store_path.to_string_lossy().to_string());
3✔
1970

1971
            let result = run_multi_source_demo(
3✔
1972
                args.into_iter(),
3✔
1973
                |_| Ok(()),
3✔
1974
                |_| {
3✔
1975
                    vec![Box::new(TestSource {
3✔
1976
                        id: "source_without_recipes".into(),
3✔
1977
                        count: Some(1),
3✔
1978
                        recipes: Vec::new(),
3✔
1979
                    }) as DynSource]
3✔
1980
                },
3✔
1981
            );
1982

1983
            assert!(result.is_ok());
3✔
1984
        }
1985
    }
1✔
1986

1987
    #[test]
1988
    fn run_multi_source_demo_reset_recreates_split_store_and_samples() {
1✔
1989
        let dir = tempdir().unwrap();
1✔
1990
        let split_store_path = dir.path().join("reset_split_store.bin");
1✔
1991
        std::fs::write(&split_store_path, b"stale-data").unwrap();
1✔
1992

1993
        let args = vec![
1✔
1994
            "--reset".to_string(),
1✔
1995
            "--pair-batch".to_string(),
1✔
1996
            "--split-store-path".to_string(),
1✔
1997
            split_store_path.to_string_lossy().to_string(),
1✔
1998
        ];
1999

2000
        let result = run_multi_source_demo(
1✔
2001
            args.into_iter(),
1✔
2002
            |_| Ok(()),
1✔
2003
            |_| {
1✔
2004
                let recipes = vec![default_recipe("fixture_recipe")];
1✔
2005
                let records: Vec<DataRecord> = (1..=8)
1✔
2006
                    .map(|day| {
8✔
2007
                        fixture_record(
8✔
2008
                            "fixture_source",
8✔
2009
                            &format!("r{day}"),
8✔
2010
                            day,
8✔
2011
                            &format!("Fixture headline {day}"),
8✔
2012
                            &format!("Fixture body content for day {day}."),
8✔
2013
                        )
2014
                    })
8✔
2015
                    .collect();
1✔
2016
                vec![Box::new(FixtureSource {
1✔
2017
                    id: "fixture_source".into(),
1✔
2018
                    records,
1✔
2019
                    recipes,
1✔
2020
                }) as DynSource]
1✔
2021
            },
1✔
2022
        );
2023

2024
        assert!(result.is_ok());
1✔
2025
        assert!(split_store_path.exists());
1✔
2026
        let metadata = std::fs::metadata(&split_store_path).unwrap();
1✔
2027
        assert!(metadata.len() > 0);
1✔
2028
    }
1✔
2029

2030
    #[test]
2031
    fn run_multi_source_demo_batches_mode_executes_multiple_batches() {
1✔
2032
        let dir = tempdir().unwrap();
1✔
2033
        let split_store_path = dir.path().join("batches_split_store.bin");
1✔
2034
        let args = vec![
1✔
2035
            "--batches".to_string(),
1✔
2036
            "2".to_string(),
1✔
2037
            "--split-store-path".to_string(),
1✔
2038
            split_store_path.to_string_lossy().to_string(),
1✔
2039
        ];
2040

2041
        let result = run_multi_source_demo(
1✔
2042
            args.into_iter(),
1✔
2043
            |_| Ok(()),
1✔
2044
            |_| {
1✔
2045
                let recipes = vec![default_recipe("batch_recipe")];
1✔
2046
                vec![Box::new(FixtureSource {
1✔
2047
                    id: "batch_source".into(),
1✔
2048
                    records: vec![
1✔
2049
                        fixture_record(
1✔
2050
                            "batch_source",
1✔
2051
                            "r1",
1✔
2052
                            3,
1✔
2053
                            "Inflation cools in latest report",
1✔
2054
                            "Core inflation moderated compared with prior quarter.",
1✔
2055
                        ),
1✔
2056
                        fixture_record(
1✔
2057
                            "batch_source",
1✔
2058
                            "r2",
1✔
2059
                            4,
1✔
2060
                            "Labor market remains resilient",
1✔
2061
                            "Job openings remain elevated despite slower growth.",
1✔
2062
                        ),
1✔
2063
                        fixture_record(
1✔
2064
                            "batch_source",
1✔
2065
                            "r3",
1✔
2066
                            5,
1✔
2067
                            "Manufacturing sentiment stabilizes",
1✔
2068
                            "Survey data suggests output expectations are improving.",
1✔
2069
                        ),
1✔
2070
                    ],
1✔
2071
                    recipes,
1✔
2072
                }) as DynSource]
1✔
2073
            },
1✔
2074
        );
2075

2076
        assert!(result.is_ok());
1✔
2077
        assert!(split_store_path.exists());
1✔
2078
    }
1✔
2079

2080
    #[test]
2081
    fn managed_demo_split_store_path_resolves_under_cache_group() {
1✔
2082
        let path = managed_demo_split_store_path().unwrap();
1✔
2083
        assert!(path.ends_with(MULTI_SOURCE_DEMO_STORE_FILENAME));
1✔
2084
        let parent = path
1✔
2085
            .parent()
1✔
2086
            .expect("managed split-store path should have a parent");
1✔
2087
        assert!(parent.ends_with(PathBuf::from(MULTI_SOURCE_DEMO_GROUP)));
1✔
2088
    }
1✔
2089

2090
    #[test]
2091
    fn run_multi_source_demo_help_returns_ok_without_work() {
1✔
2092
        let result = run_multi_source_demo(
1✔
2093
            ["--help".to_string()].into_iter(),
1✔
NEW
2094
            |_| -> Result<(), Box<dyn Error>> {
×
NEW
2095
                panic!("help path should return before resolving roots")
×
2096
            },
NEW
2097
            |_: &()| -> Vec<DynSource> {
×
NEW
2098
                panic!("help path should return before building sources")
×
2099
            },
2100
        );
2101

2102
        assert!(result.is_ok());
1✔
2103
    }
1✔
2104

2105
    #[test]
2106
    fn run_multi_source_demo_uses_managed_split_store_path_when_not_provided() {
1✔
2107
        let result = run_multi_source_demo(
1✔
2108
            ["--list-text-recipes".to_string()].into_iter(),
1✔
2109
            |_| Ok(()),
1✔
2110
            |_| {
1✔
2111
                vec![Box::new(TestSource {
1✔
2112
                    id: "managed_path_source".into(),
1✔
2113
                    count: Some(2),
1✔
2114
                    recipes: vec![default_recipe("managed_recipe")],
1✔
2115
                }) as DynSource]
1✔
2116
            },
1✔
2117
        );
2118

2119
        assert!(result.is_ok());
1✔
2120
    }
1✔
2121

2122
    #[test]
2123
    fn run_multi_source_demo_reset_errors_when_target_is_directory() {
1✔
2124
        let dir = tempdir().unwrap();
1✔
2125
        let split_store_path = dir.path().join("split_store_dir");
1✔
2126
        std::fs::create_dir(&split_store_path).unwrap();
1✔
2127

2128
        let result = run_multi_source_demo(
1✔
2129
            [
1✔
2130
                "--reset".to_string(),
1✔
2131
                "--split-store-path".to_string(),
1✔
2132
                split_store_path.to_string_lossy().to_string(),
1✔
2133
            ]
1✔
2134
            .into_iter(),
1✔
2135
            |_| Ok(()),
1✔
NEW
UNCOV
2136
            |_| Vec::<DynSource>::new(),
×
2137
        );
2138

2139
        let err = result.unwrap_err().to_string();
1✔
2140
        assert!(err.contains("failed to remove split store"));
1✔
2141
    }
1✔
2142

2143
    #[test]
2144
    fn print_summary_helpers_accept_empty_iterators() {
1✔
2145
        print_source_summary("empty summary", std::iter::empty::<&str>());
1✔
2146
        print_recipe_context_by_source("empty recipe context", std::iter::empty::<(&str, &str)>());
1✔
2147
    }
1✔
2148

2149
    #[cfg(feature = "extended-metrics")]
2150
    #[test]
2151
    fn metric_mean_median_handles_even_length_inputs() {
1✔
2152
        let mut vals = [1.0, 4.0, 2.0, 3.0];
1✔
2153
        let (mean, median) = metric_mean_median(&mut vals);
1✔
2154
        assert!((mean - 2.5).abs() < 1e-6);
1✔
2155
        assert!((median - 2.5).abs() < 1e-6);
1✔
2156
    }
1✔
2157

2158
    #[cfg(feature = "extended-metrics")]
2159
    #[test]
2160
    fn print_metric_summary_includes_multi_source_aggregate() {
1✔
2161
        let source_data = HashMap::from([
1✔
2162
            (
1✔
2163
                "source_a".to_string(),
1✔
2164
                vec![(0.9, 0.8, 0.2, 0.1), (0.8, 0.7, 0.3, 0.2)],
1✔
2165
            ),
1✔
2166
            (
1✔
2167
                "source_b".to_string(),
1✔
2168
                vec![(0.7, 0.6, 0.4, 0.3), (0.6, 0.5, 0.5, 0.4)],
1✔
2169
            ),
1✔
2170
        ]);
1✔
2171

2172
        print_metric_summary(&source_data);
1✔
2173
    }
1✔
2174
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc