• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

aas-core-works / aas-core-codegen / 25098769931

29 Apr 2026 08:30AM UTC coverage: 83.966% (-0.02%) from 83.987%
25098769931

Pull #615

github

web-flow
Merge 1321aff3e into c752b149a
Pull Request #615: Discontinue RDF+SHACL and JSON-LD generators

30593 of 36435 relevant lines covered (83.97%)

3.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.52
/aas_core_codegen/run.py
1
"""Encapsulate the entry point to different generators."""
2
import hashlib
4✔
3
import io
4✔
4
import pathlib
4✔
5
import pickle
4✔
6
import tempfile
4✔
7
import textwrap
4✔
8
import uuid
4✔
9
from typing import Sequence, TextIO, Tuple, Optional, Final
4✔
10

11
import asttokens
4✔
12
from icontract import require, ensure
4✔
13

14
import aas_core_codegen
4✔
15
from aas_core_codegen import specific_implementations, intermediate, parse
4✔
16
from aas_core_codegen.common import LinenoColumner
4✔
17

18

19
class Context:
4✔
20
    """Represent the context of a code generation."""
21

22
    @require(lambda model_path: model_path.exists() and model_path.is_file())
4✔
23
    @require(lambda output_dir: output_dir.exists() and output_dir.is_dir())
4✔
24
    def __init__(
4✔
25
        self,
26
        model_path: pathlib.Path,
27
        symbol_table: intermediate.SymbolTable,
28
        spec_impls: specific_implementations.SpecificImplementations,
29
        lineno_columner: LinenoColumner,
30
        output_dir: pathlib.Path,
31
    ) -> None:
32
        """Initialize with the given values."""
33
        self.model_path = model_path
4✔
34
        self.symbol_table = symbol_table
4✔
35
        self.spec_impls = spec_impls
4✔
36
        self.lineno_columner = lineno_columner
4✔
37
        self.output_dir = output_dir
4✔
38

39

40
@require(
4✔
41
    lambda errors: all(
42
        len(error) > 0 and not error.startswith("\n")
43
        # This is necessary so that we do not have double bullet point.
44
        and not error.startswith("*") and not error.endswith("\n")
45
        for error in errors
46
    )
47
)
48
@require(lambda message: not message.endswith(":"))
4✔
49
@require(lambda message: not message.endswith("\n"))
4✔
50
@require(lambda message: not message.startswith("\n") and not message.startswith("*"))
4✔
51
# fmt: on
52
def write_error_report(message: str, errors: Sequence[str], stderr: TextIO) -> None:
4✔
53
    """
54
    Write the report (main ``message`` and details as ``errors``) to ``stderr``.
55

56
    This method helps us to have a unified way of showing errors.
57
    """
58
    stderr.write(f"{message}:\n")
×
59
    for error in errors:
×
60
        indented = textwrap.indent(error, "  ")
×
61
        indented = "* " + indented[2:]
×
62
        stderr.write(f"{indented}\n")
×
63

64

65
class _Cached:
4✔
66
    symbol_table: Final[intermediate.SymbolTable]
4✔
67
    atok: Final[asttokens.ASTTokens]
4✔
68

69
    def __init__(
4✔
70
        self, symbol_table: intermediate.SymbolTable, atok: asttokens.ASTTokens
71
    ) -> None:
72
        self.symbol_table = symbol_table
4✔
73
        self.atok = atok
4✔
74

75

76
@require(lambda model_path: model_path.exists() and model_path.is_file())
4✔
77
@ensure(lambda result: (result[0] is not None) ^ (result[1] is not None))
4✔
78
def load_model(
4✔
79
    model_path: pathlib.Path, cache_model: bool = False
80
) -> Tuple[
81
    Optional[Tuple[intermediate.SymbolTable, asttokens.ASTTokens]], Optional[str]
82
]:
83
    """
84
    Load the given meta-model from the file system and understand it.
85

86
    If the ``cache_model`` is set, the symbol table will be stored in the temporary
87
    directory of your OS keyed on the hash of the model source code. On subsequent
88
    calls to this function, the model will be loaded from the cache instead of reparsed
89
    as long as the content of the source code did not change.
90
    """
91
    text = model_path.read_text(encoding="utf-8")
4✔
92

93
    text_hash = hashlib.sha256(text.encode()).hexdigest()
4✔
94
    cache_path = (
4✔
95
        pathlib.Path(tempfile.gettempdir())
96
        / f"aas-core-codegen-{aas_core_codegen.__version__}"
97
        / f"model-{text_hash}.pickle"
98
    )
99
    if cache_model:
4✔
100
        if cache_path.exists():
4✔
101
            with cache_path.open("rb") as fid:
4✔
102
                cached = pickle.load(fid)
4✔
103
                assert isinstance(cached, _Cached)
4✔
104

105
                return (cached.symbol_table, cached.atok), None
4✔
106

107
    atok, parse_exception = parse.source_to_atok(source=text)
4✔
108
    if parse_exception:
4✔
109
        if isinstance(parse_exception, SyntaxError):
×
110
            return None, (
×
111
                f"Failed to parse the meta-model: "
112
                f"invalid syntax at line {parse_exception.lineno}"
113
            )
114
        else:
115
            return None, f"Failed to parse the meta-model: {parse_exception}"
×
116

117
    assert atok is not None
4✔
118

119
    import_errors = parse.check_expected_imports(atok=atok)
4✔
120
    if import_errors:
4✔
121
        writer = io.StringIO()
×
122
        write_error_report(
×
123
            message="One or more unexpected imports in the meta-model",
124
            errors=import_errors,
125
            stderr=writer,
126
        )
127
        return None, writer.getvalue()
×
128

129
    lineno_columner = LinenoColumner(atok=atok)
4✔
130

131
    parsed_symbol_table, error = parse.atok_to_symbol_table(atok=atok)
4✔
132
    if error is not None:
4✔
133
        writer = io.StringIO()
×
134
        write_error_report(
×
135
            message="Failed to construct the symbol table",
136
            errors=[lineno_columner.error_message(error)],
137
            stderr=writer,
138
        )
139
        return None, writer.getvalue()
×
140

141
    assert parsed_symbol_table is not None
4✔
142

143
    ir_symbol_table, error = intermediate.translate(
4✔
144
        parsed_symbol_table=parsed_symbol_table,
145
        atok=atok,
146
    )
147
    if error is not None:
4✔
148
        writer = io.StringIO()
×
149
        write_error_report(
×
150
            message=(
151
                "Failed to translate the parsed symbol table "
152
                "to intermediate symbol table"
153
            ),
154
            errors=[lineno_columner.error_message(error)],
155
            stderr=writer,
156
        )
157
        return None, writer.getvalue()
×
158

159
    assert ir_symbol_table is not None
4✔
160

161
    if cache_model:
4✔
162
        cache_path.parent.mkdir(parents=True, exist_ok=True)
4✔
163

164
        tmp_path = cache_path.with_suffix(f".{uuid.uuid4()}.tmp")
4✔
165
        try:
4✔
166
            with tmp_path.open("wb") as fid:
4✔
167
                pickle.dump(_Cached(symbol_table=ir_symbol_table, atok=atok), fid)
4✔
168

169
            tmp_path.rename(cache_path)
4✔
170
        finally:
171
            tmp_path.unlink(missing_ok=True)
4✔
172

173
    return (ir_symbol_table, atok), None
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc