• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SPF-OST / pytrnsys_process / 16748222285

05 Aug 2025 11:03AM UTC coverage: 49.518% (-46.5%) from 95.968%
16748222285

Pull #126

github

ahobeost
Reduce linux job to just test.
Pull Request #126: 125 bug step file not read when step used with type 25

5 of 6 new or added lines in 2 files covered. (83.33%)

578 existing lines in 11 files now uncovered.

616 of 1244 relevant lines covered (49.52%)

0.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.41
/pytrnsys_process/deck/extractor.py
1
import collections.abc as _cabc
2✔
2
import logging as _logging
2✔
3
import math as _math
2✔
4

5
import lark as _lark
2✔
6

7
from pytrnsys_process import log
2✔
8
from pytrnsys_process.deck import parser
2✔
9
from pytrnsys_process.deck import visitor_helpers as vh
2✔
10

11

12
def parse_deck_for_constant_expressions(
2✔
13
    deck_as_string: str, logger: _logging.Logger = log.default_console_logger
14
) -> dict[str, float | int]:
15
    """Evaluate constant expressions in a TRNSYS deck file and return their values.
16

17
    This function parses a TRNSYS deck file string, identifies constant expressions,
18
    and evaluates them to their numerical values. It handles mathematical operations,
19
    functions, and variable references.
20

21
    Parameters
22
    __________
23
        deck_as_string: str
24
            A string containing the contents of a TRNSYS deck file.
25
        logger: Logger
26
            provide your own logger. to for example log per simulation
27

28
    Returns
29
    _______
30
        variable_values: dict
31
            A dictionary mapping variable names to their evaluated values (float or int).
32
            The original case of variable names is preserved in the returned dictionary.
33
            Expressions that could not be evaluated are not included in the returned dictionary.
34

35

36
    """
37

38
    equations = _get_equation_trees(deck_as_string)
2✔
39
    sub_trees_to_process = _get_expression_sub_trees_by_variable_name(
2✔
40
        equations
41
    )
42

43
    evaluated_variables: dict[str, float | int] = {}
2✔
44
    original_variable_names: list[str] = []
2✔
45
    new_constants_found = True
2✔
46

47
    while new_constants_found:
2✔
48
        sub_trees_before_processing = sub_trees_to_process.copy()
2✔
49
        # Needs to be converted into list, so items can be deleted while iteration over
50
        # Described in this answer:
51
        # https://stackoverflow.com/questions/5384914/how-to-delete-items-from-a-dictionary-while-iterating-over-it
52
        for var, tree in list(sub_trees_to_process.items()):
2✔
53
            try:
2✔
54

55
                maybe_evaluated_value = (
2✔
56
                    _evaluate_or_none_if_variable_could_not_be_found(
57
                        tree, evaluated_variables
58
                    )
59
                )
60

61
                if maybe_evaluated_value is not None:
2✔
62
                    var_lower = var.casefold()
2✔
63
                    original_variable_names.append(var)
2✔
64
                    evaluated_variables[var_lower] = maybe_evaluated_value
2✔
65
                    del sub_trees_to_process[var]
2✔
66

UNCOV
67
            except MathFuncNotFoundError as e:
×
UNCOV
68
                failed_equation = deck_as_string[
×
69
                    e.meta.start_pos : e.meta.end_pos
70
                ]
UNCOV
71
                func_name, _ = failed_equation.split("(")
×
UNCOV
72
                logger.warning(
×
73
                    "On line %s, %s is not supported in %s=%s",
74
                    e.meta.line,
75
                    func_name,
76
                    var,
77
                    failed_equation,
78
                )
UNCOV
79
                del sub_trees_to_process[var]
×
80

UNCOV
81
            except _lark.exceptions.VisitError as e:
×
UNCOV
82
                failed_equation = deck_as_string[
×
83
                    e.obj.meta.start_pos : e.obj.meta.end_pos  # type: ignore
84
                ]
UNCOV
85
                logger.error(
×
86
                    "On line %s, unable to compute equation %s=%s because: %s",
87
                    e.obj.meta.line,  # type: ignore
88
                    var,
89
                    failed_equation,
90
                    str(e),
91
                    exc_info=True,
92
                )
93

94
        if sub_trees_before_processing == sub_trees_to_process:
2✔
95
            new_constants_found = False
2✔
96

97
    return _rename_dict_keys_to_original_format(
2✔
98
        evaluated_variables, original_variable_names
99
    )
100

101

102
class EquationsCollectorVisitor(_lark.Visitor):
2✔
103
    """This visitor is given the whole deck as a tree.
104
    For each equation the equation() method is called and it appends it to a list of equations
105
    """
106

107
    def __init__(self):
2✔
108
        self.equations_to_transform = []
2✔
109

110
    def equation(self, tree):
2✔
111
        output_detector = self.OutputOfTrnsysTypeDetector()
2✔
112
        output_detector.visit(tree)
2✔
113

114
        if not output_detector.is_output:
2✔
115
            self.equations_to_transform.append(tree)
2✔
116

117
    class OutputOfTrnsysTypeDetector(_lark.visitors.Visitor_Recursive):
2✔
118
        """Detects if equation is an output: equation_name = [15,1]"""
119

120
        def __init__(self):
2✔
121
            self.is_output = False
2✔
122

123
        def output(self, _):
2✔
UNCOV
124
            self.is_output = True
×
125

126

127
class EquationsTransformer(_lark.Transformer):
2✔
128

129
    def __init__(self, evaluated_variables: _cabc.Mapping[str, float | int]):
2✔
130
        super().__init__()
2✔
131
        self.evaluated_variables = evaluated_variables
2✔
132

133
    def number(self, items):
2✔
134
        number_as_str = items[0].value
2✔
135
        as_int = int(float(number_as_str))
2✔
136
        as_float = float(number_as_str)
2✔
137
        return as_int if as_int == as_float else as_float
2✔
138

139
    def negate(self, items):
2✔
UNCOV
140
        return -items[0]
×
141

142
    def plus(self, items):
2✔
143
        return items[0] + items[1]
2✔
144

145
    def minus(self, items):
2✔
146
        return items[0] - items[1]
2✔
147

148
    def divided_by(self, items):
2✔
149
        return items[0] / items[1]
2✔
150

151
    def times(self, items):
2✔
152
        return items[0] * items[1]
2✔
153

154
    def to_power_of(self, items):
2✔
UNCOV
155
        return items[0] ** items[1]
×
156

157
    def default_visibility_var(self, items) -> float:
2✔
158
        try:
2✔
159
            variable_name = items[0].value.casefold()
2✔
160
            return self.evaluated_variables[variable_name]
2✔
UNCOV
161
        except KeyError as exc:
×
UNCOV
162
            raise ReferencedVariableNotEvaluatedError() from exc
×
163

164
    @_lark.v_args(meta=True)
2✔
165
    # pylint: disable=too-many-return-statements,too-many-branches
166
    def func_call(self, meta, items):
2✔
167
        """Mathematical function behaviour is described in pages 20 and 21 of trnsys doc 6 TRNedit"""
168

UNCOV
169
        math_func = vh.get_child_token_value("NAME", items[0], str).casefold()
×
UNCOV
170
        args = items[1].children
×
171

UNCOV
172
        if math_func == "int":
×
UNCOV
173
            return int(args[0])
×
UNCOV
174
        if math_func == "ae":
×
UNCOV
175
            return 1 if abs(args[0] - args[1]) < args[2] else 0
×
UNCOV
176
        if math_func == "abs":
×
UNCOV
177
            return abs(args[0])
×
UNCOV
178
        if math_func == "acos":
×
UNCOV
179
            return _math.acos(args[0])
×
UNCOV
180
        if math_func == "and":
×
UNCOV
181
            return args[0] and args[1]
×
UNCOV
182
        if math_func == "or":
×
UNCOV
183
            return args[0] or args[1]
×
UNCOV
184
        if math_func == "not":
×
UNCOV
185
            return int(not args[0])
×
UNCOV
186
        if math_func == "asin":
×
UNCOV
187
            return _math.asin(args[0])
×
UNCOV
188
        if math_func == "atan":
×
UNCOV
189
            return _math.atan(args[0])
×
UNCOV
190
        if math_func == "cos":
×
UNCOV
191
            return _math.cos(args[0])
×
UNCOV
192
        if math_func == "eql":
×
UNCOV
193
            return 1 if args[0] == args[1] else 0
×
UNCOV
194
        if math_func == "exp":
×
UNCOV
195
            return _math.exp(args[0])
×
UNCOV
196
        if math_func == "ge":
×
UNCOV
197
            return 1 if args[0] >= args[1] else 0
×
UNCOV
198
        if math_func == "gt":
×
UNCOV
199
            return 1 if args[0] > args[1] else 0
×
UNCOV
200
        if math_func == "le":
×
UNCOV
201
            return 1 if args[0] <= args[1] else 0
×
UNCOV
202
        if math_func == "lt":
×
UNCOV
203
            return 1 if args[0] < args[1] else 0
×
UNCOV
204
        if math_func == "ln":
×
UNCOV
205
            return _math.log(args[0])
×
UNCOV
206
        if math_func == "log":
×
UNCOV
207
            return _math.log10(args[0])
×
UNCOV
208
        if math_func == "max":
×
UNCOV
209
            return max(args[0], args[1])
×
UNCOV
210
        if math_func == "min":
×
UNCOV
211
            return min(args[0], args[1])
×
UNCOV
212
        if math_func == "mod":
×
UNCOV
213
            return _math.fmod(args[0], args[1])
×
UNCOV
214
        if math_func == "sin":
×
UNCOV
215
            return _math.sin(args[0])
×
UNCOV
216
        if math_func == "tan":
×
UNCOV
217
            return _math.tan(args[0])
×
UNCOV
218
        raise MathFuncNotFoundError(
×
219
            f"Function {math_func} can not be computed", meta
220
        )
221

222
    def explicit_var(self, items):
2✔
223
        return items[0]
2✔
224

225

226
class MathFuncNotFoundError(Exception):
2✔
227
    """This error is raised if the parsed 'func_call' is not supported."""
228

229
    def __init__(self, message, meta):
2✔
UNCOV
230
        super().__init__(message)
×
231

UNCOV
232
        self.meta = meta
×
233

234

235
class ReferencedVariableNotEvaluatedError(Exception):
2✔
236
    """Raised if an equation could not be found in the dictionary of resolved equations."""
237

238

239
def _get_equation_trees(deck_as_string):
2✔
240
    whole_tree = parser.parse_dck(deck_as_string)
2✔
241
    equations_collector_visitor = EquationsCollectorVisitor()
2✔
242
    equations_collector_visitor.visit(whole_tree)
2✔
243
    equations = equations_collector_visitor.equations_to_transform
2✔
244
    return equations
2✔
245

246

247
def _rename_dict_keys_to_original_format(
2✔
248
    evaluated_variables, original_variable_names
249
) -> dict[str, float | int]:
250
    for original_name in original_variable_names:
2✔
251
        evaluated_variables[original_name] = evaluated_variables.pop(
2✔
252
            original_name.casefold()
253
        )
254
    return evaluated_variables
2✔
255

256

257
def _get_expression_sub_trees_by_variable_name(
2✔
258
    list_of_equation_trees: list[_lark.Tree],
259
) -> dict[str, _lark.Tree]:
260
    equations_dict = {}
2✔
261
    for equation_tree in list_of_equation_trees:
2✔
262
        equations_dict[
2✔
263
            vh.get_child_token_value(
264
                "NAME", equation_tree.children[0].children[0], str
265
            )
266
        ] = equation_tree.children[
267
            1
268
        ]  # right hand side of the equation as a tree
269

270
    return equations_dict
2✔
271

272

273
def _evaluate_or_none_if_variable_could_not_be_found(
2✔
274
    tree: _lark.Tree, evaluated_variables: _cabc.Mapping[str, float]
275
):
276
    # Exceptions raised in callback need to be caught here
277
    try:
2✔
278
        value = EquationsTransformer(evaluated_variables).transform(tree)
2✔
279
        return value
2✔
UNCOV
280
    except _lark.exceptions.VisitError as e:
×
UNCOV
281
        if isinstance(e.orig_exc, ReferencedVariableNotEvaluatedError):
×
UNCOV
282
            return None
×
UNCOV
283
        if isinstance(e.orig_exc, MathFuncNotFoundError):
×
UNCOV
284
            raise e.orig_exc
×
UNCOV
285
        raise
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc