• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SPF-OST / pytrnsys_process / 13763570692

10 Mar 2025 11:29AM UTC coverage: 95.379%. Remained the same
13763570692

push

github

web-flow
Merge pull request #79 from SPF-OST/71-update-and-add-docstrings

disabled test and checks for main branch

1094 of 1147 relevant lines covered (95.38%)

1.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.61
/pytrnsys_process/process/process_batch.py
1
import logging as _logging
2✔
2
import pathlib as _pl
2✔
3
import time as _time
2✔
4
from collections import abc as _abc
2✔
5
from concurrent.futures import ProcessPoolExecutor, as_completed
2✔
6
from typing import List, Optional, Sequence, Union
2✔
7

8
import matplotlib.pyplot as _plt
2✔
9
import pandas as _pd
2✔
10

11
from pytrnsys_process import config as conf
2✔
12
from pytrnsys_process import log, util
2✔
13
from pytrnsys_process.process import data_structures as ds
2✔
14
from pytrnsys_process.process import process_sim as ps
2✔
15

16

17
class UnableToProcessSimulationError(Exception):
2✔
18
    """Raised when a simulation cannot be processed."""
19

20

21
# pylint: disable=too-many-locals
22
def _process_batch(
2✔
23
    sim_folders: list[_pl.Path],
24
    processing_scenario: Union[
25
        _abc.Callable[[ds.Simulation], None],
26
        Sequence[_abc.Callable[[ds.Simulation], None]],
27
    ],
28
    results_folder: _pl.Path,
29
    parallel: bool = False,
30
    max_workers: int | None = None,
31
) -> ds.SimulationsData:
32
    """Common processing logic for both sequential and parallel batch processing.
33

34
    This internal function implements the core processing logic used by both sequential
35
    and parallel processing modes. It handles the setup of processing infrastructure,
36
    execution of processing tasks, and collection of results.
37

38
    Parameters
39
    __________
40
        sim_folders:
41
            List of simulation folders to process
42

43
        processing_scenario:
44
            Processing scenario(s) to apply to each simulation
45

46
        results_folder:
47
            Root folder containing all simulations
48

49
        parallel:
50
            Whether to process simulations in parallel
51

52
        max_workers:
53
            Maximum number of worker processes for parallel execution
54

55

56
    Returns
57
    _______
58
        SimulationsData containing the processed simulation results and metadata
59

60
    Note:
61
    _____
62
        This is an internal function that should not be called directly.
63
        Use process_single_simulation, process_whole_result_set, or
64
        process_whole_result_set_parallel instead.
65

66

67
    """
68
    start_time = _time.time()
2✔
69
    results = ds.ProcessingResults()
2✔
70
    simulations_data = ds.SimulationsData(
2✔
71
        path_to_simulations=results_folder.as_posix()
72
    )
73

74
    main_logger = log.get_main_logger(results_folder)
2✔
75

76
    if parallel:
2✔
77
        with ProcessPoolExecutor(max_workers=max_workers) as executor:
2✔
78
            tasks = {}
2✔
79
            for sim_folder in sim_folders:
2✔
80
                main_logger.info(
2✔
81
                    "Submitting simulation folder for processing: %s",
82
                    sim_folder.name,
83
                )
84
                tasks[
2✔
85
                    executor.submit(
86
                        _process_simulation, sim_folder, processing_scenario
87
                    )
88
                ] = sim_folder
89

90
            for future in as_completed(tasks):
2✔
91
                try:
2✔
92
                    _handle_simulation_result(
2✔
93
                        future.result(), results, simulations_data
94
                    )
95
                except Exception as e:  # pylint: disable=broad-except
×
96
                    _handle_simulation_error(
×
97
                        e, tasks[future], results, main_logger
98
                    )
99
    else:
100
        for sim_folder in sim_folders:
2✔
101
            try:
2✔
102
                main_logger.info("Processing simulation: %s", sim_folder.name)
2✔
103
                result = _process_simulation(sim_folder, processing_scenario)
2✔
104
                _handle_simulation_result(result, results, simulations_data)
2✔
105
            except Exception as e:  # pylint: disable=broad-except
×
106
                _handle_simulation_error(e, sim_folder, results, main_logger)
×
107

108
    simulations_data = _concat_scalar(simulations_data)
2✔
109
    _log_processing_results(results, main_logger)
2✔
110

111
    end_time = _time.time()
2✔
112
    execution_time = end_time - start_time
2✔
113
    main_logger.info(
2✔
114
        "%s execution time: %.2f seconds",
115
        "Parallel" if parallel else "Total",
116
        execution_time,
117
    )
118

119
    return simulations_data
2✔
120

121

122
def _handle_simulation_result(
2✔
123
    result: tuple[ds.Simulation, List[str]],
124
    results: ds.ProcessingResults,
125
    simulations_data: ds.SimulationsData,
126
) -> None:
127
    """Handle the result of a processed simulation.
128

129
    Parameters
130
    __________
131
        result: Tuple of (simulation, failed_scenarios)
132
        sim_folder: Path to the simulation folder
133
        results: ProcessingResults to update
134
        simulations_data: SimulationsData to update
135
    """
136
    simulation, failed_scenarios = result
2✔
137
    results.processed_count += 1
2✔
138
    simulations_data.simulations[_pl.Path(simulation.path).name] = simulation
2✔
139
    if failed_scenarios:
2✔
140
        results.failed_scenarios[_pl.Path(simulation.path).name] = (
2✔
141
            failed_scenarios
142
        )
143

144

145
def _handle_simulation_error(
2✔
146
    error: Exception,
147
    sim_folder: _pl.Path,
148
    results: ds.ProcessingResults,
149
    main_logger: _logging.Logger,
150
) -> None:
151
    """Handle an error that occurred during simulation processing.
152

153
    Parameters
154
    __________
155
        error: The exception that occurred
156
        sim_folder: Path to the simulation folder
157
        results: ProcessingResults to update
158
    """
159
    results.error_count += 1
×
160
    results.failed_simulations.append(sim_folder.name)
×
161
    main_logger.error(
×
162
        "Failed to process simulation in %s: %s",
163
        sim_folder,
164
        str(error),
165
        exc_info=True,
166
    )
167

168

169
def process_single_simulation(
2✔
170
    sim_folder: _pl.Path,
171
    processing_scenarios: Union[
172
        _abc.Callable[[ds.Simulation], None],
173
        Sequence[_abc.Callable[[ds.Simulation], None]],
174
    ],
175
) -> ds.Simulation:
176
    """Process a single simulation folder using the provided processing scenario(s).
177

178
    Parameters
179
    __________
180
        sim_folder pathlib.Path:
181
            Path to the simulation folder to process
182

183
        processing_scenarios:
184
            Single callable or sequence of callables that implement
185
            the processing logic for a simulation. Each callable should take a Simulation
186
            object as its only parameter.
187

188
    Returns
189
    _______
190
            Simulation object containing the processed data
191

192
    Example
193
    _______
194
            >>> import pathlib as _pl
195
            >>> from pytrnsys_process import api
196
            ...
197
            >>> def processing_step_1(sim: api.Simulation):
198
            ...     # Process simulation data
199
            ...     pass
200
            >>> results = api.process_single_simulation(
201
            ...     _pl.Path("path/to/simulation"),
202
            ...     processing_step_1
203
            ... )
204
    """
205
    main_logger = log.get_main_logger(sim_folder)
2✔
206
    log.initialize_logs(sim_folder)
2✔
207
    main_logger.info("Starting processing of simulation %s", sim_folder)
2✔
208
    sim_folders = [sim_folder]
2✔
209
    simulations_data = _process_batch(
2✔
210
        sim_folders, processing_scenarios, sim_folder.parent
211
    )
212
    try:
2✔
213
        return simulations_data.simulations[sim_folder.name]
2✔
214
    except KeyError as exc:
×
215
        raise UnableToProcessSimulationError(
×
216
            f"Failed to process simulation in {sim_folder}"
217
        ) from exc
218

219

220
def process_whole_result_set(
2✔
221
    results_folder: _pl.Path,
222
    processing_scenario: Union[
223
        _abc.Callable[[ds.Simulation], None],
224
        Sequence[_abc.Callable[[ds.Simulation], None]],
225
    ],
226
) -> ds.SimulationsData:
227
    """Process all simulation folders in a results directory sequentially.
228

229
    Processes each simulation folder found in the results directory one at a time,
230
    applying the provided processing scenario(s) to each simulation.
231

232
    Parameters
233
    __________
234
        results_folder:
235
            Path to the directory containing simulation folders.
236
            Each subfolder should contain valid simulation data files.
237

238
        processing_scenario:
239
            Single callable or sequence of callables that implement
240
            the processing logic for each simulation. Each callable should take a
241
            Simulation object as its only parameter and modify it in place.
242

243
    Returns
244
    _______
245
        SimulationsData: :class:`pytrnsys_process.api.SimulationsData`
246
            - monthly: Dict mapping simulation names to monthly DataFrame results
247
            - hourly: Dict mapping simulation names to hourly DataFrame results
248
            - scalar: DataFrame containing scalar/deck values from all simulations
249

250
    Raises
251
    ______
252
        ValueError: If results_folder doesn't exist or is not a directory
253
        Exception: Individual simulation failures are logged but not re-raised
254

255
    Example
256
    _______
257
        >>> import pathlib as _pl
258
        >>> from pytrnsys_process import api
259
        ...
260
        >>> def processing_step_1(sim):
261
        ...     # Process simulation data
262
        ...     pass
263
        >>> def processing_step_2(sim):
264
        ...     # Process simulation data
265
        ...     pass
266
        >>> results = api.process_whole_result_set(
267
        ...     _pl.Path("path/to/results"),
268
        ...     [processing_step_1, processing_step_2]
269
        ... )
270
    """
271
    _validate_folder(results_folder)
2✔
272
    main_logger = log.get_main_logger(results_folder)
2✔
273
    log.initialize_logs(results_folder)
2✔
274
    main_logger.info(
2✔
275
        "Starting batch processing of simulations in %s", results_folder
276
    )
277

278
    sim_folders = [
2✔
279
        sim_folder
280
        for sim_folder in results_folder.iterdir()
281
        if sim_folder.is_dir()
282
    ]
283
    simulations_data = _process_batch(
2✔
284
        sim_folders, processing_scenario, results_folder
285
    )
286
    util.save_to_pickle(
2✔
287
        simulations_data,
288
        results_folder / conf.FileNames.SIMULATIONS_DATA_PICKLE_FILE.value,
289
    )
290

291
    return simulations_data
2✔
292

293

294
def process_whole_result_set_parallel(
2✔
295
    results_folder: _pl.Path,
296
    processing_scenario: Union[
297
        _abc.Callable[[ds.Simulation], None],
298
        Sequence[_abc.Callable[[ds.Simulation], None]],
299
    ],
300
    max_workers: int | None = None,
301
) -> ds.SimulationsData:
302
    """Process all simulation folders in a results directory in parallel.
303

304
    Uses a ProcessPoolExecutor to process multiple simulations concurrently.
305

306
    Parameters
307
    __________
308
        results_folder:
309
            Path to the directory containing simulation folders.
310
            Each subfolder should contain valid simulation data files.
311
        processing_scenario:
312
            Single callable or sequence of callables that implement
313
            the processing logic for each simulation. Each callable should take a
314
            Simulation object as its only parameter.
315
        max_workers:
316
            Maximum number of worker processes to use. If None, defaults to
317
            the number of processors on the machine.
318

319
    Returns
320
    _______
321
        SimulationsData: :class:`pytrnsys_process.api.SimulationsData`
322
            - monthly: Dict mapping simulation names to monthly DataFrame results
323
            - hourly: Dict mapping simulation names to hourly DataFrame results
324
            - scalar: DataFrame containing scalar/deck values from all simulations
325

326
    Raises
327
    _______
328
        ValueError: If results_folder doesn't exist or is not a directory
329
        Exception: Individual simulation failures are logged but not re-raised
330

331
    Example
332
    _______
333
        >>> import pathlib as _pl
334
        >>> from pytrnsys_process import api
335
        ...
336
        >>> def processing_step_1(sim):
337
        ...     # Process simulation data
338
        ...     pass
339
        >>> def processing_step_2(sim):
340
        ...     # Process simulation data
341
        ...     pass
342
        >>> results = api.process_whole_result_set_parallel(
343
        ...     _pl.Path("path/to/results"),
344
        ...     [processing_step_1, processing_step_2]
345
        ... )
346
    """
347
    # The last :returns: ensures that the formatting works in PyCharm
348
    _validate_folder(results_folder)
2✔
349
    log.initialize_logs(results_folder)
2✔
350
    main_logger = log.get_main_logger(results_folder)
2✔
351
    main_logger.info(
2✔
352
        "Starting batch processing of simulations in %s with parallel execution",
353
        results_folder,
354
    )
355

356
    sim_folders = [
2✔
357
        sim_folder
358
        for sim_folder in results_folder.iterdir()
359
        if sim_folder.is_dir()
360
    ]
361
    simulations_data = _process_batch(
2✔
362
        sim_folders,
363
        processing_scenario,
364
        results_folder,
365
        parallel=True,
366
        max_workers=max_workers,
367
    )
368
    util.save_to_pickle(
2✔
369
        simulations_data,
370
        results_folder / conf.FileNames.SIMULATIONS_DATA_PICKLE_FILE.value,
371
    )
372

373
    return simulations_data
2✔
374

375

376
def do_comparison(
2✔
377
    comparison_scenario: Union[
378
        _abc.Callable[[ds.SimulationsData], None],
379
        _abc.Sequence[_abc.Callable[[ds.SimulationsData], None]],
380
    ],
381
    simulations_data: Optional[ds.SimulationsData] = None,
382
    results_folder: Optional[_pl.Path] = None,
383
) -> None:
384
    """Execute comparison scenarios on processed simulation results.
385

386
    Parameters
387
    __________
388
        comparison_scenario:
389
            Single callable or sequence of callables that implement
390
            the comparison logic. Each callable should take a SimulationsData
391
            object as its only parameter.
392
        simulations_data:
393
            Optional SimulationsData object containing the processed
394
            simulation data to be compared.
395
        results_folder:
396
            Optional Path to the directory containing simulation results.
397
            Used if simulations_data is not provided.
398

399
    Example
400
    __________
401
        >>> from pytrnsys_process import api
402
        ...
403
        >>> def comparison_step(simulations_data: ds.SimulationsData):
404
        ...     # Compare simulation results
405
        ...     pass
406
        ...
407
        >>> api.do_comparison(comparison_step, simulations_data=processed_results)
408
    """
409
    if not simulations_data:
2✔
410
        if not results_folder:
2✔
411
            raise ValueError(
2✔
412
                "Either simulations_data or results_folder must be provided to perform comparison"
413
            )
414
        path_to_simulations_data = (
2✔
415
            results_folder / conf.FileNames.SIMULATIONS_DATA_PICKLE_FILE.value
416
        )
417
        if path_to_simulations_data.exists():
2✔
418
            simulations_data = util.load_simulations_data_from_pickle(
×
419
                path_to_simulations_data
420
            )
421
        else:
422
            simulations_data = process_whole_result_set_parallel(
2✔
423
                results_folder, []
424
            )
425
    main_logger = log.get_main_logger(
2✔
426
        _pl.Path(simulations_data.path_to_simulations)
427
    )
428
    _process_comparisons(simulations_data, comparison_scenario, main_logger)
2✔
429
    _plt.close("all")
2✔
430

431

432
def _process_comparisons(
2✔
433
    simulations_data: ds.SimulationsData,
434
    comparison_scenario: Union[
435
        _abc.Callable[[ds.SimulationsData], None],
436
        _abc.Sequence[_abc.Callable[[ds.SimulationsData], None]],
437
    ],
438
    main_logger: _logging.Logger,
439
):
440
    scenario = (
2✔
441
        [comparison_scenario]
442
        if callable(comparison_scenario)
443
        else comparison_scenario
444
    )
445
    for step in scenario:
2✔
446
        try:
2✔
447
            step(simulations_data)
2✔
448
        except Exception as e:  # pylint: disable=broad-except
2✔
449
            scenario_name = getattr(step, "__name__", str(step))
2✔
450
            main_logger.error(
2✔
451
                "Scenario %s failed for comparison: %s ",
452
                scenario_name,
453
                str(e),
454
                exc_info=True,
455
            )
456

457

458
def _concat_scalar(simulation_data: ds.SimulationsData) -> ds.SimulationsData:
2✔
459
    scalar_values_to_concat = {
2✔
460
        sim_name: sim.scalar
461
        for sim_name, sim in simulation_data.simulations.items()
462
        if not sim.scalar.empty
463
    }
464
    if scalar_values_to_concat:
2✔
465
        simulation_data.scalar = _pd.concat(
2✔
466
            scalar_values_to_concat.values(),
467
            keys=scalar_values_to_concat.keys(),
468
        ).droplevel(1)
469
    return simulation_data
2✔
470

471

472
def _validate_folder(folder: _pl.Path) -> None:
2✔
473
    if not folder.exists():
2✔
474
        raise ValueError(f"Folder does not exist: {folder}")
×
475
    if not folder.is_dir():
2✔
476
        raise ValueError(f"Path is not a directory: {folder}")
×
477

478

479
def _process_simulation(
2✔
480
    sim_folder: _pl.Path,
481
    processing_scenarios: Union[
482
        _abc.Callable[[ds.Simulation], None],
483
        Sequence[_abc.Callable[[ds.Simulation], None]],
484
    ],
485
) -> tuple[ds.Simulation, List[str]]:
486
    sim_logger = log.get_simulation_logger(sim_folder)
2✔
487
    sim_logger.info("Starting simulation processing")
2✔
488
    sim_pickle_file = sim_folder / conf.FileNames.SIMULATION_PICKLE_FILE.value
2✔
489
    simulation: ds.Simulation
490
    if (
2✔
491
        sim_pickle_file.exists()
492
        and not conf.global_settings.reader.force_reread_prt
493
    ):
494
        sim_logger.info("Loading simulation from pickle file")
2✔
495
        simulation = util.load_simulation_from_pickle(
2✔
496
            sim_pickle_file, sim_logger
497
        )
498
    else:
499
        sim_logger.info("Processing simulation from raw files")
2✔
500
        sim_files = util.get_files([sim_folder])
2✔
501
        simulation = ps.process_sim(sim_files, sim_folder)
2✔
502
        util.save_to_pickle(simulation, sim_pickle_file, sim_logger)
2✔
503

504
    failed_scenarios = []
2✔
505

506
    # Convert single scenario to list for uniform handling
507
    scenarios = (
2✔
508
        [processing_scenarios]
509
        if callable(processing_scenarios)
510
        else processing_scenarios
511
    )
512

513
    for scenario in scenarios:
2✔
514
        try:
2✔
515
            scenario_name = getattr(scenario, "__name__", str(scenario))
2✔
516
            sim_logger.info("Running scenario: %s", scenario_name)
2✔
517
            scenario(simulation)
2✔
518
            sim_logger.info(
2✔
519
                "Successfully completed scenario: %s", scenario_name
520
            )
521
        except Exception as e:  # pylint: disable=broad-except
2✔
522
            failed_scenarios.append(scenario_name)
2✔
523
            sim_logger.error(
2✔
524
                "Scenario %s failed: %s",
525
                scenario_name,
526
                str(e),
527
                exc_info=True,
528
            )
529

530
    if failed_scenarios:
2✔
531
        sim_logger.warning(
2✔
532
            "Simulation completed with %d failed scenarios",
533
            len(failed_scenarios),
534
        )
535
    else:
536
        sim_logger.info("Simulation completed successfully")
2✔
537

538
    _plt.close("all")
2✔
539
    return simulation, failed_scenarios
2✔
540

541

542
def _log_processing_results(
2✔
543
    results: ds.ProcessingResults, main_logger: _logging.Logger
544
) -> None:
545
    main_logger.info("=" * 80)
2✔
546
    main_logger.info("BATCH PROCESSING SUMMARY")
2✔
547
    main_logger.info("-" * 80)
2✔
548
    main_logger.info(
2✔
549
        "Total simulations processed: %d | Failed: %d",
550
        results.processed_count,
551
        results.error_count,
552
    )
553

554
    if results.error_count > 0:
2✔
555
        main_logger.warning(
×
556
            "Some simulations failed to process. Check the log for details."
557
        )
558
        main_logger.warning("Failed simulations:")
×
559
        for sim in results.failed_simulations:
×
560
            main_logger.warning("  • %s", sim)
×
561

562
    if results.failed_scenarios:
2✔
563
        main_logger.warning("Failed scenarios by simulation:")
2✔
564
        for sim, scenarios in results.failed_scenarios.items():
2✔
565
            if scenarios:
2✔
566
                main_logger.warning("  • %s:", sim)
2✔
567
                for scenario in scenarios:
2✔
568
                    main_logger.warning("    - %s", scenario)
2✔
569
    main_logger.info("=" * 80)
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc