• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SPF-OST / pytrnsys_process / 13560850870

27 Feb 2025 07:08AM UTC coverage: 98.182% (+0.02%) from 98.165%
13560850870

push

github

ahobeost
CI adjustments

7 of 8 new or added lines in 3 files covered. (87.5%)

4 existing lines in 2 files now uncovered.

1188 of 1210 relevant lines covered (98.18%)

1.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.0
/pytrnsys_process/process_batch.py
1
import pathlib as _pl
2✔
2
import time as _time
2✔
3
from collections import abc as _abc
2✔
4
from concurrent.futures import ProcessPoolExecutor, as_completed
2✔
5
from typing import List, Optional, Sequence, Union
2✔
6

7
import matplotlib.pyplot as _plt
2✔
8
import pandas as _pd
2✔
9

10
from pytrnsys_process import constants as const
2✔
11
from pytrnsys_process import data_structures as ds
2✔
12
from pytrnsys_process import logger as log
2✔
13
from pytrnsys_process import settings as sett
2✔
14
from pytrnsys_process import utils
2✔
15
from pytrnsys_process.process_sim import process_sim as ps
2✔
16

17

18
class UnableToProcessSimulationError(Exception):
2✔
19
    """Raised when a simulation cannot be processed."""
20

21

22
# pylint: disable=too-many-locals
23
def _process_batch(
2✔
24
    sim_folders: list[_pl.Path],
25
    processing_scenario: Union[
26
        _abc.Callable[[ds.Simulation], None],
27
        Sequence[_abc.Callable[[ds.Simulation], None]],
28
    ],
29
    results_folder: _pl.Path,
30
    parallel: bool = False,
31
    max_workers: int | None = None,
32
) -> ds.SimulationsData:
33
    """Common processing logic for both sequential and parallel batch processing.
34

35
    This internal function implements the core processing logic used by both sequential
36
    and parallel processing modes. It handles the setup of processing infrastructure,
37
    execution of processing tasks, and collection of results.
38

39
    Parameters
40
    __________
41
        sim_folders:
42
            List of simulation folders to process
43

44
        processing_scenario:
45
            Processing scenario(s) to apply to each simulation
46

47
        results_folder:
48
            Root folder containing all simulations
49

50
        parallel:
51
            Whether to process simulations in parallel
52

53
        max_workers:
54
            Maximum number of worker processes for parallel execution
55

56

57
    Returns
58
    _______
59
        SimulationsData containing the processed simulation results and metadata
60

61
    Note:
62
    _____
63
        This is an internal function that should not be called directly.
64
        Use process_single_simulation, process_whole_result_set, or
65
        process_whole_result_set_parallel instead.
66

67

68
    """
69
    start_time = _time.time()
2✔
70
    results = ds.ProcessingResults()
2✔
71
    simulations_data = ds.SimulationsData(
2✔
72
        path_to_simulations=results_folder.as_posix()
73
    )
74

75
    if parallel:
2✔
76
        with ProcessPoolExecutor(max_workers=max_workers) as executor:
2✔
77
            tasks = {}
2✔
78
            for sim_folder in sim_folders:
2✔
79
                log.main_logger.info(
2✔
80
                    "Submitting simulation folder for processing: %s",
81
                    sim_folder.name,
82
                )
83
                tasks[
2✔
84
                    executor.submit(
85
                        _process_simulation, sim_folder, processing_scenario
86
                    )
87
                ] = sim_folder
88

89
            for future in as_completed(tasks):
2✔
90
                try:
2✔
91
                    _handle_simulation_result(
2✔
92
                        future.result(), results, simulations_data
93
                    )
94
                except Exception as e:  # pylint: disable=broad-except
2✔
95
                    _handle_simulation_error(e, tasks[future], results)
2✔
96
    else:
97
        for sim_folder in sim_folders:
2✔
98
            try:
2✔
99
                log.main_logger.info(
2✔
100
                    "Processing simulation: %s", sim_folder.name
101
                )
102
                result = _process_simulation(sim_folder, processing_scenario)
2✔
103
                _handle_simulation_result(result, results, simulations_data)
2✔
104
            except Exception as e:  # pylint: disable=broad-except
2✔
105
                _handle_simulation_error(e, sim_folder, results)
2✔
106

107
    simulations_data = _concat_scalar(simulations_data)
2✔
108
    _log_processing_results(results)
2✔
109

110
    end_time = _time.time()
2✔
111
    execution_time = end_time - start_time
2✔
112
    log.main_logger.info(
2✔
113
        "%s execution time: %.2f seconds",
114
        "Parallel" if parallel else "Total",
115
        execution_time,
116
    )
117

118
    return simulations_data
2✔
119

120

121
def _handle_simulation_result(
2✔
122
    result: tuple[ds.Simulation, List[str]],
123
    results: ds.ProcessingResults,
124
    simulations_data: ds.SimulationsData,
125
) -> None:
126
    """Handle the result of a processed simulation.
127

128
    Parameters
129
    __________
130
        result: Tuple of (simulation, failed_scenarios)
131
        sim_folder: Path to the simulation folder
132
        results: ProcessingResults to update
133
        simulations_data: SimulationsData to update
134
    """
135
    simulation, failed_scenarios = result
2✔
136
    results.processed_count += 1
2✔
137
    simulations_data.simulations[_pl.Path(simulation.path).name] = simulation
2✔
138
    if failed_scenarios:
2✔
139
        results.failed_scenarios[_pl.Path(simulation.path).name] = (
2✔
140
            failed_scenarios
141
        )
142

143

144
def _handle_simulation_error(
2✔
145
    error: Exception,
146
    sim_folder: _pl.Path,
147
    results: ds.ProcessingResults,
148
) -> None:
149
    """Handle an error that occurred during simulation processing.
150

151
    Parameters
152
    __________
153
        error: The exception that occurred
154
        sim_folder: Path to the simulation folder
155
        results: ProcessingResults to update
156
    """
157
    results.error_count += 1
2✔
158
    results.failed_simulations.append(sim_folder.name)
2✔
159
    log.main_logger.error(
2✔
160
        "Failed to process simulation in %s: %s",
161
        sim_folder,
162
        str(error),
163
        exc_info=True,
164
    )
165

166

167
def process_single_simulation(
2✔
168
    sim_folder: _pl.Path,
169
    processing_scenarios: Union[
170
        _abc.Callable[[ds.Simulation], None],
171
        Sequence[_abc.Callable[[ds.Simulation], None]],
172
    ],
173
) -> ds.Simulation:
174
    """Process a single simulation folder using the provided processing scenario(s).
175

176
    Parameters
177
    __________
178
        sim_folder:
179
            Path to the simulation folder to process
180

181
        processing_scenarios:
182
            Single callable or sequence of callables that implement
183
            the processing logic for a simulation. Each callable should take a Simulation
184
            object as its only parameter.
185

186
    Returns
187
    _______
188
            Simulation object containing the processed data
189

190
    Example
191
    _______
192
            >>> import pathlib as _pl
193
            >>> from pytrnsys_process import api, data_structures
194
            ...
195
            >>> def processing_step_1(sim: data_structures.Simulation):
196
            ...     # Process simulation data
197
            ...     pass
198
            >>> results = api.process_single_simulation(
199
            ...     _pl.Path("path/to/simulation"),
200
            ...     processing_step_1
201
            ... )
202
    """
203
    log.initialize_logs()
2✔
204
    log.main_logger.info("Starting processing of simulation %s", sim_folder)
2✔
205
    sim_folders = [sim_folder]
2✔
206
    simulations_data = _process_batch(
2✔
207
        sim_folders, processing_scenarios, sim_folder.parent
208
    )
209
    try:
2✔
210
        return simulations_data.simulations[sim_folder.name]
2✔
211
    except KeyError as exc:
2✔
212
        raise UnableToProcessSimulationError(
2✔
213
            f"Failed to process simulation in {sim_folder}"
214
        ) from exc
215

216

217
def process_whole_result_set(
2✔
218
    results_folder: _pl.Path,
219
    processing_scenario: Union[
220
        _abc.Callable[[ds.Simulation], None],
221
        Sequence[_abc.Callable[[ds.Simulation], None]],
222
    ],
223
) -> ds.SimulationsData:
224
    """Process all simulation folders in a results directory sequentially.
225

226
    Processes each simulation folder found in the results directory one at a time,
227
    applying the provided processing scenario(s) to each simulation.
228

229
    Parameters
230
    __________
231
        results_folder:
232
            Path to the directory containing simulation folders.
233
            Each subfolder should contain valid simulation data files.
234

235
        processing_scenario:
236
            Single callable or sequence of callables that implement
237
            the processing logic for each simulation. Each callable should take a
238
            Simulation object as its only parameter and modify it in place.
239

240
    Returns
241
    _______
242
        SimulationsData: :class:`pytrnsys_process.api.SimulationsData`
243
            - monthly: Dict mapping simulation names to monthly DataFrame results
244
            - hourly: Dict mapping simulation names to hourly DataFrame results
245
            - scalar: DataFrame containing scalar/deck values from all simulations
246

247
    Raises
248
    ______
249
        ValueError: If results_folder doesn't exist or is not a directory
250
        Exception: Individual simulation failures are logged but not re-raised
251

252
    Example
253
    _______
254
        >>> import pathlib as _pl
255
        >>> from pytrnsys_process import api
256
        ...
257
        >>> def processing_step_1(sim):
258
        ...     # Process simulation data
259
        ...     pass
260
        >>> def processing_step_2(sim):
261
        ...     # Process simulation data
262
        ...     pass
263
        >>> results = api.process_whole_result_set(
264
        ...     _pl.Path("path/to/results"),
265
        ...     [processing_step_1, processing_step_2]
266
        ... )
267
    """
268
    _validate_folder(results_folder)
2✔
269
    log.initialize_logs()
2✔
270
    log.main_logger.info(
2✔
271
        "Starting batch processing of simulations in %s", results_folder
272
    )
273

274
    sim_folders = [
2✔
275
        sim_folder
276
        for sim_folder in results_folder.iterdir()
277
        if sim_folder.is_dir()
278
    ]
279
    simulations_data = _process_batch(
2✔
280
        sim_folders, processing_scenario, results_folder
281
    )
282
    utils.save_to_pickle(
2✔
283
        simulations_data,
284
        results_folder / const.FileNames.SIMULATIONS_DATA_PICKLE_FILE.value,
285
    )
286

287
    return simulations_data
2✔
288

289

290
def process_whole_result_set_parallel(
2✔
291
    results_folder: _pl.Path,
292
    processing_scenario: Union[
293
        _abc.Callable[[ds.Simulation], None],
294
        Sequence[_abc.Callable[[ds.Simulation], None]],
295
    ],
296
    max_workers: int | None = None,
297
) -> ds.SimulationsData:
298
    """Process all simulation folders in a results directory in parallel.
299

300
    Uses a ProcessPoolExecutor to process multiple simulations concurrently.
301

302
    Parameters
303
    __________
304
        results_folder:
305
            Path to the directory containing simulation folders.
306
            Each subfolder should contain valid simulation data files.
307
        processing_scenario:
308
            Single callable or sequence of callables that implement
309
            the processing logic for each simulation. Each callable should take a
310
            Simulation object as its only parameter.
311
        max_workers:
312
            Maximum number of worker processes to use. If None, defaults to
313
            the number of processors on the machine.
314

315
    Returns
316
    _______
317
        SimulationsData: :class:`pytrnsys_process.api.SimulationsData`
318
            - monthly: Dict mapping simulation names to monthly DataFrame results
319
            - hourly: Dict mapping simulation names to hourly DataFrame results
320
            - scalar: DataFrame containing scalar/deck values from all simulations
321

322
    Raises
323
    _______
324
        ValueError: If results_folder doesn't exist or is not a directory
325
        Exception: Individual simulation failures are logged but not re-raised
326

327
    Example
328
    _______
329
        >>> import pathlib as _pl
330
        >>> from pytrnsys_process import api
331
        ...
332
        >>> def processing_step_1(sim):
333
        ...     # Process simulation data
334
        ...     pass
335
        >>> def processing_step_2(sim):
336
        ...     # Process simulation data
337
        ...     pass
338
        >>> results = api.process_whole_result_set_parallel(
339
        ...     _pl.Path("path/to/results"),
340
        ...     [processing_step_1, processing_step_2]
341
        ... )
342
    """
343
    # The last :returns: ensures that the formatting works in PyCharm
344
    _validate_folder(results_folder)
2✔
345
    log.initialize_logs()
2✔
346
    log.main_logger.info(
2✔
347
        "Starting batch processing of simulations in %s with parallel execution",
348
        results_folder,
349
    )
350

351
    sim_folders = [
2✔
352
        sim_folder
353
        for sim_folder in results_folder.iterdir()
354
        if sim_folder.is_dir()
355
    ]
356
    simulations_data = _process_batch(
2✔
357
        sim_folders,
358
        processing_scenario,
359
        results_folder,
360
        parallel=True,
361
        max_workers=max_workers,
362
    )
363
    utils.save_to_pickle(
2✔
364
        simulations_data,
365
        results_folder / const.FileNames.SIMULATIONS_DATA_PICKLE_FILE.value,
366
    )
367

368
    return simulations_data
2✔
369

370

371
def do_comparison(
2✔
372
    comparison_scenario: Union[
373
        _abc.Callable[[ds.SimulationsData], None],
374
        _abc.Sequence[_abc.Callable[[ds.SimulationsData], None]],
375
    ],
376
    simulations_data: Optional[ds.SimulationsData] = None,
377
    results_folder: Optional[_pl.Path] = None,
378
) -> None:
379
    """Execute comparison scenarios on processed simulation results.
380

381
    Parameters
382
    __________
383
        comparison_scenario:
384
            Single callable or sequence of callables that implement
385
            the comparison logic. Each callable should take a SimulationsData
386
            object as its only parameter.
387
        simulations_data:
388
            Optional SimulationsData object containing the processed
389
            simulation data to be compared.
390
        results_folder:
391
            Optional Path to the directory containing simulation results.
392
            Used if simulations_data is not provided.
393

394
    Example
395
    __________
396
        >>> from pytrnsys_process import api
397
        ...
398
        >>> def comparison_step(simulations_data: ds.SimulationsData):
399
        ...     # Compare simulation results
400
        ...     pass
401
        ...
402
        >>> api.do_comparison(comparison_step, simulations_data=processed_results)
403
    """
404
    if not simulations_data:
2✔
405
        if not results_folder:
2✔
406
            raise ValueError(
2✔
407
                "Either simulations_data or results_folder must be provided to perform comparison"
408
            )
409
        path_to_simulations_data = results_folder / "simulations_data.pickle"
2✔
410
        if path_to_simulations_data.exists():
2✔
NEW
411
            simulations_data = utils.load_simulations_data_from_pickle(
×
412
                path_to_simulations_data
413
            )
414
        else:
415
            simulations_data = process_whole_result_set_parallel(
2✔
416
                results_folder, []
417
            )
418
    _process_comparisons(simulations_data, comparison_scenario)
2✔
419
    _plt.close("all")
2✔
420

421

422
def _process_comparisons(
2✔
423
    simulations_data: ds.SimulationsData,
424
    comparison_scenario: Union[
425
        _abc.Callable[[ds.SimulationsData], None],
426
        _abc.Sequence[_abc.Callable[[ds.SimulationsData], None]],
427
    ],
428
):
429
    scenario = (
2✔
430
        [comparison_scenario]
431
        if callable(comparison_scenario)
432
        else comparison_scenario
433
    )
434
    for step in scenario:
2✔
435
        try:
2✔
436
            step(simulations_data)
2✔
437
        except Exception as e:  # pylint: disable=broad-except
2✔
438
            scenario_name = getattr(step, "__name__", str(step))
2✔
439
            log.main_logger.error(
2✔
440
                "Scenario %s failed for comparison: %s ",
441
                scenario_name,
442
                str(e),
443
                exc_info=True,
444
            )
445

446

447
def _concat_scalar(simulation_data: ds.SimulationsData) -> ds.SimulationsData:
2✔
448
    scalar_values_to_concat = {
2✔
449
        sim_name: sim.scalar
450
        for sim_name, sim in simulation_data.simulations.items()
451
        if not sim.scalar.empty
452
    }
453
    if scalar_values_to_concat:
2✔
454
        simulation_data.scalar = _pd.concat(
2✔
455
            scalar_values_to_concat.values(),
456
            keys=scalar_values_to_concat.keys(),
457
        ).droplevel(1)
458
    return simulation_data
2✔
459

460

461
def _validate_folder(folder: _pl.Path) -> None:
2✔
462
    if not folder.exists():
2✔
463
        raise ValueError(f"Folder does not exist: {folder}")
×
464
    if not folder.is_dir():
2✔
465
        raise ValueError(f"Path is not a directory: {folder}")
×
466

467

468
def _process_simulation(
2✔
469
    sim_folder: _pl.Path,
470
    processing_scenarios: Union[
471
        _abc.Callable[[ds.Simulation], None],
472
        Sequence[_abc.Callable[[ds.Simulation], None]],
473
    ],
474
) -> tuple[ds.Simulation, List[str]]:
475
    sim_logger = log.get_simulation_logger(sim_folder)
2✔
476
    sim_logger.info("Starting simulation processing")
2✔
477
    sim_pickle_file = sim_folder / const.FileNames.SIMULATION_PICKLE_FILE.value
2✔
478
    simulation: ds.Simulation
479
    if sim_pickle_file.exists() and not sett.settings.reader.force_reread_prt:
2✔
480
        sim_logger.info("Loading simulation from pickle file")
2✔
481
        simulation = utils.load_simulation_from_pickle(
2✔
482
            sim_pickle_file, sim_logger
483
        )
484
    else:
485
        sim_logger.info("Processing simulation from raw files")
2✔
486
        sim_files = utils.get_files([sim_folder])
2✔
487
        simulation = ps.process_sim(sim_files, sim_folder)
2✔
488
        utils.save_to_pickle(simulation, sim_pickle_file, sim_logger)
2✔
489

490
    failed_scenarios = []
2✔
491

492
    # Convert single scenario to list for uniform handling
493
    scenarios = (
2✔
494
        [processing_scenarios]
495
        if callable(processing_scenarios)
496
        else processing_scenarios
497
    )
498

499
    for scenario in scenarios:
2✔
500
        try:
2✔
501
            scenario_name = getattr(scenario, "__name__", str(scenario))
2✔
502
            sim_logger.info("Running scenario: %s", scenario_name)
2✔
503
            scenario(simulation)
2✔
504
            sim_logger.info(
2✔
505
                "Successfully completed scenario: %s", scenario_name
506
            )
507
        except Exception as e:  # pylint: disable=broad-except
2✔
508
            failed_scenarios.append(scenario_name)
2✔
509
            sim_logger.error(
2✔
510
                "Scenario %s failed: %s",
511
                scenario_name,
512
                str(e),
513
                exc_info=True,
514
            )
515

516
    if failed_scenarios:
2✔
517
        sim_logger.warning(
2✔
518
            "Simulation completed with %d failed scenarios",
519
            len(failed_scenarios),
520
        )
521
    else:
522
        sim_logger.info("Simulation completed successfully")
2✔
523

524
    _plt.close("all")
2✔
525
    return simulation, failed_scenarios
2✔
526

527

528
def _log_processing_results(results: ds.ProcessingResults) -> None:
2✔
529
    log.main_logger.info("=" * 80)
2✔
530
    log.main_logger.info("BATCH PROCESSING SUMMARY")
2✔
531
    log.main_logger.info("-" * 80)
2✔
532
    log.main_logger.info(
2✔
533
        "Total simulations processed: %d | Failed: %d",
534
        results.processed_count,
535
        results.error_count,
536
    )
537

538
    if results.error_count > 0:
2✔
539
        log.main_logger.warning(
2✔
540
            "Some simulations failed to process. Check the log for details."
541
        )
542
        log.main_logger.warning("Failed simulations:")
2✔
543
        for sim in results.failed_simulations:
2✔
544
            log.main_logger.warning("  • %s", sim)
2✔
545

546
    if results.failed_scenarios:
2✔
547
        log.main_logger.warning("Failed scenarios by simulation:")
2✔
548
        for sim, scenarios in results.failed_scenarios.items():
2✔
549
            if scenarios:
2✔
550
                log.main_logger.warning("  • %s:", sim)
2✔
551
                for scenario in scenarios:
2✔
552
                    log.main_logger.warning("    - %s", scenario)
2✔
553
    log.main_logger.info("=" * 80)
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc