• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SPF-OST / pytrnsys_process / 15485406069

06 Jun 2025 07:39AM UTC coverage: 95.847% (+0.5%) from 95.307%
15485406069

push

github

ahobeost
Ci adjustments.

10 of 10 new or added lines in 4 files covered. (100.0%)

10 existing lines in 2 files now uncovered.

1177 of 1228 relevant lines covered (95.85%)

1.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.82
/pytrnsys_process/process/process_batch.py
1
import logging as _logging
2✔
2
import pathlib as _pl
2✔
3
import time as _time
2✔
4
import typing as _tp
2✔
5
from collections import abc as _abc
2✔
6
from concurrent import futures as _futures
2✔
7

8
import matplotlib.pyplot as _plt
2✔
9
import pandas as _pd
2✔
10

11
from pytrnsys_process import config as conf
2✔
12
from pytrnsys_process import log, util
2✔
13
from pytrnsys_process.process import data_structures as ds
2✔
14
from pytrnsys_process.process import process_sim as ps
2✔
15

16

17
class UnableToProcessSimulationError(Exception):
2✔
18
    """Raised when a simulation cannot be processed."""
19

20

21
# pylint: disable=too-many-locals
22
def _process_batch(
2✔
23
    sim_folders: list[_pl.Path],
24
    processing_scenario: _tp.Union[
25
        _abc.Callable[[ds.Simulation], None],
26
        _tp.Sequence[_abc.Callable[[ds.Simulation], None]],
27
    ],
28
    results_folder: _pl.Path,
29
    parallel: bool = False,
30
    max_workers: int | None = None,
31
) -> ds.SimulationsData:
32
    """Common processing logic for both sequential and parallel batch processing.
33

34
    This internal function implements the core processing logic used by both sequential
35
    and parallel processing modes. It handles the setup of processing infrastructure,
36
    execution of processing tasks, and collection of results.
37

38
    Parameters
39
    __________
40
        sim_folders:
41
            List of simulation folders to process
42

43
        processing_scenario:
44
            Processing scenario(s) to apply to each simulation
45

46
        results_folder:
47
            Root folder containing all simulations
48

49
        parallel:
50
            Whether to process simulations in parallel
51

52
        max_workers:
53
            Maximum number of worker processes for parallel execution
54

55

56
    Returns
57
    _______
58
        SimulationsData containing the processed simulation results and metadata
59

60
    Note:
61
    _____
62
        This is an internal function that should not be called directly.
63
        Use process_single_simulation, process_whole_result_set, or
64
        process_whole_result_set_parallel instead.
65

66

67
    """
68
    start_time = _time.time()
2✔
69
    results = ds.ProcessingResults()
2✔
70
    simulations_data = ds.SimulationsData(
2✔
71
        path_to_simulations=results_folder.as_posix()
72
    )
73

74
    main_logger = log.get_main_logger(results_folder)
2✔
75

76
    if parallel:
2✔
77
        with _futures.ProcessPoolExecutor(max_workers=max_workers) as executor:
2✔
78
            tasks = {}
2✔
79
            for sim_folder in sim_folders:
2✔
80
                main_logger.info(
2✔
81
                    "Submitting simulation folder for processing: %s",
82
                    sim_folder.name,
83
                )
84
                tasks[
2✔
85
                    executor.submit(
86
                        _process_simulation,
87
                        sim_folder,
88
                        processing_scenario,
89
                        conf.global_settings.reader.force_reread_prt,
90
                    )
91
                ] = sim_folder
92

93
            for future in _futures.as_completed(tasks):
2✔
94
                try:
2✔
95
                    _handle_simulation_result(
2✔
96
                        future.result(), results, simulations_data
97
                    )
98
                except Exception as e:  # pylint: disable=broad-except
×
99
                    _handle_simulation_error(
×
100
                        e, tasks[future], results, main_logger
101
                    )
102
    else:
103
        for sim_folder in sim_folders:
2✔
104
            try:
2✔
105
                main_logger.info("Processing simulation: %s", sim_folder.name)
2✔
106
                result = _process_simulation(sim_folder, processing_scenario)
2✔
107
                _handle_simulation_result(result, results, simulations_data)
2✔
108
            except Exception as e:  # pylint: disable=broad-except
×
109
                _handle_simulation_error(e, sim_folder, results, main_logger)
×
110

111
    simulations_data = _concat_scalar(simulations_data)
2✔
112
    _log_processing_results(results, main_logger)
2✔
113

114
    end_time = _time.time()
2✔
115
    execution_time = end_time - start_time
2✔
116
    main_logger.info(
2✔
117
        "%s execution time: %.2f seconds",
118
        "Parallel" if parallel else "Total",
119
        execution_time,
120
    )
121

122
    return simulations_data
2✔
123

124

125
def _handle_simulation_result(
2✔
126
    result: tuple[ds.Simulation, list[str]],
127
    results: ds.ProcessingResults,
128
    simulations_data: ds.SimulationsData,
129
) -> None:
130
    """Handle the result of a processed simulation.
131

132
    Parameters
133
    __________
134
        result: Tuple of (simulation, failed_scenarios)
135
        sim_folder: Path to the simulation folder
136
        results: ProcessingResults to update
137
        simulations_data: SimulationsData to update
138
    """
139
    simulation, failed_scenarios = result
2✔
140
    results.processed_count += 1
2✔
141
    simulations_data.simulations[_pl.Path(simulation.path).name] = simulation
2✔
142
    if failed_scenarios:
2✔
143
        results.failed_scenarios[_pl.Path(simulation.path).name] = (
2✔
144
            failed_scenarios
145
        )
146

147

148
def _handle_simulation_error(
2✔
149
    error: Exception,
150
    sim_folder: _pl.Path,
151
    results: ds.ProcessingResults,
152
    main_logger: _logging.Logger,
153
) -> None:
154
    """Handle an error that occurred during simulation processing.
155

156
    Parameters
157
    __________
158
        error: The exception that occurred
159
        sim_folder: Path to the simulation folder
160
        results: ProcessingResults to update
161
    """
162
    results.error_count += 1
×
163
    results.failed_simulations.append(sim_folder.name)
×
164
    main_logger.error(
×
165
        "Failed to process simulation in %s: %s",
166
        sim_folder,
167
        str(error),
168
        exc_info=True,
169
    )
170

171

172
def process_single_simulation(
2✔
173
    sim_folder: _pl.Path,
174
    processing_scenario: _tp.Union[
175
        _abc.Callable[[ds.Simulation], None],
176
        _tp.Sequence[_abc.Callable[[ds.Simulation], None]],
177
    ],
178
) -> ds.Simulation:
179
    """Process a single simulation folder using the provided processing step/scenario.
180

181
    Parameters
182
    __________
183
        sim_folder: pathlib.Path
184
            Path to the simulation folder to process
185

186
        processing_scenario: collections.abc.Callable or collections.abc.Sequence of collections.abc.Callable
187
            They should containd the processing logic for a simulation.
188
            Each callable should take a Simulation object as its only parameter and modify it in place.
189

190
    Returns
191
    _______
192
        Simulation: :class:`pytrnsys_process.api.Simulation`
193

194
    Example
195
    _______
196
            >>> import pathlib as _pl
197
            >>> from pytrnsys_process import api
198
            ...
199
            >>> def processing_step_1(sim: api.Simulation):
200
            ...     # Process simulation data
201
            ...     pass
202
            >>> results = api.process_single_simulation(
203
            ...     _pl.Path("path/to/simulation"),
204
            ...     processing_step_1
205
            ... )
206
    """
207
    main_logger = log.get_main_logger(sim_folder)
2✔
208
    log.initialize_logs(sim_folder)
2✔
209
    main_logger.info("Starting processing of simulation %s", sim_folder)
2✔
210
    sim_folders = [sim_folder]
2✔
211
    simulations_data = _process_batch(
2✔
212
        sim_folders, processing_scenario, sim_folder.parent
213
    )
214
    try:
2✔
215
        return simulations_data.simulations[sim_folder.name]
2✔
216
    except KeyError as exc:
×
217
        raise UnableToProcessSimulationError(
×
218
            f"Failed to process simulation in {sim_folder}"
219
        ) from exc
220

221

222
def process_whole_result_set(
2✔
223
    results_folder: _pl.Path,
224
    processing_scenario: _tp.Union[
225
        _abc.Callable[[ds.Simulation], None],
226
        _tp.Sequence[_abc.Callable[[ds.Simulation], None]],
227
    ],
228
) -> ds.SimulationsData:
229
    """Process all simulation folders in a results directory sequentially.
230

231
    Processes each simulation folder found in the results directory one at a time,
232
    applying the provided processing step/scenario to each simulation.
233

234
    Using the default settings your structure should look like this:
235

236
    | results_folder
237
    |     ├─ sim-1
238
    |     ├─ sim-2
239
    |     ├─ sim-3
240
    |         ├─ temp
241
    |             ├─ your-printer-files.prt
242

243
    Parameters
244
    __________
245
        results_folder pathlib.Path:
246
            Path to the directory containing simulation folders.
247
            Each subfolder should contain a temp folder containing valid simulation data files.
248

249
        processing_scenario: collections.abc.Callable or collections.abc.Sequence of collections.abc.Callable
250
            They should containd the processing logic for a simulation.
251
            Each callable should take a Simulation object as its only parameter and modify it in place.
252

253
    Returns
254
    _______
255
        SimulationsData: :class:`pytrnsys_process.api.SimulationsData`
256
            - monthly: Dict mapping simulation names to monthly DataFrame results
257
            - hourly: Dict mapping simulation names to hourly DataFrame results
258
            - scalar: DataFrame containing scalar/deck values from all simulations
259

260
    Raises
261
    ______
262
        ValueError: If results_folder doesn't exist or is not a directory
263
        Exception: Individual simulation failures are logged but not re-raised
264

265
    Example
266
    _______
267
        >>> import pathlib as _pl
268
        >>> from pytrnsys_process import api
269
        ...
270
        >>> def processing_step_1(sim):
271
        ...     # Process simulation data
272
        ...     pass
273
        >>> def processing_step_2(sim):
274
        ...     # Process simulation data
275
        ...     pass
276
        >>> results = api.process_whole_result_set(
277
        ...     _pl.Path("path/to/results"),
278
        ...     [processing_step_1, processing_step_2]
279
        ... )
280
    """
281
    _validate_folder(results_folder)
2✔
282
    main_logger = log.get_main_logger(results_folder)
2✔
283
    log.initialize_logs(results_folder)
2✔
284
    main_logger.info(
2✔
285
        "Starting batch processing of simulations in %s", results_folder
286
    )
287

288
    sim_folders = [
2✔
289
        sim_folder
290
        for sim_folder in results_folder.iterdir()
291
        if sim_folder.is_dir()
292
    ]
293
    simulations_data = _process_batch(
2✔
294
        sim_folders, processing_scenario, results_folder
295
    )
296
    util.save_to_pickle(
2✔
297
        simulations_data,
298
        results_folder / conf.FileNames.SIMULATIONS_DATA_PICKLE_FILE.value,
299
    )
300

301
    return simulations_data
2✔
302

303

304
def process_whole_result_set_parallel(
2✔
305
    results_folder: _pl.Path,
306
    processing_scenario: _tp.Union[
307
        _abc.Callable[[ds.Simulation], None],
308
        _tp.Sequence[_abc.Callable[[ds.Simulation], None]],
309
    ],
310
    max_workers: int | None = None,
311
) -> ds.SimulationsData:
312
    """Process all simulation folders in a results directory in parallel.
313

314
    Uses a ProcessPoolExecutor to process multiple simulations concurrently,
315
    applying the provided processing step/scenario to each simulation.
316

317
    Using the default settings your structure should look like this:
318

319
    | results_folder
320
    |     ├─ sim-1
321
    |     ├─ sim-2
322
    |     ├─ sim-3
323
    |         ├─ temp
324
    |             ├─ your-printer-files.prt
325

326
    Parameters
327
    __________
328
        results_folder pathlib.Path:
329
            Path to the directory containing simulation folders.
330
            Each subfolder should contain a temp folder containing valid simulation data files.
331

332
        processing_scenario: collections.abc.Callable or collections.abc.Sequence of collections.abc.Callable
333
            They should containd the processing logic for a simulation.
334
            Each callable should take a Simulation object as its only parameter and modify it in place.
335
        max_workers int, default None:
336
            Maximum number of worker processes to use.
337
            If None, defaults to the number of processors on the machine.
338

339
    Returns
340
    _______
341
        SimulationsData: :class:`pytrnsys_process.api.SimulationsData`
342
            - monthly: Dict mapping simulation names to monthly DataFrame results
343
            - hourly: Dict mapping simulation names to hourly DataFrame results
344
            - scalar: DataFrame containing scalar/deck values from all simulations
345

346
    Raises
347
    _______
348
        ValueError: If results_folder doesn't exist or is not a directory
349
        Exception: Individual simulation failures are logged but not re-raised
350

351
    Example
352
    _______
353
        >>> import pathlib as _pl
354
        >>> from pytrnsys_process import api
355
        ...
356
        >>> def processing_step_1(sim):
357
        ...     # Process simulation data
358
        ...     pass
359
        >>> def processing_step_2(sim):
360
        ...     # Process simulation data
361
        ...     pass
362
        >>> results = api.process_whole_result_set_parallel(
363
        ...     _pl.Path("path/to/results"),
364
        ...     [processing_step_1, processing_step_2]
365
        ... )
366
    """
367
    # The last :returns: ensures that the formatting works in PyCharm
368
    _validate_folder(results_folder)
2✔
369
    log.initialize_logs(results_folder)
2✔
370
    main_logger = log.get_main_logger(results_folder)
2✔
371
    main_logger.info(
2✔
372
        "Starting batch processing of simulations in %s with parallel execution",
373
        results_folder,
374
    )
375

376
    sim_folders = [
2✔
377
        sim_folder
378
        for sim_folder in results_folder.iterdir()
379
        if sim_folder.is_dir()
380
    ]
381
    simulations_data = _process_batch(
2✔
382
        sim_folders,
383
        processing_scenario,
384
        results_folder,
385
        parallel=True,
386
        max_workers=max_workers,
387
    )
388
    util.save_to_pickle(
2✔
389
        simulations_data,
390
        results_folder / conf.FileNames.SIMULATIONS_DATA_PICKLE_FILE.value,
391
    )
392

393
    return simulations_data
2✔
394

395

396
def do_comparison(
2✔
397
    comparison_scenario: _tp.Union[
398
        _abc.Callable[[ds.SimulationsData], None],
399
        _abc.Sequence[_abc.Callable[[ds.SimulationsData], None]],
400
    ],
401
    simulations_data: _tp.Optional[ds.SimulationsData] = None,
402
    results_folder: _tp.Optional[_pl.Path] = None,
403
) -> ds.SimulationsData:
404
    """Execute comparison scenarios on processed simulation results.
405

406
    Parameters
407
    __________
408
        comparison_scenario: collections.abc.Callable or collections.abc.Sequence of collections.abc.Callable
409
            They should containd the comparison logic.
410
            Each callable should take a SimulationsData object as its only parameter and modify it in place.
411

412
        simulations_data: SimulationsData, optional
413
            SimulationsData object containing the processed
414
            simulations data to be compared.
415

416
        results_folder: pathlib.Path, optional
417
            Path to the directory containing simulation results.
418
            Used if simulations_data is not provided.
419

420
    Returns
421
    _______
422
        SimulationsData: :class:`pytrnsys_process.api.SimulationsData`
423

424
    Example
425
    __________
426
        >>> from pytrnsys_process import api
427
        ...
428
        >>> def comparison_step(simulations_data: ds.SimulationsData):
429
        ...     # Compare simulation results
430
        ...     pass
431
        ...
432
        >>> api.do_comparison(comparison_step, simulations_data=processed_results)
433
    """
434
    if not simulations_data:
2✔
435
        if not results_folder:
2✔
436
            raise ValueError(
2✔
437
                "Either simulations_data or results_folder must be provided to perform comparison"
438
            )
439
        path_to_simulations_data = (
2✔
440
            results_folder / conf.FileNames.SIMULATIONS_DATA_PICKLE_FILE.value
441
        )
442
        if (
2✔
443
            path_to_simulations_data.exists()
444
            and not conf.global_settings.reader.force_reread_prt
445
        ):
446
            simulations_data = util.load_simulations_data_from_pickle(
2✔
447
                path_to_simulations_data
448
            )
449
            # Moving locations of files breaks the paths.
450
            # If the pickle file is found, then we know the new path is correct.
451
            # The original path is saved for later retrieval.
452
            simulations_data.path_to_simulations_original = (
2✔
453
                simulations_data.path_to_simulations
454
            )
455
            if not simulations_data.path_to_simulations == str(results_folder):
2✔
456
                simulations_data.path_to_simulations = str(results_folder)
2✔
457

458
        else:
459
            simulations_data = process_whole_result_set_parallel(
2✔
460
                results_folder, []
461
            )
462
    main_logger = log.get_main_logger(
2✔
463
        _pl.Path(simulations_data.path_to_simulations)
464
    )
465
    _process_comparisons(simulations_data, comparison_scenario, main_logger)
2✔
466
    _plt.close("all")
2✔
467

468
    return simulations_data
2✔
469

470

471
def _process_comparisons(
2✔
472
    simulations_data: ds.SimulationsData,
473
    comparison_scenario: _tp.Union[
474
        _abc.Callable[[ds.SimulationsData], None],
475
        _abc.Sequence[_abc.Callable[[ds.SimulationsData], None]],
476
    ],
477
    main_logger: _logging.Logger,
478
):
479
    scenario = (
2✔
480
        [comparison_scenario]
481
        if callable(comparison_scenario)
482
        else comparison_scenario
483
    )
484
    for step in scenario:
2✔
485
        try:
2✔
486
            step(simulations_data)
2✔
UNCOV
487
        except Exception as e:  # pylint: disable=broad-except
×
UNCOV
488
            scenario_name = getattr(step, "__name__", str(step))
×
UNCOV
489
            main_logger.error(
×
490
                "Scenario %s failed for comparison: %s ",
491
                scenario_name,
492
                str(e),
493
                exc_info=True,
494
            )
495

496

497
def _concat_scalar(simulation_data: ds.SimulationsData) -> ds.SimulationsData:
2✔
498
    scalar_values_to_concat = {
2✔
499
        sim_name: sim.scalar
500
        for sim_name, sim in simulation_data.simulations.items()
501
        if not sim.scalar.empty
502
    }
503
    if scalar_values_to_concat:
2✔
504
        simulation_data.scalar = _pd.concat(
2✔
505
            scalar_values_to_concat.values(),
506
            keys=scalar_values_to_concat.keys(),
507
        ).droplevel(1)
508
    return simulation_data
2✔
509

510

511
def _validate_folder(folder: _pl.Path) -> None:
2✔
512
    if not folder.exists():
2✔
UNCOV
513
        raise ValueError(f"Folder does not exist: {folder}")
×
514
    if not folder.is_dir():
2✔
UNCOV
515
        raise ValueError(f"Path is not a directory: {folder}")
×
516

517

518
def _process_simulation(
2✔
519
    sim_folder: _pl.Path,
520
    processing_scenarios: _tp.Union[
521
        _abc.Callable[[ds.Simulation], None],
522
        _tp.Sequence[_abc.Callable[[ds.Simulation], None]],
523
    ],
524
    force_reread_prt: _tp.Optional[bool] = None,
525
) -> tuple[ds.Simulation, list[str]]:
526
    if not force_reread_prt:
2✔
527
        force_reread_prt = conf.global_settings.reader.force_reread_prt
2✔
528
    sim_logger = log.get_simulation_logger(sim_folder)
2✔
529
    sim_logger.info("Starting simulation processing")
2✔
530
    sim_pickle_file = sim_folder / conf.FileNames.SIMULATION_PICKLE_FILE.value
2✔
531
    simulation: ds.Simulation
532
    if sim_pickle_file.exists() and not force_reread_prt:
2✔
533
        sim_logger.info("Loading simulation from pickle file")
2✔
534
        simulation = util.load_simulation_from_pickle(
2✔
535
            sim_pickle_file, sim_logger
536
        )
537
    else:
538
        sim_logger.info("Processing simulation from raw files")
2✔
539
        sim_files = util.get_files([sim_folder])
2✔
540
        simulation = ps.process_sim(sim_files, sim_folder)
2✔
541
        if sim_files:
2✔
542
            util.save_to_pickle(simulation, sim_pickle_file, sim_logger)
2✔
543

544
    failed_scenarios = []
2✔
545

546
    # Convert single scenario to list for uniform handling
547
    scenarios = (
2✔
548
        [processing_scenarios]
549
        if callable(processing_scenarios)
550
        else processing_scenarios
551
    )
552

553
    for scenario in scenarios:
2✔
554
        try:
2✔
555
            scenario_name = getattr(scenario, "__name__", str(scenario))
2✔
556
            sim_logger.info("Running scenario: %s", scenario_name)
2✔
557
            scenario(simulation)
2✔
558
            sim_logger.info(
2✔
559
                "Successfully completed scenario: %s", scenario_name
560
            )
561
        except Exception as e:  # pylint: disable=broad-except
2✔
562
            failed_scenarios.append(scenario_name)
2✔
563
            sim_logger.error(
2✔
564
                "Scenario %s failed: %s",
565
                scenario_name,
566
                str(e),
567
                exc_info=True,
568
            )
569

570
    if failed_scenarios:
2✔
571
        sim_logger.warning(
2✔
572
            "Simulation completed with %d failed scenarios",
573
            len(failed_scenarios),
574
        )
575
    else:
576
        sim_logger.info("Simulation completed successfully")
2✔
577

578
    _plt.close("all")
2✔
579
    return simulation, failed_scenarios
2✔
580

581

582
def _log_processing_results(
2✔
583
    results: ds.ProcessingResults, main_logger: _logging.Logger
584
) -> None:
585
    main_logger.info("=" * 80)
2✔
586
    main_logger.info("BATCH PROCESSING SUMMARY")
2✔
587
    main_logger.info("-" * 80)
2✔
588
    main_logger.info(
2✔
589
        "Total simulations processed: %d | Failed: %d",
590
        results.processed_count,
591
        results.error_count,
592
    )
593

594
    if results.error_count > 0:
2✔
UNCOV
595
        main_logger.warning(
×
596
            "Some simulations failed to process. Check the log for details."
597
        )
UNCOV
598
        main_logger.warning("Failed simulations:")
×
UNCOV
599
        for sim in results.failed_simulations:
×
UNCOV
600
            main_logger.warning("  • %s", sim)
×
601

602
    if results.failed_scenarios:
2✔
603
        main_logger.warning("Failed scenarios by simulation:")
2✔
604
        for sim, scenarios in results.failed_scenarios.items():
2✔
605
            if scenarios:
2✔
606
                main_logger.warning("  • %s:", sim)
2✔
607
                for scenario in scenarios:
2✔
608
                    main_logger.warning("    - %s", scenario)
2✔
609
    main_logger.info("=" * 80)
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc