• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SPF-OST / pytrnsys_process / 12275439241

11 Dec 2024 11:28AM UTC coverage: 95.252% (+0.9%) from 94.322%
12275439241

push

github

sebastian-swob
fixed converter to behave as expected

3 of 3 new or added lines in 1 file covered. (100.0%)

14 existing lines in 6 files now uncovered.

642 of 674 relevant lines covered (95.25%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.81
/pytrnsys_process/process_batch.py
1
import pathlib as _pl
1✔
2
from collections.abc import Callable
1✔
3
from concurrent.futures import ProcessPoolExecutor, as_completed
1✔
4
from dataclasses import dataclass, field
1✔
5
from typing import List, Sequence, Union
1✔
6

7
import matplotlib.pyplot as _plt
1✔
8

9
from pytrnsys_process import utils
1✔
10
from pytrnsys_process.logger import logger
1✔
11
from pytrnsys_process.process_sim import process_sim as ps
1✔
12

13

14
@dataclass
1✔
15
class ProcessingResults:
1✔
16
    """Results from processing one or more simulations.
17

18
    Attributes:
19
        processed_count: Number of successfully processed simulations
20
        error_count: Number of simulations that failed to process
21
        failed_simulations: List of simulation names that failed to process
22
        failed_scenarios: Dictionary mapping simulation names to lists of failed scenario names
23
        simulations: Dictionary mapping simulation names to processed Simulation objects
24

25
    Example:
26
        >>> results = ProcessingResults()
27
        >>> results.processed_count = 5
28
        >>> results.error_count = 1
29
        >>> results.failed_simulations = ['sim_001']
30
        >>> results.failed_scenarios = {'sim_002': ['scenario_1']}
31
    """
32

33
    processed_count: int = 0
1✔
34
    error_count: int = 0
1✔
35
    failed_simulations: List[str] = field(default_factory=list)
1✔
36
    failed_scenarios: dict[str, List[str]] = field(default_factory=dict)
1✔
37
    simulations: dict[str, ps.Simulation] = field(default_factory=dict)
1✔
38

39

40
def _validate_folder(folder: _pl.Path) -> None:
1✔
41
    if not folder.exists():
1✔
UNCOV
42
        raise ValueError(f"Folder does not exist: {folder}")
×
43
    if not folder.is_dir():
1✔
UNCOV
44
        raise ValueError(f"Path is not a directory: {folder}")
×
45

46

47
def _process_simulation(
1✔
48
        sim_folder: _pl.Path,
49
        processing_scenarios: Union[Callable, Sequence[Callable]],
50
) -> tuple[ps.Simulation, List[str]]:
51
    logger.debug("Processing simulation folder: %s", sim_folder)
1✔
52
    sim_files = utils.get_files([sim_folder])
1✔
53
    simulation = ps.process_sim(sim_files, sim_folder)
1✔
54
    failed_scenarios = []
1✔
55

56
    # Convert single scenario to list for uniform handling
57
    scenarios = (
1✔
58
        [processing_scenarios]
59
        if callable(processing_scenarios)
60
        else processing_scenarios
61
    )
62

63
    for scenario in scenarios:
1✔
64
        try:
1✔
65
            scenario(simulation)
1✔
66
        except Exception as e:  # pylint: disable=broad-except
1✔
67
            scenario_name = getattr(scenario, "__name__", str(scenario))
1✔
68
            failed_scenarios.append(scenario_name)
1✔
69
            logger.error(
1✔
70
                "Scenario %s failed for simulation %s: %s",
71
                scenario_name,
72
                sim_folder.name,
73
                str(e),
74
                exc_info=True,
75
            )
76

77
    _plt.close("all")
1✔
78
    return simulation, failed_scenarios
1✔
79

80

81
def _log_processing_results(results: ProcessingResults) -> None:
1✔
82
    logger.info(
1✔
83
        "Batch processing complete. Processed: %d, Errors: %d",
84
        results.processed_count,
85
        results.error_count,
86
    )
87
    if results.error_count > 0:
1✔
88
        logger.warning(
1✔
89
            "Some simulations failed to process. Check the log for details."
90
        )
91
    if results.failed_scenarios:
1✔
92
        logger.warning(
1✔
93
            "Some scenarios failed: %s",
94
            {
95
                sim: scenarios
96
                for sim, scenarios in results.failed_scenarios.items()
97
                if scenarios
98
            },
99
        )
100

101

102
def process_single_simulation(
1✔
103
        sim_folder: _pl.Path,
104
        processing_scenarios: Union[Callable, Sequence[Callable]],
105
) -> ProcessingResults:
106
    """Process a single simulation folder using the provided processing scenario(s).
107

108
    Args:
109
        sim_folder: Path to the simulation folder to process
110
        processing_scenarios: Single callable or sequence of callables that implement
111
            the processing logic for a simulation. Each callable should take a Simulation
112
            object as its only parameter.
113

114
    Returns:
115
        ProcessingResults containing the processed simulation and any failures
116

117
    Example:
118
        >>> import pathlib as _pl
119
        >>> from pytrnsys_process import api
120
        ...
121
        >>> def processing_step_1(sim: api.Simulation):
122
        ...     # Process simulation data
123
        ...     pass
124
        >>> results = api.process_single_simulation(
125
        ...     _pl.Path("path/to/simulation"),
126
        ...     processing_step_1
127
        ... )
128
        >>> print(f"Processed: {results.processed_count}")
129
    """
130
    results = ProcessingResults()
1✔
131
    try:
1✔
132
        simulation, failed_scenarios = _process_simulation(
1✔
133
            sim_folder, processing_scenarios
134
        )
135
        results.processed_count += 1
1✔
136
        results.simulations[simulation.path.name] = simulation
1✔
137
        if failed_scenarios:
1✔
138
            results.failed_scenarios[simulation.path.name] = failed_scenarios
1✔
139
    except Exception as e:  # pylint: disable=broad-except
1✔
140
        results.error_count += 1
1✔
141
        results.failed_simulations.append(sim_folder.name)
1✔
142
        logger.error(
1✔
143
            "Failed to process simulation in %s: %s",
144
            sim_folder,
145
            str(e),
146
            exc_info=True,
147
        )
148
    return results
1✔
149

150

151
def process_whole_result_set(
1✔
152
        results_folder: _pl.Path,
153
        processing_scenario: Union[Callable, Sequence[Callable]],
154
) -> ProcessingResults:
155
    """Process all simulation folders in a results directory sequentially.
156

157
    Args:
158
        results_folder: Path to the directory containing simulation folders
159
        processing_scenario: Single callable or sequence of callables that implement
160
            the processing logic for each simulation. Each callable should take a
161
            Simulation object as its only parameter.
162

163
    Returns:
164
        ProcessingResults containing counts of processed and failed simulations
165

166
    Raises:
167
        ValueError: If results_folder doesn't exist or is not a directory
168

169
    Example:
170
        >>> import pathlib as _pl
171
        >>> from pytrnsys_process import api
172
        ...
173
        >>> def processing_step_1(sim):
174
        ...     # Process simulation data
175
        ...     pass
176
        >>> def processing_step_2(sim):
177
        ...     # Process simulation data
178
        ...     pass
179
        >>> results = api.process_whole_result_set(
180
        ...     _pl.Path("path/to/results"),
181
        ...     [processing_step_1, processing_step_2]
182
        ... )
183
        >>> print(f"Processed: {results.processed_count}, Failed: {results.error_count}")
184
    """
185
    _validate_folder(results_folder)
1✔
186

187
    logger.info(
1✔
188
        "Starting batch processing of simulations in %s", results_folder
189
    )
190
    results = ProcessingResults()
1✔
191

192
    for sim_folder in results_folder.iterdir():
1✔
193
        if not sim_folder.is_dir():
1✔
UNCOV
194
            continue
×
195

196
        try:
1✔
197
            simulation, failed_scenarios = _process_simulation(
1✔
198
                sim_folder, processing_scenario
199
            )
200
            results.processed_count += 1
1✔
201
            results.simulations[simulation.path.name] = simulation
1✔
202
            if failed_scenarios:
1✔
203
                results.failed_scenarios[simulation.path.name] = (
1✔
204
                    failed_scenarios
205
                )
206
        except Exception as e:  # pylint: disable=broad-except
1✔
207
            results.error_count += 1
1✔
208
            results.failed_simulations.append(sim_folder.name)
1✔
209
            logger.error(
1✔
210
                "Failed to process simulation in %s: %s",
211
                sim_folder,
212
                str(e),
213
                exc_info=True,
214
            )
215

216
    _log_processing_results(results)
1✔
217
    return results
1✔
218

219

220
def process_whole_result_set_parallel(
1✔
221
    results_folder: _pl.Path,
222
        processing_scenario: Union[Callable, Sequence[Callable]],
223
    max_workers: int | None = None,
224
) -> ProcessingResults:
225
    """Process all simulation folders in a results directory in parallel.
226

227
    Uses a ProcessPoolExecutor to process multiple simulations concurrently.
228

229
    Args:
230
        results_folder: Path to the directory containing simulation folders
231
        processing_scenario: Single callable or sequence of callables that implement
232
            the processing logic for each simulation. Each callable should take a
233
            Simulation object as its only parameter.
234
        max_workers: Maximum number of worker processes to use. If None, defaults to
235
            the number of processors on the machine.
236

237
    Returns:
238
        ProcessingResults containing counts of processed and failed simulations
239

240
    Raises:
241
        ValueError: If results_folder doesn't exist or is not a directory
242

243
    Example:
244
        >>> import pathlib as _pl
245
        >>> from pytrnsys_process import api
246
        ...
247
        >>> def processing_step_1(sim):
248
        ...     # Process simulation data
249
        ...     pass
250
        >>> def processing_step_2(sim):
251
        ...     # Process simulation data
252
        ...     pass
253
        >>> results = api.process_whole_result_set_parallel(
254
        ...     _pl.Path("path/to/results"),
255
        ...     [processing_step_1, processing_step_2]
256
        ... )
257
        >>> print(f"Processed: {results.processed_count}, Failed: {results.error_count}")
258
    """
259
    _validate_folder(results_folder)
1✔
260

261
    logger.info(
1✔
262
        "Starting batch processing of simulations in %s with parallel execution",
263
        results_folder,
264
    )
265
    results = ProcessingResults()
1✔
266

267
    sim_folders = [
1✔
268
        sim_folder
269
        for sim_folder in results_folder.iterdir()
270
        if sim_folder.is_dir()
271
    ]
272

273
    with ProcessPoolExecutor(max_workers=max_workers) as executor:
1✔
274
        tasks = {
1✔
275
            executor.submit(
276
                _process_simulation, sim_folder, processing_scenario
277
            ): sim_folder
278
            for sim_folder in sim_folders
279
        }
280

281
        for future in as_completed(tasks):
1✔
282
            try:
1✔
283
                simulation, failed_scenarios = future.result()
1✔
284
                results.processed_count += 1
1✔
285
                results.simulations[simulation.path.name] = simulation
1✔
286
                if failed_scenarios:
1✔
287
                    results.failed_scenarios[simulation.path.name] = (
1✔
288
                        failed_scenarios
289
                    )
290
            except Exception as e:  # pylint: disable=broad-except
1✔
291
                results.error_count += 1
1✔
292
                results.failed_simulations.append(tasks[future].name)
1✔
293
                logger.error(
1✔
294
                    "Failed to process simulation in %s: %s",
295
                    tasks[future].name,
296
                    str(e),
297
                    exc_info=True,
298
                )
299

300
    _log_processing_results(results)
1✔
301
    return results
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc