• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

aas-core-works / aas-core-codegen / 25098769931

29 Apr 2026 08:30AM UTC coverage: 83.966% (-0.02%) from 83.987%
25098769931

Pull #615

github

web-flow
Merge 1321aff3e into c752b149a
Pull Request #615: Discontinue RDF+SHACL and JSON-LD generators

30593 of 36435 relevant lines covered (83.97%)

3.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.71
/aas_core_codegen/common.py
1
"""Provide common functions and types for the code generation."""
2
import ast
4✔
3
import inspect
4✔
4
import io
4✔
5
import itertools
4✔
6
import re
4✔
7
import textwrap
4✔
8
from typing import (
4✔
9
    Optional,
10
    Tuple,
11
    cast,
12
    List,
13
    overload,
14
    Union,
15
    Iterator,
16
    Sequence,
17
    NoReturn,
18
    Any,
19
    Iterable,
20
    TypeVar,
21
)
22

23
import asttokens
4✔
24
from icontract import require, DBC, ensure
4✔
25

26

27
class Rstripped(str):
4✔
28
    """
29
    Represent a block of text without trailing whitespace.
30

31
    The block can be both single-line or multi-line.
32
    """
33

34
    @require(
4✔
35
        lambda block: not block.endswith("\n")
36
        and not block.endswith(" ")
37
        and not block.endswith("\t")
38
    )
39
    def __new__(cls, block: str) -> "Rstripped":
4✔
40
        return cast(Rstripped, block)
×
41

42

43
def is_stripped(text: str) -> bool:
4✔
44
    """Check that the ``text`` does not have leading and trailing whitespace."""
45
    return (
4✔
46
        not text.startswith("\n")
47
        and not text.startswith(" ")
48
        and not text.startswith("\t")
49
    ) and (
50
        not text.endswith("\n") and not text.endswith(" ") and not text.endswith("\t")
51
    )
52

53

54
class Stripped(Rstripped):
4✔
55
    """
56
    Represent a block of text without leading and trailing whitespace.
57

58
    The block of text can be both single-line and multi-line.
59
    """
60

61
    @require(lambda block: is_stripped(block))
4✔
62
    def __new__(cls, block: str) -> "Stripped":
4✔
63
        return cast(Stripped, block)
4✔
64

65

66
# noinspection RegExpSimplifiable
67
IDENTIFIER_RE = re.compile(r"[a-zA-Z_][a-zA-Z_0-9]*")
4✔
68

69

70
class Identifier(DBC, Stripped):
4✔
71
    """Represent an identifier."""
72

73
    @require(lambda value: IDENTIFIER_RE.fullmatch(value))
4✔
74
    def __new__(cls, value: str) -> "Identifier":
4✔
75
        return cast(Identifier, value)
4✔
76

77

78
class Error:
4✔
79
    """
80
    Represent an unexpected input.
81

82
    For example, the code of the meta-model can be a valid Python code, but we
83
    can only process a subset of language constructs.
84
    """
85

86
    def __init__(
4✔
87
        self,
88
        node: Optional[ast.AST],
89
        message: str,
90
        underlying: Optional[List["Error"]] = None,
91
    ) -> None:
92
        self.node = node
4✔
93
        self.message = message
4✔
94
        self.underlying = underlying
4✔
95

96
    def __repr__(self) -> str:
4✔
97
        return (
×
98
            f"Error("
99
            f"node={self.node!r}, "
100
            f"message={self.message!r}, "
101
            f"underlying={self.underlying!r})"
102
        )
103

104

105
class LinenoColumner:
4✔
106
    """Map the source code to line number and column for precise error messages."""
107

108
    def __init__(self, atok: asttokens.ASTTokens) -> None:
4✔
109
        positions = []  # type: List[Tuple[int, int]]
4✔
110
        lineno = 1
4✔
111
        column = 0
4✔
112
        for character in atok.get_text(atok.tree):
4✔
113
            if character == "\n":
4✔
114
                column = 1
4✔
115
                lineno += 1
4✔
116
            else:
117
                column += 1
4✔
118

119
            positions.append((lineno, column))
4✔
120

121
        self.atok = atok
4✔
122
        self.positions = positions
4✔
123

124
    def error_message(self, error: Error) -> str:
4✔
125
        """Generate the error message based on the unexpected observation."""
126
        prefix = ""
4✔
127
        if error.node is not None:
4✔
128
            start, _ = self.atok.get_text_range(node=error.node)
4✔
129
            lineno, column = self.positions[start]
4✔
130

131
            prefix = f"At line {lineno} and column {column}: "
4✔
132

133
        if error.underlying is None or len(error.underlying) == 0:
4✔
134
            return f"{prefix}{error.message}"
4✔
135
        else:
136
            writer = io.StringIO()
×
137
            writer.write(f"{prefix}{error.message}\n")
×
138
            for i, underlying_error in enumerate(error.underlying):
×
139
                if i > 0:
×
140
                    writer.write("\n")
×
141
                indented = textwrap.indent(self.error_message(underlying_error), "  ")
×
142
                writer.write(indented)
×
143

144
            return writer.getvalue()
×
145

146

147
class Lines(DBC):
4✔
148
    """Represent a sequence of text lines."""
149

150
    # fmt: off
151
    @require(
4✔
152
        lambda lines:
153
        all('\n' not in line and '\r' not in line for line in lines)
154
    )
155
    # fmt: on
156
    def __new__(cls, lines: Sequence[str]) -> "Lines":
4✔
157
        r"""
158
        Ensure the properties on the ``lines``.
159

160
        Please make sure that you transfer the "ownership" immediately to Lines
161
        and don't modify the original list of strings any more:
162

163
        .. code-block:: python
164

165
            ##
166
            # OK
167
            ##
168

169
            lines = Lines(some_text.splitlines())
170

171
            ##
172
            # Not OK
173
            ##
174

175
            some_lines = some_text.splitlines()
176
            lines = Lines(some_lines)
177
            # ... do something assuming ``lines`` is immutable ...
178

179
            some_lines[0] = "This will break \n your logic"
180
            # ERROR! lines[0] now contains a new-line which is unexpected!
181

182
        """
183
        return cast(Lines, lines)
×
184

185
    # pylint: disable=function-redefined
186

187
    @overload
4✔
188
    def __getitem__(self, index: int) -> str:
4✔
189
        """Get the item at the given integer index."""
190
        raise NotImplementedError("Only for type annotations")
×
191

192
    @overload
4✔
193
    def __getitem__(self, index: slice) -> "Lines":
4✔
194
        """Get the slice of the lines."""
195
        raise NotImplementedError("Only for type annotations")
×
196

197
    def __getitem__(self, index: Union[int, slice]) -> Union[str, "Lines"]:
4✔
198
        """Get the line(s) at the given index."""
199
        raise NotImplementedError("Only for type annotations")
×
200

201
    def __iter__(self) -> Iterator[str]:
4✔
202
        """Iterate over the lines."""
203
        raise NotImplementedError("Only for type annotations")
×
204

205
    def __len__(self) -> int:
4✔
206
        """Return the number of the lines."""
207
        raise NotImplementedError("Only for type annotations")
×
208

209
    def __add__(self, other: "Lines") -> "Lines":
4✔
210
        """Concatenate two list of lines."""
211
        raise NotImplementedError("Only for type annotations")
×
212

213

214
def assert_never(value: NoReturn) -> NoReturn:
4✔
215
    """
216
    Signal to mypy to perform an exhaustive matching.
217

218
    Please see the following page for more details:
219
    https://hakibenita.com/python-mypy-exhaustive-checking
220
    """
221
    assert False, f"Unhandled value: {value} ({type(value).__name__})"
×
222

223

224
def indent_but_first_line(text: str, indention: str) -> str:
4✔
225
    """
226
    Indent all but the first of the given ``text`` by ``indention``.
227

228
    For example, this helps you insert indented blocks into formatted string literals.
229
    """
230
    indented_lines = []  # type: List[str]
4✔
231
    for i, line in enumerate(text.splitlines()):
4✔
232
        if i == 0:
4✔
233
            indented_lines.append(line)
4✔
234
        else:
235
            if len(line) > 0:
4✔
236
                indented_lines.append(indention + line)
4✔
237
            else:
238
                indented_lines.append(line)
4✔
239

240
    return "\n".join(indented_lines)
4✔
241

242

243
def assert_union_of_descendants_exhaustive(union: Any, base_class: Any) -> None:
4✔
244
    """
245
    Check that the ``union`` covers all the concrete subclasses of ``base_class``.
246

247
    Make sure you put the assertion at the end of the module where no new classes are
248
    defined.
249

250
    See also for more details: https://hakibenita.com/python-mypy-exhaustive-checking
251
    """
252
    if inspect.isclass(union):
4✔
253
        union_map = {id(union): union}
×
254
    elif hasattr(union, "__args__"):
4✔
255
        union_map = {id(cls): cls for cls in union.__args__}
4✔
256
    else:
257
        raise NotImplementedError(f"We do not know how to handle the union: {union}")
×
258

259
    # We have to recursively figure out the subclasses.
260
    concrete_subclasses = []  # type: List[Any]
4✔
261

262
    stack = base_class.__subclasses__()  # type: List[Any]
4✔
263

264
    while len(stack) > 0:
4✔
265
        sub_cls = stack.pop()
4✔
266
        if not inspect.isabstract(sub_cls):
4✔
267
            concrete_subclasses.append(sub_cls)
4✔
268

269
        stack.extend(sub_cls.__subclasses__())
4✔
270

271
    subclass_map = {id(sub_cls): sub_cls for sub_cls in concrete_subclasses}
4✔
272

273
    union_set = set(union_map.keys())
4✔
274
    subclass_set = set(subclass_map.keys())
4✔
275

276
    if union_set != subclass_set:
4✔
277
        union_diff = union_set.difference(subclass_set)
×
278
        union_diff_names = [union_map[cls_id].__name__ for cls_id in union_diff]
×
279

280
        subclass_diff = subclass_set.difference(union_set)
×
281
        subclass_diff_names = [
×
282
            subclass_map[cls_id].__name__ for cls_id in subclass_diff
283
        ]
284

285
        if len(union_diff_names) == 0 and len(subclass_diff_names) > 0:
×
286
            raise AssertionError(
×
287
                f"The following concrete subclasses of {base_class.__name__!r} were "
288
                f"not listed in the union: {subclass_diff_names}"
289
            )
290

291
        elif len(union_diff_names) > 0 and len(subclass_diff_names) == 0:
×
292
            raise AssertionError(
×
293
                f"The following classes were listed in the union, "
294
                f"but they are not sub-classes "
295
                f"of {base_class.__name__!r}: {union_diff_names}"
296
            )
297
        else:
298
            raise AssertionError(
×
299
                f"The following classes were listed in the union, "
300
                f"but they are not sub-classes "
301
                f"of {base_class.__name__!r}: {union_diff_names}.\n\n"
302
                f"The following concrete sub-classes of {base_class.__name__!r} were "
303
                f"not listed in the union: {subclass_diff_names}"
304
            )
305

306

307
def assert_union_without_excluded(
4✔
308
    original_union: Any, subset_union: Any, excluded: Iterable[Any]
309
) -> None:
310
    """
311
    Check that the ``subset_union`` ∪ ``excluded`` is ``original_union``.
312

313
    Make sure you put the assertion at the end of the module where no new classes are
314
    defined.
315

316
    See also for more details: https://hakibenita.com/python-mypy-exhaustive-checking
317
    """
318
    # region Map the identifiers of the inputs to their objects
319

320
    if inspect.isclass(original_union):
4✔
321
        original_union_map = {id(original_union): original_union}
×
322
    elif hasattr(original_union, "__args__"):
4✔
323
        original_union_map = {id(a_type): a_type for a_type in original_union.__args__}
4✔
324
    else:
325
        raise NotImplementedError(
×
326
            f"We do not know how to handle the original_union: {original_union}"
327
        )
328

329
    if inspect.isclass(subset_union):
4✔
330
        subset_union_map = {id(subset_union): subset_union}
×
331
    elif hasattr(subset_union, "__args__"):
4✔
332
        subset_union_map = {id(a_type): a_type for a_type in subset_union.__args__}
4✔
333
    else:
334
        raise NotImplementedError(
×
335
            f"We do not know how to handle the subset_union: {subset_union}"
336
        )
337

338
    excluded_map = {id(a_type): a_type for a_type in excluded}
4✔
339

340
    name_map = {**original_union_map, **subset_union_map, **excluded_map}
4✔
341

342
    # endregion
343

344
    # region Compute the sets of the identifiers
345

346
    original_union_set = set(original_union_map.keys())
4✔
347
    subset_union_set = set(subset_union_map.keys())
4✔
348
    excluded_set = set(excluded_map.keys())
4✔
349

350
    # endregion
351

352
    # region Check that it all fits
353

354
    intersection = subset_union_set.intersection(excluded_set)
4✔
355
    if len(intersection) > 0:
4✔
356
        names = sorted(name_map[type_id].__name__ for type_id in intersection)
×
357
        raise AssertionError(
×
358
            f"The following types were listed both "
359
            f"in the subset_union and the excluded: {names}"
360
        )
361

362
    diff = subset_union_set.difference(original_union_set)
4✔
363
    if len(diff) > 0:
4✔
364
        names = sorted(name_map[type_id].__name__ for type_id in diff)
×
365
        raise AssertionError(
×
366
            f"The following types were listed in the subset_union, "
367
            f"but not in the original_union: {names}"
368
        )
369

370
    diff = excluded_set.difference(original_union_set)
4✔
371
    if len(diff) > 0:
4✔
372
        names = sorted(name_map[type_id].__name__ for type_id in diff)
×
373
        raise AssertionError(
×
374
            f"The following types were listed in the excluded, "
375
            f"but not in the original_union: {names}"
376
        )
377

378
    reconstructed_set = subset_union_set.union(excluded_set)
4✔
379

380
    diff = original_union_set.difference(reconstructed_set)
4✔
381
    if len(diff) > 0:
4✔
382
        names = sorted(name_map[type_id].__name__ for type_id in diff)
×
383
        raise AssertionError(
×
384
            f"The following types were listed in the original_union, "
385
            f"but not in the subset_union ∪ excluded: {names}"
386
        )
387

388
    diff = reconstructed_set.difference(original_union_set)
4✔
389
    if len(diff) > 0:
4✔
390
        names = sorted(name_map[type_id].__name__ for type_id in diff)
×
391
        raise AssertionError(
×
392
            f"The following types were listed in the subset_union ∪ excluded, "
393
            f"but not in the original_union: {names}"
394
        )
395

396
    # endregion
397

398

399
@ensure(lambda text, result: text == "".join(result))
4✔
400
def wrap_text_into_lines(text: str, line_width: int = 60) -> List[str]:
4✔
401
    """
402
    Wrap the ``text`` into multiple lines.
403

404
    The tokens are split based on the whitespace. We make sure the articles are not
405
    left hanging between the lines. A line should observe the ``line_limit``,
406
    if possible.
407

408
    No new lines are added — the description should be given to the user in the original
409
    formatting. We merely split it in string literals for better code readability.
410
    """
411
    parts = text.split(" ")
4✔
412
    if len(parts) == 1:
4✔
413
        return [text]
4✔
414

415
    # NOTE (mristin, 2022-04-08):
416
    # We do not want to cut out "the", "a" and "an" on separate lines, so we split
417
    # the text once more in tokens where the articles are kept in the same token as
418
    # the word.
419
    tokens = []  # type: List[str]
4✔
420

421
    article = None  # type: Optional[str]
4✔
422
    for part in parts:
4✔
423
        if article is None:
4✔
424
            if part in ("a", "an", "the"):
4✔
425
                article = part
4✔
426
                continue
4✔
427
            else:
428
                tokens.append(part)
4✔
429
        else:
430
            if part in ("a", "an", "the"):
4✔
431
                # Append the previously observed ``article``;
432
                # the ``part`` becomes a new article.
433
                tokens.append(article)
4✔
434
                article = part
4✔
435
                continue
4✔
436

437
            tokens.append(f"{article} {part}")
4✔
438
            article = None
4✔
439

440
    if article is not None:
4✔
441
        tokens.append(article)
4✔
442

443
    # NOTE (mristin, 2022-04-08):
444
    # We add space to the tokens so that it is easier to re-flow them.
445
    tokens = [
4✔
446
        f"{token} " if i < len(tokens) - 1 else token for i, token in enumerate(tokens)
447
    ]
448
    assert "".join(tokens) == text
4✔
449

450
    segments = []  # type: List[str]
4✔
451

452
    accumulation_len = 0
4✔
453
    accumulation = []  # type: List[str]
4✔
454

455
    for token in tokens:
4✔
456
        if len(token) > line_width:
4✔
457
            segments.append("".join(accumulation))
4✔
458
            segments.append(token)
4✔
459
            accumulation_len = 0
4✔
460
            accumulation = []
4✔
461

462
        elif accumulation_len + len(token) > line_width:
4✔
463
            segments.append("".join(accumulation))
4✔
464
            accumulation_len = len(token)
4✔
465
            accumulation = [token]
4✔
466
        else:
467
            accumulation_len += len(token)
4✔
468
            accumulation.append(token)
4✔
469

470
    if accumulation_len > 0:
4✔
471
        segments.append("".join(accumulation))
4✔
472

473
    return segments
4✔
474

475

476
T = TypeVar("T")
4✔
477

478

479
def pairwise(iterable: Iterable[T]) -> Iterator[Tuple[T, T]]:
4✔
480
    """
481
    Iterate pair-wise over the iterator.
482

483
    >>> list(pairwise("ABCDE"))
484
    [('A', 'B'), ('B', 'C'), ('C', 'D'), ('D', 'E')]
485
    """
486
    a, b = itertools.tee(iterable)
4✔
487
    next(b, None)
4✔
488
    return zip(a, b)
4✔
489

490

491
def iterate_except_first(iterable: Iterable[T]) -> Iterator[T]:
4✔
492
    """
493
    Iterate over ``iterable``, but skip the first item.
494

495
    >>> list(iterate_except_first("ABCD"))
496
    ['B', 'C', 'D']
497
    """
498
    iterator = iter(iterable)
4✔
499
    next(iterator, None)
4✔
500
    yield from iterator
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc