• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SPF-OST / pytrnsys_process / 13394719474

18 Feb 2025 03:55PM UTC coverage: 96.919% (-1.0%) from 97.93%
13394719474

push

github

sebastian-swob
increased positional arguments to 7

1164 of 1201 relevant lines covered (96.92%)

1.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.26
/pytrnsys_process/process_batch.py
1
import pathlib as _pl
2✔
2
import time as _time
2✔
3
from collections import abc as _abc
2✔
4
from concurrent.futures import ProcessPoolExecutor, as_completed
2✔
5
from typing import List, Sequence, Union
2✔
6

7
import matplotlib.pyplot as _plt
2✔
8
import pandas as _pd
2✔
9

10
from pytrnsys_process import constants as const
2✔
11
from pytrnsys_process import data_structures as ds
2✔
12
from pytrnsys_process import logger as log
2✔
13
from pytrnsys_process import settings as sett
2✔
14
from pytrnsys_process import utils
2✔
15
from pytrnsys_process.process_sim import process_sim as ps
2✔
16

17

18
class UnableToProcessSimulationError(Exception):
2✔
19
    """Raised when a simulation cannot be processed."""
20

21

22
# pylint: disable=too-many-locals
23
def _process_batch(
2✔
24
        sim_folders: list[_pl.Path],
25
        processing_scenario: Union[
26
            _abc.Callable[[ds.Simulation], None],
27
            Sequence[_abc.Callable[[ds.Simulation], None]],
28
        ],
29
        results_folder: _pl.Path,
30
        parallel: bool = False,
31
        max_workers: int | None = None,
32
) -> ds.SimulationsData:
33
    """Common processing logic for both sequential and parallel batch processing.
34

35
    This internal function implements the core processing logic used by both sequential
36
    and parallel processing modes. It handles the setup of processing infrastructure,
37
    execution of processing tasks, and collection of results.
38

39
    Args:
40
        sim_folders: List of simulation folders to process
41
        processing_scenario: Processing scenario(s) to apply to each simulation
42
        results_folder: Root folder containing all simulations
43
        parallel: Whether to process simulations in parallel
44
        max_workers: Maximum number of worker processes for parallel execution
45

46
    Returns:
47
        SimulationsData containing the processed simulation results and metadata
48

49
    Note:
50
        This is an internal function that should not be called directly.
51
        Use process_single_simulation, process_whole_result_set, or
52
        process_whole_result_set_parallel instead.
53
    """
54
    start_time = _time.time()
2✔
55
    results = ds.ProcessingResults()
2✔
56
    simulations_data = ds.SimulationsData(path_to_simulations=results_folder)
2✔
57

58
    if parallel:
2✔
59
        with ProcessPoolExecutor(max_workers=max_workers) as executor:
2✔
60
            tasks = {}
2✔
61
            for sim_folder in sim_folders:
2✔
62
                log.main_logger.info(
2✔
63
                    "Submitting simulation folder for processing: %s",
64
                    sim_folder.name,
65
                )
66
                tasks[
2✔
67
                    executor.submit(
68
                        _process_simulation, sim_folder, processing_scenario
69
                    )
70
                ] = sim_folder
71

72
            for future in as_completed(tasks):
2✔
73
                try:
2✔
74
                    _handle_simulation_result(
2✔
75
                        future.result(), results, simulations_data
76
                    )
77
                except Exception as e:  # pylint: disable=broad-except
2✔
78
                    _handle_simulation_error(e, tasks[future], results)
2✔
79
    else:
80
        for sim_folder in sim_folders:
2✔
81
            try:
2✔
82
                log.main_logger.info(
2✔
83
                    "Processing simulation: %s", sim_folder.name
84
                )
85
                result = _process_simulation(sim_folder, processing_scenario)
2✔
86
                _handle_simulation_result(result, results, simulations_data)
2✔
87
            except Exception as e:  # pylint: disable=broad-except
2✔
88
                _handle_simulation_error(e, sim_folder, results)
2✔
89

90
    simulations_data = _concat_scalar(simulations_data)
2✔
91
    _log_processing_results(results)
2✔
92

93
    end_time = _time.time()
2✔
94
    execution_time = end_time - start_time
2✔
95
    log.main_logger.info(
2✔
96
        "%s execution time: %.2f seconds",
97
        "Parallel" if parallel else "Total",
98
        execution_time,
99
    )
100

101
    return simulations_data
2✔
102

103

104
def _handle_simulation_result(
2✔
105
        result: tuple[ds.Simulation, List[str]],
106
        results: ds.ProcessingResults,
107
        simulations_data: ds.SimulationsData,
108
) -> None:
109
    """Handle the result of a processed simulation.
110

111
    Args:
112
        result: Tuple of (simulation, failed_scenarios)
113
        sim_folder: Path to the simulation folder
114
        results: ProcessingResults to update
115
        simulations_data: SimulationsData to update
116
    """
117
    simulation, failed_scenarios = result
2✔
118
    results.processed_count += 1
2✔
119
    simulations_data.simulations[simulation.path.name] = simulation
2✔
120
    if failed_scenarios:
2✔
121
        results.failed_scenarios[simulation.path.name] = failed_scenarios
2✔
122

123

124
def _handle_simulation_error(
2✔
125
        error: Exception,
126
        sim_folder: _pl.Path,
127
        results: ds.ProcessingResults,
128
) -> None:
129
    """Handle an error that occurred during simulation processing.
130

131
    Args:
132
        error: The exception that occurred
133
        sim_folder: Path to the simulation folder
134
        results: ProcessingResults to update
135
    """
136
    results.error_count += 1
2✔
137
    results.failed_simulations.append(sim_folder.name)
2✔
138
    log.main_logger.error(
2✔
139
        "Failed to process simulation in %s: %s",
140
        sim_folder,
141
        str(error),
142
        exc_info=True,
143
    )
144

145

146
def process_single_simulation(
2✔
147
        sim_folder: _pl.Path,
148
        processing_scenarios: Union[
149
            _abc.Callable[[ds.Simulation], None],
150
            Sequence[_abc.Callable[[ds.Simulation], None]],
151
        ],
152
) -> ds.Simulation:
153
    """Process a single simulation folder using the provided processing scenario(s).
154

155
        Args:
156
            sim_folder: Path to the simulation folder to process
157
            processing_scenarios: Single callable or sequence of callables that implement
158
                the processing logic for a simulation. Each callable should take a Simulation
159
                object as its only parameter.
160

161
        Returns:
162
            Simulation object containing the processed data
163

164
        Example:
165
    import data_structures        >>> import pathlib as _pl
166
            >>> from pytrnsys_process import api
167
            ...
168
            >>> def processing_step_1(sim: data_structures.Simulation):
169
            ...     # Process simulation data
170
            ...     pass
171
            >>> results = api.process_single_simulation(
172
            ...     _pl.Path("path/to/simulation"),
173
            ...     processing_step_1
174
            ... )
175
            >>> api.compare_results(results, comparison_step_1)
176
    """
177
    log.initialize_logs()
2✔
178
    log.main_logger.info("Starting processing of simulation %s", sim_folder)
2✔
179
    sim_folders = [sim_folder]
2✔
180
    simulations_data = _process_batch(
2✔
181
        sim_folders, processing_scenarios, sim_folder.parent
182
    )
183
    try:
2✔
184
        return simulations_data.simulations[sim_folder.name]
2✔
185
    except KeyError as exc:
2✔
186
        raise UnableToProcessSimulationError(
2✔
187
            f"Failed to process simulation in {sim_folder}"
188
        ) from exc
189

190

191
def process_whole_result_set(
2✔
192
        results_folder: _pl.Path,
193
        processing_scenario: Union[
194
            _abc.Callable[[ds.Simulation], None],
195
            Sequence[_abc.Callable[[ds.Simulation], None]],
196
        ],
197
) -> ds.SimulationsData:
198
    """Process all simulation folders in a results directory sequentially.
199

200
    Processes each simulation folder found in the results directory one at a time,
201
    applying the provided processing scenario(s) to each simulation.
202

203
    Args:
204
        results_folder: Path to the directory containing simulation folders.
205
            Each subfolder should contain valid simulation data files.
206
        processing_scenario: Single callable or sequence of callables that implement
207
            the processing logic for each simulation. Each callable should take a
208
            Simulation object as its only parameter and modify it in place.
209

210
    Returns:
211
        ResultsForComparison object containing:
212
            - monthly: Dict mapping simulation names to monthly DataFrame results
213
            - hourly: Dict mapping simulation names to hourly DataFrame results
214
            - scalar: DataFrame containing scalar/deck values from all simulations
215
        ProcessingResults containing counts of processed and failed simulations
216

217
    Raises:
218
        ValueError: If results_folder doesn't exist or is not a directory
219
        Exception: Individual simulation failures are logged but not re-raised
220

221
    Example:
222
        >>> import pathlib as _pl
223
        >>> from pytrnsys_process import api
224
        ...
225
        >>> def processing_step_1(sim):
226
        ...     # Process simulation data
227
        ...     pass
228
        >>> def processing_step_2(sim):
229
        ...     # Process simulation data
230
        ...     pass
231
        >>> results = api.process_whole_result_set(
232
        ...     _pl.Path("path/to/results"),
233
        ...     [processing_step_1, processing_step_2]
234
        ... )
235
        >>> api.compare_results(results, comparison_step_1)
236
    """
237
    _validate_folder(results_folder)
2✔
238
    log.initialize_logs()
2✔
239
    log.main_logger.info(
2✔
240
        "Starting batch processing of simulations in %s", results_folder
241
    )
242

243
    sim_folders = [
2✔
244
        sim_folder
245
        for sim_folder in results_folder.iterdir()
246
        if sim_folder.is_dir()
247
    ]
248
    simulations_data = _process_batch(
2✔
249
        sim_folders, processing_scenario, results_folder
250
    )
251
    utils.save_to_pickle(
2✔
252
        simulations_data,
253
        results_folder / const.FileNames.SIMULATIONS_DATA_PICKLE_FILE.value,
254
    )
255

256
    return simulations_data
2✔
257

258

259
def process_whole_result_set_parallel(
2✔
260
    results_folder: _pl.Path,
261
        processing_scenario: Union[
262
            _abc.Callable[[ds.Simulation], None],
263
            Sequence[_abc.Callable[[ds.Simulation], None]],
264
        ],
265
    max_workers: int | None = None,
266
) -> ds.SimulationsData:
267
    """Process all simulation folders in a results directory in parallel.
268

269
    Uses a ProcessPoolExecutor to process multiple simulations concurrently.
270

271
    Args:
272
        results_folder: Path to the directory containing simulation folders.
273
            Each subfolder should contain valid simulation data files.
274
        processing_scenario: Single callable or sequence of callables that implement
275
            the processing logic for each simulation. Each callable should take a
276
            Simulation object as its only parameter.
277
        max_workers: Maximum number of worker processes to use. If None, defaults to
278
            the number of processors on the machine.
279

280
    Returns:
281
        ResultsForComparison object containing:
282
            - monthly: Dict mapping simulation names to monthly DataFrame results
283
            - hourly: Dict mapping simulation names to hourly DataFrame results
284
            - scalar: DataFrame containing scalar/deck values from all simulations
285

286
    Raises:
287
        ValueError: If results_folder doesn't exist or is not a directory
288
        Exception: Individual simulation failures are logged but not re-raised
289

290
    Example:
291
        >>> import pathlib as _pl
292
        >>> from pytrnsys_process import api
293
        ...
294
        >>> def processing_step_1(sim):
295
        ...     # Process simulation data
296
        ...     pass
297
        >>> def processing_step_2(sim):
298
        ...     # Process simulation data
299
        ...     pass
300
        >>> results = api.process_whole_result_set_parallel(
301
        ...     _pl.Path("path/to/results"),
302
        ...     [processing_step_1, processing_step_2]
303
        ... )
304
        >>> api.compare_results(results, comparison_step_1)
305
    """
306
    _validate_folder(results_folder)
2✔
307
    log.initialize_logs()
2✔
308
    log.main_logger.info(
2✔
309
        "Starting batch processing of simulations in %s with parallel execution",
310
        results_folder,
311
    )
312

313
    sim_folders = [
2✔
314
        sim_folder
315
        for sim_folder in results_folder.iterdir()
316
        if sim_folder.is_dir()
317
    ]
318
    simulations_data = _process_batch(
2✔
319
        sim_folders,
320
        processing_scenario,
321
        results_folder,
322
        parallel=True,
323
        max_workers=max_workers,
324
    )
325
    utils.save_to_pickle(
2✔
326
        simulations_data,
327
        results_folder / const.FileNames.SIMULATIONS_DATA_PICKLE_FILE.value,
328
    )
329

330
    return simulations_data
2✔
331

332

333
def do_comparison(
2✔
334
        simulations_data: ds.SimulationsData,
335
        comparison_scenario: Union[
336
            _abc.Callable[[ds.SimulationsData], None],
337
            _abc.Sequence[_abc.Callable[[ds.SimulationsData], None]],
338
        ],
339
):
340
    """Execute comparison scenarios on processed simulation results.
341

342
        Args:
343
            results_for_comparison: ResultsForComparison object containing the processed
344
                simulation data to be compared
345
            comparison_scenario: Single callable or sequence of callables that implement
346
                the comparison logic. Each callable should take a ResultsForComparison
347
                object as its only parameter.
348

349
        Example:
350
    import data_structures        >>> from pytrnsys_process import api
351
            ...
352
            >>> def comparison_step(comparison_results: data_structures.ResultsForComparison):
353
            ...     # Compare simulation results
354
            ...     pass
355
            ...
356
            >>> api.do_comparison(processed_results, comparison_step)
357
    """
358
    try:
2✔
359
        _process_comparisons(simulations_data, comparison_scenario)
2✔
360
        _plt.close("all")
2✔
361
    except Exception:  # pylint: disable=broad-except
×
362
        log.main_logger.error(
×
363
            "Failed to do comparison",
364
            exc_info=True,
365
        )
366

367

368
def _process_comparisons(
2✔
369
        simulations_data: ds.SimulationsData,
370
        comparison_scenario: Union[
371
            _abc.Callable[[ds.SimulationsData], None],
372
            _abc.Sequence[_abc.Callable[[ds.SimulationsData], None]],
373
        ],
374
):
375
    scenario = (
2✔
376
        [comparison_scenario]
377
        if callable(comparison_scenario)
378
        else comparison_scenario
379
    )
380
    for step in scenario:
2✔
381
        try:
2✔
382
            step(simulations_data)
2✔
383
        except Exception as e:  # pylint: disable=broad-except
2✔
384
            scenario_name = getattr(step, "__name__", str(step))
2✔
385
            log.main_logger.error(
2✔
386
                "Scenario %s failed for comparison: %s ",
387
                scenario_name,
388
                str(e),
389
                exc_info=True,
390
            )
391

392

393
def _concat_scalar(simulation_data: ds.SimulationsData) -> ds.SimulationsData:
2✔
394
    scalar_values_to_concat = {
2✔
395
        sim_name: sim.scalar
396
        for sim_name, sim in simulation_data.simulations.items()
397
        if not sim.scalar.empty
398
    }
399
    if scalar_values_to_concat:
2✔
400
        simulation_data.scalar = _pd.concat(
2✔
401
            scalar_values_to_concat.values(),
402
            keys=scalar_values_to_concat.keys(),
403
        ).droplevel(1)
404
    return simulation_data
2✔
405

406

407
def _validate_folder(folder: _pl.Path) -> None:
2✔
408
    if not folder.exists():
2✔
409
        raise ValueError(f"Folder does not exist: {folder}")
×
410
    if not folder.is_dir():
2✔
411
        raise ValueError(f"Path is not a directory: {folder}")
×
412

413

414
def _process_simulation(
2✔
415
        sim_folder: _pl.Path,
416
        processing_scenarios: Union[
417
            _abc.Callable[[ds.Simulation], None],
418
            Sequence[_abc.Callable[[ds.Simulation], None]],
419
        ],
420
) -> tuple[ds.Simulation, List[str]]:
421
    sim_logger = log.get_simulation_logger(sim_folder)
2✔
422
    sim_logger.info("Starting simulation processing")
2✔
423
    sim_pickle_file = sim_folder / const.FileNames.SIMULATION_PICKLE_FILE.value
2✔
424
    simulation: ds.Simulation
425
    if sim_pickle_file.exists() and not sett.settings.reader.force_reread_prt:
2✔
426
        sim_logger.info("Loading simulation from pickle file")
2✔
427
        simulation = utils.load_simulation_from_pickle(
2✔
428
            sim_pickle_file, sim_logger
429
        )
430
    else:
431
        sim_logger.info("Processing simulation from raw files")
2✔
432
        sim_files = utils.get_files([sim_folder])
2✔
433
        simulation = ps.process_sim(sim_files, sim_folder)
2✔
434
        utils.save_to_pickle(simulation, sim_pickle_file, sim_logger)
2✔
435

436
    failed_scenarios = []
2✔
437

438
    # Convert single scenario to list for uniform handling
439
    scenarios = (
2✔
440
        [processing_scenarios]
441
        if callable(processing_scenarios)
442
        else processing_scenarios
443
    )
444

445
    for scenario in scenarios:
2✔
446
        try:
2✔
447
            scenario_name = getattr(scenario, "__name__", str(scenario))
2✔
448
            sim_logger.info("Running scenario: %s", scenario_name)
2✔
449
            scenario(simulation)
2✔
450
            sim_logger.info(
2✔
451
                "Successfully completed scenario: %s", scenario_name
452
            )
453
        except Exception as e:  # pylint: disable=broad-except
2✔
454
            failed_scenarios.append(scenario_name)
2✔
455
            sim_logger.error(
2✔
456
                "Scenario %s failed: %s",
457
                scenario_name,
458
                str(e),
459
                exc_info=True,
460
            )
461

462
    if failed_scenarios:
2✔
463
        sim_logger.warning(
2✔
464
            "Simulation completed with %d failed scenarios",
465
            len(failed_scenarios),
466
        )
467
    else:
468
        sim_logger.info("Simulation completed successfully")
2✔
469

470
    _plt.close("all")
2✔
471
    return simulation, failed_scenarios
2✔
472

473

474
def _log_processing_results(results: ds.ProcessingResults) -> None:
2✔
475
    log.main_logger.info("=" * 80)
2✔
476
    log.main_logger.info("BATCH PROCESSING SUMMARY")
2✔
477
    log.main_logger.info("-" * 80)
2✔
478
    log.main_logger.info(
2✔
479
        "Total simulations processed: %d | Failed: %d",
480
        results.processed_count,
481
        results.error_count,
482
    )
483

484
    if results.error_count > 0:
2✔
485
        log.main_logger.warning(
2✔
486
            "Some simulations failed to process. Check the log for details."
487
        )
488
        log.main_logger.warning("Failed simulations:")
2✔
489
        for sim in results.failed_simulations:
2✔
490
            log.main_logger.warning("  • %s", sim)
2✔
491

492
    if results.failed_scenarios:
2✔
493
        log.main_logger.warning("Failed scenarios by simulation:")
2✔
494
        for sim, scenarios in results.failed_scenarios.items():
2✔
495
            if scenarios:
2✔
496
                log.main_logger.warning("  • %s:", sim)
2✔
497
                for scenario in scenarios:
2✔
498
                    log.main_logger.warning("    - %s", scenario)
2✔
499
    log.main_logger.info("=" * 80)
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc