• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SPF-OST / pytrnsys_process / 13769979091

10 Mar 2025 04:38PM UTC coverage: 95.383% (+0.004%) from 95.379%
13769979091

push

github

sebastian-swob
support for force re-read of raw files in do_comparison
doc improvements

2 of 2 new or added lines in 2 files covered. (100.0%)

9 existing lines in 2 files now uncovered.

1095 of 1148 relevant lines covered (95.38%)

1.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.61
/pytrnsys_process/process/process_batch.py
1
import logging as _logging
2✔
2
import pathlib as _pl
2✔
3
import time as _time
2✔
4
from collections import abc as _abc
2✔
5
from concurrent.futures import ProcessPoolExecutor, as_completed
2✔
6
from typing import List, Optional, Sequence, Union
2✔
7

8
import matplotlib.pyplot as _plt
2✔
9
import pandas as _pd
2✔
10

11
from pytrnsys_process import config as conf
2✔
12
from pytrnsys_process import log, util
2✔
13
from pytrnsys_process.process import data_structures as ds
2✔
14
from pytrnsys_process.process import process_sim as ps
2✔
15

16

17
class UnableToProcessSimulationError(Exception):
2✔
18
    """Raised when a simulation cannot be processed."""
19

20

21
# pylint: disable=too-many-locals
22
def _process_batch(
2✔
23
    sim_folders: list[_pl.Path],
24
    processing_scenario: Union[
25
        _abc.Callable[[ds.Simulation], None],
26
        Sequence[_abc.Callable[[ds.Simulation], None]],
27
    ],
28
    results_folder: _pl.Path,
29
    parallel: bool = False,
30
    max_workers: int | None = None,
31
) -> ds.SimulationsData:
32
    """Common processing logic for both sequential and parallel batch processing.
33

34
    This internal function implements the core processing logic used by both sequential
35
    and parallel processing modes. It handles the setup of processing infrastructure,
36
    execution of processing tasks, and collection of results.
37

38
    Parameters
39
    __________
40
        sim_folders:
41
            List of simulation folders to process
42

43
        processing_scenario:
44
            Processing scenario(s) to apply to each simulation
45

46
        results_folder:
47
            Root folder containing all simulations
48

49
        parallel:
50
            Whether to process simulations in parallel
51

52
        max_workers:
53
            Maximum number of worker processes for parallel execution
54

55

56
    Returns
57
    _______
58
        SimulationsData containing the processed simulation results and metadata
59

60
    Note:
61
    _____
62
        This is an internal function that should not be called directly.
63
        Use process_single_simulation, process_whole_result_set, or
64
        process_whole_result_set_parallel instead.
65

66

67
    """
68
    start_time = _time.time()
2✔
69
    results = ds.ProcessingResults()
2✔
70
    simulations_data = ds.SimulationsData(
2✔
71
        path_to_simulations=results_folder.as_posix()
72
    )
73

74
    main_logger = log.get_main_logger(results_folder)
2✔
75

76
    if parallel:
2✔
77
        with ProcessPoolExecutor(max_workers=max_workers) as executor:
2✔
78
            tasks = {}
2✔
79
            for sim_folder in sim_folders:
2✔
80
                main_logger.info(
2✔
81
                    "Submitting simulation folder for processing: %s",
82
                    sim_folder.name,
83
                )
84
                tasks[
2✔
85
                    executor.submit(
86
                        _process_simulation, sim_folder, processing_scenario
87
                    )
88
                ] = sim_folder
89

90
            for future in as_completed(tasks):
2✔
91
                try:
2✔
92
                    _handle_simulation_result(
2✔
93
                        future.result(), results, simulations_data
94
                    )
95
                except Exception as e:  # pylint: disable=broad-except
×
96
                    _handle_simulation_error(
×
97
                        e, tasks[future], results, main_logger
98
                    )
99
    else:
100
        for sim_folder in sim_folders:
2✔
101
            try:
2✔
102
                main_logger.info("Processing simulation: %s", sim_folder.name)
2✔
103
                result = _process_simulation(sim_folder, processing_scenario)
2✔
104
                _handle_simulation_result(result, results, simulations_data)
2✔
105
            except Exception as e:  # pylint: disable=broad-except
×
106
                _handle_simulation_error(e, sim_folder, results, main_logger)
×
107

108
    simulations_data = _concat_scalar(simulations_data)
2✔
109
    _log_processing_results(results, main_logger)
2✔
110

111
    end_time = _time.time()
2✔
112
    execution_time = end_time - start_time
2✔
113
    main_logger.info(
2✔
114
        "%s execution time: %.2f seconds",
115
        "Parallel" if parallel else "Total",
116
        execution_time,
117
    )
118

119
    return simulations_data
2✔
120

121

122
def _handle_simulation_result(
2✔
123
    result: tuple[ds.Simulation, List[str]],
124
    results: ds.ProcessingResults,
125
    simulations_data: ds.SimulationsData,
126
) -> None:
127
    """Handle the result of a processed simulation.
128

129
    Parameters
130
    __________
131
        result: Tuple of (simulation, failed_scenarios)
132
        sim_folder: Path to the simulation folder
133
        results: ProcessingResults to update
134
        simulations_data: SimulationsData to update
135
    """
136
    simulation, failed_scenarios = result
2✔
137
    results.processed_count += 1
2✔
138
    simulations_data.simulations[_pl.Path(simulation.path).name] = simulation
2✔
139
    if failed_scenarios:
2✔
140
        results.failed_scenarios[_pl.Path(simulation.path).name] = (
2✔
141
            failed_scenarios
142
        )
143

144

145
def _handle_simulation_error(
2✔
146
    error: Exception,
147
    sim_folder: _pl.Path,
148
    results: ds.ProcessingResults,
149
    main_logger: _logging.Logger,
150
) -> None:
151
    """Handle an error that occurred during simulation processing.
152

153
    Parameters
154
    __________
155
        error: The exception that occurred
156
        sim_folder: Path to the simulation folder
157
        results: ProcessingResults to update
158
    """
159
    results.error_count += 1
×
160
    results.failed_simulations.append(sim_folder.name)
×
161
    main_logger.error(
×
162
        "Failed to process simulation in %s: %s",
163
        sim_folder,
164
        str(error),
165
        exc_info=True,
166
    )
167

168

169
def process_single_simulation(
2✔
170
    sim_folder: _pl.Path,
171
    processing_scenario: Union[
172
        _abc.Callable[[ds.Simulation], None],
173
        Sequence[_abc.Callable[[ds.Simulation], None]],
174
    ],
175
) -> ds.Simulation:
176
    """Process a single simulation folder using the provided processing step/scenario.
177

178
    Parameters
179
    __________
180
        sim_folder: pathlib.Path
181
            Path to the simulation folder to process
182

183
        processing_scenario: collections.abc.Callable or collections.abc.Sequence of collections.abc.Callable
184
            They should containd the processing logic for a simulation.
185
            Each callable should take a Simulation object as its only parameter and modify it in place.
186

187
    Returns
188
    _______
189
        Simulation: :class:`pytrnsys_process.api.Simulation`
190

191
    Example
192
    _______
193
            >>> import pathlib as _pl
194
            >>> from pytrnsys_process import api
195
            ...
196
            >>> def processing_step_1(sim: api.Simulation):
197
            ...     # Process simulation data
198
            ...     pass
199
            >>> results = api.process_single_simulation(
200
            ...     _pl.Path("path/to/simulation"),
201
            ...     processing_step_1
202
            ... )
203
    """
204
    main_logger = log.get_main_logger(sim_folder)
2✔
205
    log.initialize_logs(sim_folder)
2✔
206
    main_logger.info("Starting processing of simulation %s", sim_folder)
2✔
207
    sim_folders = [sim_folder]
2✔
208
    simulations_data = _process_batch(
2✔
209
        sim_folders, processing_scenario, sim_folder.parent
210
    )
211
    try:
2✔
212
        return simulations_data.simulations[sim_folder.name]
2✔
213
    except KeyError as exc:
×
214
        raise UnableToProcessSimulationError(
×
215
            f"Failed to process simulation in {sim_folder}"
216
        ) from exc
217

218

219
def process_whole_result_set(
2✔
220
    results_folder: _pl.Path,
221
    processing_scenario: Union[
222
        _abc.Callable[[ds.Simulation], None],
223
        Sequence[_abc.Callable[[ds.Simulation], None]],
224
    ],
225
) -> ds.SimulationsData:
226
    """Process all simulation folders in a results directory sequentially.
227

228
    Processes each simulation folder found in the results directory one at a time,
229
    applying the provided processing step/scenario to each simulation.
230

231
    Using the default settings your structure should look like this:
232

233
    | results_folder
234
    |     ├─ sim-1
235
    |     ├─ sim-2
236
    |     ├─ sim-3
237
    |         ├─ temp
238
    |             ├─ your-printer-files.prt
239

240
    Parameters
241
    __________
242
        results_folder pathlib.Path:
243
            Path to the directory containing simulation folders.
244
            Each subfolder should contain a temp folder containing valid simulation data files.
245

246
        processing_scenario: collections.abc.Callable or collections.abc.Sequence of collections.abc.Callable
247
            They should containd the processing logic for a simulation.
248
            Each callable should take a Simulation object as its only parameter and modify it in place.
249

250
    Returns
251
    _______
252
        SimulationsData: :class:`pytrnsys_process.api.SimulationsData`
253
            - monthly: Dict mapping simulation names to monthly DataFrame results
254
            - hourly: Dict mapping simulation names to hourly DataFrame results
255
            - scalar: DataFrame containing scalar/deck values from all simulations
256

257
    Raises
258
    ______
259
        ValueError: If results_folder doesn't exist or is not a directory
260
        Exception: Individual simulation failures are logged but not re-raised
261

262
    Example
263
    _______
264
        >>> import pathlib as _pl
265
        >>> from pytrnsys_process import api
266
        ...
267
        >>> def processing_step_1(sim):
268
        ...     # Process simulation data
269
        ...     pass
270
        >>> def processing_step_2(sim):
271
        ...     # Process simulation data
272
        ...     pass
273
        >>> results = api.process_whole_result_set(
274
        ...     _pl.Path("path/to/results"),
275
        ...     [processing_step_1, processing_step_2]
276
        ... )
277
    """
278
    _validate_folder(results_folder)
2✔
279
    main_logger = log.get_main_logger(results_folder)
2✔
280
    log.initialize_logs(results_folder)
2✔
281
    main_logger.info(
2✔
282
        "Starting batch processing of simulations in %s", results_folder
283
    )
284

285
    sim_folders = [
2✔
286
        sim_folder
287
        for sim_folder in results_folder.iterdir()
288
        if sim_folder.is_dir()
289
    ]
290
    simulations_data = _process_batch(
2✔
291
        sim_folders, processing_scenario, results_folder
292
    )
293
    util.save_to_pickle(
2✔
294
        simulations_data,
295
        results_folder / conf.FileNames.SIMULATIONS_DATA_PICKLE_FILE.value,
296
    )
297

298
    return simulations_data
2✔
299

300

301
def process_whole_result_set_parallel(
2✔
302
    results_folder: _pl.Path,
303
    processing_scenario: Union[
304
        _abc.Callable[[ds.Simulation], None],
305
        Sequence[_abc.Callable[[ds.Simulation], None]],
306
    ],
307
    max_workers: int | None = None,
308
) -> ds.SimulationsData:
309
    """Process all simulation folders in a results directory in parallel.
310

311
    Uses a ProcessPoolExecutor to process multiple simulations concurrently,
312
    applying the provided processing step/scenario to each simulation.
313

314
    Using the default settings your structure should look like this:
315

316
    | results_folder
317
    |     ├─ sim-1
318
    |     ├─ sim-2
319
    |     ├─ sim-3
320
    |         ├─ temp
321
    |             ├─ your-printer-files.prt
322

323
    Parameters
324
    __________
325
        results_folder pathlib.Path:
326
            Path to the directory containing simulation folders.
327
            Each subfolder should contain a temp folder containing valid simulation data files.
328

329
        processing_scenario: collections.abc.Callable or collections.abc.Sequence of collections.abc.Callable
330
            They should containd the processing logic for a simulation.
331
            Each callable should take a Simulation object as its only parameter and modify it in place.
332
        max_workers int, default None:
333
            Maximum number of worker processes to use.
334
            If None, defaults to the number of processors on the machine.
335

336
    Returns
337
    _______
338
        SimulationsData: :class:`pytrnsys_process.api.SimulationsData`
339
            - monthly: Dict mapping simulation names to monthly DataFrame results
340
            - hourly: Dict mapping simulation names to hourly DataFrame results
341
            - scalar: DataFrame containing scalar/deck values from all simulations
342

343
    Raises
344
    _______
345
        ValueError: If results_folder doesn't exist or is not a directory
346
        Exception: Individual simulation failures are logged but not re-raised
347

348
    Example
349
    _______
350
        >>> import pathlib as _pl
351
        >>> from pytrnsys_process import api
352
        ...
353
        >>> def processing_step_1(sim):
354
        ...     # Process simulation data
355
        ...     pass
356
        >>> def processing_step_2(sim):
357
        ...     # Process simulation data
358
        ...     pass
359
        >>> results = api.process_whole_result_set_parallel(
360
        ...     _pl.Path("path/to/results"),
361
        ...     [processing_step_1, processing_step_2]
362
        ... )
363
    """
364
    # The last :returns: ensures that the formatting works in PyCharm
365
    _validate_folder(results_folder)
2✔
366
    log.initialize_logs(results_folder)
2✔
367
    main_logger = log.get_main_logger(results_folder)
2✔
368
    main_logger.info(
2✔
369
        "Starting batch processing of simulations in %s with parallel execution",
370
        results_folder,
371
    )
372

373
    sim_folders = [
2✔
374
        sim_folder
375
        for sim_folder in results_folder.iterdir()
376
        if sim_folder.is_dir()
377
    ]
378
    simulations_data = _process_batch(
2✔
379
        sim_folders,
380
        processing_scenario,
381
        results_folder,
382
        parallel=True,
383
        max_workers=max_workers,
384
    )
385
    util.save_to_pickle(
2✔
386
        simulations_data,
387
        results_folder / conf.FileNames.SIMULATIONS_DATA_PICKLE_FILE.value,
388
    )
389

390
    return simulations_data
2✔
391

392

393
def do_comparison(
2✔
394
    comparison_scenario: Union[
395
        _abc.Callable[[ds.SimulationsData], None],
396
        _abc.Sequence[_abc.Callable[[ds.SimulationsData], None]],
397
    ],
398
    simulations_data: Optional[ds.SimulationsData] = None,
399
    results_folder: Optional[_pl.Path] = None,
400
) -> None:
401
    """Execute comparison scenarios on processed simulation results.
402

403
    Parameters
404
    __________
405
        comparison_scenario: collections.abc.Callable or collections.abc.Sequence of collections.abc.Callable
406
            They should containd the comparison logic.
407
            Each callable should take a SimulationsData object as its only parameter and modify it in place.
408

409
        simulations_data: SimulationsData, optional
410
            SimulationsData object containing the processed
411
            simulations data to be compared.
412

413
        results_folder: pathlib.Path, optional
414
            Path to the directory containing simulation results.
415
            Used if simulations_data is not provided.
416

417
    Example
418
    __________
419
        >>> from pytrnsys_process import api
420
        ...
421
        >>> def comparison_step(simulations_data: ds.SimulationsData):
422
        ...     # Compare simulation results
423
        ...     pass
424
        ...
425
        >>> api.do_comparison(comparison_step, simulations_data=processed_results)
426
    """
427
    if not simulations_data:
2✔
428
        if not results_folder:
2✔
429
            raise ValueError(
2✔
430
                "Either simulations_data or results_folder must be provided to perform comparison"
431
            )
432
        path_to_simulations_data = (
2✔
433
            results_folder / conf.FileNames.SIMULATIONS_DATA_PICKLE_FILE.value
434
        )
435
        if (
2✔
436
            path_to_simulations_data.exists()
437
            and not conf.global_settings.reader.force_reread_prt
438
        ):
UNCOV
439
            simulations_data = util.load_simulations_data_from_pickle(
×
440
                path_to_simulations_data
441
            )
442
        else:
443
            simulations_data = process_whole_result_set_parallel(
2✔
444
                results_folder, []
445
            )
446
    main_logger = log.get_main_logger(
2✔
447
        _pl.Path(simulations_data.path_to_simulations)
448
    )
449
    _process_comparisons(simulations_data, comparison_scenario, main_logger)
2✔
450
    _plt.close("all")
2✔
451

452

453
def _process_comparisons(
2✔
454
    simulations_data: ds.SimulationsData,
455
    comparison_scenario: Union[
456
        _abc.Callable[[ds.SimulationsData], None],
457
        _abc.Sequence[_abc.Callable[[ds.SimulationsData], None]],
458
    ],
459
    main_logger: _logging.Logger,
460
):
461
    scenario = (
2✔
462
        [comparison_scenario]
463
        if callable(comparison_scenario)
464
        else comparison_scenario
465
    )
466
    for step in scenario:
2✔
467
        try:
2✔
468
            step(simulations_data)
2✔
469
        except Exception as e:  # pylint: disable=broad-except
2✔
470
            scenario_name = getattr(step, "__name__", str(step))
2✔
471
            main_logger.error(
2✔
472
                "Scenario %s failed for comparison: %s ",
473
                scenario_name,
474
                str(e),
475
                exc_info=True,
476
            )
477

478

479
def _concat_scalar(simulation_data: ds.SimulationsData) -> ds.SimulationsData:
2✔
480
    scalar_values_to_concat = {
2✔
481
        sim_name: sim.scalar
482
        for sim_name, sim in simulation_data.simulations.items()
483
        if not sim.scalar.empty
484
    }
485
    if scalar_values_to_concat:
2✔
486
        simulation_data.scalar = _pd.concat(
2✔
487
            scalar_values_to_concat.values(),
488
            keys=scalar_values_to_concat.keys(),
489
        ).droplevel(1)
490
    return simulation_data
2✔
491

492

493
def _validate_folder(folder: _pl.Path) -> None:
2✔
494
    if not folder.exists():
2✔
495
        raise ValueError(f"Folder does not exist: {folder}")
×
496
    if not folder.is_dir():
2✔
497
        raise ValueError(f"Path is not a directory: {folder}")
×
498

499

500
def _process_simulation(
2✔
501
    sim_folder: _pl.Path,
502
    processing_scenarios: Union[
503
        _abc.Callable[[ds.Simulation], None],
504
        Sequence[_abc.Callable[[ds.Simulation], None]],
505
    ],
506
) -> tuple[ds.Simulation, List[str]]:
507
    sim_logger = log.get_simulation_logger(sim_folder)
2✔
508
    sim_logger.info("Starting simulation processing")
2✔
509
    sim_pickle_file = sim_folder / conf.FileNames.SIMULATION_PICKLE_FILE.value
2✔
510
    simulation: ds.Simulation
511
    if (
2✔
512
        sim_pickle_file.exists()
513
        and not conf.global_settings.reader.force_reread_prt
514
    ):
515
        sim_logger.info("Loading simulation from pickle file")
2✔
516
        simulation = util.load_simulation_from_pickle(
2✔
517
            sim_pickle_file, sim_logger
518
        )
519
    else:
520
        sim_logger.info("Processing simulation from raw files")
2✔
521
        sim_files = util.get_files([sim_folder])
2✔
522
        simulation = ps.process_sim(sim_files, sim_folder)
2✔
523
        util.save_to_pickle(simulation, sim_pickle_file, sim_logger)
2✔
524

525
    failed_scenarios = []
2✔
526

527
    # Convert single scenario to list for uniform handling
528
    scenarios = (
2✔
529
        [processing_scenarios]
530
        if callable(processing_scenarios)
531
        else processing_scenarios
532
    )
533

534
    for scenario in scenarios:
2✔
535
        try:
2✔
536
            scenario_name = getattr(scenario, "__name__", str(scenario))
2✔
537
            sim_logger.info("Running scenario: %s", scenario_name)
2✔
538
            scenario(simulation)
2✔
539
            sim_logger.info(
2✔
540
                "Successfully completed scenario: %s", scenario_name
541
            )
542
        except Exception as e:  # pylint: disable=broad-except
2✔
543
            failed_scenarios.append(scenario_name)
2✔
544
            sim_logger.error(
2✔
545
                "Scenario %s failed: %s",
546
                scenario_name,
547
                str(e),
548
                exc_info=True,
549
            )
550

551
    if failed_scenarios:
2✔
552
        sim_logger.warning(
2✔
553
            "Simulation completed with %d failed scenarios",
554
            len(failed_scenarios),
555
        )
556
    else:
557
        sim_logger.info("Simulation completed successfully")
2✔
558

559
    _plt.close("all")
2✔
560
    return simulation, failed_scenarios
2✔
561

562

563
def _log_processing_results(
2✔
564
    results: ds.ProcessingResults, main_logger: _logging.Logger
565
) -> None:
566
    main_logger.info("=" * 80)
2✔
567
    main_logger.info("BATCH PROCESSING SUMMARY")
2✔
568
    main_logger.info("-" * 80)
2✔
569
    main_logger.info(
2✔
570
        "Total simulations processed: %d | Failed: %d",
571
        results.processed_count,
572
        results.error_count,
573
    )
574

575
    if results.error_count > 0:
2✔
576
        main_logger.warning(
×
577
            "Some simulations failed to process. Check the log for details."
578
        )
579
        main_logger.warning("Failed simulations:")
×
580
        for sim in results.failed_simulations:
×
581
            main_logger.warning("  • %s", sim)
×
582

583
    if results.failed_scenarios:
2✔
584
        main_logger.warning("Failed scenarios by simulation:")
2✔
585
        for sim, scenarios in results.failed_scenarios.items():
2✔
586
            if scenarios:
2✔
587
                main_logger.warning("  • %s:", sim)
2✔
588
                for scenario in scenarios:
2✔
589
                    main_logger.warning("    - %s", scenario)
2✔
590
    main_logger.info("=" * 80)
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc