• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SPF-OST / pytrnsys_process / 13785751940

11 Mar 2025 10:25AM UTC coverage: 94.517% (-0.9%) from 95.383%
13785751940

push

github

sebastian-swob
fixed bug where force reread would not work when running with multiprocessing

2 of 2 new or added lines in 1 file covered. (100.0%)

3 existing lines in 1 file now uncovered.

1086 of 1149 relevant lines covered (94.52%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.39
/pytrnsys_process/process/process_batch.py
1
import logging as _logging
1✔
2
import pathlib as _pl
1✔
3
import time as _time
1✔
4
from collections import abc as _abc
1✔
5
from concurrent.futures import ProcessPoolExecutor, as_completed
1✔
6
from typing import List, Optional, Sequence, Union
1✔
7

8
import matplotlib.pyplot as _plt
1✔
9
import pandas as _pd
1✔
10

11
from pytrnsys_process import config as conf
1✔
12
from pytrnsys_process import log, util
1✔
13
from pytrnsys_process.process import data_structures as ds
1✔
14
from pytrnsys_process.process import process_sim as ps
1✔
15

16

17
class UnableToProcessSimulationError(Exception):
1✔
18
    """Raised when a simulation cannot be processed."""
19

20

21
# pylint: disable=too-many-locals
22
def _process_batch(
1✔
23
    sim_folders: list[_pl.Path],
24
    processing_scenario: Union[
25
        _abc.Callable[[ds.Simulation], None],
26
        Sequence[_abc.Callable[[ds.Simulation], None]],
27
    ],
28
    results_folder: _pl.Path,
29
    parallel: bool = False,
30
    max_workers: int | None = None,
31
) -> ds.SimulationsData:
32
    """Common processing logic for both sequential and parallel batch processing.
33

34
    This internal function implements the core processing logic used by both sequential
35
    and parallel processing modes. It handles the setup of processing infrastructure,
36
    execution of processing tasks, and collection of results.
37

38
    Parameters
39
    __________
40
        sim_folders:
41
            List of simulation folders to process
42

43
        processing_scenario:
44
            Processing scenario(s) to apply to each simulation
45

46
        results_folder:
47
            Root folder containing all simulations
48

49
        parallel:
50
            Whether to process simulations in parallel
51

52
        max_workers:
53
            Maximum number of worker processes for parallel execution
54

55

56
    Returns
57
    _______
58
        SimulationsData containing the processed simulation results and metadata
59

60
    Note:
61
    _____
62
        This is an internal function that should not be called directly.
63
        Use process_single_simulation, process_whole_result_set, or
64
        process_whole_result_set_parallel instead.
65

66

67
    """
68
    start_time = _time.time()
1✔
69
    results = ds.ProcessingResults()
1✔
70
    simulations_data = ds.SimulationsData(
1✔
71
        path_to_simulations=results_folder.as_posix()
72
    )
73

74
    main_logger = log.get_main_logger(results_folder)
1✔
75

76
    if parallel:
1✔
77
        with ProcessPoolExecutor(max_workers=max_workers) as executor:
1✔
78
            tasks = {}
1✔
79
            for sim_folder in sim_folders:
1✔
80
                main_logger.info(
1✔
81
                    "Submitting simulation folder for processing: %s",
82
                    sim_folder.name,
83
                )
84
                tasks[
1✔
85
                    executor.submit(
86
                        _process_simulation,
87
                        sim_folder,
88
                        processing_scenario,
89
                        conf.global_settings.reader.force_reread_prt,
90
                    )
91
                ] = sim_folder
92

93
            for future in as_completed(tasks):
1✔
94
                try:
1✔
95
                    _handle_simulation_result(
1✔
96
                        future.result(), results, simulations_data
97
                    )
98
                except Exception as e:  # pylint: disable=broad-except
×
99
                    _handle_simulation_error(
×
100
                        e, tasks[future], results, main_logger
101
                    )
102
    else:
103
        for sim_folder in sim_folders:
1✔
104
            try:
1✔
105
                main_logger.info("Processing simulation: %s", sim_folder.name)
1✔
106
                result = _process_simulation(sim_folder, processing_scenario)
1✔
107
                _handle_simulation_result(result, results, simulations_data)
1✔
108
            except Exception as e:  # pylint: disable=broad-except
×
109
                _handle_simulation_error(e, sim_folder, results, main_logger)
×
110

111
    simulations_data = _concat_scalar(simulations_data)
1✔
112
    _log_processing_results(results, main_logger)
1✔
113

114
    end_time = _time.time()
1✔
115
    execution_time = end_time - start_time
1✔
116
    main_logger.info(
1✔
117
        "%s execution time: %.2f seconds",
118
        "Parallel" if parallel else "Total",
119
        execution_time,
120
    )
121

122
    return simulations_data
1✔
123

124

125
def _handle_simulation_result(
1✔
126
    result: tuple[ds.Simulation, List[str]],
127
    results: ds.ProcessingResults,
128
    simulations_data: ds.SimulationsData,
129
) -> None:
130
    """Handle the result of a processed simulation.
131

132
    Parameters
133
    __________
134
        result: Tuple of (simulation, failed_scenarios)
135
        sim_folder: Path to the simulation folder
136
        results: ProcessingResults to update
137
        simulations_data: SimulationsData to update
138
    """
139
    simulation, failed_scenarios = result
1✔
140
    results.processed_count += 1
1✔
141
    simulations_data.simulations[_pl.Path(simulation.path).name] = simulation
1✔
142
    if failed_scenarios:
1✔
143
        results.failed_scenarios[_pl.Path(simulation.path).name] = (
1✔
144
            failed_scenarios
145
        )
146

147

148
def _handle_simulation_error(
1✔
149
    error: Exception,
150
    sim_folder: _pl.Path,
151
    results: ds.ProcessingResults,
152
    main_logger: _logging.Logger,
153
) -> None:
154
    """Handle an error that occurred during simulation processing.
155

156
    Parameters
157
    __________
158
        error: The exception that occurred
159
        sim_folder: Path to the simulation folder
160
        results: ProcessingResults to update
161
    """
162
    results.error_count += 1
×
163
    results.failed_simulations.append(sim_folder.name)
×
164
    main_logger.error(
×
165
        "Failed to process simulation in %s: %s",
166
        sim_folder,
167
        str(error),
168
        exc_info=True,
169
    )
170

171

172
def process_single_simulation(
1✔
173
    sim_folder: _pl.Path,
174
    processing_scenario: Union[
175
        _abc.Callable[[ds.Simulation], None],
176
        Sequence[_abc.Callable[[ds.Simulation], None]],
177
    ],
178
) -> ds.Simulation:
179
    """Process a single simulation folder using the provided processing step/scenario.
180

181
    Parameters
182
    __________
183
        sim_folder: pathlib.Path
184
            Path to the simulation folder to process
185

186
        processing_scenario: collections.abc.Callable or collections.abc.Sequence of collections.abc.Callable
187
            They should containd the processing logic for a simulation.
188
            Each callable should take a Simulation object as its only parameter and modify it in place.
189

190
    Returns
191
    _______
192
        Simulation: :class:`pytrnsys_process.api.Simulation`
193

194
    Example
195
    _______
196
            >>> import pathlib as _pl
197
            >>> from pytrnsys_process import api
198
            ...
199
            >>> def processing_step_1(sim: api.Simulation):
200
            ...     # Process simulation data
201
            ...     pass
202
            >>> results = api.process_single_simulation(
203
            ...     _pl.Path("path/to/simulation"),
204
            ...     processing_step_1
205
            ... )
206
    """
207
    main_logger = log.get_main_logger(sim_folder)
1✔
208
    log.initialize_logs(sim_folder)
1✔
209
    main_logger.info("Starting processing of simulation %s", sim_folder)
1✔
210
    sim_folders = [sim_folder]
1✔
211
    simulations_data = _process_batch(
1✔
212
        sim_folders, processing_scenario, sim_folder.parent
213
    )
214
    try:
1✔
215
        return simulations_data.simulations[sim_folder.name]
1✔
216
    except KeyError as exc:
×
217
        raise UnableToProcessSimulationError(
×
218
            f"Failed to process simulation in {sim_folder}"
219
        ) from exc
220

221

222
def process_whole_result_set(
1✔
223
    results_folder: _pl.Path,
224
    processing_scenario: Union[
225
        _abc.Callable[[ds.Simulation], None],
226
        Sequence[_abc.Callable[[ds.Simulation], None]],
227
    ],
228
) -> ds.SimulationsData:
229
    """Process all simulation folders in a results directory sequentially.
230

231
    Processes each simulation folder found in the results directory one at a time,
232
    applying the provided processing step/scenario to each simulation.
233

234
    Using the default settings your structure should look like this:
235

236
    | results_folder
237
    |     ├─ sim-1
238
    |     ├─ sim-2
239
    |     ├─ sim-3
240
    |         ├─ temp
241
    |             ├─ your-printer-files.prt
242

243
    Parameters
244
    __________
245
        results_folder pathlib.Path:
246
            Path to the directory containing simulation folders.
247
            Each subfolder should contain a temp folder containing valid simulation data files.
248

249
        processing_scenario: collections.abc.Callable or collections.abc.Sequence of collections.abc.Callable
250
            They should containd the processing logic for a simulation.
251
            Each callable should take a Simulation object as its only parameter and modify it in place.
252

253
    Returns
254
    _______
255
        SimulationsData: :class:`pytrnsys_process.api.SimulationsData`
256
            - monthly: Dict mapping simulation names to monthly DataFrame results
257
            - hourly: Dict mapping simulation names to hourly DataFrame results
258
            - scalar: DataFrame containing scalar/deck values from all simulations
259

260
    Raises
261
    ______
262
        ValueError: If results_folder doesn't exist or is not a directory
263
        Exception: Individual simulation failures are logged but not re-raised
264

265
    Example
266
    _______
267
        >>> import pathlib as _pl
268
        >>> from pytrnsys_process import api
269
        ...
270
        >>> def processing_step_1(sim):
271
        ...     # Process simulation data
272
        ...     pass
273
        >>> def processing_step_2(sim):
274
        ...     # Process simulation data
275
        ...     pass
276
        >>> results = api.process_whole_result_set(
277
        ...     _pl.Path("path/to/results"),
278
        ...     [processing_step_1, processing_step_2]
279
        ... )
280
    """
281
    _validate_folder(results_folder)
1✔
282
    main_logger = log.get_main_logger(results_folder)
1✔
283
    log.initialize_logs(results_folder)
1✔
284
    main_logger.info(
1✔
285
        "Starting batch processing of simulations in %s", results_folder
286
    )
287

288
    sim_folders = [
1✔
289
        sim_folder
290
        for sim_folder in results_folder.iterdir()
291
        if sim_folder.is_dir()
292
    ]
293
    simulations_data = _process_batch(
1✔
294
        sim_folders, processing_scenario, results_folder
295
    )
296
    util.save_to_pickle(
1✔
297
        simulations_data,
298
        results_folder / conf.FileNames.SIMULATIONS_DATA_PICKLE_FILE.value,
299
    )
300

301
    return simulations_data
1✔
302

303

304
def process_whole_result_set_parallel(
1✔
305
    results_folder: _pl.Path,
306
    processing_scenario: Union[
307
        _abc.Callable[[ds.Simulation], None],
308
        Sequence[_abc.Callable[[ds.Simulation], None]],
309
    ],
310
    max_workers: int | None = None,
311
) -> ds.SimulationsData:
312
    """Process all simulation folders in a results directory in parallel.
313

314
    Uses a ProcessPoolExecutor to process multiple simulations concurrently,
315
    applying the provided processing step/scenario to each simulation.
316

317
    Using the default settings your structure should look like this:
318

319
    | results_folder
320
    |     ├─ sim-1
321
    |     ├─ sim-2
322
    |     ├─ sim-3
323
    |         ├─ temp
324
    |             ├─ your-printer-files.prt
325

326
    Parameters
327
    __________
328
        results_folder pathlib.Path:
329
            Path to the directory containing simulation folders.
330
            Each subfolder should contain a temp folder containing valid simulation data files.
331

332
        processing_scenario: collections.abc.Callable or collections.abc.Sequence of collections.abc.Callable
333
            They should containd the processing logic for a simulation.
334
            Each callable should take a Simulation object as its only parameter and modify it in place.
335
        max_workers int, default None:
336
            Maximum number of worker processes to use.
337
            If None, defaults to the number of processors on the machine.
338

339
    Returns
340
    _______
341
        SimulationsData: :class:`pytrnsys_process.api.SimulationsData`
342
            - monthly: Dict mapping simulation names to monthly DataFrame results
343
            - hourly: Dict mapping simulation names to hourly DataFrame results
344
            - scalar: DataFrame containing scalar/deck values from all simulations
345

346
    Raises
347
    _______
348
        ValueError: If results_folder doesn't exist or is not a directory
349
        Exception: Individual simulation failures are logged but not re-raised
350

351
    Example
352
    _______
353
        >>> import pathlib as _pl
354
        >>> from pytrnsys_process import api
355
        ...
356
        >>> def processing_step_1(sim):
357
        ...     # Process simulation data
358
        ...     pass
359
        >>> def processing_step_2(sim):
360
        ...     # Process simulation data
361
        ...     pass
362
        >>> results = api.process_whole_result_set_parallel(
363
        ...     _pl.Path("path/to/results"),
364
        ...     [processing_step_1, processing_step_2]
365
        ... )
366
    """
367
    # The last :returns: ensures that the formatting works in PyCharm
368
    _validate_folder(results_folder)
1✔
369
    log.initialize_logs(results_folder)
1✔
370
    main_logger = log.get_main_logger(results_folder)
1✔
371
    main_logger.info(
1✔
372
        "Starting batch processing of simulations in %s with parallel execution",
373
        results_folder,
374
    )
375

376
    sim_folders = [
1✔
377
        sim_folder
378
        for sim_folder in results_folder.iterdir()
379
        if sim_folder.is_dir()
380
    ]
381
    simulations_data = _process_batch(
1✔
382
        sim_folders,
383
        processing_scenario,
384
        results_folder,
385
        parallel=True,
386
        max_workers=max_workers,
387
    )
388
    util.save_to_pickle(
1✔
389
        simulations_data,
390
        results_folder / conf.FileNames.SIMULATIONS_DATA_PICKLE_FILE.value,
391
    )
392

393
    return simulations_data
1✔
394

395

396
def do_comparison(
1✔
397
    comparison_scenario: Union[
398
        _abc.Callable[[ds.SimulationsData], None],
399
        _abc.Sequence[_abc.Callable[[ds.SimulationsData], None]],
400
    ],
401
    simulations_data: Optional[ds.SimulationsData] = None,
402
    results_folder: Optional[_pl.Path] = None,
403
) -> ds.SimulationsData:
404
    """Execute comparison scenarios on processed simulation results.
405

406
    Parameters
407
    __________
408
        comparison_scenario: collections.abc.Callable or collections.abc.Sequence of collections.abc.Callable
409
            They should containd the comparison logic.
410
            Each callable should take a SimulationsData object as its only parameter and modify it in place.
411

412
        simulations_data: SimulationsData, optional
413
            SimulationsData object containing the processed
414
            simulations data to be compared.
415

416
        results_folder: pathlib.Path, optional
417
            Path to the directory containing simulation results.
418
            Used if simulations_data is not provided.
419

420
    Returns
421
    _______
422
        SimulationsData: :class:`pytrnsys_process.api.SimulationsData`
423

424
    Example
425
    __________
426
        >>> from pytrnsys_process import api
427
        ...
428
        >>> def comparison_step(simulations_data: ds.SimulationsData):
429
        ...     # Compare simulation results
430
        ...     pass
431
        ...
432
        >>> api.do_comparison(comparison_step, simulations_data=processed_results)
433
    """
434
    if not simulations_data:
1✔
435
        if not results_folder:
1✔
436
            raise ValueError(
1✔
437
                "Either simulations_data or results_folder must be provided to perform comparison"
438
            )
439
        path_to_simulations_data = (
1✔
440
            results_folder / conf.FileNames.SIMULATIONS_DATA_PICKLE_FILE.value
441
        )
442
        if (
1✔
443
            path_to_simulations_data.exists()
444
            and not conf.global_settings.reader.force_reread_prt
445
        ):
446
            simulations_data = util.load_simulations_data_from_pickle(
1✔
447
                path_to_simulations_data
448
            )
449
        else:
450
            simulations_data = process_whole_result_set_parallel(
1✔
451
                results_folder, []
452
            )
453
    main_logger = log.get_main_logger(
1✔
454
        _pl.Path(simulations_data.path_to_simulations)
455
    )
456
    _process_comparisons(simulations_data, comparison_scenario, main_logger)
1✔
457
    _plt.close("all")
1✔
458

459
    return simulations_data
1✔
460

461

462
def _process_comparisons(
1✔
463
    simulations_data: ds.SimulationsData,
464
    comparison_scenario: Union[
465
        _abc.Callable[[ds.SimulationsData], None],
466
        _abc.Sequence[_abc.Callable[[ds.SimulationsData], None]],
467
    ],
468
    main_logger: _logging.Logger,
469
):
470
    scenario = (
1✔
471
        [comparison_scenario]
472
        if callable(comparison_scenario)
473
        else comparison_scenario
474
    )
475
    for step in scenario:
1✔
476
        try:
1✔
477
            step(simulations_data)
1✔
UNCOV
478
        except Exception as e:  # pylint: disable=broad-except
×
UNCOV
479
            scenario_name = getattr(step, "__name__", str(step))
×
UNCOV
480
            main_logger.error(
×
481
                "Scenario %s failed for comparison: %s ",
482
                scenario_name,
483
                str(e),
484
                exc_info=True,
485
            )
486

487

488
def _concat_scalar(simulation_data: ds.SimulationsData) -> ds.SimulationsData:
1✔
489
    scalar_values_to_concat = {
1✔
490
        sim_name: sim.scalar
491
        for sim_name, sim in simulation_data.simulations.items()
492
        if not sim.scalar.empty
493
    }
494
    if scalar_values_to_concat:
1✔
495
        simulation_data.scalar = _pd.concat(
1✔
496
            scalar_values_to_concat.values(),
497
            keys=scalar_values_to_concat.keys(),
498
        ).droplevel(1)
499
    return simulation_data
1✔
500

501

502
def _validate_folder(folder: _pl.Path) -> None:
1✔
503
    if not folder.exists():
1✔
504
        raise ValueError(f"Folder does not exist: {folder}")
×
505
    if not folder.is_dir():
1✔
506
        raise ValueError(f"Path is not a directory: {folder}")
×
507

508

509
def _process_simulation(
1✔
510
    sim_folder: _pl.Path,
511
    processing_scenarios: Union[
512
        _abc.Callable[[ds.Simulation], None],
513
        Sequence[_abc.Callable[[ds.Simulation], None]],
514
    ],
515
    force_reread_prt: bool = conf.global_settings.reader.force_reread_prt,
516
) -> tuple[ds.Simulation, List[str]]:
517
    sim_logger = log.get_simulation_logger(sim_folder)
1✔
518
    sim_logger.info("Starting simulation processing")
1✔
519
    sim_pickle_file = sim_folder / conf.FileNames.SIMULATION_PICKLE_FILE.value
1✔
520
    simulation: ds.Simulation
521
    if sim_pickle_file.exists() and not force_reread_prt:
1✔
522
        sim_logger.info("Loading simulation from pickle file")
1✔
523
        simulation = util.load_simulation_from_pickle(
1✔
524
            sim_pickle_file, sim_logger
525
        )
526
    else:
527
        sim_logger.info("Processing simulation from raw files")
1✔
528
        sim_files = util.get_files([sim_folder])
1✔
529
        simulation = ps.process_sim(sim_files, sim_folder)
1✔
530
        util.save_to_pickle(simulation, sim_pickle_file, sim_logger)
1✔
531

532
    failed_scenarios = []
1✔
533

534
    # Convert single scenario to list for uniform handling
535
    scenarios = (
1✔
536
        [processing_scenarios]
537
        if callable(processing_scenarios)
538
        else processing_scenarios
539
    )
540

541
    for scenario in scenarios:
1✔
542
        try:
1✔
543
            scenario_name = getattr(scenario, "__name__", str(scenario))
1✔
544
            sim_logger.info("Running scenario: %s", scenario_name)
1✔
545
            scenario(simulation)
1✔
546
            sim_logger.info(
1✔
547
                "Successfully completed scenario: %s", scenario_name
548
            )
549
        except Exception as e:  # pylint: disable=broad-except
1✔
550
            failed_scenarios.append(scenario_name)
1✔
551
            sim_logger.error(
1✔
552
                "Scenario %s failed: %s",
553
                scenario_name,
554
                str(e),
555
                exc_info=True,
556
            )
557

558
    if failed_scenarios:
1✔
559
        sim_logger.warning(
1✔
560
            "Simulation completed with %d failed scenarios",
561
            len(failed_scenarios),
562
        )
563
    else:
564
        sim_logger.info("Simulation completed successfully")
1✔
565

566
    _plt.close("all")
1✔
567
    return simulation, failed_scenarios
1✔
568

569

570
def _log_processing_results(
1✔
571
    results: ds.ProcessingResults, main_logger: _logging.Logger
572
) -> None:
573
    main_logger.info("=" * 80)
1✔
574
    main_logger.info("BATCH PROCESSING SUMMARY")
1✔
575
    main_logger.info("-" * 80)
1✔
576
    main_logger.info(
1✔
577
        "Total simulations processed: %d | Failed: %d",
578
        results.processed_count,
579
        results.error_count,
580
    )
581

582
    if results.error_count > 0:
1✔
583
        main_logger.warning(
×
584
            "Some simulations failed to process. Check the log for details."
585
        )
586
        main_logger.warning("Failed simulations:")
×
587
        for sim in results.failed_simulations:
×
588
            main_logger.warning("  • %s", sim)
×
589

590
    if results.failed_scenarios:
1✔
591
        main_logger.warning("Failed scenarios by simulation:")
1✔
592
        for sim, scenarios in results.failed_scenarios.items():
1✔
593
            if scenarios:
1✔
594
                main_logger.warning("  • %s:", sim)
1✔
595
                for scenario in scenarios:
1✔
596
                    main_logger.warning("    - %s", scenario)
1✔
597
    main_logger.info("=" * 80)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc