• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SPF-OST / pytrnsys_process / 13545305374

26 Feb 2025 01:43PM UTC coverage: 98.165% (+0.001%) from 98.164%
13545305374

push

github

sebastian-swob
Merge remote-tracking branch 'origin/61-improve-docs-and-add-a-tutorial' into 61-improve-docs-and-add-a-tutorial

1177 of 1199 relevant lines covered (98.17%)

1.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.0
/pytrnsys_process/process_batch.py
1
import pathlib as _pl
2✔
2
import time as _time
2✔
3
from collections import abc as _abc
2✔
4
from concurrent.futures import ProcessPoolExecutor, as_completed
2✔
5
from typing import List, Optional, Sequence, Union
2✔
6

7
import matplotlib.pyplot as _plt
2✔
8
import pandas as _pd
2✔
9

10
from pytrnsys_process import constants as const
2✔
11
from pytrnsys_process import data_structures as ds
2✔
12
from pytrnsys_process import logger as log
2✔
13
from pytrnsys_process import settings as sett
2✔
14
from pytrnsys_process import utils
2✔
15
from pytrnsys_process.process_sim import process_sim as ps
2✔
16

17

18
class UnableToProcessSimulationError(Exception):
2✔
19
    """Raised when a simulation cannot be processed."""
20

21

22
# pylint: disable=too-many-locals
23
def _process_batch(
2✔
24
        sim_folders: list[_pl.Path],
25
        processing_scenario: Union[
26
            _abc.Callable[[ds.Simulation], None],
27
            Sequence[_abc.Callable[[ds.Simulation], None]],
28
        ],
29
        results_folder: _pl.Path,
30
        parallel: bool = False,
31
        max_workers: int | None = None,
32
) -> ds.SimulationsData:
33
    """Common processing logic for both sequential and parallel batch processing.
34

35
    This internal function implements the core processing logic used by both sequential
36
    and parallel processing modes. It handles the setup of processing infrastructure,
37
    execution of processing tasks, and collection of results.
38

39
    Parameters
40
    __________
41
        sim_folders:
42
            List of simulation folders to process
43

44
        processing_scenario:
45
            Processing scenario(s) to apply to each simulation
46

47
        results_folder:
48
            Root folder containing all simulations
49

50
        parallel:
51
            Whether to process simulations in parallel
52

53
        max_workers:
54
            Maximum number of worker processes for parallel execution
55

56

57
    Returns
58
    _______
59
        SimulationsData containing the processed simulation results and metadata
60

61
    Note:
62
    _____
63
        This is an internal function that should not be called directly.
64
        Use process_single_simulation, process_whole_result_set, or
65
        process_whole_result_set_parallel instead.
66

67

68
    """
69
    start_time = _time.time()
2✔
70
    results = ds.ProcessingResults()
2✔
71
    simulations_data = ds.SimulationsData(path_to_simulations=results_folder.as_posix())
2✔
72

73
    if parallel:
2✔
74
        with ProcessPoolExecutor(max_workers=max_workers) as executor:
2✔
75
            tasks = {}
2✔
76
            for sim_folder in sim_folders:
2✔
77
                log.main_logger.info(
2✔
78
                    "Submitting simulation folder for processing: %s",
79
                    sim_folder.name,
80
                )
81
                tasks[
2✔
82
                    executor.submit(
83
                        _process_simulation, sim_folder, processing_scenario
84
                    )
85
                ] = sim_folder
86

87
            for future in as_completed(tasks):
2✔
88
                try:
2✔
89
                    _handle_simulation_result(
2✔
90
                        future.result(), results, simulations_data
91
                    )
92
                except Exception as e:  # pylint: disable=broad-except
2✔
93
                    _handle_simulation_error(e, tasks[future], results)
2✔
94
    else:
95
        for sim_folder in sim_folders:
2✔
96
            try:
2✔
97
                log.main_logger.info(
2✔
98
                    "Processing simulation: %s", sim_folder.name
99
                )
100
                result = _process_simulation(sim_folder, processing_scenario)
2✔
101
                _handle_simulation_result(result, results, simulations_data)
2✔
102
            except Exception as e:  # pylint: disable=broad-except
2✔
103
                _handle_simulation_error(e, sim_folder, results)
2✔
104

105
    simulations_data = _concat_scalar(simulations_data)
2✔
106
    _log_processing_results(results)
2✔
107

108
    end_time = _time.time()
2✔
109
    execution_time = end_time - start_time
2✔
110
    log.main_logger.info(
2✔
111
        "%s execution time: %.2f seconds",
112
        "Parallel" if parallel else "Total",
113
        execution_time,
114
    )
115

116
    return simulations_data
2✔
117

118

119
def _handle_simulation_result(
2✔
120
        result: tuple[ds.Simulation, List[str]],
121
        results: ds.ProcessingResults,
122
        simulations_data: ds.SimulationsData,
123
) -> None:
124
    """Handle the result of a processed simulation.
125

126
    Parameters
127
    __________
128
        result: Tuple of (simulation, failed_scenarios)
129
        sim_folder: Path to the simulation folder
130
        results: ProcessingResults to update
131
        simulations_data: SimulationsData to update
132
    """
133
    simulation, failed_scenarios = result
2✔
134
    results.processed_count += 1
2✔
135
    simulations_data.simulations[_pl.Path(simulation.path).name] = simulation
2✔
136
    if failed_scenarios:
2✔
137
        results.failed_scenarios[_pl.Path(simulation.path).name] = failed_scenarios
2✔
138

139

140
def _handle_simulation_error(
2✔
141
        error: Exception,
142
        sim_folder: _pl.Path,
143
        results: ds.ProcessingResults,
144
) -> None:
145
    """Handle an error that occurred during simulation processing.
146

147
    Parameters
148
    __________
149
        error: The exception that occurred
150
        sim_folder: Path to the simulation folder
151
        results: ProcessingResults to update
152
    """
153
    results.error_count += 1
2✔
154
    results.failed_simulations.append(sim_folder.name)
2✔
155
    log.main_logger.error(
2✔
156
        "Failed to process simulation in %s: %s",
157
        sim_folder,
158
        str(error),
159
        exc_info=True,
160
    )
161

162

163
def process_single_simulation(
2✔
164
        sim_folder: _pl.Path,
165
        processing_scenarios: Union[
166
            _abc.Callable[[ds.Simulation], None],
167
            Sequence[_abc.Callable[[ds.Simulation], None]],
168
        ],
169
) -> ds.Simulation:
170
    """Process a single simulation folder using the provided processing scenario(s).
171

172
    Parameters
173
    __________
174
        sim_folder:
175
            Path to the simulation folder to process
176

177
        processing_scenarios:
178
            Single callable or sequence of callables that implement
179
            the processing logic for a simulation. Each callable should take a Simulation
180
            object as its only parameter.
181

182
    Returns
183
    _______
184
            Simulation object containing the processed data
185

186
    Example
187
    _______
188

189
            >>> import pathlib as _pl
190
            >>> from pytrnsys_process import api, data_structures
191
            ...
192
            >>> def processing_step_1(sim: data_structures.Simulation):
193
            ...     # Process simulation data
194
            ...     pass
195
            >>> results = api.process_single_simulation(
196
            ...     _pl.Path("path/to/simulation"),
197
            ...     processing_step_1
198
            ... )
199
    """
200
    log.initialize_logs()
2✔
201
    log.main_logger.info("Starting processing of simulation %s", sim_folder)
2✔
202
    sim_folders = [sim_folder]
2✔
203
    simulations_data = _process_batch(
2✔
204
        sim_folders, processing_scenarios, sim_folder.parent
205
    )
206
    try:
2✔
207
        return simulations_data.simulations[sim_folder.name]
2✔
208
    except KeyError as exc:
2✔
209
        raise UnableToProcessSimulationError(
2✔
210
            f"Failed to process simulation in {sim_folder}"
211
        ) from exc
212

213

214
def process_whole_result_set(
2✔
215
        results_folder: _pl.Path,
216
        processing_scenario: Union[
217
            _abc.Callable[[ds.Simulation], None],
218
            Sequence[_abc.Callable[[ds.Simulation], None]],
219
        ],
220
) -> ds.SimulationsData:
221
    """Process all simulation folders in a results directory sequentially.
222

223
    Processes each simulation folder found in the results directory one at a time,
224
    applying the provided processing scenario(s) to each simulation.
225

226
    Parameters
227
    __________
228
        results_folder:
229
            Path to the directory containing simulation folders.
230
            Each subfolder should contain valid simulation data files.
231

232
        processing_scenario:
233
            Single callable or sequence of callables that implement
234
            the processing logic for each simulation. Each callable should take a
235
            Simulation object as its only parameter and modify it in place.
236

237
    Returns
238
    _______
239
        SimulationsData object containing:
240
            - monthly: Dict mapping simulation names to monthly DataFrame results
241
            - hourly: Dict mapping simulation names to hourly DataFrame results
242
            - scalar: DataFrame containing scalar/deck values from all simulations
243
        ProcessingResults containing counts of processed and failed simulations
244

245
    Raises
246
    _______
247
        ValueError: If results_folder doesn't exist or is not a directory
248
        Exception: Individual simulation failures are logged but not re-raised
249

250
    Example
251
    _______
252
        >>> import pathlib as _pl
253
        >>> from pytrnsys_process import api
254
        ...
255
        >>> def processing_step_1(sim):
256
        ...     # Process simulation data
257
        ...     pass
258
        >>> def processing_step_2(sim):
259
        ...     # Process simulation data
260
        ...     pass
261
        >>> results = api.process_whole_result_set(
262
        ...     _pl.Path("path/to/results"),
263
        ...     [processing_step_1, processing_step_2]
264
        ... )
265
    """
266
    _validate_folder(results_folder)
2✔
267
    log.initialize_logs()
2✔
268
    log.main_logger.info(
2✔
269
        "Starting batch processing of simulations in %s", results_folder
270
    )
271

272
    sim_folders = [
2✔
273
        sim_folder
274
        for sim_folder in results_folder.iterdir()
275
        if sim_folder.is_dir()
276
    ]
277
    simulations_data = _process_batch(
2✔
278
        sim_folders, processing_scenario, results_folder
279
    )
280
    utils.save_to_pickle(
2✔
281
        simulations_data,
282
        results_folder / const.FileNames.SIMULATIONS_DATA_PICKLE_FILE.value,
283
    )
284

285
    return simulations_data
2✔
286

287

288
def process_whole_result_set_parallel(
2✔
289
    results_folder: _pl.Path,
290
        processing_scenario: Union[
291
            _abc.Callable[[ds.Simulation], None],
292
            Sequence[_abc.Callable[[ds.Simulation], None]],
293
        ],
294
    max_workers: int | None = None,
295
) -> ds.SimulationsData:
296
    """Process all simulation folders in a results directory in parallel.
297

298
    Uses a ProcessPoolExecutor to process multiple simulations concurrently.
299

300
    Parameters
301
    __________
302
        results_folder:
303
            Path to the directory containing simulation folders.
304
            Each subfolder should contain valid simulation data files.
305
        processing_scenario:
306
            Single callable or sequence of callables that implement
307
            the processing logic for each simulation. Each callable should take a
308
            Simulation object as its only parameter.
309
        max_workers:
310
            Maximum number of worker processes to use. If None, defaults to
311
            the number of processors on the machine.
312

313
    Returns
314
    _______
315
        SimulationsData: :class:`pytrnsys_process.api.SimulationsData`
316
            - monthly: Dict mapping simulation names to monthly DataFrame results
317
            - hourly: Dict mapping simulation names to hourly DataFrame results
318
            - scalar: DataFrame containing scalar/deck values from all simulations
319

320
    Raises
321
    _______
322
        ValueError: If results_folder doesn't exist or is not a directory
323
        Exception: Individual simulation failures are logged but not re-raised
324

325
    Example
326
    _______
327
        >>> import pathlib as _pl
328
        >>> from pytrnsys_process import api
329
        ...
330
        >>> def processing_step_1(sim):
331
        ...     # Process simulation data
332
        ...     pass
333
        >>> def processing_step_2(sim):
334
        ...     # Process simulation data
335
        ...     pass
336
        >>> results = api.process_whole_result_set_parallel(
337
        ...     _pl.Path("path/to/results"),
338
        ...     [processing_step_1, processing_step_2]
339
        ... )
340
    """
341
    # The last :returns: ensures that the formatting works in PyCharm
342
    _validate_folder(results_folder)
2✔
343
    log.initialize_logs()
2✔
344
    log.main_logger.info(
2✔
345
        "Starting batch processing of simulations in %s with parallel execution",
346
        results_folder,
347
    )
348

349
    sim_folders = [
2✔
350
        sim_folder
351
        for sim_folder in results_folder.iterdir()
352
        if sim_folder.is_dir()
353
    ]
354
    simulations_data = _process_batch(
2✔
355
        sim_folders,
356
        processing_scenario,
357
        results_folder,
358
        parallel=True,
359
        max_workers=max_workers,
360
    )
361
    utils.save_to_pickle(
2✔
362
        simulations_data,
363
        results_folder / const.FileNames.SIMULATIONS_DATA_PICKLE_FILE.value,
364
    )
365

366
    return simulations_data
2✔
367

368

369
def do_comparison(
2✔
370
        comparison_scenario: Union[
371
            _abc.Callable[[ds.SimulationsData], None],
372
            _abc.Sequence[_abc.Callable[[ds.SimulationsData], None]],
373
        ],
374
        simulations_data: Optional[ds.SimulationsData] = None,
375
        results_folder: Optional[_pl.Path] = None,
376
) -> None:
377
    """Execute comparison scenarios on processed simulation results.
378

379
        Parameters
380
        __________
381
            comparison_scenario:
382
                Single callable or sequence of callables that implement
383
                the comparison logic. Each callable should take a SimulationsData
384
                object as its only parameter.
385
            simulations_data:
386
                Optional SimulationsData object containing the processed
387
                simulation data to be compared.
388
            results_folder:
389
                Optional Path to the directory containing simulation results.
390
                Used if simulations_data is not provided.
391

392
        Example
393
        __________
394
            >>> from pytrnsys_process import api
395
            ...
396
            >>> def comparison_step(simulations_data: ds.SimulationsData):
397
            ...     # Compare simulation results
398
            ...     pass
399
            ...
400
            >>> api.do_comparison(comparison_step, simulations_data=processed_results)
401
    """
402
    if not simulations_data:
2✔
403
        if not results_folder:
2✔
404
            raise ValueError("Either simulations_data or results_folder must be provided to perform comparison")
2✔
405
        path_to_simulations_data = results_folder / "simulations_data.pickle"
2✔
406
        if path_to_simulations_data.exists():
2✔
407
            simulations_data = utils.load_simulations_data_from_pickle(path_to_simulations_data)
×
408
        else:
409
            simulations_data = process_whole_result_set_parallel(results_folder, [])
2✔
410
    _process_comparisons(simulations_data, comparison_scenario)
2✔
411
    _plt.close("all")
2✔
412

413

414

415
def _process_comparisons(
2✔
416
        simulations_data: ds.SimulationsData,
417
        comparison_scenario: Union[
418
            _abc.Callable[[ds.SimulationsData], None],
419
            _abc.Sequence[_abc.Callable[[ds.SimulationsData], None]],
420
        ],
421
):
422
    scenario = (
2✔
423
        [comparison_scenario]
424
        if callable(comparison_scenario)
425
        else comparison_scenario
426
    )
427
    for step in scenario:
2✔
428
        try:
2✔
429
            step(simulations_data)
2✔
430
        except Exception as e:  # pylint: disable=broad-except
2✔
431
            scenario_name = getattr(step, "__name__", str(step))
2✔
432
            log.main_logger.error(
2✔
433
                "Scenario %s failed for comparison: %s ",
434
                scenario_name,
435
                str(e),
436
                exc_info=True,
437
            )
438

439

440
def _concat_scalar(simulation_data: ds.SimulationsData) -> ds.SimulationsData:
2✔
441
    scalar_values_to_concat = {
2✔
442
        sim_name: sim.scalar
443
        for sim_name, sim in simulation_data.simulations.items()
444
        if not sim.scalar.empty
445
    }
446
    if scalar_values_to_concat:
2✔
447
        simulation_data.scalar = _pd.concat(
2✔
448
            scalar_values_to_concat.values(),
449
            keys=scalar_values_to_concat.keys(),
450
        ).droplevel(1)
451
    return simulation_data
2✔
452

453

454
def _validate_folder(folder: _pl.Path) -> None:
2✔
455
    if not folder.exists():
2✔
456
        raise ValueError(f"Folder does not exist: {folder}")
×
457
    if not folder.is_dir():
2✔
458
        raise ValueError(f"Path is not a directory: {folder}")
×
459

460

461
def _process_simulation(
2✔
462
        sim_folder: _pl.Path,
463
        processing_scenarios: Union[
464
            _abc.Callable[[ds.Simulation], None],
465
            Sequence[_abc.Callable[[ds.Simulation], None]],
466
        ],
467
) -> tuple[ds.Simulation, List[str]]:
468
    sim_logger = log.get_simulation_logger(sim_folder)
2✔
469
    sim_logger.info("Starting simulation processing")
2✔
470
    sim_pickle_file = sim_folder / const.FileNames.SIMULATION_PICKLE_FILE.value
2✔
471
    simulation: ds.Simulation
472
    if sim_pickle_file.exists() and not sett.settings.reader.force_reread_prt:
2✔
473
        sim_logger.info("Loading simulation from pickle file")
2✔
474
        simulation = utils.load_simulation_from_pickle(
2✔
475
            sim_pickle_file, sim_logger
476
        )
477
    else:
478
        sim_logger.info("Processing simulation from raw files")
2✔
479
        sim_files = utils.get_files([sim_folder])
2✔
480
        simulation = ps.process_sim(sim_files, sim_folder)
2✔
481
        utils.save_to_pickle(simulation, sim_pickle_file, sim_logger)
2✔
482

483
    failed_scenarios = []
2✔
484

485
    # Convert single scenario to list for uniform handling
486
    scenarios = (
2✔
487
        [processing_scenarios]
488
        if callable(processing_scenarios)
489
        else processing_scenarios
490
    )
491

492
    for scenario in scenarios:
2✔
493
        try:
2✔
494
            scenario_name = getattr(scenario, "__name__", str(scenario))
2✔
495
            sim_logger.info("Running scenario: %s", scenario_name)
2✔
496
            scenario(simulation)
2✔
497
            sim_logger.info(
2✔
498
                "Successfully completed scenario: %s", scenario_name
499
            )
500
        except Exception as e:  # pylint: disable=broad-except
2✔
501
            failed_scenarios.append(scenario_name)
2✔
502
            sim_logger.error(
2✔
503
                "Scenario %s failed: %s",
504
                scenario_name,
505
                str(e),
506
                exc_info=True,
507
            )
508

509
    if failed_scenarios:
2✔
510
        sim_logger.warning(
2✔
511
            "Simulation completed with %d failed scenarios",
512
            len(failed_scenarios),
513
        )
514
    else:
515
        sim_logger.info("Simulation completed successfully")
2✔
516

517
    _plt.close("all")
2✔
518
    return simulation, failed_scenarios
2✔
519

520

521
def _log_processing_results(results: ds.ProcessingResults) -> None:
2✔
522
    log.main_logger.info("=" * 80)
2✔
523
    log.main_logger.info("BATCH PROCESSING SUMMARY")
2✔
524
    log.main_logger.info("-" * 80)
2✔
525
    log.main_logger.info(
2✔
526
        "Total simulations processed: %d | Failed: %d",
527
        results.processed_count,
528
        results.error_count,
529
    )
530

531
    if results.error_count > 0:
2✔
532
        log.main_logger.warning(
2✔
533
            "Some simulations failed to process. Check the log for details."
534
        )
535
        log.main_logger.warning("Failed simulations:")
2✔
536
        for sim in results.failed_simulations:
2✔
537
            log.main_logger.warning("  • %s", sim)
2✔
538

539
    if results.failed_scenarios:
2✔
540
        log.main_logger.warning("Failed scenarios by simulation:")
2✔
541
        for sim, scenarios in results.failed_scenarios.items():
2✔
542
            if scenarios:
2✔
543
                log.main_logger.warning("  • %s:", sim)
2✔
544
                for scenario in scenarios:
2✔
545
                    log.main_logger.warning("    - %s", scenario)
2✔
546
    log.main_logger.info("=" * 80)
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc