• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SPF-OST / pytrnsys / 15589866616

11 Jun 2025 04:03PM UTC coverage: 27.511% (+0.1%) from 27.368%
15589866616

Pull #244

github

web-flow
Merge 61da34b2d into 70270764d
Pull Request #244: Run trnsys from dck dir

41 of 75 new or added lines in 4 files covered. (54.67%)

9 existing lines in 2 files now uncovered.

3843 of 13969 relevant lines covered (27.51%)

0.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

8.38
/pytrnsys/rsim/runParallel.py
1
# pylint: skip-file
2
# type: ignore
3

4
import datetime
1✔
5
import json
1✔
6
import logging
1✔
7
import os
1✔
8
import shutil
1✔
9
import subprocess as _sp
1✔
10
import sys
1✔
11
import time
1✔
12
import collections.abc as _cabc
1✔
13

14
import pandas as _pd
1✔
15

16
logger = logging.getLogger("root")
1✔
17
import pytrnsys.trnsys_util.LogTrnsys as _logt
1✔
18
import pytrnsys.rsim.command as _cmd
1✔
19

20

21
def getNumberOfCPU():
1✔
22
    """Returns the number of CPUs in the system"""
23
    num = 1
×
24
    if sys.platform == "win32":
×
25
        try:
×
26
            num = int(os.environ["NUMBER_OF_PROCESSORS"])
×
27
        except (ValueError, KeyError):
×
28
            pass
×
29
    elif sys.platform == "darwin":
×
30
        try:
×
31
            num = int(os.popen("sysctl -n hw.ncpu").read())
×
32
        except ValueError:
×
33
            pass
×
34
    else:
35
        try:
×
36
            num = os.sysconf("SC_NPROCESSORS_ONLN")
×
37
        except (ValueError, OSError, AttributeError):
×
38
            pass
×
39

40
    return num
×
41

42

43
def runParallel(
1✔
44
    commands: _cabc.Sequence[_cmd.Command],
45
    reduceCpu=0,
46
    outputFile=False,
47
    estimedCPUTime=0.33,
48
    delayTime=10,
49
    trackingFile=None,
50
    masterFile=None,
51
) -> None:
52
    """Exec commands in parallel in multiple process
53
    (as much as we have CPU)
54

55
    The delay time is used to prevent multiple instances of trnsys trying to access the files at the same time.
56
    This is especially problematic with type 56, which not only reads several files.
57
    It also creates multiple files at the start of the simulation.
58
    """
59
    logDict = {}
×
60
    if trackingFile != None:
×
61
        with open(trackingFile, "w") as file:
×
62
            json.dump(logDict, file, indent=2, separators=(",", ": "), sort_keys=True)
×
63

NEW
64
    maxNumberOfCPU = max(min(getNumberOfCPU() - reduceCpu, len(commands)), 1)
×
65

UNCOV
66
    cP = {}
×
67

68
    for i in range(maxNumberOfCPU):
×
NEW
69
        cP["cpu" + str(i + 1)] = {"cpu": i + 1, "process": []}
×
70

71
    if outputFile != False:
×
UNCOV
72
        lines = ""
×
73
        line = "============PARALLEL PROCESSING STARTED==============\n"
×
74
        lines = lines + line
×
NEW
75
        line = "Number of simulated cases =%d\n" % len(commands)
×
76
        lines = lines + line
×
77
        line = "Number of CPU used =%d\n" % maxNumberOfCPU
×
78
        lines = lines + line
×
79
        line = "Estimated time =%f hours, assuming :%f hour per simulation\n" % (
×
80
            len(commands) * estimedCPUTime / (maxNumberOfCPU * 1.0),
81
            estimedCPUTime,
82
        )
83
        lines = lines + line
×
84
        line = "============CASES TO BE SIMULATED====================\n"
×
85
        lines = lines + line
×
86

87
        i = 1
×
NEW
88
        for cmd in commands:
×
NEW
89
            case = cmd.deckFilePath.name
×
90
            line = "Case %d to be simulated %s\n" % (i, case)
×
91
            lines = lines + line
×
92
            i = i + 1
×
93
        line = "============ALREADY SIMULATED CASES====================\n"
×
94
        lines = lines + line
×
95

96
        outfileRun = open(outputFile, "w")
×
97
        outfileRun.writelines(lines)
×
98
        outfileRun.close()
×
99

NEW
100
    if not commands:
×
NEW
101
        return
×
102

NEW
103
    openCmds = list(commands)
×
104

105
    finishedCmds = []
×
106

UNCOV
107
    caseNr = 1
×
108

UNCOV
109
    activeP = [0] * maxNumberOfCPU
×
110

111
    for core in cP.keys():
×
UNCOV
112
        cP[core]["cmd"] = openCmds.pop(0)
×
UNCOV
113
        cP[core]["case"] = caseNr
×
114
        caseNr += 1
×
115

UNCOV
116
    def done(p):
×
UNCOV
117
        return p.poll() is not None
×
118

119
    def success(p):
×
120
        fullDckFilePath = p.args.split(" ")[-2]
×
121
        (logFilePath, dckFileName) = os.path.split(fullDckFilePath)
×
122
        logFileName = os.path.splitext(dckFileName)[0]
×
NEW
123
        logInstance = _logt.LogTrnsys(logFilePath, logFileName)
×
124

125
        if logInstance.logFatalErrors():
×
126
            logger.error("======================================")
×
127
            logger.error(dckFileName)
×
128
            logger.error("======================================")
×
129
            errorList = logInstance.logFatalErrors()
×
130
            for line in errorList:
×
131
                logger.error(line.replace("\n", ""))
×
132
        else:
133
            logger.info("Success: No fatal errors during execution of " + dckFileName)
×
134
            logger.warning("Number of warnings during simulation: %s" % logInstance.checkWarnings())
×
135

136
        return p.returncode == 0
×
137

138
    def fail():
×
139
        logger.warning("PARALLEL RUN HAS FAILED")
×
140
        sys.exit(1)
×
141

142
    processes = []
×
143

144
    if len(processes) > maxNumberOfCPU:
×
145
        logger.warning("You are triying tu run %d processes and only have %d CPU\n" % (len(processes), maxNumberOfCPU))
×
146

147
    #    while True:
148

149
    #        cpu = 1
150

151
    ###############
152
    # alternative code:
153
    #        while openCmds:
154

155
    running = True
×
156
    startTime = time.time()
×
157

158
    while running:
×
159
        for core in cP.keys():
×
160
            p = cP[core]["process"]
×
161
            # start processes:
NEW
162
            cmd: _cmd.Command = cP[core].get("cmd")
×
NEW
163
            if not p and cmd:
×
NEW
164
                dckName = cmd.deckFilePath.name
×
165
                if trackingFile != None:
×
166
                    with open(trackingFile, "r") as file:
×
167
                        logDict = json.load(file)
×
168
                    logDict[dckName] = [time.strftime("%Y-%m-%d_%H:%M:%S")]
×
169
                    with open(trackingFile, "w") as file:
×
170
                        json.dump(logDict, file, indent=2, separators=(",", ": "), sort_keys=True)
×
171

NEW
172
                logger.info("Command: %s", cmd)
×
NEW
173
                cP[core]["process"] = _sp.Popen(cmd.args, stdout=_sp.PIPE, stderr=_sp.PIPE, shell=True, cwd=cmd.cwd)
×
174

175
                activeP[cP[core]["cpu"] - 1] = 1
×
176

177
                time.sleep(
×
178
                    delayTime
179
                )  # we delay 5 seconds for each new running to avoid that they read the same source file.
180

181
            # if process is finished, assign new command:
182

183
            if p:
×
184
                if done(p):
×
185
                    if success(p):
×
186
                        if outputFile != False:
×
187
                            #                        lines = "Finished simulated case %d\n"%(k,p.stdout.read(),p.stderr.read())
188

189
                            lines = "Finished simulated case %d at %s\n" % (
×
190
                                cP[core]["case"],
191
                                time.strftime("%H:%M:%S of day %d-%m-%y"),
192
                            )
193
                            outfileRun = open(outputFile, "a")
×
194
                            outfileRun.writelines(lines)
×
195
                            outfileRun.close()
×
196

197
                        if trackingFile != None:
×
198
                            dckName = p.args.split("\\")[-1].split(" ")[0]
×
199
                            with open(trackingFile, "r") as file:
×
200
                                logDict = json.load(file)
×
201
                            logDict[dckName].append(time.strftime("%Y-%m-%d_%H:%M:%S"))
×
202

203
                            fullDckFilePath = p.args.split(" ")[-2]
×
204
                            (logFilePath, dckFileName) = os.path.split(fullDckFilePath)
×
205
                            logFileName = os.path.splitext(dckFileName)[0]
×
NEW
206
                            logInstance = _logt.LogTrnsys(logFilePath, logFileName)
×
207

208
                            if logInstance.logFatalErrors():
×
209
                                logDict[dckName].append("fatal error")
×
210
                            else:
211
                                logDict[dckName].append("success")
×
212

213
                            simulationHours = logInstance.checkSimulatedHours()
×
214
                            if len(simulationHours) == 2:
×
215
                                logDict[dckName].append(simulationHours[0])
×
216
                                logDict[dckName].append(simulationHours[1])
×
217
                            elif len(simulationHours) == 1:
×
218
                                logDict[dckName].append(simulationHours[0])
×
219
                                logDict[dckName].append(None)
×
220

221
                            with open(trackingFile, "w") as file:
×
222
                                json.dump(logDict, file, indent=2, separators=(",", ": "), sort_keys=True)
×
223

224
                        # empty process:
225
                        cP[core]["process"] = []
×
226
                        finishedCmds.append(cP[core]["cmd"])
×
NEW
227
                        del cP[core]["cmd"]
×
NEW
228
                        del cP[core]["case"]
×
229

230
                        activeP[cP[core]["cpu"] - 1] = 0
×
231

NEW
232
                        logger.info("Runs completed: %s/%s" % (len(finishedCmds), len(commands)))
×
233

NEW
234
                        if len(finishedCmds) % len(cP) == 0 and len(finishedCmds) != len(commands):
×
235
                            currentTime = time.time()
×
236
                            timeSoFarSec = currentTime - startTime
×
NEW
237
                            totalTimePredictionSec = timeSoFarSec * len(commands) / len(finishedCmds)
×
238
                            endTimePrediction = datetime.datetime.fromtimestamp(
×
239
                                startTime + totalTimePredictionSec
240
                            ).strftime("%H:%M on %d.%m.%Y")
241
                            logger.info("+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++")
×
242
                            logger.info(
×
243
                                "Predicted time of completion of all %s runs: %s" % (len(commands), endTimePrediction)
244
                            )
245
                            logger.info("+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++")
×
246

NEW
247
                        if masterFile != None and (len(finishedCmds) == len(commands)):
×
NEW
248
                            newDf = _pd.DataFrame.from_dict(
×
249
                                logDict,
250
                                orient="index",
251
                                columns=["started", "finished", "outcome", "hour start", "hour end"],
252
                            )
253

254
                            if os.path.isfile(masterFile):
×
255
                                masterPath, masterOrig = os.path.split(masterFile)
×
256
                                masterBackup = masterOrig.split(".")[0] + "_BACKUP.csv"
×
257
                                try:
×
258
                                    shutil.copyfile(masterFile, os.path.join(masterPath, masterBackup))
×
259
                                    logger.info("Updated " + masterBackup)
×
260
                                except:
×
261
                                    logger.error("Unable to generate BACKUP of " + masterFile)
×
NEW
262
                                origDf = _pd.read_csv(masterFile, sep=";", index_col=0)
×
263

NEW
264
                                masterDf = _pd.concat([origDf, newDf])
×
265
                                masterDf = masterDf[~masterDf.index.duplicated(keep="last")]
×
266
                            else:
267
                                masterDf = newDf
×
268

269
                            try:
×
270
                                masterDf.to_csv(masterFile, sep=";")
×
271
                                logger.info("Updated " + masterFile)
×
272
                            except:
×
273
                                logger.error("Unable to write to " + masterFile)
×
274

275
                        # assign new command if there are open commands:
276

277
                        if openCmds:
×
278
                            cP[core]["cmd"] = openCmds.pop(0)
×
279
                            cP[core]["case"] = caseNr
×
280
                            caseNr += 1
×
281
                            activeP[cP[core]["cpu"] - 1] = 1
×
282

283
                    else:
284
                        fail()
×
285

286
        if all(process == 0 for process in activeP) and not openCmds:
×
287
            #        if not processes and not newCmds:
288
            break
×
289
        else:
290
            time.sleep(0.05)
×
291

292

293
def sortCommandList(cmds, keyWord):
1✔
294
    """
295
    function to put all commands that contain a keyWord string on top of a command list, so they will be evaluated first.
296

297
    Parameters
298
    ----------
299
    cmds : list of strings
300
        includes all commands to be evaluated
301

302
    keyWord : string
303
        acts as filter; commmands including this string will be evaluated first
304

305
    Returns
306
    -------
307
    cmdsNew : list of strings
308
        all commands to be evaluated in new order
309
    """
310

311
    cmdsNew = []
×
312

313
    for line in cmds:
×
314
        if keyWord in line:
×
315
            cmdsNew.insert(0, line)
×
316
        else:
317
            cmdsNew.append(line)
×
318

319
    return cmdsNew
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc