• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SPF-OST / pytrnsys_process / 16748222285

05 Aug 2025 11:03AM UTC coverage: 49.518% (-46.5%) from 95.968%
16748222285

Pull #126

github

ahobeost
Reduce linux job to just test.
Pull Request #126: 125 bug step file not read when step used with type 25

5 of 6 new or added lines in 2 files covered. (83.33%)

578 existing lines in 11 files now uncovered.

616 of 1244 relevant lines covered (49.52%)

0.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

15.53
/pytrnsys_process/process/process_batch.py
1
import logging as _logging
2✔
2
import pathlib as _pl
2✔
3
import time as _time
2✔
4
import typing as _tp
2✔
5
from collections import abc as _abc
2✔
6
from concurrent import futures as _futures
2✔
7

8
import matplotlib.pyplot as _plt
2✔
9
import pandas as _pd
2✔
10

11
from pytrnsys_process import config as conf
2✔
12
from pytrnsys_process import log, util
2✔
13
from pytrnsys_process.process import data_structures as ds
2✔
14
from pytrnsys_process.process import process_sim as ps
2✔
15

16

17
class UnableToProcessSimulationError(Exception):
2✔
18
    """Raised when a simulation cannot be processed."""
19

20

21
# pylint: disable=too-many-locals
22
def _process_batch(
2✔
23
    sim_folders: list[_pl.Path],
24
    processing_scenario: _tp.Union[
25
        _abc.Callable[[ds.Simulation], None],
26
        _tp.Sequence[_abc.Callable[[ds.Simulation], None]],
27
    ],
28
    results_folder: _pl.Path,
29
    parallel: bool = False,
30
    max_workers: int | None = None,
31
) -> ds.SimulationsData:
32
    """Common processing logic for both sequential and parallel batch processing.
33

34
    This internal function implements the core processing logic used by both sequential
35
    and parallel processing modes. It handles the setup of processing infrastructure,
36
    execution of processing tasks, and collection of results.
37

38
    Parameters
39
    __________
40
        sim_folders:
41
            List of simulation folders to process
42

43
        processing_scenario:
44
            Processing scenario(s) to apply to each simulation
45

46
        results_folder:
47
            Root folder containing all simulations
48

49
        parallel:
50
            Whether to process simulations in parallel
51

52
        max_workers:
53
            Maximum number of worker processes for parallel execution
54

55

56
    Returns
57
    _______
58
        SimulationsData containing the processed simulation results and metadata
59

60
    Note:
61
    _____
62
        This is an internal function that should not be called directly.
63
        Use process_single_simulation, process_whole_result_set, or
64
        process_whole_result_set_parallel instead.
65

66

67
    """
UNCOV
68
    start_time = _time.time()
×
UNCOV
69
    results = ds.ProcessingResults()
×
UNCOV
70
    simulations_data = ds.SimulationsData(
×
71
        path_to_simulations=results_folder.as_posix()
72
    )
73

UNCOV
74
    main_logger = log.get_main_logger(results_folder)
×
75

UNCOV
76
    if parallel:
×
UNCOV
77
        with _futures.ProcessPoolExecutor(max_workers=max_workers) as executor:
×
UNCOV
78
            tasks = {}
×
UNCOV
79
            for sim_folder in sim_folders:
×
UNCOV
80
                main_logger.info(
×
81
                    "Submitting simulation folder for processing: %s",
82
                    sim_folder.name,
83
                )
UNCOV
84
                tasks[
×
85
                    executor.submit(
86
                        _process_simulation,
87
                        sim_folder,
88
                        processing_scenario,
89
                        conf.global_settings.reader.force_reread_prt,
90
                    )
91
                ] = sim_folder
92

UNCOV
93
            for future in _futures.as_completed(tasks):
×
UNCOV
94
                try:
×
UNCOV
95
                    _handle_simulation_result(
×
96
                        future.result(), results, simulations_data
97
                    )
98
                except Exception as e:  # pylint: disable=broad-except
×
99
                    _handle_simulation_error(
×
100
                        e, tasks[future], results, main_logger
101
                    )
102
    else:
UNCOV
103
        for sim_folder in sim_folders:
×
UNCOV
104
            try:
×
UNCOV
105
                main_logger.info("Processing simulation: %s", sim_folder.name)
×
UNCOV
106
                result = _process_simulation(sim_folder, processing_scenario)
×
UNCOV
107
                _handle_simulation_result(result, results, simulations_data)
×
108
            except Exception as e:  # pylint: disable=broad-except
×
109
                _handle_simulation_error(e, sim_folder, results, main_logger)
×
110

UNCOV
111
    simulations_data = _concat_scalar(simulations_data)
×
UNCOV
112
    _log_processing_results(results, main_logger)
×
113

UNCOV
114
    end_time = _time.time()
×
UNCOV
115
    execution_time = end_time - start_time
×
UNCOV
116
    main_logger.info(
×
117
        "%s execution time: %.2f seconds",
118
        "Parallel" if parallel else "Total",
119
        execution_time,
120
    )
121

UNCOV
122
    return simulations_data
×
123

124

125
def _handle_simulation_result(
2✔
126
    result: tuple[ds.Simulation, list[str]],
127
    results: ds.ProcessingResults,
128
    simulations_data: ds.SimulationsData,
129
) -> None:
130
    """Handle the result of a processed simulation.
131

132
    Parameters
133
    __________
134
        result: Tuple of (simulation, failed_scenarios)
135
        sim_folder: Path to the simulation folder
136
        results: ProcessingResults to update
137
        simulations_data: SimulationsData to update
138
    """
UNCOV
139
    simulation, failed_scenarios = result
×
UNCOV
140
    results.processed_count += 1
×
UNCOV
141
    simulations_data.simulations[_pl.Path(simulation.path).name] = simulation
×
UNCOV
142
    if failed_scenarios:
×
UNCOV
143
        results.failed_scenarios[_pl.Path(simulation.path).name] = (
×
144
            failed_scenarios
145
        )
146

147

148
def _handle_simulation_error(
2✔
149
    error: Exception,
150
    sim_folder: _pl.Path,
151
    results: ds.ProcessingResults,
152
    main_logger: _logging.Logger,
153
) -> None:
154
    """Handle an error that occurred during simulation processing.
155

156
    Parameters
157
    __________
158
        error: The exception that occurred
159
        sim_folder: Path to the simulation folder
160
        results: ProcessingResults to update
161
    """
162
    results.error_count += 1
×
163
    results.failed_simulations.append(sim_folder.name)
×
164
    main_logger.error(
×
165
        "Failed to process simulation in %s: %s",
166
        sim_folder,
167
        str(error),
168
        exc_info=True,
169
    )
170

171

172
def process_single_simulation(
2✔
173
    sim_folder: _pl.Path,
174
    processing_scenario: _tp.Union[
175
        _abc.Callable[[ds.Simulation], None],
176
        _tp.Sequence[_abc.Callable[[ds.Simulation], None]],
177
    ],
178
) -> ds.Simulation:
179
    """Process a single simulation folder using the provided processing step/scenario.
180

181
    Parameters
182
    __________
183
        sim_folder: pathlib.Path
184
            Path to the simulation folder to process
185

186
        processing_scenario: collections.abc.Callable or collections.abc.Sequence of collections.abc.Callable
187
            They should contain the processing logic for a simulation.
188
            Each callable should take a Simulation object as its only parameter and modify it in place.
189

190
    Returns
191
    _______
192
        Simulation: :class:`pytrnsys_process.api.Simulation`
193

194
    Example
195
    _______
196
            >>> import pathlib as _pl
197
            >>> from pytrnsys_process import api
198
            ...
199
            >>> def processing_step_1(sim: api.Simulation):
200
            ...     # Process simulation data
201
            ...     pass
202
            >>> results = api.process_single_simulation(
203
            ...     _pl.Path("path/to/simulation"),
204
            ...     processing_step_1
205
            ... )
206
    """
UNCOV
207
    main_logger = log.get_main_logger(sim_folder)
×
UNCOV
208
    log.initialize_logs(sim_folder)
×
UNCOV
209
    main_logger.info("Starting processing of simulation %s", sim_folder)
×
UNCOV
210
    sim_folders = [sim_folder]
×
UNCOV
211
    simulations_data = _process_batch(
×
212
        sim_folders, processing_scenario, sim_folder.parent
213
    )
UNCOV
214
    try:
×
UNCOV
215
        return simulations_data.simulations[sim_folder.name]
×
216
    except KeyError as exc:
×
217
        raise UnableToProcessSimulationError(
×
218
            f"Failed to process simulation in {sim_folder}"
219
        ) from exc
220

221

222
def process_whole_result_set(
2✔
223
    results_folder: _pl.Path,
224
    processing_scenario: _tp.Union[
225
        _abc.Callable[[ds.Simulation], None],
226
        _tp.Sequence[_abc.Callable[[ds.Simulation], None]],
227
    ],
228
) -> ds.SimulationsData:
229
    """Process all simulation folders in a results directory sequentially.
230

231
    Processes each simulation folder found in the results directory one at a time,
232
    applying the provided processing step/scenario to each simulation.
233

234
    Using the default settings your structure should look like this:
235

236
    | results_folder
237
    |     ├─ sim-1
238
    |     ├─ sim-2
239
    |     ├─ sim-3
240
    |         ├─ temp
241
    |             ├─ your-printer-files.prt
242

243
    Parameters
244
    __________
245
        results_folder pathlib.Path:
246
            Path to the directory containing simulation folders.
247
            Each subfolder should contain a temp folder containing valid simulation data files.
248

249
        processing_scenario: collections.abc.Callable or collections.abc.Sequence of collections.abc.Callable
250
            They should containd the processing logic for a simulation.
251
            Each callable should take a Simulation object as its only parameter and modify it in place.
252

253
    Returns
254
    _______
255
        SimulationsData: :class:`pytrnsys_process.api.SimulationsData`
256
            - monthly: Dict mapping simulation names to monthly DataFrame results
257
            - hourly: Dict mapping simulation names to hourly DataFrame results
258
            - scalar: DataFrame containing scalar/deck values from all simulations
259

260
    Raises
261
    ______
262
        ValueError: If results_folder doesn't exist or is not a directory
263
        Exception: Individual simulation failures are logged but not re-raised
264

265
    Example
266
    _______
267
        >>> import pathlib as _pl
268
        >>> from pytrnsys_process import api
269
        ...
270
        >>> def processing_step_1(sim):
271
        ...     # Process simulation data
272
        ...     pass
273
        >>> def processing_step_2(sim):
274
        ...     # Process simulation data
275
        ...     pass
276
        >>> results = api.process_whole_result_set(
277
        ...     _pl.Path("path/to/results"),
278
        ...     [processing_step_1, processing_step_2]
279
        ... )
280
    """
UNCOV
281
    _validate_folder(results_folder)
×
UNCOV
282
    main_logger = log.get_main_logger(results_folder)
×
UNCOV
283
    log.initialize_logs(results_folder)
×
UNCOV
284
    main_logger.info(
×
285
        "Starting batch processing of simulations in %s", results_folder
286
    )
287

UNCOV
288
    sim_folders = [
×
289
        sim_folder
290
        for sim_folder in results_folder.iterdir()
291
        if sim_folder.is_dir()
292
    ]
UNCOV
293
    simulations_data = _process_batch(
×
294
        sim_folders, processing_scenario, results_folder
295
    )
UNCOV
296
    util.save_to_pickle(
×
297
        simulations_data,
298
        results_folder / conf.FileNames.SIMULATIONS_DATA_PICKLE_FILE.value,
299
    )
300

UNCOV
301
    return simulations_data
×
302

303

304
def process_whole_result_set_parallel(
2✔
305
    results_folder: _pl.Path,
306
    processing_scenario: _tp.Union[
307
        _abc.Callable[[ds.Simulation], None],
308
        _tp.Sequence[_abc.Callable[[ds.Simulation], None]],
309
    ],
310
    max_workers: int | None = None,
311
) -> ds.SimulationsData:
312
    """Process all simulation folders in a results directory in parallel.
313

314
    Uses a ProcessPoolExecutor to process multiple simulations concurrently,
315
    applying the provided processing step/scenario to each simulation.
316

317
    Using the default settings your structure should look like this:
318

319
    | results_folder
320
    |     ├─ sim-1
321
    |     ├─ sim-2
322
    |     ├─ sim-3
323
    |         ├─ temp
324
    |             ├─ your-printer-files.prt
325

326
    Parameters
327
    __________
328
        results_folder pathlib.Path:
329
            Path to the directory containing simulation folders.
330
            Each subfolder should contain a temp folder containing valid simulation data files.
331

332
        processing_scenario: collections.abc.Callable or collections.abc.Sequence of collections.abc.Callable
333
            They should containd the processing logic for a simulation.
334
            Each callable should take a Simulation object as its only parameter and modify it in place.
335
        max_workers int, default None:
336
            Maximum number of worker processes to use.
337
            If None, defaults to the number of processors on the machine.
338

339
    Returns
340
    _______
341
        SimulationsData: :class:`pytrnsys_process.api.SimulationsData`
342
            - monthly: Dict mapping simulation names to monthly DataFrame results
343
            - hourly: Dict mapping simulation names to hourly DataFrame results
344
            - scalar: DataFrame containing scalar/deck values from all simulations
345

346
    Raises
347
    _______
348
        ValueError: If results_folder doesn't exist or is not a directory
349
        Exception: Individual simulation failures are logged but not re-raised
350

351
    Example
352
    _______
353
        >>> import pathlib as _pl
354
        >>> from pytrnsys_process import api
355
        ...
356
        >>> def processing_step_1(sim):
357
        ...     # Process simulation data
358
        ...     pass
359
        >>> def processing_step_2(sim):
360
        ...     # Process simulation data
361
        ...     pass
362
        >>> results = api.process_whole_result_set_parallel(
363
        ...     _pl.Path("path/to/results"),
364
        ...     [processing_step_1, processing_step_2]
365
        ... )
366
    """
367
    # The last :returns: ensures that the formatting works in PyCharm
UNCOV
368
    _validate_folder(results_folder)
×
UNCOV
369
    log.initialize_logs(results_folder)
×
UNCOV
370
    main_logger = log.get_main_logger(results_folder)
×
UNCOV
371
    main_logger.info(
×
372
        "Starting batch processing of simulations in %s with parallel execution",
373
        results_folder,
374
    )
375

UNCOV
376
    sim_folders = [
×
377
        sim_folder
378
        for sim_folder in results_folder.iterdir()
379
        if sim_folder.is_dir()
380
    ]
UNCOV
381
    simulations_data = _process_batch(
×
382
        sim_folders,
383
        processing_scenario,
384
        results_folder,
385
        parallel=True,
386
        max_workers=max_workers,
387
    )
UNCOV
388
    util.save_to_pickle(
×
389
        simulations_data,
390
        results_folder / conf.FileNames.SIMULATIONS_DATA_PICKLE_FILE.value,
391
    )
392

UNCOV
393
    return simulations_data
×
394

395

396
def do_comparison(
2✔
397
    comparison_scenario: _tp.Union[
398
        _abc.Callable[[ds.SimulationsData], None],
399
        _abc.Sequence[_abc.Callable[[ds.SimulationsData], None]],
400
    ],
401
    simulations_data: _tp.Optional[ds.SimulationsData] = None,
402
    results_folder: _tp.Optional[_pl.Path] = None,
403
) -> ds.SimulationsData:
404
    """Execute comparison scenarios on processed simulation results.
405

406
    Parameters
407
    __________
408
        comparison_scenario: collections.abc.Callable or collections.abc.Sequence of collections.abc.Callable
409
            They should containd the comparison logic.
410
            Each callable should take a SimulationsData object as its only parameter and modify it in place.
411

412
        simulations_data: SimulationsData, optional
413
            SimulationsData object containing the processed
414
            simulations data to be compared.
415

416
        results_folder: pathlib.Path, optional
417
            Path to the directory containing simulation results.
418
            Used if simulations_data is not provided.
419

420
    Returns
421
    _______
422
        SimulationsData: :class:`pytrnsys_process.api.SimulationsData`
423

424
    Example
425
    __________
426
        >>> from pytrnsys_process import api
427
        ...
428
        >>> def comparison_step(simulations_data: ds.SimulationsData):
429
        ...     # Compare simulation results
430
        ...     pass
431
        ...
432
        >>> api.do_comparison(comparison_step, simulations_data=processed_results)
433
    """
UNCOV
434
    if not simulations_data:
×
UNCOV
435
        if not results_folder:
×
UNCOV
436
            raise ValueError(
×
437
                "Either simulations_data or results_folder must be provided to perform comparison"
438
            )
UNCOV
439
        path_to_simulations_data = (
×
440
            results_folder / conf.FileNames.SIMULATIONS_DATA_PICKLE_FILE.value
441
        )
UNCOV
442
        if (
×
443
            path_to_simulations_data.exists()
444
            and not conf.global_settings.reader.force_reread_prt
445
        ):
UNCOV
446
            simulations_data = util.load_simulations_data_from_pickle(
×
447
                path_to_simulations_data
448
            )
449
            # Moving locations of files breaks the paths.
450
            # If the pickle file is found, then we know the new path is correct.
451
            # The original path is saved for later retrieval.
UNCOV
452
            simulations_data.path_to_simulations_original = (
×
453
                simulations_data.path_to_simulations
454
            )
UNCOV
455
            if not simulations_data.path_to_simulations == str(results_folder):
×
UNCOV
456
                simulations_data.path_to_simulations = str(results_folder)
×
457

458
        else:
UNCOV
459
            simulations_data = process_whole_result_set_parallel(
×
460
                results_folder, []
461
            )
UNCOV
462
    main_logger = log.get_main_logger(
×
463
        _pl.Path(simulations_data.path_to_simulations)
464
    )
UNCOV
465
    _process_comparisons(simulations_data, comparison_scenario, main_logger)
×
466

UNCOV
467
    return simulations_data
×
468

469

470
def _process_comparisons(
2✔
471
    simulations_data: ds.SimulationsData,
472
    comparison_scenario: _tp.Union[
473
        _abc.Callable[[ds.SimulationsData], None],
474
        _abc.Sequence[_abc.Callable[[ds.SimulationsData], None]],
475
    ],
476
    main_logger: _logging.Logger,
477
):
UNCOV
478
    scenario = (
×
479
        [comparison_scenario]
480
        if callable(comparison_scenario)
481
        else comparison_scenario
482
    )
UNCOV
483
    for step in scenario:
×
UNCOV
484
        try:
×
UNCOV
485
            step(simulations_data)
×
UNCOV
486
            _plt.close("all")
×
487
        except Exception as e:  # pylint: disable=broad-except
×
488
            scenario_name = getattr(step, "__name__", str(step))
×
489
            main_logger.error(
×
490
                "Scenario %s failed for comparison: %s ",
491
                scenario_name,
492
                str(e),
493
                exc_info=True,
494
            )
495

496

497
def _concat_scalar(simulation_data: ds.SimulationsData) -> ds.SimulationsData:
2✔
UNCOV
498
    scalar_values_to_concat = {
×
499
        sim_name: sim.scalar
500
        for sim_name, sim in simulation_data.simulations.items()
501
        if not sim.scalar.empty
502
    }
UNCOV
503
    if scalar_values_to_concat:
×
UNCOV
504
        simulation_data.scalar = _pd.concat(
×
505
            scalar_values_to_concat.values(),
506
            keys=scalar_values_to_concat.keys(),
507
        ).droplevel(1)
UNCOV
508
    return simulation_data
×
509

510

511
def _validate_folder(folder: _pl.Path) -> None:
2✔
UNCOV
512
    if not folder.exists():
×
513
        raise ValueError(f"Folder does not exist: {folder}")
×
UNCOV
514
    if not folder.is_dir():
×
515
        raise ValueError(f"Path is not a directory: {folder}")
×
516

517

518
def _process_simulation(
2✔
519
    sim_folder: _pl.Path,
520
    processing_scenarios: _tp.Union[
521
        _abc.Callable[[ds.Simulation], None],
522
        _tp.Sequence[_abc.Callable[[ds.Simulation], None]],
523
    ],
524
    force_reread_prt: _tp.Optional[bool] = None,
525
) -> tuple[ds.Simulation, list[str]]:
UNCOV
526
    if not force_reread_prt:
×
UNCOV
527
        force_reread_prt = conf.global_settings.reader.force_reread_prt
×
UNCOV
528
    sim_logger = log.get_simulation_logger(sim_folder)
×
UNCOV
529
    sim_logger.info("Starting simulation processing")
×
UNCOV
530
    sim_pickle_file = sim_folder / conf.FileNames.SIMULATION_PICKLE_FILE.value
×
531
    simulation: ds.Simulation
UNCOV
532
    if sim_pickle_file.exists() and not force_reread_prt:
×
UNCOV
533
        sim_logger.info("Loading simulation from pickle file")
×
UNCOV
534
        simulation = util.load_simulation_from_pickle(
×
535
            sim_pickle_file, sim_logger
536
        )
537
    else:
UNCOV
538
        sim_logger.info("Processing simulation from raw files")
×
UNCOV
539
        sim_files = util.get_files([sim_folder])
×
UNCOV
540
        simulation = ps.process_sim(sim_files, sim_folder)
×
UNCOV
541
        if sim_files:
×
UNCOV
542
            util.save_to_pickle(simulation, sim_pickle_file, sim_logger)
×
543

UNCOV
544
    failed_scenarios = []
×
545

546
    # Convert single scenario to list for uniform handling
UNCOV
547
    scenarios = (
×
548
        [processing_scenarios]
549
        if callable(processing_scenarios)
550
        else processing_scenarios
551
    )
552

UNCOV
553
    for scenario in scenarios:
×
UNCOV
554
        try:
×
UNCOV
555
            scenario_name = getattr(scenario, "__name__", str(scenario))
×
UNCOV
556
            sim_logger.info("Running scenario: %s", scenario_name)
×
UNCOV
557
            scenario(simulation)
×
UNCOV
558
            sim_logger.info(
×
559
                "Successfully completed scenario: %s", scenario_name
560
            )
UNCOV
561
            _plt.close("all")
×
UNCOV
562
        except Exception as e:  # pylint: disable=broad-except
×
UNCOV
563
            failed_scenarios.append(scenario_name)
×
UNCOV
564
            sim_logger.error(
×
565
                "Scenario %s failed: %s",
566
                scenario_name,
567
                str(e),
568
                exc_info=True,
569
            )
570

UNCOV
571
    if failed_scenarios:
×
UNCOV
572
        sim_logger.warning(
×
573
            "Simulation completed with %d failed scenarios",
574
            len(failed_scenarios),
575
        )
576
    else:
UNCOV
577
        sim_logger.info("Simulation completed successfully")
×
578

UNCOV
579
    return simulation, failed_scenarios
×
580

581

582
def _log_processing_results(
2✔
583
    results: ds.ProcessingResults, main_logger: _logging.Logger
584
) -> None:
UNCOV
585
    main_logger.info("=" * 80)
×
UNCOV
586
    main_logger.info("BATCH PROCESSING SUMMARY")
×
UNCOV
587
    main_logger.info("-" * 80)
×
UNCOV
588
    main_logger.info(
×
589
        "Total simulations processed: %d | Failed: %d",
590
        results.processed_count,
591
        results.error_count,
592
    )
593

UNCOV
594
    if results.error_count > 0:
×
595
        main_logger.warning(
×
596
            "Some simulations failed to process. Check the log for details."
597
        )
598
        main_logger.warning("Failed simulations:")
×
599
        for sim in results.failed_simulations:
×
600
            main_logger.warning("  • %s", sim)
×
601

UNCOV
602
    if results.failed_scenarios:
×
UNCOV
603
        main_logger.warning("Failed scenarios by simulation:")
×
UNCOV
604
        for sim, scenarios in results.failed_scenarios.items():
×
UNCOV
605
            if scenarios:
×
UNCOV
606
                main_logger.warning("  • %s:", sim)
×
UNCOV
607
                for scenario in scenarios:
×
UNCOV
608
                    main_logger.warning("    - %s", scenario)
×
UNCOV
609
    main_logger.info("=" * 80)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc