• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SPF-OST / pytrnsys_process / 13546303843

26 Feb 2025 02:33PM UTC coverage: 98.165%. Remained the same
13546303843

push

github

ahobeost
Finished adjusting docstrings to numpy style

1177 of 1199 relevant lines covered (98.17%)

1.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.0
/pytrnsys_process/process_batch.py
1
import pathlib as _pl
2✔
2
import time as _time
2✔
3
from collections import abc as _abc
2✔
4
from concurrent.futures import ProcessPoolExecutor, as_completed
2✔
5
from typing import List, Optional, Sequence, Union
2✔
6

7
import matplotlib.pyplot as _plt
2✔
8
import pandas as _pd
2✔
9

10
from pytrnsys_process import constants as const
2✔
11
from pytrnsys_process import data_structures as ds
2✔
12
from pytrnsys_process import logger as log
2✔
13
from pytrnsys_process import settings as sett
2✔
14
from pytrnsys_process import utils
2✔
15
from pytrnsys_process.process_sim import process_sim as ps
2✔
16

17

18
class UnableToProcessSimulationError(Exception):
2✔
19
    """Raised when a simulation cannot be processed."""
20

21

22
# pylint: disable=too-many-locals
23
def _process_batch(
2✔
24
        sim_folders: list[_pl.Path],
25
        processing_scenario: Union[
26
            _abc.Callable[[ds.Simulation], None],
27
            Sequence[_abc.Callable[[ds.Simulation], None]],
28
        ],
29
        results_folder: _pl.Path,
30
        parallel: bool = False,
31
        max_workers: int | None = None,
32
) -> ds.SimulationsData:
33
    """Common processing logic for both sequential and parallel batch processing.
34

35
    This internal function implements the core processing logic used by both sequential
36
    and parallel processing modes. It handles the setup of processing infrastructure,
37
    execution of processing tasks, and collection of results.
38

39
    Parameters
40
    __________
41
        sim_folders:
42
            List of simulation folders to process
43

44
        processing_scenario:
45
            Processing scenario(s) to apply to each simulation
46

47
        results_folder:
48
            Root folder containing all simulations
49

50
        parallel:
51
            Whether to process simulations in parallel
52

53
        max_workers:
54
            Maximum number of worker processes for parallel execution
55

56

57
    Returns
58
    _______
59
        SimulationsData containing the processed simulation results and metadata
60

61
    Note:
62
    _____
63
        This is an internal function that should not be called directly.
64
        Use process_single_simulation, process_whole_result_set, or
65
        process_whole_result_set_parallel instead.
66

67

68
    """
69
    start_time = _time.time()
2✔
70
    results = ds.ProcessingResults()
2✔
71
    simulations_data = ds.SimulationsData(path_to_simulations=results_folder.as_posix())
2✔
72

73
    if parallel:
2✔
74
        with ProcessPoolExecutor(max_workers=max_workers) as executor:
2✔
75
            tasks = {}
2✔
76
            for sim_folder in sim_folders:
2✔
77
                log.main_logger.info(
2✔
78
                    "Submitting simulation folder for processing: %s",
79
                    sim_folder.name,
80
                )
81
                tasks[
2✔
82
                    executor.submit(
83
                        _process_simulation, sim_folder, processing_scenario
84
                    )
85
                ] = sim_folder
86

87
            for future in as_completed(tasks):
2✔
88
                try:
2✔
89
                    _handle_simulation_result(
2✔
90
                        future.result(), results, simulations_data
91
                    )
92
                except Exception as e:  # pylint: disable=broad-except
2✔
93
                    _handle_simulation_error(e, tasks[future], results)
2✔
94
    else:
95
        for sim_folder in sim_folders:
2✔
96
            try:
2✔
97
                log.main_logger.info(
2✔
98
                    "Processing simulation: %s", sim_folder.name
99
                )
100
                result = _process_simulation(sim_folder, processing_scenario)
2✔
101
                _handle_simulation_result(result, results, simulations_data)
2✔
102
            except Exception as e:  # pylint: disable=broad-except
2✔
103
                _handle_simulation_error(e, sim_folder, results)
2✔
104

105
    simulations_data = _concat_scalar(simulations_data)
2✔
106
    _log_processing_results(results)
2✔
107

108
    end_time = _time.time()
2✔
109
    execution_time = end_time - start_time
2✔
110
    log.main_logger.info(
2✔
111
        "%s execution time: %.2f seconds",
112
        "Parallel" if parallel else "Total",
113
        execution_time,
114
    )
115

116
    return simulations_data
2✔
117

118

119
def _handle_simulation_result(
2✔
120
        result: tuple[ds.Simulation, List[str]],
121
        results: ds.ProcessingResults,
122
        simulations_data: ds.SimulationsData,
123
) -> None:
124
    """Handle the result of a processed simulation.
125

126
    Parameters
127
    __________
128
        result: Tuple of (simulation, failed_scenarios)
129
        sim_folder: Path to the simulation folder
130
        results: ProcessingResults to update
131
        simulations_data: SimulationsData to update
132
    """
133
    simulation, failed_scenarios = result
2✔
134
    results.processed_count += 1
2✔
135
    simulations_data.simulations[_pl.Path(simulation.path).name] = simulation
2✔
136
    if failed_scenarios:
2✔
137
        results.failed_scenarios[_pl.Path(simulation.path).name] = failed_scenarios
2✔
138

139

140
def _handle_simulation_error(
2✔
141
        error: Exception,
142
        sim_folder: _pl.Path,
143
        results: ds.ProcessingResults,
144
) -> None:
145
    """Handle an error that occurred during simulation processing.
146

147
    Parameters
148
    __________
149
        error: The exception that occurred
150
        sim_folder: Path to the simulation folder
151
        results: ProcessingResults to update
152
    """
153
    results.error_count += 1
2✔
154
    results.failed_simulations.append(sim_folder.name)
2✔
155
    log.main_logger.error(
2✔
156
        "Failed to process simulation in %s: %s",
157
        sim_folder,
158
        str(error),
159
        exc_info=True,
160
    )
161

162

163
def process_single_simulation(
2✔
164
        sim_folder: _pl.Path,
165
        processing_scenarios: Union[
166
            _abc.Callable[[ds.Simulation], None],
167
            Sequence[_abc.Callable[[ds.Simulation], None]],
168
        ],
169
) -> ds.Simulation:
170
    """Process a single simulation folder using the provided processing scenario(s).
171

172
    Parameters
173
    __________
174
        sim_folder:
175
            Path to the simulation folder to process
176

177
        processing_scenarios:
178
            Single callable or sequence of callables that implement
179
            the processing logic for a simulation. Each callable should take a Simulation
180
            object as its only parameter.
181

182
    Returns
183
    _______
184
            Simulation object containing the processed data
185

186
    Example
187
    _______
188
            >>> import pathlib as _pl
189
            >>> from pytrnsys_process import api, data_structures
190
            ...
191
            >>> def processing_step_1(sim: data_structures.Simulation):
192
            ...     # Process simulation data
193
            ...     pass
194
            >>> results = api.process_single_simulation(
195
            ...     _pl.Path("path/to/simulation"),
196
            ...     processing_step_1
197
            ... )
198
    """
199
    log.initialize_logs()
2✔
200
    log.main_logger.info("Starting processing of simulation %s", sim_folder)
2✔
201
    sim_folders = [sim_folder]
2✔
202
    simulations_data = _process_batch(
2✔
203
        sim_folders, processing_scenarios, sim_folder.parent
204
    )
205
    try:
2✔
206
        return simulations_data.simulations[sim_folder.name]
2✔
207
    except KeyError as exc:
2✔
208
        raise UnableToProcessSimulationError(
2✔
209
            f"Failed to process simulation in {sim_folder}"
210
        ) from exc
211

212

213
def process_whole_result_set(
2✔
214
        results_folder: _pl.Path,
215
        processing_scenario: Union[
216
            _abc.Callable[[ds.Simulation], None],
217
            Sequence[_abc.Callable[[ds.Simulation], None]],
218
        ],
219
) -> ds.SimulationsData:
220
    """Process all simulation folders in a results directory sequentially.
221

222
    Processes each simulation folder found in the results directory one at a time,
223
    applying the provided processing scenario(s) to each simulation.
224

225
    Parameters
226
    __________
227
        results_folder:
228
            Path to the directory containing simulation folders.
229
            Each subfolder should contain valid simulation data files.
230

231
        processing_scenario:
232
            Single callable or sequence of callables that implement
233
            the processing logic for each simulation. Each callable should take a
234
            Simulation object as its only parameter and modify it in place.
235

236
    Returns
237
    _______
238
        SimulationsData: :class:`pytrnsys_process.api.SimulationsData`
239
            - monthly: Dict mapping simulation names to monthly DataFrame results
240
            - hourly: Dict mapping simulation names to hourly DataFrame results
241
            - scalar: DataFrame containing scalar/deck values from all simulations
242

243
    Raises
244
    ______
245
        ValueError: If results_folder doesn't exist or is not a directory
246
        Exception: Individual simulation failures are logged but not re-raised
247

248
    Example
249
    _______
250
        >>> import pathlib as _pl
251
        >>> from pytrnsys_process import api
252
        ...
253
        >>> def processing_step_1(sim):
254
        ...     # Process simulation data
255
        ...     pass
256
        >>> def processing_step_2(sim):
257
        ...     # Process simulation data
258
        ...     pass
259
        >>> results = api.process_whole_result_set(
260
        ...     _pl.Path("path/to/results"),
261
        ...     [processing_step_1, processing_step_2]
262
        ... )
263
    """
264
    _validate_folder(results_folder)
2✔
265
    log.initialize_logs()
2✔
266
    log.main_logger.info(
2✔
267
        "Starting batch processing of simulations in %s", results_folder
268
    )
269

270
    sim_folders = [
2✔
271
        sim_folder
272
        for sim_folder in results_folder.iterdir()
273
        if sim_folder.is_dir()
274
    ]
275
    simulations_data = _process_batch(
2✔
276
        sim_folders, processing_scenario, results_folder
277
    )
278
    utils.save_to_pickle(
2✔
279
        simulations_data,
280
        results_folder / const.FileNames.SIMULATIONS_DATA_PICKLE_FILE.value,
281
    )
282

283
    return simulations_data
2✔
284

285

286
def process_whole_result_set_parallel(
2✔
287
    results_folder: _pl.Path,
288
        processing_scenario: Union[
289
            _abc.Callable[[ds.Simulation], None],
290
            Sequence[_abc.Callable[[ds.Simulation], None]],
291
        ],
292
    max_workers: int | None = None,
293
) -> ds.SimulationsData:
294
    """Process all simulation folders in a results directory in parallel.
295

296
    Uses a ProcessPoolExecutor to process multiple simulations concurrently.
297

298
    Parameters
299
    __________
300
        results_folder:
301
            Path to the directory containing simulation folders.
302
            Each subfolder should contain valid simulation data files.
303
        processing_scenario:
304
            Single callable or sequence of callables that implement
305
            the processing logic for each simulation. Each callable should take a
306
            Simulation object as its only parameter.
307
        max_workers:
308
            Maximum number of worker processes to use. If None, defaults to
309
            the number of processors on the machine.
310

311
    Returns
312
    _______
313
        SimulationsData: :class:`pytrnsys_process.api.SimulationsData`
314
            - monthly: Dict mapping simulation names to monthly DataFrame results
315
            - hourly: Dict mapping simulation names to hourly DataFrame results
316
            - scalar: DataFrame containing scalar/deck values from all simulations
317

318
    Raises
319
    _______
320
        ValueError: If results_folder doesn't exist or is not a directory
321
        Exception: Individual simulation failures are logged but not re-raised
322

323
    Example
324
    _______
325
        >>> import pathlib as _pl
326
        >>> from pytrnsys_process import api
327
        ...
328
        >>> def processing_step_1(sim):
329
        ...     # Process simulation data
330
        ...     pass
331
        >>> def processing_step_2(sim):
332
        ...     # Process simulation data
333
        ...     pass
334
        >>> results = api.process_whole_result_set_parallel(
335
        ...     _pl.Path("path/to/results"),
336
        ...     [processing_step_1, processing_step_2]
337
        ... )
338
    """
339
    # The last :returns: ensures that the formatting works in PyCharm
340
    _validate_folder(results_folder)
2✔
341
    log.initialize_logs()
2✔
342
    log.main_logger.info(
2✔
343
        "Starting batch processing of simulations in %s with parallel execution",
344
        results_folder,
345
    )
346

347
    sim_folders = [
2✔
348
        sim_folder
349
        for sim_folder in results_folder.iterdir()
350
        if sim_folder.is_dir()
351
    ]
352
    simulations_data = _process_batch(
2✔
353
        sim_folders,
354
        processing_scenario,
355
        results_folder,
356
        parallel=True,
357
        max_workers=max_workers,
358
    )
359
    utils.save_to_pickle(
2✔
360
        simulations_data,
361
        results_folder / const.FileNames.SIMULATIONS_DATA_PICKLE_FILE.value,
362
    )
363

364
    return simulations_data
2✔
365

366

367
def do_comparison(
2✔
368
        comparison_scenario: Union[
369
            _abc.Callable[[ds.SimulationsData], None],
370
            _abc.Sequence[_abc.Callable[[ds.SimulationsData], None]],
371
        ],
372
        simulations_data: Optional[ds.SimulationsData] = None,
373
        results_folder: Optional[_pl.Path] = None,
374
) -> None:
375
    """Execute comparison scenarios on processed simulation results.
376

377
        Parameters
378
        __________
379
            comparison_scenario:
380
                Single callable or sequence of callables that implement
381
                the comparison logic. Each callable should take a SimulationsData
382
                object as its only parameter.
383
            simulations_data:
384
                Optional SimulationsData object containing the processed
385
                simulation data to be compared.
386
            results_folder:
387
                Optional Path to the directory containing simulation results.
388
                Used if simulations_data is not provided.
389

390
        Example
391
        __________
392
            >>> from pytrnsys_process import api
393
            ...
394
            >>> def comparison_step(simulations_data: ds.SimulationsData):
395
            ...     # Compare simulation results
396
            ...     pass
397
            ...
398
            >>> api.do_comparison(comparison_step, simulations_data=processed_results)
399
    """
400
    if not simulations_data:
2✔
401
        if not results_folder:
2✔
402
            raise ValueError("Either simulations_data or results_folder must be provided to perform comparison")
2✔
403
        path_to_simulations_data = results_folder / "simulations_data.pickle"
2✔
404
        if path_to_simulations_data.exists():
2✔
405
            simulations_data = utils.load_simulations_data_from_pickle(path_to_simulations_data)
×
406
        else:
407
            simulations_data = process_whole_result_set_parallel(results_folder, [])
2✔
408
    _process_comparisons(simulations_data, comparison_scenario)
2✔
409
    _plt.close("all")
2✔
410

411

412

413
def _process_comparisons(
2✔
414
        simulations_data: ds.SimulationsData,
415
        comparison_scenario: Union[
416
            _abc.Callable[[ds.SimulationsData], None],
417
            _abc.Sequence[_abc.Callable[[ds.SimulationsData], None]],
418
        ],
419
):
420
    scenario = (
2✔
421
        [comparison_scenario]
422
        if callable(comparison_scenario)
423
        else comparison_scenario
424
    )
425
    for step in scenario:
2✔
426
        try:
2✔
427
            step(simulations_data)
2✔
428
        except Exception as e:  # pylint: disable=broad-except
2✔
429
            scenario_name = getattr(step, "__name__", str(step))
2✔
430
            log.main_logger.error(
2✔
431
                "Scenario %s failed for comparison: %s ",
432
                scenario_name,
433
                str(e),
434
                exc_info=True,
435
            )
436

437

438
def _concat_scalar(simulation_data: ds.SimulationsData) -> ds.SimulationsData:
2✔
439
    scalar_values_to_concat = {
2✔
440
        sim_name: sim.scalar
441
        for sim_name, sim in simulation_data.simulations.items()
442
        if not sim.scalar.empty
443
    }
444
    if scalar_values_to_concat:
2✔
445
        simulation_data.scalar = _pd.concat(
2✔
446
            scalar_values_to_concat.values(),
447
            keys=scalar_values_to_concat.keys(),
448
        ).droplevel(1)
449
    return simulation_data
2✔
450

451

452
def _validate_folder(folder: _pl.Path) -> None:
2✔
453
    if not folder.exists():
2✔
454
        raise ValueError(f"Folder does not exist: {folder}")
×
455
    if not folder.is_dir():
2✔
456
        raise ValueError(f"Path is not a directory: {folder}")
×
457

458

459
def _process_simulation(
2✔
460
        sim_folder: _pl.Path,
461
        processing_scenarios: Union[
462
            _abc.Callable[[ds.Simulation], None],
463
            Sequence[_abc.Callable[[ds.Simulation], None]],
464
        ],
465
) -> tuple[ds.Simulation, List[str]]:
466
    sim_logger = log.get_simulation_logger(sim_folder)
2✔
467
    sim_logger.info("Starting simulation processing")
2✔
468
    sim_pickle_file = sim_folder / const.FileNames.SIMULATION_PICKLE_FILE.value
2✔
469
    simulation: ds.Simulation
470
    if sim_pickle_file.exists() and not sett.settings.reader.force_reread_prt:
2✔
471
        sim_logger.info("Loading simulation from pickle file")
2✔
472
        simulation = utils.load_simulation_from_pickle(
2✔
473
            sim_pickle_file, sim_logger
474
        )
475
    else:
476
        sim_logger.info("Processing simulation from raw files")
2✔
477
        sim_files = utils.get_files([sim_folder])
2✔
478
        simulation = ps.process_sim(sim_files, sim_folder)
2✔
479
        utils.save_to_pickle(simulation, sim_pickle_file, sim_logger)
2✔
480

481
    failed_scenarios = []
2✔
482

483
    # Convert single scenario to list for uniform handling
484
    scenarios = (
2✔
485
        [processing_scenarios]
486
        if callable(processing_scenarios)
487
        else processing_scenarios
488
    )
489

490
    for scenario in scenarios:
2✔
491
        try:
2✔
492
            scenario_name = getattr(scenario, "__name__", str(scenario))
2✔
493
            sim_logger.info("Running scenario: %s", scenario_name)
2✔
494
            scenario(simulation)
2✔
495
            sim_logger.info(
2✔
496
                "Successfully completed scenario: %s", scenario_name
497
            )
498
        except Exception as e:  # pylint: disable=broad-except
2✔
499
            failed_scenarios.append(scenario_name)
2✔
500
            sim_logger.error(
2✔
501
                "Scenario %s failed: %s",
502
                scenario_name,
503
                str(e),
504
                exc_info=True,
505
            )
506

507
    if failed_scenarios:
2✔
508
        sim_logger.warning(
2✔
509
            "Simulation completed with %d failed scenarios",
510
            len(failed_scenarios),
511
        )
512
    else:
513
        sim_logger.info("Simulation completed successfully")
2✔
514

515
    _plt.close("all")
2✔
516
    return simulation, failed_scenarios
2✔
517

518

519
def _log_processing_results(results: ds.ProcessingResults) -> None:
2✔
520
    log.main_logger.info("=" * 80)
2✔
521
    log.main_logger.info("BATCH PROCESSING SUMMARY")
2✔
522
    log.main_logger.info("-" * 80)
2✔
523
    log.main_logger.info(
2✔
524
        "Total simulations processed: %d | Failed: %d",
525
        results.processed_count,
526
        results.error_count,
527
    )
528

529
    if results.error_count > 0:
2✔
530
        log.main_logger.warning(
2✔
531
            "Some simulations failed to process. Check the log for details."
532
        )
533
        log.main_logger.warning("Failed simulations:")
2✔
534
        for sim in results.failed_simulations:
2✔
535
            log.main_logger.warning("  • %s", sim)
2✔
536

537
    if results.failed_scenarios:
2✔
538
        log.main_logger.warning("Failed scenarios by simulation:")
2✔
539
        for sim, scenarios in results.failed_scenarios.items():
2✔
540
            if scenarios:
2✔
541
                log.main_logger.warning("  • %s:", sim)
2✔
542
                for scenario in scenarios:
2✔
543
                    log.main_logger.warning("    - %s", scenario)
2✔
544
    log.main_logger.info("=" * 80)
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc