• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SPF-OST / pytrnsys_process / 12050569509

27 Nov 2024 12:44PM UTC coverage: 90.52% (-3.5%) from 94.037%
12050569509

push

github

web-flow
Merge pull request #18 from SPF-OST/15-create-example-script-for-per-sim-interaction

15 create example script for per sim interaction

119 of 124 new or added lines in 5 files covered. (95.97%)

20 existing lines in 3 files now uncovered.

487 of 538 relevant lines covered (90.52%)

1.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.0
/pytrnsys_process/process_batch.py
1
import pathlib as _pl
2✔
2
from collections.abc import Callable
2✔
3
from concurrent.futures import ProcessPoolExecutor, as_completed
2✔
4
from dataclasses import dataclass, field
2✔
5
from typing import List
2✔
6

7
import matplotlib.pyplot as _plt
2✔
8

9
from pytrnsys_process.logger import logger
2✔
10
from pytrnsys_process.process_sim import process_sim as ps
2✔
11

12

13
@dataclass
2✔
14
class ProcessingResults:
2✔
15
    """Results from processing one or more simulations"""
16

17
    processed_count: int = 0
2✔
18
    error_count: int = 0
2✔
19
    failed_simulations: List[str] = field(default_factory=list)
2✔
20
    simulations: dict[str, ps.Simulation] = field(default_factory=dict)
2✔
21

22

23
def _validate_folder(folder: _pl.Path) -> None:
2✔
24
    if not folder.exists():
2✔
NEW
25
        raise ValueError(f"Folder does not exist: {folder}")
×
26
    if not folder.is_dir():
2✔
NEW
27
        raise ValueError(f"Path is not a directory: {folder}")
×
28

29

30
def _process_simulation(
2✔
31
    sim_folder: _pl.Path, processing_scenario: Callable
32
) -> ps.Simulation:
33
    logger.debug("Processing simulation folder: %s", sim_folder)
2✔
34
    simulation = ps.process_sim_prt(sim_folder)
2✔
35
    processing_scenario(simulation)
2✔
36
    _plt.close("all")
2✔
37
    return simulation
2✔
38

39

40
def _log_processing_results(results: ProcessingResults) -> None:
2✔
41
    logger.info(
2✔
42
        "Batch processing complete. Processed: %d, Errors: %d",
43
        results.processed_count,
44
        results.error_count,
45
    )
46
    if results.error_count > 0:
2✔
47
        logger.warning(
2✔
48
            "Some simulations failed to process. Check the log for details."
49
        )
50

51

52
def process_single_simulation(
2✔
53
    sim_folder: _pl.Path, processing_scenario: Callable
54
) -> ProcessingResults:
55
    """Process a single simulation folder using the provided processing scenario.
56

57
    Args:
58
        sim_folder: Path to the simulation folder to process
59
        processing_scenario: Callable that implements the processing logic for a simulation
60

61
    Returns:
62
        ProcessingResults containing the processed simulation
63

64
    Raises:
65
        Exception: If processing fails. The error will be logged but not re-raised.
66
    """
67
    results = ProcessingResults()
2✔
68
    try:
2✔
69
        simulation = _process_simulation(sim_folder, processing_scenario)
2✔
70
        results.processed_count += 1
2✔
71
        results.simulations[simulation.path.name] = simulation
2✔
72
    except Exception as e:  # pylint: disable=broad-except
2✔
73
        results.error_count += 1
2✔
74
        results.failed_simulations.append(sim_folder.name)
2✔
75
        logger.error(
2✔
76
            "Failed to process simulation in %s: %s",
77
            sim_folder,
78
            str(e),
79
            exc_info=True,
80
        )
81
    return results
2✔
82

83

84
def process_whole_result_set(
2✔
85
    results_folder: _pl.Path, processing_scenario: Callable
86
) -> ProcessingResults:
87
    """Process all simulation folders in a results directory sequentially.
88

89
    Args:
90
        results_folder: Path to the directory containing simulation folders
91
        processing_scenario: Callable that implements the processing logic for each simulation
92

93
    Returns:
94
        ProcessingResults containing counts of processed and failed simulations
95

96
    Raises:
97
        ValueError: If results_folder doesn't exist or is not a directory
98
    """
99
    _validate_folder(results_folder)
2✔
100

101
    logger.info(
2✔
102
        "Starting batch processing of simulations in %s", results_folder
103
    )
104
    results = ProcessingResults()
2✔
105

106
    for sim_folder in results_folder.iterdir():
2✔
107
        if not sim_folder.is_dir():
2✔
NEW
108
            continue
×
109

110
        try:
2✔
111
            simulation = _process_simulation(sim_folder, processing_scenario)
2✔
112
            results.processed_count += 1
2✔
113
            results.simulations[simulation.path.name] = simulation
2✔
114
        except Exception as e:  # pylint: disable=broad-except
2✔
115
            results.error_count += 1
2✔
116
            results.failed_simulations.append(sim_folder.name)
2✔
117
            logger.error(
2✔
118
                "Failed to process simulation in %s: %s",
119
                sim_folder,
120
                str(e),
121
                exc_info=True,
122
            )
123

124
    _log_processing_results(results)
2✔
125
    return results
2✔
126

127

128
def process_whole_result_set_parallel(
2✔
129
    results_folder: _pl.Path,
130
    processing_scenario: Callable,
131
    max_workers: int | None = None,
132
) -> ProcessingResults:
133
    """Process all simulation folders in a results directory in parallel.
134

135
    Uses a ProcessPoolExecutor to process multiple simulations concurrently.
136

137
    Args:
138
        results_folder: Path to the directory containing simulation folders
139
        processing_scenario: Callable that implements the processing logic for each simulation
140
        max_workers: Maximum number of worker processes to use. If None, defaults to the number
141
            of processors on the machine.
142

143
    Returns:
144
        ProcessingResults containing counts of processed and failed simulations
145

146
    Raises:
147
        ValueError: If results_folder doesn't exist or is not a directory
148
    """
149
    _validate_folder(results_folder)
2✔
150

151
    logger.info(
2✔
152
        "Starting batch processing of simulations in %s with parallel execution",
153
        results_folder,
154
    )
155
    results = ProcessingResults()
2✔
156

157
    sim_folders = [
2✔
158
        sim_folder
159
        for sim_folder in results_folder.iterdir()
160
        if sim_folder.is_dir()
161
    ]
162

163
    with ProcessPoolExecutor(max_workers=max_workers) as executor:
2✔
164
        tasks = {
2✔
165
            executor.submit(
166
                _process_simulation, sim_folder, processing_scenario
167
            ): sim_folder
168
            for sim_folder in sim_folders
169
        }
170

171
        for future in as_completed(tasks):
2✔
172
            try:
2✔
173
                simulation = future.result()
2✔
174
                results.processed_count += 1
2✔
175
                results.simulations[simulation.path.name] = simulation
2✔
176
            except Exception as e:  # pylint: disable=broad-except
2✔
177
                results.error_count += 1
2✔
178
                results.failed_simulations.append(tasks[future].name)
2✔
179
                logger.error(
2✔
180
                    "Failed to process simulation in %s: %s",
181
                    tasks[future].name,
182
                    str(e),
183
                    exc_info=True,
184
                )
185

186
    _log_processing_results(results)
2✔
187
    return results
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc