• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SPF-OST / pytrnsys_process / 13763327914

10 Mar 2025 11:16AM UTC coverage: 95.379% (-2.1%) from 97.522%
13763327914

push

github

web-flow
Merge pull request #77 from SPF-OST/76-bug-two-types-of-step-files-one-does-not-get-read-in-leading-to-the-simulation-failing-entirely

76 bug two types of step files one does not get read in leading to the simulation failing entirely

28 of 35 new or added lines in 6 files covered. (80.0%)

19 existing lines in 2 files now uncovered.

1094 of 1147 relevant lines covered (95.38%)

1.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.61
/pytrnsys_process/process/process_batch.py
1
import logging as _logging
2✔
2
import pathlib as _pl
2✔
3
import time as _time
2✔
4
from collections import abc as _abc
2✔
5
from concurrent.futures import ProcessPoolExecutor, as_completed
2✔
6
from typing import List, Optional, Sequence, Union
2✔
7

8
import matplotlib.pyplot as _plt
2✔
9
import pandas as _pd
2✔
10

11
from pytrnsys_process import config as conf
2✔
12
from pytrnsys_process import log, util
2✔
13
from pytrnsys_process.process import data_structures as ds
2✔
14
from pytrnsys_process.process import process_sim as ps
2✔
15

16

17
class UnableToProcessSimulationError(Exception):
2✔
18
    """Raised when a simulation cannot be processed."""
19

20

21
# pylint: disable=too-many-locals
22
def _process_batch(
2✔
23
    sim_folders: list[_pl.Path],
24
    processing_scenario: Union[
25
        _abc.Callable[[ds.Simulation], None],
26
        Sequence[_abc.Callable[[ds.Simulation], None]],
27
    ],
28
    results_folder: _pl.Path,
29
    parallel: bool = False,
30
    max_workers: int | None = None,
31
) -> ds.SimulationsData:
32
    """Common processing logic for both sequential and parallel batch processing.
33

34
    This internal function implements the core processing logic used by both sequential
35
    and parallel processing modes. It handles the setup of processing infrastructure,
36
    execution of processing tasks, and collection of results.
37

38
    Parameters
39
    __________
40
        sim_folders:
41
            List of simulation folders to process
42

43
        processing_scenario:
44
            Processing scenario(s) to apply to each simulation
45

46
        results_folder:
47
            Root folder containing all simulations
48

49
        parallel:
50
            Whether to process simulations in parallel
51

52
        max_workers:
53
            Maximum number of worker processes for parallel execution
54

55

56
    Returns
57
    _______
58
        SimulationsData containing the processed simulation results and metadata
59

60
    Note:
61
    _____
62
        This is an internal function that should not be called directly.
63
        Use process_single_simulation, process_whole_result_set, or
64
        process_whole_result_set_parallel instead.
65

66

67
    """
68
    start_time = _time.time()
2✔
69
    results = ds.ProcessingResults()
2✔
70
    simulations_data = ds.SimulationsData(
2✔
71
        path_to_simulations=results_folder.as_posix()
72
    )
73

74
    main_logger = log.get_main_logger(results_folder)
2✔
75

76
    if parallel:
2✔
77
        with ProcessPoolExecutor(max_workers=max_workers) as executor:
2✔
78
            tasks = {}
2✔
79
            for sim_folder in sim_folders:
2✔
80
                main_logger.info(
2✔
81
                    "Submitting simulation folder for processing: %s",
82
                    sim_folder.name,
83
                )
84
                tasks[
2✔
85
                    executor.submit(
86
                        _process_simulation, sim_folder, processing_scenario
87
                    )
88
                ] = sim_folder
89

90
            for future in as_completed(tasks):
2✔
91
                try:
2✔
92
                    _handle_simulation_result(
2✔
93
                        future.result(), results, simulations_data
94
                    )
UNCOV
95
                except Exception as e:  # pylint: disable=broad-except
×
UNCOV
96
                    _handle_simulation_error(
×
97
                        e, tasks[future], results, main_logger
98
                    )
99
    else:
100
        for sim_folder in sim_folders:
2✔
101
            try:
2✔
102
                main_logger.info("Processing simulation: %s", sim_folder.name)
2✔
103
                result = _process_simulation(sim_folder, processing_scenario)
2✔
104
                _handle_simulation_result(result, results, simulations_data)
2✔
UNCOV
105
            except Exception as e:  # pylint: disable=broad-except
×
UNCOV
106
                _handle_simulation_error(e, sim_folder, results, main_logger)
×
107

108
    simulations_data = _concat_scalar(simulations_data)
2✔
109
    _log_processing_results(results, main_logger)
2✔
110

111
    end_time = _time.time()
2✔
112
    execution_time = end_time - start_time
2✔
113
    main_logger.info(
2✔
114
        "%s execution time: %.2f seconds",
115
        "Parallel" if parallel else "Total",
116
        execution_time,
117
    )
118

119
    return simulations_data
2✔
120

121

122
def _handle_simulation_result(
2✔
123
    result: tuple[ds.Simulation, List[str]],
124
    results: ds.ProcessingResults,
125
    simulations_data: ds.SimulationsData,
126
) -> None:
127
    """Handle the result of a processed simulation.
128

129
    Parameters
130
    __________
131
        result: Tuple of (simulation, failed_scenarios)
132
        sim_folder: Path to the simulation folder
133
        results: ProcessingResults to update
134
        simulations_data: SimulationsData to update
135
    """
136
    simulation, failed_scenarios = result
2✔
137
    results.processed_count += 1
2✔
138
    simulations_data.simulations[_pl.Path(simulation.path).name] = simulation
2✔
139
    if failed_scenarios:
2✔
140
        results.failed_scenarios[_pl.Path(simulation.path).name] = (
2✔
141
            failed_scenarios
142
        )
143

144

145
def _handle_simulation_error(
2✔
146
    error: Exception,
147
    sim_folder: _pl.Path,
148
    results: ds.ProcessingResults,
149
    main_logger: _logging.Logger,
150
) -> None:
151
    """Handle an error that occurred during simulation processing.
152

153
    Parameters
154
    __________
155
        error: The exception that occurred
156
        sim_folder: Path to the simulation folder
157
        results: ProcessingResults to update
158
    """
UNCOV
159
    results.error_count += 1
×
UNCOV
160
    results.failed_simulations.append(sim_folder.name)
×
UNCOV
161
    main_logger.error(
×
162
        "Failed to process simulation in %s: %s",
163
        sim_folder,
164
        str(error),
165
        exc_info=True,
166
    )
167

168

169
def process_single_simulation(
2✔
170
    sim_folder: _pl.Path,
171
    processing_scenarios: Union[
172
        _abc.Callable[[ds.Simulation], None],
173
        Sequence[_abc.Callable[[ds.Simulation], None]],
174
    ],
175
) -> ds.Simulation:
176
    """Process a single simulation folder using the provided processing scenario(s).
177

178
    Parameters
179
    __________
180
        sim_folder:
181
            Path to the simulation folder to process
182

183
        processing_scenarios:
184
            Single callable or sequence of callables that implement
185
            the processing logic for a simulation. Each callable should take a Simulation
186
            object as its only parameter.
187

188
    Returns
189
    _______
190
            Simulation object containing the processed data
191

192
    Example
193
    _______
194
            >>> import pathlib as _pl
195
            >>> from pytrnsys_process import api
196
            ...
197
            >>> def processing_step_1(sim: api.Simulation):
198
            ...     # Process simulation data
199
            ...     pass
200
            >>> results = api.process_single_simulation(
201
            ...     _pl.Path("path/to/simulation"),
202
            ...     processing_step_1
203
            ... )
204
    """
205
    main_logger = log.get_main_logger(sim_folder)
2✔
206
    log.initialize_logs(sim_folder)
2✔
207
    main_logger.info("Starting processing of simulation %s", sim_folder)
2✔
208
    sim_folders = [sim_folder]
2✔
209
    simulations_data = _process_batch(
2✔
210
        sim_folders, processing_scenarios, sim_folder.parent
211
    )
212
    try:
2✔
213
        return simulations_data.simulations[sim_folder.name]
2✔
UNCOV
214
    except KeyError as exc:
×
UNCOV
215
        raise UnableToProcessSimulationError(
×
216
            f"Failed to process simulation in {sim_folder}"
217
        ) from exc
218

219

220
def process_whole_result_set(
2✔
221
    results_folder: _pl.Path,
222
    processing_scenario: Union[
223
        _abc.Callable[[ds.Simulation], None],
224
        Sequence[_abc.Callable[[ds.Simulation], None]],
225
    ],
226
) -> ds.SimulationsData:
227
    """Process all simulation folders in a results directory sequentially.
228

229
    Processes each simulation folder found in the results directory one at a time,
230
    applying the provided processing scenario(s) to each simulation.
231

232
    Parameters
233
    __________
234
        results_folder:
235
            Path to the directory containing simulation folders.
236
            Each subfolder should contain valid simulation data files.
237

238
        processing_scenario:
239
            Single callable or sequence of callables that implement
240
            the processing logic for each simulation. Each callable should take a
241
            Simulation object as its only parameter and modify it in place.
242

243
    Returns
244
    _______
245
        SimulationsData: :class:`pytrnsys_process.api.SimulationsData`
246
            - monthly: Dict mapping simulation names to monthly DataFrame results
247
            - hourly: Dict mapping simulation names to hourly DataFrame results
248
            - scalar: DataFrame containing scalar/deck values from all simulations
249

250
    Raises
251
    ______
252
        ValueError: If results_folder doesn't exist or is not a directory
253
        Exception: Individual simulation failures are logged but not re-raised
254

255
    Example
256
    _______
257
        >>> import pathlib as _pl
258
        >>> from pytrnsys_process import api
259
        ...
260
        >>> def processing_step_1(sim):
261
        ...     # Process simulation data
262
        ...     pass
263
        >>> def processing_step_2(sim):
264
        ...     # Process simulation data
265
        ...     pass
266
        >>> results = api.process_whole_result_set(
267
        ...     _pl.Path("path/to/results"),
268
        ...     [processing_step_1, processing_step_2]
269
        ... )
270
    """
271
    _validate_folder(results_folder)
2✔
272
    main_logger = log.get_main_logger(results_folder)
2✔
273
    log.initialize_logs(results_folder)
2✔
274
    main_logger.info(
2✔
275
        "Starting batch processing of simulations in %s", results_folder
276
    )
277

278
    sim_folders = [
2✔
279
        sim_folder
280
        for sim_folder in results_folder.iterdir()
281
        if sim_folder.is_dir()
282
    ]
283
    simulations_data = _process_batch(
2✔
284
        sim_folders, processing_scenario, results_folder
285
    )
286
    util.save_to_pickle(
2✔
287
        simulations_data,
288
        results_folder / conf.FileNames.SIMULATIONS_DATA_PICKLE_FILE.value,
289
    )
290

291
    return simulations_data
2✔
292

293

294
def process_whole_result_set_parallel(
2✔
295
    results_folder: _pl.Path,
296
    processing_scenario: Union[
297
        _abc.Callable[[ds.Simulation], None],
298
        Sequence[_abc.Callable[[ds.Simulation], None]],
299
    ],
300
    max_workers: int | None = None,
301
) -> ds.SimulationsData:
302
    """Process all simulation folders in a results directory in parallel.
303

304
    Uses a ProcessPoolExecutor to process multiple simulations concurrently.
305

306
    Parameters
307
    __________
308
        results_folder:
309
            Path to the directory containing simulation folders.
310
            Each subfolder should contain valid simulation data files.
311
        processing_scenario:
312
            Single callable or sequence of callables that implement
313
            the processing logic for each simulation. Each callable should take a
314
            Simulation object as its only parameter.
315
        max_workers:
316
            Maximum number of worker processes to use. If None, defaults to
317
            the number of processors on the machine.
318

319
    Returns
320
    _______
321
        SimulationsData: :class:`pytrnsys_process.api.SimulationsData`
322
            - monthly: Dict mapping simulation names to monthly DataFrame results
323
            - hourly: Dict mapping simulation names to hourly DataFrame results
324
            - scalar: DataFrame containing scalar/deck values from all simulations
325

326
    Raises
327
    _______
328
        ValueError: If results_folder doesn't exist or is not a directory
329
        Exception: Individual simulation failures are logged but not re-raised
330

331
    Example
332
    _______
333
        >>> import pathlib as _pl
334
        >>> from pytrnsys_process import api
335
        ...
336
        >>> def processing_step_1(sim):
337
        ...     # Process simulation data
338
        ...     pass
339
        >>> def processing_step_2(sim):
340
        ...     # Process simulation data
341
        ...     pass
342
        >>> results = api.process_whole_result_set_parallel(
343
        ...     _pl.Path("path/to/results"),
344
        ...     [processing_step_1, processing_step_2]
345
        ... )
346
    """
347
    # The last :returns: ensures that the formatting works in PyCharm
348
    _validate_folder(results_folder)
2✔
349
    log.initialize_logs(results_folder)
2✔
350
    main_logger = log.get_main_logger(results_folder)
2✔
351
    main_logger.info(
2✔
352
        "Starting batch processing of simulations in %s with parallel execution",
353
        results_folder,
354
    )
355

356
    sim_folders = [
2✔
357
        sim_folder
358
        for sim_folder in results_folder.iterdir()
359
        if sim_folder.is_dir()
360
    ]
361
    simulations_data = _process_batch(
2✔
362
        sim_folders,
363
        processing_scenario,
364
        results_folder,
365
        parallel=True,
366
        max_workers=max_workers,
367
    )
368
    util.save_to_pickle(
2✔
369
        simulations_data,
370
        results_folder / conf.FileNames.SIMULATIONS_DATA_PICKLE_FILE.value,
371
    )
372

373
    return simulations_data
2✔
374

375

376
def do_comparison(
2✔
377
    comparison_scenario: Union[
378
        _abc.Callable[[ds.SimulationsData], None],
379
        _abc.Sequence[_abc.Callable[[ds.SimulationsData], None]],
380
    ],
381
    simulations_data: Optional[ds.SimulationsData] = None,
382
    results_folder: Optional[_pl.Path] = None,
383
) -> None:
384
    """Execute comparison scenarios on processed simulation results.
385

386
    Parameters
387
    __________
388
        comparison_scenario:
389
            Single callable or sequence of callables that implement
390
            the comparison logic. Each callable should take a SimulationsData
391
            object as its only parameter.
392
        simulations_data:
393
            Optional SimulationsData object containing the processed
394
            simulation data to be compared.
395
        results_folder:
396
            Optional Path to the directory containing simulation results.
397
            Used if simulations_data is not provided.
398

399
    Example
400
    __________
401
        >>> from pytrnsys_process import api
402
        ...
403
        >>> def comparison_step(simulations_data: ds.SimulationsData):
404
        ...     # Compare simulation results
405
        ...     pass
406
        ...
407
        >>> api.do_comparison(comparison_step, simulations_data=processed_results)
408
    """
409
    if not simulations_data:
2✔
410
        if not results_folder:
2✔
411
            raise ValueError(
2✔
412
                "Either simulations_data or results_folder must be provided to perform comparison"
413
            )
414
        path_to_simulations_data = results_folder / "simulations_data.pickle"
2✔
415
        if path_to_simulations_data.exists():
2✔
416
            simulations_data = util.load_simulations_data_from_pickle(
×
417
                path_to_simulations_data
418
            )
419
        else:
420
            simulations_data = process_whole_result_set_parallel(
2✔
421
                results_folder, []
422
            )
423
    main_logger = log.get_main_logger(
2✔
424
        _pl.Path(simulations_data.path_to_simulations)
425
    )
426
    _process_comparisons(simulations_data, comparison_scenario, main_logger)
2✔
427
    _plt.close("all")
2✔
428

429

430
def _process_comparisons(
2✔
431
    simulations_data: ds.SimulationsData,
432
    comparison_scenario: Union[
433
        _abc.Callable[[ds.SimulationsData], None],
434
        _abc.Sequence[_abc.Callable[[ds.SimulationsData], None]],
435
    ],
436
    main_logger: _logging.Logger,
437
):
438
    scenario = (
2✔
439
        [comparison_scenario]
440
        if callable(comparison_scenario)
441
        else comparison_scenario
442
    )
443
    for step in scenario:
2✔
444
        try:
2✔
445
            step(simulations_data)
2✔
446
        except Exception as e:  # pylint: disable=broad-except
2✔
447
            scenario_name = getattr(step, "__name__", str(step))
2✔
448
            main_logger.error(
2✔
449
                "Scenario %s failed for comparison: %s ",
450
                scenario_name,
451
                str(e),
452
                exc_info=True,
453
            )
454

455

456
def _concat_scalar(simulation_data: ds.SimulationsData) -> ds.SimulationsData:
2✔
457
    scalar_values_to_concat = {
2✔
458
        sim_name: sim.scalar
459
        for sim_name, sim in simulation_data.simulations.items()
460
        if not sim.scalar.empty
461
    }
462
    if scalar_values_to_concat:
2✔
463
        simulation_data.scalar = _pd.concat(
2✔
464
            scalar_values_to_concat.values(),
465
            keys=scalar_values_to_concat.keys(),
466
        ).droplevel(1)
467
    return simulation_data
2✔
468

469

470
def _validate_folder(folder: _pl.Path) -> None:
2✔
471
    if not folder.exists():
2✔
472
        raise ValueError(f"Folder does not exist: {folder}")
×
473
    if not folder.is_dir():
2✔
474
        raise ValueError(f"Path is not a directory: {folder}")
×
475

476

477
def _process_simulation(
2✔
478
    sim_folder: _pl.Path,
479
    processing_scenarios: Union[
480
        _abc.Callable[[ds.Simulation], None],
481
        Sequence[_abc.Callable[[ds.Simulation], None]],
482
    ],
483
) -> tuple[ds.Simulation, List[str]]:
484
    sim_logger = log.get_simulation_logger(sim_folder)
2✔
485
    sim_logger.info("Starting simulation processing")
2✔
486
    sim_pickle_file = sim_folder / conf.FileNames.SIMULATION_PICKLE_FILE.value
2✔
487
    simulation: ds.Simulation
488
    if (
2✔
489
        sim_pickle_file.exists()
490
        and not conf.global_settings.reader.force_reread_prt
491
    ):
492
        sim_logger.info("Loading simulation from pickle file")
2✔
493
        simulation = util.load_simulation_from_pickle(
2✔
494
            sim_pickle_file, sim_logger
495
        )
496
    else:
497
        sim_logger.info("Processing simulation from raw files")
2✔
498
        sim_files = util.get_files([sim_folder])
2✔
499
        simulation = ps.process_sim(sim_files, sim_folder)
2✔
500
        util.save_to_pickle(simulation, sim_pickle_file, sim_logger)
2✔
501

502
    failed_scenarios = []
2✔
503

504
    # Convert single scenario to list for uniform handling
505
    scenarios = (
2✔
506
        [processing_scenarios]
507
        if callable(processing_scenarios)
508
        else processing_scenarios
509
    )
510

511
    for scenario in scenarios:
2✔
512
        try:
2✔
513
            scenario_name = getattr(scenario, "__name__", str(scenario))
2✔
514
            sim_logger.info("Running scenario: %s", scenario_name)
2✔
515
            scenario(simulation)
2✔
516
            sim_logger.info(
2✔
517
                "Successfully completed scenario: %s", scenario_name
518
            )
519
        except Exception as e:  # pylint: disable=broad-except
2✔
520
            failed_scenarios.append(scenario_name)
2✔
521
            sim_logger.error(
2✔
522
                "Scenario %s failed: %s",
523
                scenario_name,
524
                str(e),
525
                exc_info=True,
526
            )
527

528
    if failed_scenarios:
2✔
529
        sim_logger.warning(
2✔
530
            "Simulation completed with %d failed scenarios",
531
            len(failed_scenarios),
532
        )
533
    else:
534
        sim_logger.info("Simulation completed successfully")
2✔
535

536
    _plt.close("all")
2✔
537
    return simulation, failed_scenarios
2✔
538

539

540
def _log_processing_results(
2✔
541
    results: ds.ProcessingResults, main_logger: _logging.Logger
542
) -> None:
543
    main_logger.info("=" * 80)
2✔
544
    main_logger.info("BATCH PROCESSING SUMMARY")
2✔
545
    main_logger.info("-" * 80)
2✔
546
    main_logger.info(
2✔
547
        "Total simulations processed: %d | Failed: %d",
548
        results.processed_count,
549
        results.error_count,
550
    )
551

552
    if results.error_count > 0:
2✔
UNCOV
553
        main_logger.warning(
×
554
            "Some simulations failed to process. Check the log for details."
555
        )
UNCOV
556
        main_logger.warning("Failed simulations:")
×
UNCOV
557
        for sim in results.failed_simulations:
×
UNCOV
558
            main_logger.warning("  • %s", sim)
×
559

560
    if results.failed_scenarios:
2✔
561
        main_logger.warning("Failed scenarios by simulation:")
2✔
562
        for sim, scenarios in results.failed_scenarios.items():
2✔
563
            if scenarios:
2✔
564
                main_logger.warning("  • %s:", sim)
2✔
565
                for scenario in scenarios:
2✔
566
                    main_logger.warning("    - %s", scenario)
2✔
567
    main_logger.info("=" * 80)
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc