• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SPF-OST / pytrnsys / 15706698083

17 Jun 2025 12:04PM UTC coverage: 28.985% (+0.02%) from 28.963%
15706698083

push

github

zuckerruebe
Remove unused argument.

4051 of 13976 relevant lines covered (28.99%)

0.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.54
/pytrnsys/rsim/runParallel.py
1
# pylint: disable=unspecified-encoding
2

3
import collections.abc as _cabc
1✔
4
import dataclasses as _dc
1✔
5
import datetime
1✔
6
import json
1✔
7
import logging
1✔
8
import os
1✔
9
import shutil
1✔
10
import subprocess as _sp
1✔
11
import sys
1✔
12
import time
1✔
13
import typing as _tp
1✔
14

15
import pandas as _pd
1✔
16

17
import pytrnsys.rsim.command as _cmd
1✔
18
import pytrnsys.trnsys_util.LogTrnsys as _logt
1✔
19

20
logger = logging.getLogger("root")
1✔
21

22

23
def getNumberOfCPU():
1✔
24
    """Returns the number of CPUs in the system"""
25
    num = 1
1✔
26
    if sys.platform == "win32":
1✔
27
        try:
1✔
28
            num = int(os.environ["NUMBER_OF_PROCESSORS"])
1✔
29
        except (ValueError, KeyError):
×
30
            pass
×
31
    elif sys.platform == "darwin":
×
32
        try:
×
33
            num = int(os.popen("sysctl -n hw.ncpu").read())
×
34
        except ValueError:
×
35
            pass
×
36
    else:
37
        try:
×
38
            num = os.sysconf("SC_NPROCESSORS_ONLN")
×
39
        except (ValueError, OSError, AttributeError):
×
40
            pass
×
41

42
    return num
1✔
43

44

45
@_dc.dataclass
1✔
46
class _SimulationCase:
1✔
47
    caseNumber: int
1✔
48
    command: _cmd.Command
1✔
49
    process: _sp.Popen | None = None
1✔
50

51

52
@_dc.dataclass
1✔
53
class _Cpu:
1✔
54
    id: int
1✔
55
    assignedCase: _SimulationCase | None = None
1✔
56

57

58
def runParallel(  # pylint: disable=too-many-locals,too-many-branches,too-many-statements
1✔
59
    commands: _cabc.Sequence[_cmd.Command],
60
    reduceCpu=0,
61
    outputFile=False,
62
    estimatedCPUTime=0.33,
63
    trackingFile=None,
64
    masterFile=None,
65
) -> None:
66
    """Exec commands in parallel in multiple process
67
    (as much as we have CPU)
68

69
    The delay time is used to prevent multiple instances of trnsys trying to access the files at the same time.
70
    This is especially problematic with type 56, which not only reads several files.
71
    It also creates multiple files at the start of the simulation.
72
    """
73
    logDict = dict[str, list[_tp.Any]]()
1✔
74
    if trackingFile:
1✔
75
        with open(trackingFile, "w") as file:
1✔
76
            json.dump(logDict, file, indent=2, separators=(",", ": "), sort_keys=True)
1✔
77

78
    maxNumberOfCPU = max(min(getNumberOfCPU() - reduceCpu, len(commands)), 1)
1✔
79

80
    cpus = [_Cpu(i + 1) for i in range(maxNumberOfCPU)]
1✔
81

82
    if outputFile:
1✔
83
        lines = ""
1✔
84
        line = "============PARALLEL PROCESSING STARTED==============\n"
1✔
85
        lines = lines + line
1✔
86
        line = f"Number of simulated cases ={len(commands):d}\n"
1✔
87
        lines = lines + line
1✔
88
        line = f"Number of CPU used ={maxNumberOfCPU:d}\n"
1✔
89
        lines = lines + line
1✔
90
        line = (
1✔
91
            f"Estimated time = {len(commands) * estimatedCPUTime / (maxNumberOfCPU * 1.0):f} hours, "
92
            f"assuming :{estimatedCPUTime:f} hour per simulation\n"
93
        )
94
        lines = lines + line
1✔
95
        line = "============CASES TO BE SIMULATED====================\n"
1✔
96
        lines = lines + line
1✔
97

98
        i = 1
1✔
99
        for cmd in commands:
1✔
100
            case = cmd.deckFilePath.name
1✔
101
            line = f"Case {i:d} to be simulated {case}\n"
1✔
102
            lines = lines + line
1✔
103
            i = i + 1
1✔
104
        line = "============ALREADY SIMULATED CASES====================\n"
1✔
105
        lines = lines + line
1✔
106

107
        with open(outputFile, "w") as outfileRun:
1✔
108
            outfileRun.writelines(lines)
1✔
109

110
    if not commands:
1✔
111
        return
×
112

113
    caseNumber = 1
1✔
114
    for cpu, command in zip(cpus, commands, strict=False):
1✔
115
        simulationCase = _SimulationCase(caseNumber, command)
1✔
116
        cpu.assignedCase = simulationCase
1✔
117
        caseNumber += 1
1✔
118

119
    assignedCommands = [c.assignedCase.command for c in cpus if c.assignedCase]
1✔
120
    commandsStillToBeRun = [c for c in commands if c not in assignedCommands]
1✔
121

122
    completedCommands = []
1✔
123

124
    startTime = time.time()
1✔
125

126
    while True:  # pylint: disable=too-many-nested-blocks
1✔
127
        for cpu in cpus:
1✔
128
            assignedCase = cpu.assignedCase
1✔
129

130
            if not assignedCase:
1✔
131
                continue
1✔
132

133
            command = assignedCase.command
1✔
134
            process = assignedCase.process
1✔
135

136
            dckName = command.deckFilePath.name
1✔
137

138
            if not process:
1✔
139

140
                if trackingFile:
1✔
141
                    with open(trackingFile, "r") as file:
1✔
142
                        logDict = json.load(file)
1✔
143
                    logDict[dckName] = [time.strftime("%Y-%m-%d_%H:%M:%S")]
1✔
144
                    with open(trackingFile, "w") as file:
1✔
145
                        json.dump(logDict, file, indent=2, separators=(",", ": "), sort_keys=True)
1✔
146

147
                logger.info("Command: %s", command)
1✔
148
                assignedCase.process = _sp.Popen(  # pylint: disable=consider-using-with
1✔
149
                    command.args, stdout=_sp.PIPE, stderr=_sp.PIPE, cwd=command.cwd
150
                )
151

152
            elif _isDone(process):
1✔
153
                if not _hasSuccessfullyCompleted(assignedCase):
1✔
154
                    logger.warning("PARALLEL RUN HAS FAILED")
×
155
                    sys.exit(1)
×
156

157
                if outputFile:
1✔
158
                    lines = (
1✔
159
                        f"Finished simulated case {assignedCase.caseNumber:d} "
160
                        f"at {time.strftime('%H:%M:%S of day %d-%m-%y')}\n"
161
                    )
162
                    with open(outputFile, "a") as outfileRun:
1✔
163
                        outfileRun.writelines(lines)
1✔
164

165
                if trackingFile:
1✔
166
                    with open(trackingFile, "r") as file:
1✔
167
                        logDict = json.load(file)
1✔
168
                    logDict[dckName].append(time.strftime("%Y-%m-%d_%H:%M:%S"))
1✔
169

170
                    logTrnsys, _ = _getLogTrnsysAndDeckFileName(command)
1✔
171

172
                    if logTrnsys.logFatalErrors():
1✔
173
                        logDict[dckName].append("fatal error")
×
174
                    else:
175
                        logDict[dckName].append("success")
1✔
176

177
                    simulationHours = logTrnsys.checkSimulatedHours()
1✔
178
                    if len(simulationHours) == 2:
1✔
179
                        logDict[dckName].append(simulationHours[0])
1✔
180
                        logDict[dckName].append(simulationHours[1])
1✔
181
                    elif len(simulationHours) == 1:
×
182
                        logDict[dckName].append(simulationHours[0])
×
183
                        logDict[dckName].append(None)
×
184

185
                    with open(trackingFile, "w") as file:
1✔
186
                        json.dump(logDict, file, indent=2, separators=(",", ": "), sort_keys=True)
1✔
187

188
                cpu.assignedCase = None
1✔
189
                completedCommands.append(command)
1✔
190

191
                logger.info("Runs completed: %s/%s", len(completedCommands), len(commands))
1✔
192

193
                if len(completedCommands) % len(cpus) == 0 and len(completedCommands) != len(commands):
1✔
194
                    currentTime = time.time()
1✔
195
                    timeSoFarSec = currentTime - startTime
1✔
196
                    totalTimePredictionSec = timeSoFarSec * len(commands) / len(completedCommands)
1✔
197
                    endTimePrediction = datetime.datetime.fromtimestamp(startTime + totalTimePredictionSec).strftime(
1✔
198
                        "%H:%M on %d.%m.%Y"
199
                    )
200
                    logger.info("+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++")
1✔
201
                    logger.info("Predicted time of completion of all %s runs: %s", len(commands), endTimePrediction)
1✔
202
                    logger.info("+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++")
1✔
203

204
                if masterFile and (len(completedCommands) == len(commands)):
1✔
205
                    newDf = _pd.DataFrame.from_dict(
1✔
206
                        logDict,
207
                        orient="index",
208
                        columns=["started", "finished", "outcome", "hour start", "hour end"],
209
                    )
210

211
                    if os.path.isfile(masterFile):
1✔
212
                        masterPath, masterOrig = os.path.split(masterFile)
×
213
                        masterBackup = masterOrig.split(".")[0] + "_BACKUP.csv"
×
214
                        try:
×
215
                            shutil.copyfile(masterFile, os.path.join(masterPath, masterBackup))
×
216
                            logger.info("Updated %s", masterBackup)
×
217
                        except OSError:
×
218
                            logger.error("Unable to generate BACKUP of %s", masterFile)
×
219
                        origDf = _pd.read_csv(masterFile, sep=";", index_col=0)
×
220

221
                        masterDf = _pd.concat([origDf, newDf])
×
222
                        masterDf = masterDf[~masterDf.index.duplicated(keep="last")]
×
223
                    else:
224
                        masterDf = newDf
1✔
225

226
                    try:
1✔
227
                        masterDf.to_csv(masterFile, sep=";")
1✔
228
                        logger.info("Updated %s", masterFile)
1✔
229
                    except OSError:
×
230
                        logger.error("Unable to write to %s", masterFile)
×
231

232
                if commandsStillToBeRun:
1✔
233
                    nextCommand = commandsStillToBeRun.pop(0)
1✔
234
                    simulationCase = _SimulationCase(caseNumber, nextCommand)
1✔
235
                    cpu.assignedCase = simulationCase
1✔
236
                    caseNumber += 1
1✔
237

238
        runningCases = [c.assignedCase for c in cpus if c.assignedCase]
1✔
239
        if not runningCases and not commandsStillToBeRun:
1✔
240
            break
1✔
241

242
        time.sleep(1)
1✔
243

244

245
def _hasSuccessfullyCompleted(simulationCase: _SimulationCase) -> bool:
1✔
246
    command = simulationCase.command
1✔
247
    logTrnsys, dckFileName = _getLogTrnsysAndDeckFileName(command)
1✔
248

249
    if logTrnsys.logFatalErrors():
1✔
250
        logger.error("======================================")
×
251
        logger.error(dckFileName)
×
252
        logger.error("======================================")
×
253
        errorList = logTrnsys.logFatalErrors()
×
254
        for line in errorList:
×
255
            logger.error(line.replace("\n", ""))
×
256
    else:
257
        logger.info("Success: No fatal errors during execution of %s", dckFileName)
1✔
258
        logger.warning("Number of warnings during simulation: %s", logTrnsys.checkWarnings())
1✔
259

260
    process = simulationCase.process
1✔
261
    assert process
1✔
262

263
    return process.returncode == 0
1✔
264

265

266
def _getLogTrnsysAndDeckFileName(command: _cmd.Command) -> _tp.Tuple[_logt.LogTrnsys, str]:  # type: ignore[name-defined]
1✔
267
    fullDckFilePath = command.truncatedDeckFilePath
1✔
268
    (logFilePath, dckFileName) = os.path.split(fullDckFilePath)
1✔
269
    logFileName = os.path.splitext(dckFileName)[0]
1✔
270
    logInstance = _logt.LogTrnsys(logFilePath, logFileName)  # type: ignore[attr-defined]
1✔
271
    return logInstance, dckFileName
1✔
272

273

274
def _isDone(process: _sp.Popen) -> bool:
1✔
275
    return process.poll() is not None
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc