• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

aas-core-works / aas-core-codegen / 17361139636

31 Aug 2025 07:00PM UTC coverage: 82.088% (-0.02%) from 82.105%
17361139636

push

github

web-flow
Handle list of primitives in JSON Schema, Python (#547)

This change makes it possible to handle list of primitives in order to
generate definitions in JSON schema as well as the de/serialization
code in Python.

This is the first change that includes integration tests to test
the pipeline end-to-end: from code generation to testing of
the *generated* code isolated from AAS or any other specific meta-model.
We included two targets (JSON schema and Python) in one pull request
so that we can specifically tailor the integration tests for these
two different use cases.

5 of 12 new or added lines in 1 file covered. (41.67%)

1 existing line in 1 file now uncovered.

28501 of 34720 relevant lines covered (82.09%)

3.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.44
/aas_core_codegen/python/xmlization/_generate.py
1
"""Generate Python code for XML-ization based on the intermediate representation."""
2

3
import io
4✔
4
import textwrap
4✔
5
from typing import Tuple, Optional, List, Union
4✔
6

7
from icontract import ensure, require
4✔
8

9
from aas_core_codegen import intermediate, specific_implementations, naming
4✔
10
from aas_core_codegen.common import (
4✔
11
    Error,
12
    Stripped,
13
    assert_never,
14
    Identifier,
15
    indent_but_first_line,
16
)
17
from aas_core_codegen.python import common as python_common, naming as python_naming
4✔
18
from aas_core_codegen.python.common import (
4✔
19
    INDENT as I,
20
    INDENT2 as II,
21
    INDENT3 as III,
22
    INDENT4 as IIII,
23
)
24

25

26
def _generate_module_docstring(
4✔
27
    symbol_table: intermediate.SymbolTable,
28
    aas_module: python_common.QualifiedModuleName,
29
) -> Stripped:
30
    """Generate the docstring of the whole module."""
31
    first_cls = (
4✔
32
        symbol_table.concrete_classes[0]
33
        if len(symbol_table.concrete_classes) > 0
34
        else None
35
    )
36

37
    docstring_blocks = [
4✔
38
        Stripped(
39
            f"""\
40
Read and write AAS models as XML.
41

42
For reading, we provide different reading functions, each handling a different kind
43
of input. All the reading functions operate in one pass, *i.e.*, the source is read
44
incrementally and the complete XML is not held in memory.
45

46
We provide the following four reading functions (where ``X`` represents the name of
47
the class):
48

49
1) ``X_from_iterparse`` reads from a stream of ``(event, element)`` tuples coming from
50
   :py:func:`xml.etree.ElementTree.iterparse` with the argument
51
   ``events=["start", "end"]``. If you do not trust the source, please consider
52
   using `defusedxml.ElementTree`_.
53
2) ``X_from_stream`` reads from the given text stream.
54
3) ``X_from_file`` reads from a file on disk.
55
4) ``X_from_str`` reads from the given string.
56

57
The functions ``X_from_stream``, ``X_from_file`` and ``X_from_str`` provide
58
an extra parameter, ``has_iterparse``, which allows you to use a parsing library
59
different from :py:mod:`xml.etree.ElementTree`. For example, you can pass in
60
`defusedxml.ElementTree`_.
61

62
.. _defusedxml.ElementTree: https://pypi.org/project/defusedxml/#defusedxml-elementtree
63

64
All XML elements are expected to live in the :py:attr:`~NAMESPACE`.
65

66
For writing, use the function :py:func:`{aas_module}.xmlization.write` which
67
translates the instance of the model into an XML document and writes it in one pass
68
to the stream."""
69
        )
70
    ]
71

72
    if first_cls is not None:
4✔
73
        read_first_cls_from_file = python_naming.function_name(
4✔
74
            Identifier(f"read_{first_cls.name}_from_file")
75
        )
76

77
        first_cls_name = python_naming.class_name(first_cls.name)
4✔
78

79
        docstring_blocks.append(
80
            Stripped(
81
                f"""\
82
Here is an example usage how to de-serialize from a file:
83

84
.. code-block::
85

86
    import pathlib
87
    import xml.etree.ElementTree as ET
88

89
    import {aas_module}.xmlization as aas_xmlization
90

91
    path = pathlib.Path(...)
92
    instance = aas_xmlization.{read_first_cls_from_file}(
93
        path
94
    )
95

96
    # Do something with the ``instance``
97

98
Here is another code example where we serialize the instance:
99

100
.. code-block::
101

102
    import pathlib
103

104
    import {aas_module}.types as aas_types
105
    import {aas_module}.xmlization as aas_xmlization
106

107
    instance = {first_cls_name}(
108
       ... # some constructor arguments
109
    )
110

111
    pth = pathlib.Path(...)
112
    with pth.open("wt") as fid:
113
        aas_xmlization.write(instance, fid)"""
114
            )
115
        )
116

117
    escaped_text = "\n\n".join(docstring_blocks).replace('"""', '\\"\\"\\"')
4✔
118
    return Stripped(
4✔
119
        f"""\
120
\"\"\"
121
{escaped_text}
122
\"\"\""""
123
    )
124

125

126
def _generate_read_enum_from_element_text(
4✔
127
    enumeration: intermediate.Enumeration,
128
) -> Stripped:
129
    """Generate the reading function from an element's text for ``enumeration``."""
130
    enum_name = python_naming.enum_name(identifier=enumeration.name)
4✔
131

132
    function_name = python_naming.private_function_name(
4✔
133
        Identifier(f"read_{enumeration.name}_from_element_text")
134
    )
135

136
    enum_from_str = python_naming.function_name(
4✔
137
        Identifier(f"{enumeration.name}_from_str")
138
    )
139

140
    return Stripped(
4✔
141
        f"""\
142
def {function_name}(
143
{I}element: Element,
144
{I}iterator: Iterator[Tuple[str, Element]]
145
) -> aas_types.{enum_name}:
146
{I}\"\"\"
147
{I}Parse the text of :paramref:`element` as a literal of
148
{I}:py:class:`.types.{enum_name}`, and read the corresponding
149
{I}end element from :paramref:`iterator`.
150

151
{I}:param element: start element
152
{I}:param iterator:
153
{II}Input stream of ``(event, element)`` coming from
154
{II}:py:func:`xml.etree.ElementTree.iterparse` with the argument
155
{II}``events=["start", "end"]``
156
{I}:raise: :py:class:`DeserializationException` if unexpected input
157
{I}:return: parsed value
158
{I}\"\"\"
159
{I}text = _read_text_from_element(
160
{II}element,
161
{II}iterator
162
{I})
163

164
{I}literal = aas_stringification.{enum_from_str}(text)
165
{I}if literal is None:
166
{II}raise DeserializationException(
167
{III}f"Not a valid string representation of "
168
{III}f"a literal of {enum_name}: {{text}}"
169
{II})
170

171
{I}return literal"""
172
    )
173

174

175
def _generate_read_cls_from_iterparse(
4✔
176
    cls: Union[intermediate.AbstractClass, intermediate.ConcreteClass],
177
    aas_module: python_common.QualifiedModuleName,
178
) -> Stripped:
179
    """Generate the public function for the reading for a ``cls``."""
180
    function_name = python_naming.function_name(
4✔
181
        Identifier(f"{cls.name}_from_iterparse")
182
    )
183

184
    cls_name = python_naming.class_name(cls.name)
4✔
185

186
    wrapped_function_name = python_naming.function_name(
4✔
187
        Identifier(f"_read_{cls.name}_as_element")
188
    )
189

190
    return Stripped(
4✔
191
        f"""\
192
def {function_name}(
193
{I}iterator: Iterator[Tuple[str, Element]]
194
) -> aas_types.{cls_name}:
195
{I}\"\"\"
196
{I}Read an instance of :py:class:`.types.{cls_name}` from
197
{I}the :paramref:`iterator`.
198

199
{I}Example usage:
200

201
{I}.. code-block::
202

203
{I}    import pathlib
204
{I}    import xml.etree.ElementTree as ET
205

206
{I}    import {aas_module}.xmlization as aas_xmlization
207

208
{I}    path = pathlib.Path(...)
209
{I}    with path.open("rt") as fid:
210
{I}        iterator = ET.iterparse(
211
{I}            source=fid,
212
{I}            events=['start', 'end']
213
{I}        )
214
{I}        instance = aas_xmlization.{function_name}(
215
{I}            iterator
216
{I}        )
217

218
{I}    # Do something with the ``instance``
219

220
{I}:param iterator:
221
{II}Input stream of ``(event, element)`` coming from
222
{II}:py:func:`xml.etree.ElementTree.iterparse` with the argument
223
{II}``events=["start", "end"]``
224
{I}:raise: :py:class:`DeserializationException` if unexpected input
225
{I}:return:
226
{II}Instance of :py:class:`.types.{cls_name}` read from
227
{II}:paramref:`iterator`
228
{I}\"\"\"
229
{I}next_event_element = next(iterator, None)
230
{I}if next_event_element is None:
231
{II}raise DeserializationException(
232
{III}# fmt: off
233
{III}"Expected the start element for {cls_name}, "
234
{III}"but got the end-of-input"
235
{III}# fmt: on
236
{II})
237

238
{I}next_event, next_element = next_event_element
239
{I}if next_event != 'start':
240
{II}raise DeserializationException(
241
{III}f"Expected the start element for {cls_name}, "
242
{III}f"but got event {{next_event!r}} and element {{next_element.tag!r}}"
243
{II})
244

245
{I}try:
246
{II}return {wrapped_function_name}(
247
{III}next_element,
248
{III}iterator
249
{II})
250
{I}except DeserializationException as exception:
251
{II}exception.path._prepend(ElementSegment(next_element))
252
{II}raise exception"""
253
    )
254

255

256
def _generate_read_cls_from_stream(
4✔
257
    cls: Union[intermediate.AbstractClass, intermediate.ConcreteClass],
258
    aas_module: python_common.QualifiedModuleName,
259
) -> Stripped:
260
    """Generate the public function for the reading of a ``cls`` from a stream."""
261
    function_name = python_naming.function_name(Identifier(f"{cls.name}_from_stream"))
4✔
262

263
    from_iterparse_name = python_naming.function_name(
4✔
264
        Identifier(f"{cls.name}_from_iterparse")
265
    )
266

267
    cls_name = python_naming.class_name(cls.name)
4✔
268

269
    return Stripped(
4✔
270
        f"""\
271
def {function_name}(
272
{I}stream: TextIO,
273
{I}has_iterparse: HasIterparse = xml.etree.ElementTree
274
) -> aas_types.{cls_name}:
275
{I}\"\"\"
276
{I}Read an instance of :py:class:`.types.{cls_name}` from
277
{I}the :paramref:`stream`.
278

279
{I}Example usage:
280

281
{I}.. code-block::
282

283
{I}    import {aas_module}.xmlization as aas_xmlization
284

285
{I}    with open_some_stream_over_network(...) as stream:
286
{I}        instance = aas_xmlization.{function_name}(
287
{I}            stream
288
{I}        )
289

290
{I}    # Do something with the ``instance``
291

292
{I}:param stream:
293
{II}representing an instance of
294
{II}:py:class:`.types.{cls_name}` in XML
295
{I}:param has_iterparse:
296
{II}Module containing ``iterparse`` function.
297

298
{II}Default is to use :py:mod:`xml.etree.ElementTree` from the standard
299
{II}library. If you have to deal with malicious input, consider using
300
{II}a library such as `defusedxml.ElementTree`_.
301
{I}:raise: :py:class:`DeserializationException` if unexpected input
302
{I}:return:
303
{II}Instance of :py:class:`.types.{cls_name}` read from
304
{II}:paramref:`stream`
305
{I}\"\"\"
306
{I}iterator = has_iterparse.iterparse(
307
{II}stream,
308
{II}['start', 'end']
309
{I})
310
{I}return {from_iterparse_name}(
311
{II}_with_elements_cleared_after_yield(iterator)
312
{I})"""
313
    )
314

315

316
def _generate_read_cls_from_file(
4✔
317
    cls: Union[intermediate.AbstractClass, intermediate.ConcreteClass],
318
    aas_module: python_common.QualifiedModuleName,
319
) -> Stripped:
320
    """Generate the public function for the reading of a ``cls`` from a file."""
321
    function_name = python_naming.function_name(Identifier(f"{cls.name}_from_file"))
4✔
322

323
    from_iterparse_name = python_naming.function_name(
4✔
324
        Identifier(f"{cls.name}_from_iterparse")
325
    )
326

327
    cls_name = python_naming.class_name(cls.name)
4✔
328

329
    return Stripped(
4✔
330
        f"""\
331
def {function_name}(
332
{I}path: PathLike,
333
{I}has_iterparse: HasIterparse = xml.etree.ElementTree
334
) -> aas_types.{cls_name}:
335
{I}\"\"\"
336
{I}Read an instance of :py:class:`.types.{cls_name}` from
337
{I}the :paramref:`path`.
338

339
{I}Example usage:
340

341
{I}.. code-block::
342

343
{I}    import pathlib
344
{I}    import {aas_module}.xmlization as aas_xmlization
345

346
{I}    path = pathlib.Path(...)
347
{I}    instance = aas_xmlization.{function_name}(
348
{I}        path
349
{I}    )
350

351
{I}    # Do something with the ``instance``
352

353
{I}:param path:
354
{II}to the file representing an instance of
355
{II}:py:class:`.types.{cls_name}` in XML
356
{I}:param has_iterparse:
357
{II}Module containing ``iterparse`` function.
358

359
{II}Default is to use :py:mod:`xml.etree.ElementTree` from the standard
360
{II}library. If you have to deal with malicious input, consider using
361
{II}a library such as `defusedxml.ElementTree`_.
362
{I}:raise: :py:class:`DeserializationException` if unexpected input
363
{I}:return:
364
{II}Instance of :py:class:`.types.{cls_name}` read from
365
{II}:paramref:`path`
366
{I}\"\"\"
367
{I}with open(os.fspath(path), "rt", encoding='utf-8') as fid:
368
{II}iterator = has_iterparse.iterparse(
369
{III}fid,
370
{III}['start', 'end']
371
{II})
372
{II}return {from_iterparse_name}(
373
{III}_with_elements_cleared_after_yield(iterator)
374
{II})"""
375
    )
376

377

378
def _generate_read_cls_from_str(
4✔
379
    cls: Union[intermediate.AbstractClass, intermediate.ConcreteClass],
380
    aas_module: python_common.QualifiedModuleName,
381
) -> Stripped:
382
    """Generate the public function for the reading of a ``cls`` from a string."""
383
    function_name = python_naming.function_name(Identifier(f"{cls.name}_from_str"))
4✔
384

385
    from_iterparse_name = python_naming.function_name(
4✔
386
        Identifier(f"{cls.name}_from_iterparse")
387
    )
388

389
    cls_name = python_naming.class_name(cls.name)
4✔
390

391
    return Stripped(
4✔
392
        f"""\
393
def {function_name}(
394
{I}text: str,
395
{I}has_iterparse: HasIterparse = xml.etree.ElementTree
396
) -> aas_types.{cls_name}:
397
{I}\"\"\"
398
{I}Read an instance of :py:class:`.types.{cls_name}` from
399
{I}the :paramref:`text`.
400

401
{I}Example usage:
402

403
{I}.. code-block::
404

405
{I}    import pathlib
406
{I}    import {aas_module}.xmlization as aas_xmlization
407

408
{I}    text = "<...>...</...>"
409
{I}    instance = aas_xmlization.{function_name}(
410
{I}        text
411
{I}    )
412

413
{I}    # Do something with the ``instance``
414

415
{I}:param text:
416
{II}representing an instance of
417
{II}:py:class:`.types.{cls_name}` in XML
418
{I}:param has_iterparse:
419
{II}Module containing ``iterparse`` function.
420

421
{II}Default is to use :py:mod:`xml.etree.ElementTree` from the standard
422
{II}library. If you have to deal with malicious input, consider using
423
{II}a library such as `defusedxml.ElementTree`_.
424
{I}:raise: :py:class:`DeserializationException` if unexpected input
425
{I}:return:
426
{II}Instance of :py:class:`.types.{cls_name}` read from
427
{II}:paramref:`text`
428
{I}\"\"\"
429
{I}iterator = has_iterparse.iterparse(
430
{II}io.StringIO(text),
431
{II}['start', 'end']
432
{I})
433
{I}return {from_iterparse_name}(
434
{II}_with_elements_cleared_after_yield(iterator)
435
{I})"""
436
    )
437

438

439
# fmt: off
440
@require(
4✔
441
    lambda cls:
442
    not isinstance(cls, intermediate.AbstractClass)
443
    or len(cls.concrete_descendants) > 0,
444
    "All abstract classes must have concrete descendants; otherwise we can not dispatch"
445
)
446
# fmt: on
447
def _generate_read_cls_as_element(
4✔
448
    cls: Union[intermediate.AbstractClass, intermediate.ConcreteClass]
449
) -> Stripped:
450
    """Generate the read function to dispatch or read a concrete instance of ``cls``."""
451

452
    if len(cls.concrete_descendants) > 0:
4✔
453
        dispatch_map = python_naming.private_constant_name(
4✔
454
            Identifier(f"dispatch_for_{cls.name}")
455
        )
456

457
        cls_name = python_naming.class_name(cls.name)
4✔
458

459
        body = Stripped(
4✔
460
            f"""\
461
tag_wo_ns = _parse_element_tag(element)
462
read_as_sequence = {dispatch_map}.get(
463
{I}tag_wo_ns,
464
{I}None
465
)
466

467
if read_as_sequence is None:
468
{I}raise DeserializationException(
469
{II}f"Expected the element tag to be a valid model type "
470
{II}f"of a concrete instance of '{cls_name}', "
471
{II}f"but got tag {{tag_wo_ns!r}}"
472
{I})
473

474
return read_as_sequence(
475
{I}element,
476
{I}iterator
477
)"""
478
        )
479
    else:
480
        xml_cls = naming.xml_class_name(cls.name)
4✔
481
        xml_cls_literal = python_common.string_literal(xml_cls)
4✔
482

483
        read_as_sequence_function_name = python_naming.function_name(
4✔
484
            Identifier(f"_read_{cls.name}_as_sequence")
485
        )
486

487
        body = Stripped(
4✔
488
            f"""\
489
tag_wo_ns = _parse_element_tag(element)
490

491
if tag_wo_ns != {xml_cls_literal}:
492
{I}raise DeserializationException(
493
{II}f"Expected the element with the tag '{xml_cls}', "
494
{II}f"but got tag: {{tag_wo_ns}}"
495
{I})
496

497
return {read_as_sequence_function_name}(
498
{I}element,
499
{I}iterator
500
)"""
501
        )
502

503
    function_name = python_naming.function_name(
4✔
504
        Identifier(f"_read_{cls.name}_as_element")
505
    )
506

507
    cls_name = python_naming.class_name(cls.name)
4✔
508

509
    return Stripped(
4✔
510
        f"""\
511
def {function_name}(
512
{I}element: Element,
513
{I}iterator: Iterator[Tuple[str, Element]]
514
) -> aas_types.{cls_name}:
515
{I}\"\"\"
516
{I}Read an instance of :py:class:`.types.{cls_name}` from
517
{I}:paramref:`iterator`, including the end element.
518

519
{I}:param element: start element
520
{I}:param iterator:
521
{II}Input stream of ``(event, element)`` coming from
522
{II}:py:func:`xml.etree.ElementTree.iterparse` with the argument
523
{II}``events=["start", "end"]``
524
{I}:raise: :py:class:`DeserializationException` if unexpected input
525
{I}:return: parsed instance
526
{I}\"\"\"
527
{I}{indent_but_first_line(body, I)}"""
528
    )
529

530

531
def _generate_read_from_iterparse(
4✔
532
    aas_module: python_common.QualifiedModuleName,
533
) -> Stripped:
534
    """Generate the general read function to parse an instance from iterparse."""
535
    function_name = "from_iterparse"
4✔
536

537
    return Stripped(
4✔
538
        f"""\
539
def {function_name}(
540
{I}iterator: Iterator[Tuple[str, Element]]
541
) -> aas_types.Class:
542
{I}\"\"\"
543
{I}Read an instance from the :paramref:`iterator`.
544

545
{I}The type of the instance is determined by the very first start element.
546

547
{I}Example usage:
548

549
{I}.. code-block::
550

551
{I}    import pathlib
552
{I}    import xml.etree.ElementTree as ET
553

554
{I}    import {aas_module}.xmlization as aas_xmlization
555

556
{I}    path = pathlib.Path(...)
557
{I}    with path.open("rt") as fid:
558
{I}        iterator = ET.iterparse(
559
{I}            source=fid,
560
{I}            events=['start', 'end']
561
{I}        )
562
{I}        instance = aas_xmlization.{function_name}(
563
{I}            iterator
564
{I}        )
565

566
{I}    # Do something with the ``instance``
567

568
{I}:param iterator:
569
{II}Input stream of ``(event, element)`` coming from
570
{II}:py:func:`xml.etree.ElementTree.iterparse` with the argument
571
{II}``events=["start", "end"]``
572
{I}:raise: :py:class:`DeserializationException` if unexpected input
573
{I}:return:
574
{II}Instance of :py:class:`.types.Class` read from the :paramref:`iterator`
575
{I}\"\"\"
576
{I}next_event_element = next(iterator, None)
577
{I}if next_event_element is None:
578
{II}raise DeserializationException(
579
{III}# fmt: off
580
{III}"Expected the start element of an instance, "
581
{III}"but got the end-of-input"
582
{III}# fmt: on
583
{II})
584

585
{I}next_event, next_element = next_event_element
586
{I}if next_event != 'start':
587
{II}raise DeserializationException(
588
{III}f"Expected the start element of an instance, "
589
{III}f"but got event {{next_event!r}} and element {{next_element.tag!r}}"
590
{II})
591

592
{I}try:
593
{II}return _read_as_element(
594
{III}next_element,
595
{III}iterator
596
{II})
597
{I}except DeserializationException as exception:
598
{II}exception.path._prepend(ElementSegment(next_element))
599
{II}raise exception"""
600
    )
601

602

603
def _generate_read_from_stream(
4✔
604
    aas_module: python_common.QualifiedModuleName,
605
) -> Stripped:
606
    """Generate the general read function to parse an instance from a text stream."""
607
    function_name = python_naming.function_name(Identifier("from_stream"))
4✔
608

609
    return Stripped(
4✔
610
        f"""\
611
def {function_name}(
612
{I}stream: TextIO,
613
{I}has_iterparse: HasIterparse = xml.etree.ElementTree
614
) -> aas_types.Class:
615
{I}\"\"\"
616
{I}Read an instance from the :paramref:`stream`.
617

618
{I}The type of the instance is determined by the very first start element.
619

620
{I}Example usage:
621

622
{I}.. code-block::
623

624
{I}    import {aas_module}.xmlization as aas_xmlization
625

626
{I}    with open_some_stream_over_network(...) as stream:
627
{I}        instance = aas_xmlization.{function_name}(
628
{I}            stream
629
{I}        )
630

631
{I}    # Do something with the ``instance``
632

633
{I}:param stream:
634
{II}representing an instance in XML
635
{I}:param has_iterparse:
636
{II}Module containing ``iterparse`` function.
637

638
{II}Default is to use :py:mod:`xml.etree.ElementTree` from the standard
639
{II}library. If you have to deal with malicious input, consider using
640
{II}a library such as `defusedxml.ElementTree`_.
641
{I}:raise: :py:class:`DeserializationException` if unexpected input
642
{I}:return:
643
{II}Instance read from :paramref:`stream`
644
{I}\"\"\"
645
{I}iterator = has_iterparse.iterparse(
646
{II}stream,
647
{II}['start', 'end']
648
{I})
649
{I}return from_iterparse(
650
{II}_with_elements_cleared_after_yield(iterator)
651
{I})"""
652
    )
653

654

655
def _generate_read_from_file(aas_module: python_common.QualifiedModuleName) -> Stripped:
4✔
656
    """Generate the general read function to parse an instance from a file."""
657
    function_name = python_naming.function_name(Identifier("from_file"))
4✔
658

659
    return Stripped(
4✔
660
        f"""\
661
def {function_name}(
662
{I}path: PathLike,
663
{I}has_iterparse: HasIterparse = xml.etree.ElementTree
664
) -> aas_types.Class:
665
{I}\"\"\"
666
{I}Read an instance from the file at the :paramref:`path`.
667

668
{I}Example usage:
669

670
{I}.. code-block::
671

672
{I}    import pathlib
673
{I}    import {aas_module}.xmlization as aas_xmlization
674

675
{I}    path = pathlib.Path(...)
676
{I}    instance = aas_xmlization.{function_name}(
677
{I}        path
678
{I}    )
679

680
{I}    # Do something with the ``instance``
681

682
{I}:param path:
683
{II}to the file representing an instance in XML
684
{I}:param has_iterparse:
685
{II}Module containing ``iterparse`` function.
686

687
{II}Default is to use :py:mod:`xml.etree.ElementTree` from the standard
688
{II}library. If you have to deal with malicious input, consider using
689
{II}a library such as `defusedxml.ElementTree`_.
690
{I}:raise: :py:class:`DeserializationException` if unexpected input
691
{I}:return:
692
{II}Instance read from the file at :paramref:`path`
693
{I}\"\"\"
694
{I}with open(os.fspath(path), "rt", encoding='utf-8') as fid:
695
{II}iterator = has_iterparse.iterparse(
696
{III}fid,
697
{III}['start', 'end']
698
{II})
699
{II}return from_iterparse(
700
{III}_with_elements_cleared_after_yield(iterator)
701
{II})"""
702
    )
703

704

705
def _generate_read_from_str(aas_module: python_common.QualifiedModuleName) -> Stripped:
4✔
706
    """Generate the general read function to parse an instance from a string."""
707
    function_name = python_naming.function_name(Identifier("from_str"))
4✔
708

709
    return Stripped(
4✔
710
        f"""\
711
def {function_name}(
712
{I}text: str,
713
{I}has_iterparse: HasIterparse = xml.etree.ElementTree
714
) -> aas_types.Class:
715
{I}\"\"\"
716
{I}Read an instance from the :paramref:`text`.
717

718
{I}Example usage:
719

720
{I}.. code-block::
721

722
{I}    import pathlib
723
{I}    import {aas_module}.xmlization as aas_xmlization
724

725
{I}    text = "<...>...</...>"
726
{I}    instance = aas_xmlization.{function_name}(
727
{I}        text
728
{I}    )
729

730
{I}    # Do something with the ``instance``
731

732
{I}:param text:
733
{II}representing an instance in XML
734
{I}:param has_iterparse:
735
{II}Module containing ``iterparse`` function.
736

737
{II}Default is to use :py:mod:`xml.etree.ElementTree` from the standard
738
{II}library. If you have to deal with malicious input, consider using
739
{II}a library such as `defusedxml.ElementTree`_.
740
{I}:raise: :py:class:`DeserializationException` if unexpected input
741
{I}:return:
742
{II}Instance read from :paramref:`text`
743
{I}\"\"\"
744
{I}iterator = has_iterparse.iterparse(
745
{II}io.StringIO(text),
746
{II}['start', 'end']
747
{I})
748
{I}return from_iterparse(
749
{II}_with_elements_cleared_after_yield(iterator)
750
{I})"""
751
    )
752

753

754
def _generate_general_read_as_element(
4✔
755
    symbol_table: intermediate.SymbolTable,
756
) -> Stripped:
757
    """Generate the general read function to dispatch on concrete classes."""
758
    dispatch_map = python_naming.private_constant_name(Identifier("general_dispatch"))
4✔
759

760
    body = Stripped(
4✔
761
        f"""\
762
tag_wo_ns = _parse_element_tag(element)
763
read_as_sequence = {dispatch_map}.get(
764
{I}tag_wo_ns,
765
{I}None
766
)
767

768
if read_as_sequence is None:
769
{I}raise DeserializationException(
770
{II}f"Expected the element tag to be a valid model type "
771
{II}f"of a concrete instance, "
772
{II}f"but got tag {{tag_wo_ns!r}}"
773
{I})
774

775
return read_as_sequence(
776
{I}element,
777
{I}iterator
778
)"""
779
    )
780

781
    return Stripped(
4✔
782
        f"""\
783
def _read_as_element(
784
{I}element: Element,
785
{I}iterator: Iterator[Tuple[str, Element]]
786
) -> aas_types.Class:
787
{I}\"\"\"
788
{I}Read an instance from :paramref:`iterator`, including the end element.
789

790
{I}:param element: start element
791
{I}:param iterator:
792
{II}Input stream of ``(event, element)`` coming from
793
{II}:py:func:`xml.etree.ElementTree.iterparse` with the argument
794
{II}``events=["start", "end"]``
795
{I}:raise: :py:class:`DeserializationException` if unexpected input
796
{I}:return: parsed instance
797
{I}\"\"\"
798
{I}{indent_but_first_line(body, I)}"""
799
    )
800

801

802
_READ_FUNCTION_BY_PRIMITIVE_TYPE = {
4✔
803
    intermediate.PrimitiveType.BOOL: "_read_bool_from_element_text",
804
    intermediate.PrimitiveType.INT: "_read_int_from_element_text",
805
    intermediate.PrimitiveType.FLOAT: "_read_float_from_element_text",
806
    intermediate.PrimitiveType.STR: "_read_str_from_element_text",
807
    intermediate.PrimitiveType.BYTEARRAY: "_read_bytes_from_element_text",
808
}
809
assert all(
4✔
810
    literal in _READ_FUNCTION_BY_PRIMITIVE_TYPE
811
    for literal in intermediate.PrimitiveType
812
)
813

814

815
def _generate_reader_and_setter(cls: intermediate.ConcreteClass) -> Stripped:
4✔
816
    """Generate the ``ReaderAndSetterFor{cls}``."""
817
    methods = []  # type: List[Stripped]
4✔
818

819
    cls_name = python_naming.class_name(cls.name)
4✔
820

821
    init_writer = io.StringIO()
4✔
822
    for i, prop in enumerate(cls.properties):
4✔
823
        prop_name = python_naming.property_name(prop.name)
4✔
824
        prop_type = python_common.generate_type(
4✔
825
            prop.type_annotation, types_module=Identifier("aas_types")
826
        )
827

828
        # NOTE (mristin, 2022-07-22):
829
        # We make all the properties optional since we switch over the properties
830
        # during the de-serialization.
831
        if not isinstance(prop.type_annotation, intermediate.OptionalTypeAnnotation):
4✔
832
            prop_type = Stripped(f"Optional[{prop_type}]")
4✔
833

834
        if i > 0:
4✔
835
            init_writer.write("\n")
4✔
836
        init_writer.write(f"self.{prop_name}: {prop_type} = None")
4✔
837

838
    methods.append(
4✔
839
        Stripped(
840
            f"""\
841
def __init__(self) -> None:
842
{I}\"\"\"Initialize with all the properties unset.\"\"\"
843
{I}{indent_but_first_line(init_writer.getvalue(), I)}"""
844
        )
845
    )
846

847
    for prop in cls.properties:
4✔
848
        type_anno = intermediate.beneath_optional(prop.type_annotation)
4✔
849

850
        prop_name = python_naming.property_name(prop.name)
4✔
851

852
        method_body: Stripped
853

854
        if isinstance(type_anno, intermediate.PrimitiveTypeAnnotation) or (
4✔
855
            isinstance(type_anno, intermediate.OurTypeAnnotation)
856
            and isinstance(type_anno.our_type, intermediate.ConstrainedPrimitive)
857
        ):
858
            primitive_type = intermediate.try_primitive_type(type_anno)
4✔
859
            assert primitive_type is not None
4✔
860

861
            read_function = _READ_FUNCTION_BY_PRIMITIVE_TYPE[primitive_type]
4✔
862

863
            method_body = Stripped(
4✔
864
                f"""\
865
self.{prop_name} = {read_function}(
866
{I}element,
867
{I}iterator
868
)"""
869
            )
870

871
        elif isinstance(type_anno, intermediate.OurTypeAnnotation):
4✔
872
            our_type = type_anno.our_type
4✔
873
            if isinstance(our_type, intermediate.Enumeration):
4✔
874
                read_function = python_naming.private_function_name(
4✔
875
                    Identifier(f"read_{our_type.name}_from_element_text")
876
                )
877

878
                method_body = Stripped(
4✔
879
                    f"""\
880
self.{prop_name} = {read_function}(
881
{I}element,
882
{I}iterator
883
)"""
884
                )
885

886
            elif isinstance(our_type, intermediate.ConstrainedPrimitive):
4✔
887
                raise AssertionError(
×
888
                    f"Expected {intermediate.ConstrainedPrimitive.__name__} "
889
                    f"to have been handled before"
890
                )
891

892
            elif isinstance(
4✔
893
                our_type, (intermediate.AbstractClass, intermediate.ConcreteClass)
894
            ):
895
                prop_cls_name = python_naming.class_name(our_type.name)
4✔
896

897
                if len(our_type.concrete_descendants) > 0:
4✔
898
                    read_prop_cls_as_element = python_naming.function_name(
4✔
899
                        Identifier(f"_read_{our_type.name}_as_element")
900
                    )
901

902
                    method_body = Stripped(
4✔
903
                        f"""\
904
next_event_element = next(iterator, None)
905
if next_event_element is None:
906
{I}raise DeserializationException(
907
{II}"Expected a discriminator start element corresponding "
908
{II}"to {prop_cls_name}, but got end-of-input"
909
{I})
910

911
next_event, next_element = next_event_element
912
if next_event != 'start':
913
{I}raise DeserializationException(
914
{II}f"Expected a discriminator start element corresponding "
915
{II}f"to {prop_cls_name}, "
916
{II}f"but got event {{next_event!r}} and element {{next_element.tag!r}}"
917
{I})
918

919
try:
920
{I}result = {read_prop_cls_as_element}(
921
{II}next_element,
922
{II}iterator
923
{I})
924
except DeserializationException as exception:
925
{I}exception.path._prepend(ElementSegment(next_element))
926
{I}raise
927

928
_read_end_element(element, iterator)
929

930
self.{prop_name} = result"""
931
                    )
932
                else:
933
                    read_prop_cls_as_sequence = python_naming.function_name(
4✔
934
                        Identifier(f"_read_{our_type.name}_as_sequence")
935
                    )
936

937
                    method_body = Stripped(
4✔
938
                        f"""\
939
self.{prop_name} = {read_prop_cls_as_sequence}(
940
{I}element,
941
{I}iterator
942
)"""
943
                    )
944

945
        elif isinstance(type_anno, intermediate.ListTypeAnnotation):
4✔
946
            if isinstance(
4✔
947
                type_anno.items, intermediate.OurTypeAnnotation
948
            ) and isinstance(
949
                type_anno.items.our_type,
950
                (intermediate.AbstractClass, intermediate.ConcreteClass),
951
            ):
952
                read_item_cls_as_element = python_naming.function_name(
4✔
953
                    Identifier(f"_read_{type_anno.items.our_type.name}_as_element")
954
                )
955

956
                items_type = python_common.generate_type(
4✔
957
                    type_anno.items, types_module=Identifier("aas_types")
958
                )
NEW
959
            elif isinstance(type_anno.items, intermediate.PrimitiveTypeAnnotation):
×
NEW
960
                read_item_cls_as_element = Identifier(
×
961
                    _READ_FUNCTION_BY_PRIMITIVE_TYPE[type_anno.items.a_type]
962
                )
NEW
963
                items_type = Stripped(str(type_anno.items.a_type))
×
964
            else:
UNCOV
965
                raise AssertionError(
×
966
                    "(mristin, 2022-10-09) We handle only lists of classes and primitive types"
967
                    "in the XML de-serialization at the moment. The meta-model does not contain "
968
                    "any other lists, so we wanted to keep the code as simple as "
969
                    "possible, and avoid unrolling. Please contact the developers "
970
                    "if you need this feature."
971
                )
972

973
            method_body = Stripped(
4✔
974
                f"""\
975
if element.text is not None and len(element.text.strip()) != 0:
976
{I}raise DeserializationException(
977
{II}f"Expected only item elements and whitespace text, "
978
{II}f"but got text: {{element.text!r}}"
979
{I})
980

981
result: List[
982
{I}{items_type}
983
] = []
984

985
item_i = 0
986

987
while True:
988
{I}next_event_element = next(iterator, None)
989
{I}if next_event_element is None:
990
{II}raise DeserializationException(
991
{III}"Expected one or more items from a list or the end element, "
992
{III}"but got end-of-input"
993
{II})
994

995
{I}next_event, next_element = next_event_element
996
{I}if next_event == 'end' and next_element.tag == element.tag:
997
{II}# We reached the end of the list.
998
{II}break
999

1000
{I}if next_event != 'start':
1001
{II}raise DeserializationException(
1002
{III}"Expected a start element corresponding to an item, "
1003
{III}f"but got event {{next_event!r}} and element {{next_element.tag!r}}"
1004
{II})
1005

1006
{I}try:
1007
{II}item = {read_item_cls_as_element}(
1008
{III}next_element,
1009
{III}iterator
1010
{II})
1011
{I}except DeserializationException as exception:
1012
{II}exception.path._prepend(IndexSegment(next_element, item_i))
1013
{II}raise
1014

1015
{I}result.append(item)
1016
{I}item_i += 1
1017

1018
self.{prop_name} = result"""
1019
            )
1020

1021
        else:
1022
            assert_never(type_anno)
×
1023

1024
        method_name = python_naming.method_name(Identifier(f"read_and_set_{prop.name}"))
4✔
1025
        methods.append(
4✔
1026
            Stripped(
1027
                f"""\
1028
def {method_name}(
1029
{I}self,
1030
{I}element: Element,
1031
{I}iterator: Iterator[Tuple[str, Element]]
1032
) -> None:
1033
{I}\"\"\"
1034
{I}Read :paramref:`element` as the property
1035
{I}:py:attr:`.types.{cls_name}.{prop_name}` and set it.
1036
{I}\"\"\"
1037
{I}{indent_but_first_line(method_body, I)}"""
1038
            )
1039
        )
1040

1041
    reader_and_setter_name = python_naming.private_class_name(
4✔
1042
        Identifier(f"Reader_and_setter_for_{cls.name}")
1043
    )
1044

1045
    writer = io.StringIO()
4✔
1046
    writer.write(
4✔
1047
        f"""\
1048
class {reader_and_setter_name}:
1049
{I}\"\"\"
1050
{I}Provide a buffer for reading and setting the properties for the class
1051
{I}:py:class:`{cls_name}`.
1052

1053
{I}The properties correspond to the constructor arguments of
1054
{I}:py:class:`{cls_name}`. We use this buffer to facilitate dispatching when
1055
{I}parsing the properties in a streaming fashion.
1056
{I}\"\"\""""
1057
    )
1058

1059
    for method in methods:
4✔
1060
        writer.write("\n\n")
4✔
1061
        writer.write(textwrap.indent(method, I))
4✔
1062

1063
    return Stripped(writer.getvalue())
4✔
1064

1065

1066
def _generate_read_as_sequence(cls: intermediate.ConcreteClass) -> Stripped:
4✔
1067
    """
1068
    Generate the method to read the instance as sequence of XML-encoded properties.
1069

1070
    This function performs no dispatch! The dispatch is expected to have been
1071
    performed already based on the discriminator element.
1072

1073
    The properties are expected to correspond to the constructor arguments of
1074
    the ``cls``.
1075
    """
1076
    # fmt: off
1077
    assert (
4✔
1078
            sorted(
1079
                (arg.name, str(arg.type_annotation))
1080
                for arg in cls.constructor.arguments
1081
            ) == sorted(
1082
                (prop.name, str(prop.type_annotation))
1083
                for prop in cls.properties
1084
            )
1085
    ), (
1086
        "(mristin, 2022-10-11) We assume that the properties and constructor arguments "
1087
        "are identical at this point. If this is not the case, we have to re-write the "
1088
        "logic substantially! Please contact the developers if you see this."
1089
    )
1090
    # fmt: on
1091

1092
    blocks = [
4✔
1093
        Stripped(
1094
            f"""\
1095
if element.text is not None and len(element.text.strip()) != 0:
1096
{I}raise DeserializationException(
1097
{II}f"Expected only XML elements representing the properties and whitespace text, "
1098
{II}f"but got text: {{element.text!r}}"
1099
{I})"""
1100
        ),
1101
        Stripped("_raise_if_has_tail_or_attrib(element)"),
1102
    ]  # type: List[Stripped]
1103

1104
    # region Body
1105

1106
    cls_name = python_naming.class_name(cls.name)
4✔
1107

1108
    if len(cls.constructor.arguments) == 0:
4✔
1109
        blocks.append(
×
1110
            Stripped(
1111
                f"""\
1112
next_event_element = next(iterator, None)
1113
if next_event_element is None:
1114
{I}raise DeserializationException(
1115
{II}f"Expected the end element corresponding to {{element.tag}}, "
1116
{II}f"but got the end-of-input"
1117
{I})
1118

1119
{I}next_event, next_element = next_event_element
1120
{I}if next_event != 'end' or next_element.tag == element.tag:
1121
{I}raise DeserializationException(
1122
{II}f"Expected the end element corresponding to {{element.tag}}, "
1123
{II}f"but got event {{next_event!r}} and element {{next_element.tag!r}}"
1124
{I})"""
1125
            )
1126
        )
1127

1128
        blocks.append(Stripped(f"return aas_types.{cls_name}()"))
×
1129
    else:
1130
        reader_and_setter_name = python_naming.private_class_name(
4✔
1131
            Identifier(f"Reader_and_setter_for_{cls.name}")
1132
        )
1133

1134
        read_and_set_dispatch_name = python_naming.private_constant_name(
4✔
1135
            Identifier(f"read_and_set_dispatch_for_{cls.name}")
1136
        )
1137

1138
        blocks.append(
4✔
1139
            Stripped(
1140
                f"""\
1141
reader_and_setter = (
1142
{I}{reader_and_setter_name}()
1143
)
1144

1145
while True:
1146
{I}next_event_element = next(iterator, None)
1147
{I}if next_event_element is None:
1148
{II}raise DeserializationException(
1149
{III}"Expected one or more XML-encoded properties or the end element, "
1150
{III}"but got the end-of-input"
1151
{II})
1152

1153
{I}next_event, next_element = next_event_element
1154
{I}if next_event == 'end' and next_element.tag == element.tag:
1155
{II}# We reached the end element enclosing the sequence.
1156
{II}break
1157

1158
{I}if next_event != 'start':
1159
{II}raise DeserializationException(
1160
{III}"Expected a start element corresponding to a property, "
1161
{III}f"but got event {{next_event!r}} and element {{next_element.tag!r}}"
1162
{II})
1163

1164
{I}try:
1165
{II}tag_wo_ns = _parse_element_tag(next_element)
1166
{I}except DeserializationException as exception:
1167
{II}exception.path._prepend(ElementSegment(next_element))
1168
{II}raise
1169

1170
{I}read_and_set_method = {read_and_set_dispatch_name}.get(
1171
{II}tag_wo_ns,
1172
{II}None
1173
{I})
1174
{I}if read_and_set_method is None:
1175
{II}an_exception = DeserializationException(
1176
{III}f"Expected an element representing a property, "
1177
{III}f"but got an element with unexpected tag: {{tag_wo_ns!r}}"
1178
{II})
1179
{II}an_exception.path._prepend(ElementSegment(next_element))
1180
{II}raise an_exception
1181

1182
{I}try:
1183
{II}read_and_set_method(
1184
{III}reader_and_setter,
1185
{III}next_element,
1186
{III}iterator
1187
{II})
1188
{I}except DeserializationException as exception:
1189
{II}exception.path._prepend(ElementSegment(next_element))
1190
{II}raise"""
1191
            )
1192
        )
1193

1194
        for i, prop in enumerate(cls.properties):
4✔
1195
            if isinstance(prop.type_annotation, intermediate.OptionalTypeAnnotation):
4✔
1196
                continue
4✔
1197

1198
            prop_name = python_naming.property_name(prop.name)
4✔
1199

1200
            cause_literal = python_common.string_literal(
4✔
1201
                f"The required property {naming.xml_property(prop.name)!r} is missing"
1202
            )
1203
            blocks.append(
4✔
1204
                Stripped(
1205
                    f"""\
1206
if reader_and_setter.{prop_name} is None:
1207
{I}raise DeserializationException(
1208
{II}{cause_literal}
1209
{I})"""
1210
                )
1211
            )
1212

1213
        init_writer = io.StringIO()
4✔
1214
        init_writer.write(f"return aas_types.{cls_name}(\n")
4✔
1215

1216
        for i, arg in enumerate(cls.constructor.arguments):
4✔
1217
            prop = cls.properties_by_name[arg.name]
4✔
1218

1219
            prop_name = python_naming.property_name(prop.name)
4✔
1220

1221
            init_writer.write(f"{I}reader_and_setter.{prop_name}")
4✔
1222

1223
            if i < len(cls.constructor.arguments) - 1:
4✔
1224
                init_writer.write(",\n")
4✔
1225
            else:
1226
                init_writer.write("\n")
4✔
1227

1228
        init_writer.write(")")
4✔
1229

1230
        blocks.append(Stripped(init_writer.getvalue()))
4✔
1231

1232
    # endregion
1233

1234
    function_name = python_naming.private_function_name(
4✔
1235
        Identifier(f"read_{cls.name}_as_sequence")
1236
    )
1237

1238
    writer = io.StringIO()
4✔
1239
    writer.write(
4✔
1240
        f"""\
1241
def {function_name}(
1242
{II}element: Element,
1243
{II}iterator: Iterator[Tuple[str, Element]]
1244
) -> aas_types.{cls_name}:
1245
{I}\"\"\"
1246
{I}Read an instance of :py:class:`.types.{cls_name}`
1247
{I}as a sequence of XML-encoded properties.
1248

1249
{I}The end element corresponding to the :paramref:`element` will be
1250
{I}read as well.
1251

1252
{I}:param element: start element, parent of the sequence
1253
{I}:param iterator:
1254
{II}Input stream of ``(event, element)`` coming from
1255
{II}:py:func:`xml.etree.ElementTree.iterparse` with the argument
1256
{II}``events=["start", "end"]``
1257
{I}:raise: :py:class:`DeserializationException` if unexpected input
1258
{I}:return: parsed instance
1259
{I}\"\"\"
1260
"""
1261
    )
1262

1263
    for i, block in enumerate(blocks):
4✔
1264
        if i > 0:
4✔
1265
            writer.write("\n\n")
4✔
1266
        writer.write(textwrap.indent(block, I))
4✔
1267

1268
    return Stripped(writer.getvalue())
4✔
1269

1270

1271
@require(
4✔
1272
    lambda cls: len(cls.concrete_descendants) > 0,
1273
    "Expected the class to have concrete descendants; "
1274
    "otherwise it makes no sense to dispatch",
1275
)
1276
def _generate_dispatch_map_for_class(
4✔
1277
    cls: Union[intermediate.AbstractClass, intermediate.ConcreteClass]
1278
) -> Stripped:
1279
    """Generate the mapping model type 🠒 read-as-sequence function."""
1280
    mapping_name = python_naming.private_constant_name(
4✔
1281
        Identifier(f"dispatch_for_{cls.name}")
1282
    )
1283

1284
    mapping_writer = io.StringIO()
4✔
1285

1286
    cls_name = python_naming.class_name(cls.name)
4✔
1287
    if isinstance(cls, intermediate.AbstractClass):
4✔
1288
        mapping_writer.write(
4✔
1289
            f"""\
1290
#: Dispatch XML class names to read-as-sequence functions
1291
#: corresponding to concrete descendants of {cls_name}
1292
"""
1293
        )
1294
    else:
1295
        mapping_writer.write(
4✔
1296
            f"""\
1297
#: Dispatch XML class names to read-as-sequence functions
1298
#: corresponding to {cls_name} and its concrete descendants
1299
"""
1300
        )
1301

1302
    cls_name = python_naming.class_name(cls.name)
4✔
1303

1304
    mapping_writer.write(
4✔
1305
        f"""\
1306
{mapping_name}: Mapping[
1307
{I}str,
1308
{I}Callable[
1309
{II}[
1310
{III}Element,
1311
{III}Iterator[Tuple[str, Element]]
1312
{II}],
1313
{II}aas_types.{cls_name}
1314
{I}]
1315
] = {{
1316
"""
1317
    )
1318

1319
    dispatch_classes = list(cls.concrete_descendants)
4✔
1320

1321
    # NOTE (mristin, 2022-10-11):
1322
    # In case of concrete classes, we have to consider also dispatching to their
1323
    # own read function as ``concrete_descendants`` *exclude* the concrete class
1324
    # itself.
1325
    if isinstance(cls, intermediate.ConcreteClass):
4✔
1326
        dispatch_classes.insert(0, cls)
4✔
1327

1328
    for dispatch_class in dispatch_classes:
4✔
1329
        read_as_sequence_name = python_naming.private_function_name(
4✔
1330
            Identifier(f"read_{dispatch_class.name}_as_sequence")
1331
        )
1332

1333
        xml_name_literal = python_common.string_literal(
4✔
1334
            naming.xml_class_name(dispatch_class.name)
1335
        )
1336

1337
        mapping_writer.write(
4✔
1338
            f"""\
1339
{I}{xml_name_literal}: {read_as_sequence_name},
1340
"""
1341
        )
1342

1343
    mapping_writer.write("}")
4✔
1344

1345
    return Stripped(mapping_writer.getvalue())
4✔
1346

1347

1348
def _generate_general_dispatch_map(symbol_table: intermediate.SymbolTable) -> Stripped:
4✔
1349
    """Generate the general mapping model type 🠒 read-as-sequence function."""
1350
    mapping_name = python_naming.private_constant_name(Identifier("general_dispatch"))
4✔
1351

1352
    mapping_writer = io.StringIO()
4✔
1353

1354
    mapping_writer.write(
4✔
1355
        """\
1356
#: Dispatch XML class names to read-as-sequence functions
1357
#: corresponding to the concrete classes
1358
"""
1359
    )
1360

1361
    mapping_writer.write(
4✔
1362
        f"""\
1363
{mapping_name}: Mapping[
1364
{I}str,
1365
{I}Callable[
1366
{II}[
1367
{III}Element,
1368
{III}Iterator[Tuple[str, Element]]
1369
{II}],
1370
{II}aas_types.Class
1371
{I}]
1372
] = {{
1373
"""
1374
    )
1375

1376
    for concrete_cls in symbol_table.concrete_classes:
4✔
1377
        read_as_sequence_name = python_naming.private_function_name(
4✔
1378
            Identifier(f"read_{concrete_cls.name}_as_sequence")
1379
        )
1380

1381
        xml_name_literal = python_common.string_literal(
4✔
1382
            naming.xml_class_name(concrete_cls.name)
1383
        )
1384

1385
        mapping_writer.write(
4✔
1386
            f"""\
1387
{I}{xml_name_literal}: {read_as_sequence_name},
1388
"""
1389
        )
1390

1391
    mapping_writer.write("}")
4✔
1392

1393
    return Stripped(mapping_writer.getvalue())
4✔
1394

1395

1396
def _generate_reader_and_setter_map(cls: intermediate.ConcreteClass) -> Stripped:
4✔
1397
    """Generate the mapping property name 🠒 read function."""
1398
    # fmt: off
1399
    assert (
4✔
1400
            sorted(
1401
                (arg.name, str(arg.type_annotation))
1402
                for arg in cls.constructor.arguments
1403
            ) == sorted(
1404
                (prop.name, str(prop.type_annotation))
1405
                for prop in cls.properties
1406
            )
1407
    ), (
1408
        "(mristin, 2022-10-11) We assume that the properties and constructor arguments "
1409
        "are identical at this point. If this is not the case, we have to re-write the "
1410
        "logic substantially! Please contact the developers if you see this."
1411
    )
1412
    # fmt: on
1413

1414
    identifiers_expressions = []  # type: List[Tuple[Identifier, Stripped]]
4✔
1415

1416
    reader_and_setter_cls_name = python_naming.private_class_name(
4✔
1417
        Identifier(f"Reader_and_setter_for_{cls.name}")
1418
    )
1419

1420
    for prop in cls.properties:
4✔
1421
        xml_identifier = naming.xml_property(prop.name)
4✔
1422
        method_name = python_naming.method_name(Identifier(f"read_and_set_{prop.name}"))
4✔
1423

1424
        identifiers_expressions.append(
4✔
1425
            (xml_identifier, Stripped(f"{reader_and_setter_cls_name}.{method_name}"))
1426
        )
1427

1428
    map_name = python_naming.private_constant_name(
4✔
1429
        Identifier(f"read_and_set_dispatch_for_{cls.name}")
1430
    )
1431

1432
    writer = io.StringIO()
4✔
1433
    writer.write(
4✔
1434
        f"""\
1435
#: Dispatch XML property name to read & set method in
1436
#: :py:class:`{reader_and_setter_cls_name}`
1437
{map_name}: Mapping[
1438
{I}str,
1439
{I}Callable[
1440
{II}[
1441
{III}{reader_and_setter_cls_name},
1442
{III}Element,
1443
{III}Iterator[Tuple[str, Element]]
1444
{II}],
1445
{II}None
1446
{I}]
1447
] = {{
1448
"""
1449
    )
1450
    for identifier, expression in identifiers_expressions:
4✔
1451
        writer.write(
4✔
1452
            f"""\
1453
{I}{python_common.string_literal(identifier)}:
1454
{II}{indent_but_first_line(expression, II)},
1455
"""
1456
        )
1457

1458
    writer.write("}")
4✔
1459
    return Stripped(writer.getvalue())
4✔
1460

1461

1462
_WRITE_METHOD_BY_PRIMITIVE_TYPE = {
4✔
1463
    intermediate.PrimitiveType.BOOL: "_write_bool_property",
1464
    intermediate.PrimitiveType.INT: "_write_int_property",
1465
    intermediate.PrimitiveType.FLOAT: "_write_float_property",
1466
    intermediate.PrimitiveType.STR: "_write_str_property",
1467
    intermediate.PrimitiveType.BYTEARRAY: "_write_bytes_property",
1468
}
1469
assert all(
4✔
1470
    literal in _WRITE_METHOD_BY_PRIMITIVE_TYPE for literal in intermediate.PrimitiveType
1471
)
1472

1473

1474
def _count_required_properties(cls: intermediate.Class) -> int:
4✔
1475
    """Count the number of properties which are marked as non-optional."""
1476
    return sum(
4✔
1477
        1
1478
        for prop in cls.properties
1479
        if not isinstance(prop.type_annotation, intermediate.OptionalTypeAnnotation)
1480
    )
1481

1482

1483
# fmt: off
1484
@require(
4✔
1485
    lambda prop:
1486
    (
1487
        type_anno := intermediate.beneath_optional(prop.type_annotation),
1488
        isinstance(type_anno, intermediate.OurTypeAnnotation)
1489
        and isinstance(type_anno.our_type, intermediate.ConcreteClass)
1490
        and len(type_anno.our_type.concrete_descendants) == 0
1491
    )[1],
1492
    "We expect the property to be of a concrete class with no descendants so that "
1493
    "its value can be represented as a sequence of XML elements, each corresponding "
1494
    "to a property of the value."
1495
)
1496
# fmt: on
1497
def _generate_snippet_for_writing_concrete_cls_prop(
4✔
1498
    prop: intermediate.Property,
1499
) -> Stripped:
1500
    """Generate the code snippet to write a class property as a sequence."""
1501
    type_anno = intermediate.beneath_optional(prop.type_annotation)
4✔
1502
    assert isinstance(type_anno, intermediate.OurTypeAnnotation)
4✔
1503

1504
    our_type = type_anno.our_type
4✔
1505
    assert isinstance(our_type, intermediate.ConcreteClass)
4✔
1506

1507
    xml_prop_literal = python_common.string_literal(naming.xml_property(prop.name))
4✔
1508

1509
    write_cls_as_sequence = python_naming.private_method_name(
4✔
1510
        Identifier(f"write_{our_type.name}_as_sequence")
1511
    )
1512

1513
    prop_name = python_naming.property_name(prop.name)
4✔
1514

1515
    if _count_required_properties(our_type) > 0:
4✔
1516
        return Stripped(
4✔
1517
            f"""\
1518
self._write_start_element({xml_prop_literal})
1519
self.{write_cls_as_sequence}(
1520
{I}that.{prop_name}
1521
)
1522
self._write_end_element({xml_prop_literal})"""
1523
        )
1524

1525
    # NOTE (mristin, 2022-10-14):
1526
    # Prefix with "the" so that we avoid naming conflicts.
1527
    variable = python_naming.variable_name(Identifier(f"the_{prop.name}"))
4✔
1528

1529
    writer = io.StringIO()
4✔
1530
    writer.write(f"{variable} = that.{prop_name}\n")
4✔
1531

1532
    conjunction = [
4✔
1533
        f"{variable}.{python_naming.property_name(prop.name)} is None"
1534
        for prop in our_type.properties
1535
    ]
1536

1537
    writer.write(
4✔
1538
        """\
1539
# We optimize for the case where all the optional properties are not set,
1540
# so that we can simply output an empty element.
1541
if (
1542
"""
1543
    )
1544
    for i, expr in enumerate(conjunction):
4✔
1545
        if i > 0:
4✔
1546
            writer.write(f"{II}and {indent_but_first_line(expr, II)}\n")
4✔
1547
        else:
1548
            writer.write(f"{II}{indent_but_first_line(expr, II)}\n")
4✔
1549

1550
    writer.write(
4✔
1551
        f"""\
1552
):
1553
{I}self._write_empty_element(
1554
{II}{xml_prop_literal}
1555
{I})
1556
else:
1557
{I}self._write_start_element({xml_prop_literal})
1558
{I}self.{write_cls_as_sequence}(
1559
{II}{variable}
1560
{I})
1561
{I}self._write_end_element({xml_prop_literal})"""
1562
    )
1563

1564
    return Stripped(writer.getvalue())
4✔
1565

1566

1567
def _generate_write_cls_as_sequence(cls: intermediate.ConcreteClass) -> Stripped:
4✔
1568
    """
1569
    Generate the method to serialize the ``cls`` as a sequence of XML elements.
1570

1571
    The elements correspond to the properties of the ``cls``.
1572

1573
    The generated method lives in the ``_Serializer`` class.
1574
    """
1575
    # fmt: off
1576
    assert (
4✔
1577
            sorted(
1578
                (arg.name, str(arg.type_annotation))
1579
                for arg in cls.constructor.arguments
1580
            ) == sorted(
1581
        (prop.name, str(prop.type_annotation))
1582
        for prop in cls.properties
1583
    )
1584
    ), (
1585
        "(mristin, 2022-10-14) We assume that the properties and constructor arguments "
1586
        "are identical at this point. If this is not the case, we have to re-write the "
1587
        "logic substantially! Please contact the developers if you see this."
1588
    )
1589
    # fmt: on
1590

1591
    # NOTE (mristin, 2022-10-14):
1592
    # We need to introduce a new loop variable for each loop since Python tracks
1593
    # the variables in the function scope instead of the block scope.
1594
    generator_for_loop_variables = python_common.GeneratorForLoopVariables()
4✔
1595

1596
    body_blocks = []  # type: List[Stripped]
4✔
1597

1598
    if len(cls.properties) == 0:
4✔
1599
        body_blocks.append(
×
1600
            Stripped(
1601
                """\
1602
# There are no properties specified for this class, so nothing can be written.
1603
return"""
1604
            )
1605
        )
1606
    else:
1607
        for prop in cls.properties:
4✔
1608
            prop_name = python_naming.property_name(prop.name)
4✔
1609
            xml_prop_literal = python_common.string_literal(
4✔
1610
                naming.xml_property(prop.name)
1611
            )
1612

1613
            type_anno = intermediate.beneath_optional(prop.type_annotation)
4✔
1614

1615
            primitive_type = intermediate.try_primitive_type(type_anno)
4✔
1616

1617
            write_prop: Stripped
1618

1619
            if primitive_type is not None:
4✔
1620
                write_method = _WRITE_METHOD_BY_PRIMITIVE_TYPE[primitive_type]
4✔
1621

1622
                write_prop = Stripped(
4✔
1623
                    f"""\
1624
self.{write_method}(
1625
{I}{xml_prop_literal},
1626
{I}that.{prop_name}
1627
)"""
1628
                )
1629
            else:
1630
                assert not isinstance(type_anno, intermediate.PrimitiveTypeAnnotation)
4✔
1631

1632
                if isinstance(type_anno, intermediate.OurTypeAnnotation):
4✔
1633
                    our_type = type_anno.our_type
4✔
1634
                    if isinstance(our_type, intermediate.Enumeration):
4✔
1635
                        write_prop = Stripped(
4✔
1636
                            f"""\
1637
self._write_str_property(
1638
{I}{xml_prop_literal},
1639
{I}that.{prop_name}.value
1640
)"""
1641
                        )
1642

1643
                    elif isinstance(our_type, intermediate.ConstrainedPrimitive):
4✔
1644
                        raise AssertionError("Expected to be handled before")
×
1645

1646
                    elif isinstance(
4✔
1647
                        our_type,
1648
                        (intermediate.AbstractClass, intermediate.ConcreteClass),
1649
                    ):
1650
                        if len(our_type.concrete_descendants) > 0:
4✔
1651
                            write_prop = Stripped(
4✔
1652
                                f"""\
1653
self._write_start_element({xml_prop_literal})
1654
self.visit(that.{prop_name})
1655
self._write_end_element({xml_prop_literal})"""
1656
                            )
1657
                        else:
1658
                            assert isinstance(our_type, intermediate.ConcreteClass), (
4✔
1659
                                f"Unexpected abstract class with no concrete "
1660
                                f"descendants: {our_type.name!r}"
1661
                            )
1662

1663
                            # NOTE (mristin, 2022-10-14):
1664
                            # We have to put the code in a separate function as it
1665
                            # became barely readable *this* indented.
1666
                            write_prop = (
4✔
1667
                                _generate_snippet_for_writing_concrete_cls_prop(
1668
                                    prop=prop
1669
                                )
1670
                            )
1671
                    else:
1672
                        assert_never(our_type)
×
1673

1674
                elif isinstance(type_anno, intermediate.ListTypeAnnotation):
4✔
1675
                    variable = next(generator_for_loop_variables)
4✔
1676

1677
                    if isinstance(
4✔
1678
                        type_anno.items, intermediate.OurTypeAnnotation
1679
                    ) and isinstance(
1680
                        type_anno.items.our_type,
1681
                        (intermediate.AbstractClass, intermediate.ConcreteClass),
1682
                    ):
1683
                        write_prop = Stripped(
4✔
1684
                            f"""\
1685
if len(that.{prop_name}) == 0:
1686
{I}self._write_empty_element({xml_prop_literal})
1687
else:
1688
{I}self._write_start_element({xml_prop_literal})
1689
{I}for {variable} in that.{prop_name}:
1690
{II}self.visit({variable})
1691
{I}self._write_end_element({xml_prop_literal})"""
1692
                        )
1693

NEW
1694
                    elif isinstance(
×
1695
                        type_anno.items, intermediate.PrimitiveTypeAnnotation
1696
                    ):
NEW
1697
                        write_method = _WRITE_METHOD_BY_PRIMITIVE_TYPE[
×
1698
                            type_anno.items.a_type
1699
                        ]
NEW
1700
                        write_prop = Stripped(
×
1701
                            f"""\
1702
if len(that.{prop_name}) == 0:
1703
{I}self._write_empty_element({xml_prop_literal})
1704
else:
1705
{I}self._write_start_element({xml_prop_literal})
1706
{I}for {variable} in that.{prop_name}:
1707
{II}self.{write_method}('v', {variable})
1708
{I}self._write_end_element({xml_prop_literal})"""
1709
                        )
1710

1711
                    else:
NEW
1712
                        raise NotImplementedError(
×
1713
                            f"We only handle lists of class instances and primitive values, "
1714
                            f"but you supplied the following type: {type_anno}. Please contact the developers "
1715
                            f"if you need this feature."
1716
                        )
1717

1718
                else:
1719
                    assert_never(type_anno)
×
1720

1721
            if isinstance(prop.type_annotation, intermediate.OptionalTypeAnnotation):
4✔
1722
                write_prop = Stripped(
4✔
1723
                    f"""\
1724
if that.{prop_name} is not None:
1725
{I}{indent_but_first_line(write_prop, I)}"""
1726
                )
1727

1728
            body_blocks.append(write_prop)
4✔
1729

1730
    cls_name = python_naming.class_name(cls.name)
4✔
1731
    function_name = python_naming.private_method_name(
4✔
1732
        Identifier(f"write_{cls.name}_as_sequence")
1733
    )
1734

1735
    writer = io.StringIO()
4✔
1736
    writer.write(
4✔
1737
        f"""\
1738
def {function_name}(
1739
{I}self,
1740
{I}that: aas_types.{cls_name}
1741
) -> None:
1742
{I}\"\"\"
1743
{I}Serialize :paramref:`that` to :py:attr:`~stream` as a sequence of
1744
{I}XML elements.
1745

1746
{I}Each element in the sequence corresponds to a property. If no properties
1747
{I}are set, nothing is written to the :py:attr:`~stream`.
1748

1749
{I}:param that: instance to be serialized
1750
{I}\"\"\"
1751
"""
1752
    )
1753

1754
    for i, body_block in enumerate(body_blocks):
4✔
1755
        if i > 0:
4✔
1756
            writer.write("\n\n")
4✔
1757
        writer.write(textwrap.indent(body_block, I))
4✔
1758

1759
    return Stripped(writer.getvalue())
4✔
1760

1761

1762
def _generate_visit_cls(cls: intermediate.ConcreteClass) -> Stripped:
4✔
1763
    """
1764
    Generate the method to serialize the ``cls`` as an XML element.
1765

1766
    The generated method lives in the ``_Serializer`` class.
1767
    """
1768
    # fmt: off
1769
    assert (
4✔
1770
            sorted(
1771
                (arg.name, str(arg.type_annotation))
1772
                for arg in cls.constructor.arguments
1773
            ) == sorted(
1774
        (prop.name, str(prop.type_annotation))
1775
        for prop in cls.properties
1776
    )
1777
    ), (
1778
        "(mristin, 2022-10-11) We assume that the properties and constructor arguments "
1779
        "are identical at this point. If this is not the case, we have to re-write the "
1780
        "logic substantially! Please contact the developers if you see this."
1781
    )
1782
    # fmt: on
1783

1784
    xml_cls_literal = python_common.string_literal(naming.xml_class_name(cls.name))
4✔
1785

1786
    body_blocks = []  # type: List[Stripped]
4✔
1787

1788
    if len(cls.properties) == 0:
4✔
1789
        body_blocks.append(
×
1790
            Stripped(
1791
                f"""\
1792
self._write_empty_element(
1793
{I}{xml_cls_literal}
1794
)"""
1795
            )
1796
        )
1797
    else:
1798
        write_cls_as_sequence = python_naming.private_method_name(
4✔
1799
            Identifier(f"write_{cls.name}_as_sequence")
1800
        )
1801

1802
        if _count_required_properties(cls) > 0:
4✔
1803
            body_blocks.append(
4✔
1804
                Stripped(
1805
                    f"""\
1806
self._write_start_element({xml_cls_literal})
1807
self.{write_cls_as_sequence}(
1808
{I}that
1809
)
1810
self._write_end_element({xml_cls_literal})"""
1811
                )
1812
            )
1813
        else:
1814
            # NOTE (mristin, 2022-10-14):
1815
            # We optimize for the case where all the optional properties are not set,
1816
            # so that we can simply output an empty element.
1817
            conjunction = [
4✔
1818
                f"that.{python_naming.property_name(prop.name)} is None"
1819
                for prop in cls.properties
1820
            ]
1821

1822
            if_empty_writer = io.StringIO()
4✔
1823
            if_empty_writer.write(
4✔
1824
                """\
1825
# We optimize for the case where all the optional properties are not set,
1826
# so that we can simply output an empty element.
1827
if (
1828
"""
1829
            )
1830
            for i, expr in enumerate(conjunction):
4✔
1831
                if i > 0:
4✔
1832
                    if_empty_writer.write(
4✔
1833
                        f"{II}and {indent_but_first_line(expr, II)}\n"
1834
                    )
1835
                else:
1836
                    if_empty_writer.write(f"{II}{indent_but_first_line(expr, II)}\n")
4✔
1837

1838
            if_empty_writer.write(
4✔
1839
                f"""\
1840
):
1841
{I}self._write_empty_element(
1842
{II}{xml_cls_literal}
1843
{I})
1844
else:
1845
{I}self._write_start_element({xml_cls_literal})
1846
{I}self.{write_cls_as_sequence}(
1847
{II}that
1848
{I})
1849
{I}self._write_end_element({xml_cls_literal})"""
1850
            )
1851

1852
            body_blocks.append(Stripped(if_empty_writer.getvalue()))
4✔
1853

1854
    cls_name = python_naming.class_name(cls.name)
4✔
1855
    visit_name = python_naming.method_name(Identifier(f"visit_{cls.name}"))
4✔
1856

1857
    writer = io.StringIO()
4✔
1858
    writer.write(
4✔
1859
        f"""\
1860
def {visit_name}(
1861
{I}self,
1862
{I}that: aas_types.{cls_name}
1863
) -> None:
1864
{I}\"\"\"
1865
{I}Serialize :paramref:`that` to :py:attr:`~stream` as an XML element.
1866

1867
{I}The enclosing XML element designates the class of the instance, where its
1868
{I}children correspond to the properties of the instance.
1869

1870
{I}:param that: instance to be serialized
1871
{I}\"\"\"
1872
"""
1873
    )
1874

1875
    for i, body_block in enumerate(body_blocks):
4✔
1876
        if i > 0:
4✔
1877
            writer.write("\n\n")
×
1878
        writer.write(textwrap.indent(body_block, I))
4✔
1879

1880
    return Stripped(writer.getvalue())
4✔
1881

1882

1883
# fmt: off
1884
@require(
4✔
1885
    lambda symbol_table:
1886
    '"' not in symbol_table.meta_model.xml_namespace,
1887
    "No single quotes expected in the XML namespace so that we can directly "
1888
    "write the namespace as-is"
1889
)
1890
# fmt: on
1891
def _generate_serializer(symbol_table: intermediate.SymbolTable) -> Stripped:
4✔
1892
    """Generate the serializer as a visitor which writes to a stream on visits."""
1893
    body_blocks = [
4✔
1894
        Stripped(
1895
            """\
1896
#: Stream to be written to when we visit the instances
1897
stream: Final[TextIO]"""
1898
        ),
1899
        Stripped(
1900
            f"""\
1901
#: Method pointer to be invoked for writing the start element with or without
1902
#: specifying a namespace (depending on the state of the serializer)
1903
_write_start_element: Callable[
1904
{I}[str],
1905
{I}None
1906
]"""
1907
        ),
1908
        Stripped(
1909
            f"""\
1910
#: Method pointer to be invoked for writing an empty element with or without
1911
#: specifying a namespace (depending on the state of the serializer)
1912
_write_empty_element: Callable[
1913
{I}[str],
1914
{I}None
1915
]"""
1916
        ),
1917
        Stripped(
1918
            """\
1919
# NOTE (mristin, 2022-10-14):
1920
# The serialization procedure is quite rigid. We leverage the specifics of
1921
# the serialization procedure to optimize the code a bit.
1922
#
1923
# Namely, we model the writing of the XML elements as a state machine.
1924
# The namespace is only specified for the very first element. All the subsequent
1925
# elements will *not* have the namespace specified. We implement that behavior by
1926
# using pointers to methods, as Python treats the methods as first-class citizens.
1927
#
1928
# The ``_write_start_element`` will point to
1929
# ``_write_first_start_element_with_namespace`` on the *first* invocation.
1930
# Afterwards, it will be redirected to ``_write_start_element_without_namespace``.
1931
#
1932
# Analogously for ``_write_empty_element``.
1933
#
1934
# Please see the implementation for the details, but this should give you at least
1935
# a rough overview."""
1936
        ),
1937
        Stripped(
1938
            f"""\
1939
def _write_first_start_element_with_namespace(
1940
{II}self,
1941
{II}name: str
1942
) -> None:
1943
{I}\"\"\"
1944
{I}Write the start element with the tag name :paramref:`name` and specify
1945
{I}its namespace.
1946

1947
{I}The :py:attr:`~_write_start_element` is set to
1948
{I}:py:meth:`~_write_start_element_without_namespace` after the first invocation
1949
{I}of this method.
1950

1951
{I}:param name: of the element tag. Expected to contain no XML special characters.
1952
{I}\"\"\"
1953
{I}self.stream.write(f'<{{name}} xmlns="{{NAMESPACE}}">')
1954

1955
{I}# NOTE (mristin, 2022-10-14):
1956
{I}# Any subsequence call to `_write_start_element` or `_write_empty_element`
1957
{I}# should not specify the namespace of the element as we specified now already
1958
{I}# specified it.
1959
{I}self._write_start_element = self._write_start_element_without_namespace
1960
{I}self._write_empty_element = self._write_empty_element_without_namespace"""
1961
        ),
1962
        Stripped(
1963
            f"""\
1964
def _write_start_element_without_namespace(
1965
{II}self,
1966
{II}name: str
1967
) -> None:
1968
{I}\"\"\"
1969
{I}Write the start element with the tag name :paramref:`name`.
1970

1971
{I}The first element, written *before* this one, is expected to have been
1972
{I}already written with the namespace specified.
1973

1974
{I}:param name: of the element tag. Expected to contain no XML special characters.
1975
{I}\"\"\"
1976
{I}self.stream.write(f'<{{name}}>')"""
1977
        ),
1978
        Stripped(
1979
            f"""\
1980
def _escape_and_write_text(
1981
{II}self,
1982
{II}text: str
1983
) -> None:
1984
{I}\"\"\"
1985
{I}Escape :paramref:`text` for XML and write it.
1986

1987
{I}:param text: to be escaped and written
1988
{I}\"\"\"
1989
{I}# NOTE (mristin, 2022-10-14):
1990
{I}# We ran ``timeit`` on manual code which escaped XML special characters with
1991
{I}# a dictionary, and on another snippet which called three ``.replace()``.
1992
{I}# The code with ``.replace()`` was an order of magnitude faster on our computers.
1993
{I}self.stream.write(
1994
{II}text.replace('&', '&amp;').replace('<', '&lt;').replace('>', '&gt;')
1995
{I})"""
1996
        ),
1997
        Stripped(
1998
            f"""\
1999
def _write_end_element(
2000
{II}self,
2001
{II}name: str
2002
) -> None:
2003
{I}\"\"\"
2004
{I}Write the end element with the tag name :paramref:`name`.
2005

2006
{I}:param name: of the element tag. Expected to contain no XML special characters.
2007
{I}\"\"\"
2008
{I}self.stream.write(f'</{{name}}>')"""
2009
        ),
2010
        Stripped(
2011
            f"""\
2012
def _write_first_empty_element_with_namespace(
2013
{II}self,
2014
{II}name: str
2015
) -> None:
2016
{I}\"\"\"
2017
{I}Write the first (and only) empty element with the tag name :paramref:`name`.
2018

2019
{I}No elements are expected to be written to the stream afterwards. The element
2020
{I}includes the namespace specification.
2021

2022
{I}:param name: of the element tag. Expected to contain no XML special characters.
2023
{I}\"\"\"
2024
{I}self.stream.write(f'<{{name}} xmlns="{{NAMESPACE}}"/>')
2025
{I}self._write_empty_element = self._rase_if_write_element_called_again
2026
{I}self._write_start_element = self._rase_if_write_element_called_again"""
2027
        ),
2028
        Stripped(
2029
            f"""\
2030
def _rase_if_write_element_called_again(
2031
{II}self,
2032
{II}name: str
2033
) -> None:
2034
{I}raise AssertionError(
2035
{II}f"We expected to call ``_write_first_empty_element_with_namespace`` "
2036
{II}f"only once. This is an unexpected second call for writing "
2037
{II}f"an (empty or non-empty) element with the tag name: {{name!r}}"
2038
{I})"""
2039
        ),
2040
        Stripped(
2041
            f"""\
2042
def _write_empty_element_without_namespace(
2043
{II}self,
2044
{II}name: str
2045
) -> None:
2046
{I}\"\"\"
2047
{I}Write the empty element with the tag name :paramref:`name`.
2048

2049
{I}The call to this method is expected to occur *after* the enclosing element with
2050
{I}a specified namespace has been written.
2051

2052
{I}:param name: of the element tag. Expected to contain no XML special characters.
2053
{I}\"\"\"
2054
{I}self.stream.write(f'<{{name}}/>')"""
2055
        ),
2056
        Stripped(
2057
            f"""\
2058
def _write_bool_property(
2059
{II}self,
2060
{II}name: str,
2061
{II}value: bool
2062
) -> None:
2063
{I}\"\"\"
2064
{I}Write the :paramref:`value` of a boolean property enclosed in
2065
{I}the :paramref:`name` element.
2066

2067
{I}:param name: of the corresponding element tag
2068
{I}:param value: of the property
2069
{I}\"\"\"
2070
{I}self._write_start_element(name)
2071
{I}self.stream.write('true' if value else 'false')
2072
{I}self._write_end_element(name)"""
2073
        ),
2074
        Stripped(
2075
            f"""\
2076
def _write_int_property(
2077
{II}self,
2078
{II}name: str,
2079
{II}value: int
2080
) -> None:
2081
{I}\"\"\"
2082
{I}Write the :paramref:`value` of an integer property enclosed in
2083
{I}the :paramref:`name` element.
2084

2085
{I}:param name: of the corresponding element tag
2086
{I}:param value: of the property
2087
{I}\"\"\"
2088
{I}self._write_start_element(name)
2089
{I}self.stream.write(str(value))
2090
{I}self._write_end_element(name)"""
2091
        ),
2092
        Stripped(
2093
            f"""\
2094
def _write_float_property(
2095
{II}self,
2096
{II}name: str,
2097
{II}value: float
2098
) -> None:
2099
{I}\"\"\"
2100
{I}Write the :paramref:`value` of a floating-point property enclosed in
2101
{I}the :paramref:`name` element.
2102

2103
{I}:param name: of the corresponding element tag
2104
{I}:param value: of the property
2105
{I}\"\"\"
2106
{I}self._write_start_element(name)
2107

2108
{I}if value == math.inf:
2109
{II}self.stream.write('INF')
2110
{I}elif value == -math.inf:
2111
{II}self.stream.write('-INF')
2112
{I}elif math.isnan(value):
2113
{II}self.stream.write('NaN')
2114
{I}elif value == 0:
2115
{II}if math.copysign(1.0, value) < 0.0:
2116
{III}self.stream.write('-0.0')
2117
{II}else:
2118
{III}self.stream.write('0.0')
2119
{I}else:
2120
{II}self.stream.write(str(value))"""
2121
        ),
2122
        Stripped(
2123
            f"""\
2124
def _write_str_property(
2125
{II}self,
2126
{II}name: str,
2127
{II}value: str
2128
) -> None:
2129
{I}\"\"\"
2130
{I}Write the :paramref:`value` of a string property enclosed in
2131
{I}the :paramref:`name` element.
2132

2133
{I}:param name: of the corresponding element tag
2134
{I}:param value: of the property
2135
{I}\"\"\"
2136
{I}self._write_start_element(name)
2137
{I}self._escape_and_write_text(value)
2138
{I}self._write_end_element(name)"""
2139
        ),
2140
        Stripped(
2141
            f"""\
2142
def _write_bytes_property(
2143
{II}self,
2144
{II}name: str,
2145
{II}value: bytes
2146
) -> None:
2147
{I}\"\"\"
2148
{I}Write the :paramref:`value` of a binary-content property enclosed in
2149
{I}the :paramref:`name` element.
2150

2151
{I}:param name: of the corresponding element tag
2152
{I}:param value: of the property
2153
{I}\"\"\"
2154
{I}self._write_start_element(name)
2155

2156
{I}# NOTE (mristin, 2022-10-14):
2157
{I}# We need to decode the result of the base64-encoding to ASCII since we are
2158
{I}# writing to an XML *text* stream. ``base64.b64encode(.)`` gives us bytes,
2159
{I}# not a string.
2160
{I}encoded = base64.b64encode(value).decode('ascii')
2161

2162
{I}# NOTE (mristin, 2022-10-14):
2163
{I}# Base64 alphabet excludes ``<``, ``>`` and ``&``, so we can directly
2164
{I}# write the ``encoded`` content to the stream as XML text.
2165
{I}#
2166
{I}# See: https://datatracker.ietf.org/doc/html/rfc4648#section-4
2167
{I}self.stream.write(encoded)
2168
{I}self._write_end_element(name)"""
2169
        ),
2170
        Stripped(
2171
            f"""\
2172
def __init__(
2173
{I}self,
2174
{I}stream: TextIO
2175
) -> None:
2176
{I}\"\"\"
2177
{I}Initialize the visitor to write to :paramref:`stream`.
2178

2179
{I}The first element will include the :py:attr:`~.NAMESPACE`. Every other
2180
{I}element will not have the namespace specified.
2181

2182
{I}:param stream: where to write to
2183
{I}\"\"\"
2184
{I}self.stream = stream
2185
{I}self._write_start_element = (
2186
{II}self._write_first_start_element_with_namespace
2187
{I})
2188
{I}self._write_empty_element = (
2189
{II}self._write_first_empty_element_with_namespace
2190
{I})"""
2191
        ),
2192
    ]
2193

2194
    for cls in symbol_table.concrete_classes:
4✔
2195
        body_blocks.append(_generate_write_cls_as_sequence(cls=cls))
4✔
2196
        body_blocks.append(_generate_visit_cls(cls=cls))
4✔
2197

2198
    writer = io.StringIO()
4✔
2199
    writer.write(
4✔
2200
        Stripped(
2201
            f"""\
2202
class _Serializer(aas_types.AbstractVisitor):
2203
{I}\"\"\"Encode instances as XML and write them to :py:attr:`~stream`.\"\"\""""
2204
        )
2205
    )
2206

2207
    for body_block in body_blocks:
4✔
2208
        writer.write("\n\n")
4✔
2209
        writer.write(textwrap.indent(body_block, I))
4✔
2210

2211
    return Stripped(writer.getvalue())
4✔
2212

2213

2214
def _generate_write_to_stream(
4✔
2215
    symbol_table: intermediate.SymbolTable,
2216
    aas_module: python_common.QualifiedModuleName,
2217
) -> Stripped:
2218
    """Generate the function to write an instance as XML to a stream."""
2219
    docstring_blocks = [
4✔
2220
        Stripped(
2221
            """\
2222
Write the XML representation of :paramref:`instance` to :paramref:`stream`."""
2223
        )
2224
    ]
2225

2226
    first_cls = (
4✔
2227
        symbol_table.concrete_classes[0]
2228
        if len(symbol_table.concrete_classes) > 0
2229
        else None
2230
    )
2231

2232
    if first_cls is not None:
4✔
2233
        first_cls_name = python_naming.class_name(first_cls.name)
4✔
2234

2235
        docstring_blocks.append(
2236
            Stripped(
2237
                f"""\
2238
Example usage:
2239

2240
.. code-block::
2241

2242
    import pathlib
2243

2244
    import {aas_module}.types as aas_types
2245
    import {aas_module}.xmlization as aas_xmlization
2246

2247
    instance = {first_cls_name}(
2248
       ... # some constructor arguments
2249
    )
2250

2251
    pth = pathlib.Path(...)
2252
    with pth.open("wt") as fid:
2253
        aas_xmlization.write(instance, fid)"""
2254
            )
2255
        )
2256

2257
    docstring_blocks.append(
4✔
2258
        Stripped(
2259
            """\
2260
:param instance: to be serialized
2261
:param stream: to write to"""
2262
        )
2263
    )
2264

2265
    escaped_text = "\n\n".join(docstring_blocks).replace('"""', '\\"\\"\\"')
4✔
2266
    docstring = Stripped(
4✔
2267
        f"""\
2268
\"\"\"
2269
{escaped_text}
2270
\"\"\""""
2271
    )
2272

2273
    return Stripped(
4✔
2274
        f"""\
2275
def write(instance: aas_types.Class, stream: TextIO) -> None:
2276
{I}{indent_but_first_line(docstring, I)}
2277
{I}serializer = _Serializer(stream)
2278
{I}serializer.visit(instance)"""
2279
    )
2280

2281

2282
# fmt: off
2283
@ensure(lambda result: (result[0] is not None) ^ (result[1] is not None))
4✔
2284
@ensure(
4✔
2285
    lambda result:
2286
    not (result[0] is not None) or result[0].endswith('\n'),
2287
    "Trailing newline mandatory for valid end-of-files"
2288
)
2289
# fmt: on
2290
def generate(
4✔
2291
    symbol_table: intermediate.SymbolTable,
2292
    aas_module: python_common.QualifiedModuleName,
2293
    spec_impls: specific_implementations.SpecificImplementations,
2294
) -> Tuple[Optional[str], Optional[List[Error]]]:
2295
    """
2296
    Generate the Python code for the general XML de/serialization.
2297

2298
    The ``aas_module`` indicates the fully-qualified name of the base module.
2299
    """
2300
    xml_namespace_literal = python_common.string_literal(
4✔
2301
        symbol_table.meta_model.xml_namespace
2302
    )
2303

2304
    blocks = [
2305
        _generate_module_docstring(symbol_table=symbol_table, aas_module=aas_module),
2306
        python_common.WARNING,
2307
        # pylint: disable=line-too-long
2308
        Stripped(
2309
            f"""\
2310
import base64
2311
import io
2312
import math
2313
import os
2314
import sys
2315
from typing import (
2316
{I}Any,
2317
{I}Callable,
2318
{I}Iterator,
2319
{I}List,
2320
{I}Mapping,
2321
{I}Optional,
2322
{I}Sequence,
2323
{I}TextIO,
2324
{I}Tuple,
2325
{I}Union,
2326
{I}TYPE_CHECKING
2327
)
2328
import xml.etree.ElementTree
2329

2330
if sys.version_info >= (3, 8):
2331
{I}from typing import (
2332
{II}Final,
2333
{II}Protocol
2334
{I})
2335
else:
2336
{I}from typing_extensions import (
2337
{II}Final,
2338
{II}Protocol
2339
{I})
2340

2341
import {aas_module}.stringification as aas_stringification
2342
import {aas_module}.types as aas_types
2343

2344
# See: https://stackoverflow.com/questions/55076778/why-isnt-this-function-type-annotated-correctly-error-missing-type-parameters
2345
if TYPE_CHECKING:
2346
    PathLike = os.PathLike[Any]
2347
else:
2348
    PathLike = os.PathLike"""
2349
        ),
2350
        Stripped(
2351
            f"""\
2352
#: XML namespace in which all the elements are expected to reside
2353
NAMESPACE = {xml_namespace_literal}"""
2354
        ),
2355
        Stripped("# region De-serialization"),
2356
        Stripped(
2357
            """\
2358
#: XML namespace as a prefix specially tailored for
2359
#: :py:mod:`xml.etree.ElementTree`
2360
_NAMESPACE_IN_CURLY_BRACKETS = f'{{{NAMESPACE}}}'"""
2361
        ),
2362
        Stripped(
2363
            f"""\
2364
class Element(Protocol):
2365
{I}\"\"\"Behave like :py:meth:`xml.etree.ElementTree.Element`.\"\"\"
2366

2367
{I}@property
2368
{I}def attrib(self) -> Optional[Mapping[str, str]]:
2369
{II}\"\"\"Attributes of the element\"\"\"
2370
{II}raise NotImplementedError()
2371

2372
{I}@property
2373
{I}def text(self) -> Optional[str]:
2374
{II}\"\"\"Text content of the element\"\"\"
2375
{II}raise NotImplementedError()
2376

2377
{I}@property
2378
{I}def tail(self) -> Optional[str]:
2379
{II}\"\"\"Tail text of the element\"\"\"
2380
{II}raise NotImplementedError()
2381

2382
{I}@property
2383
{I}def tag(self) -> str:
2384
{II}\"\"\"Tag of the element; with a namespace provided as a ``{{...}}`` prefix\"\"\"
2385
{II}raise NotImplementedError()
2386

2387
{I}def clear(self) -> None:
2388
{II}\"\"\"Behave like :py:meth:`xml.etree.ElementTree.Element.clear`.\"\"\"
2389
{II}raise NotImplementedError()"""
2390
        ),
2391
        # pylint: disable=line-too-long
2392
        Stripped(
2393
            f"""\
2394
class HasIterparse(Protocol):
2395
{I}\"\"\"Parse an XML document incrementally.\"\"\"
2396

2397
{I}# NOTE (mristin, 2022-10-26):
2398
{I}# ``self`` is not used in this context, but is necessary for Mypy,
2399
{I}# see: https://github.com/python/mypy/issues/5018 and
2400
{I}# https://github.com/python/mypy/commit/3efbc5c5e910296a60ed5b9e0e7eb11dd912c3ed#diff-e165eb7aed9dca0a5ebd93985c8cd263a6462d36ac185f9461348dc5a1396d76R9937
2401

2402
{I}def iterparse(
2403
{III}self,
2404
{III}source: TextIO,
2405
{III}events: Optional[Sequence[str]] = None
2406
{I}) -> Iterator[Tuple[str, Element]]:
2407
{II}\"\"\"Behave like :py:func:`xml.etree.ElementTree.iterparse`.\"\"\""""
2408
        ),
2409
        Stripped(
2410
            f"""\
2411
class ElementSegment:
2412
{I}\"\"\"Represent an element on a path to the erroneous value.\"\"\"
2413
{I}#: Erroneous element
2414
{I}element: Final[Element]
2415

2416
{I}def __init__(
2417
{III}self,
2418
{III}element: Element
2419
{I}) -> None:
2420
{II}\"\"\"Initialize with the given values.\"\"\"
2421
{II}self.element = element
2422

2423
{I}def __str__(self) -> str:
2424
{II}\"\"\"
2425
{II}Render the segment as a tag without the namespace.
2426

2427
{II}We deliberately omit the namespace in the tag names. If you want to actually
2428
{II}query with the resulting XPath, you have to insert the namespaces manually.
2429
{II}We did not know how to include the namespace in a meaningful way, as XPath
2430
{II}assumes namespace prefixes to be defined *outside* of the document. At least
2431
{II}the path thus rendered is informative, and you should be able to descend it
2432
{II}manually.
2433
{II}\"\"\"
2434
{II}_, has_namespace, tag_wo_ns = self.element.tag.rpartition('}}')
2435
{II}if not has_namespace:
2436
{III}return self.element.tag
2437
{II}else:
2438
{III}return tag_wo_ns"""
2439
        ),
2440
        Stripped(
2441
            f"""\
2442
class IndexSegment:
2443
{I}\"\"\"Represent an element in a sequence on a path to the erroneous value.\"\"\"
2444
{I}#: Erroneous element
2445
{I}element: Final[Element]
2446

2447
{I}#: Index of the element in the sequence
2448
{I}index: Final[int]
2449

2450
{I}def __init__(
2451
{III}self,
2452
{III}element: Element,
2453
{III}index: int
2454
{I}) -> None:
2455
{II}\"\"\"Initialize with the given values.\"\"\"
2456
{II}self.element = element
2457
{II}self.index = index
2458

2459
{I}def __str__(self) -> str:
2460
{II}\"\"\"Render the segment as an element wildcard with the index.\"\"\"
2461
{II}return f'*[{{self.index}}]'"""
2462
        ),
2463
        Stripped(
2464
            """\
2465
Segment = Union[ElementSegment, IndexSegment]"""
2466
        ),
2467
        Stripped(
2468
            f"""\
2469
class Path:
2470
{I}\"\"\"Represent the relative path to the erroneous element.\"\"\"
2471

2472
{I}def __init__(self) -> None:
2473
{II}\"\"\"Initialize as an empty path.\"\"\"
2474
{II}self._segments = []  # type: List[Segment]
2475

2476
{I}@property
2477
{I}def segments(self) -> Sequence[Segment]:
2478
{II}\"\"\"Get the segments of the path.\"\"\"
2479
{II}return self._segments
2480

2481
{I}def _prepend(self, segment: Segment) -> None:
2482
{II}\"\"\"Insert the :paramref:`segment` in front of other segments.\"\"\"
2483
{II}self._segments.insert(0, segment)
2484

2485
{I}def __str__(self) -> str:
2486
{II}\"\"\"Render the path as a relative XPath.
2487

2488
{II}We omit the leading ``/`` so that you can easily prefix it as you need.
2489
{II}\"\"\"
2490
{II}return "/".join(str(segment) for segment in self._segments)"""
2491
        ),
2492
        Stripped(
2493
            f"""\
2494
class DeserializationException(Exception):
2495
{I}\"\"\"Signal that the XML de-serialization could not be performed.\"\"\"
2496

2497
{I}#: Human-readable explanation of the exception's cause
2498
{I}cause: Final[str]
2499

2500
{I}#: Relative path to the erroneous value
2501
{I}path: Final[Path]
2502

2503
{I}def __init__(
2504
{III}self,
2505
{III}cause: str
2506
{I}) -> None:
2507
{II}\"\"\"Initialize with the given :paramref:`cause` and an empty path.\"\"\"
2508
{II}self.cause = cause
2509
{II}self.path = Path()"""
2510
        ),
2511
        Stripped(
2512
            f"""\
2513
def _with_elements_cleared_after_yield(
2514
{II}iterator: Iterator[Tuple[str, Element]]
2515
) -> Iterator[Tuple[str, Element]]:
2516
{I}\"\"\"
2517
{I}Map the :paramref:`iterator` such that the element is ``clear()``'ed
2518
{I}*after* every ``yield``.
2519

2520
{I}:param iterator: to be mapped
2521
{I}:yield: event and element from :paramref:`iterator`
2522
{I}\"\"\"
2523
{I}for event, element in iterator:
2524
{II}yield event, element
2525
{II}element.clear()"""
2526
        ),
2527
    ]  # type: List[Stripped]
2528

2529
    errors = []  # type: List[Error]
4✔
2530

2531
    # NOTE (mristin, 2022-10-08):
2532
    # We generate first the public methods so that the reader can jump straight
2533
    # to the most important part of the code.
2534
    for cls in symbol_table.classes:
4✔
2535
        blocks.append(_generate_read_cls_from_iterparse(cls=cls, aas_module=aas_module))
4✔
2536

2537
        blocks.append(_generate_read_cls_from_stream(cls=cls, aas_module=aas_module))
4✔
2538

2539
        blocks.append(_generate_read_cls_from_file(cls=cls, aas_module=aas_module))
4✔
2540

2541
        blocks.append(_generate_read_cls_from_str(cls=cls, aas_module=aas_module))
4✔
2542

2543
    blocks.extend(
4✔
2544
        [
2545
            _generate_read_from_iterparse(aas_module=aas_module),
2546
            _generate_read_from_stream(aas_module=aas_module),
2547
            _generate_read_from_file(aas_module=aas_module),
2548
            _generate_read_from_str(aas_module=aas_module),
2549
        ]
2550
    )
2551

2552
    blocks.extend(
4✔
2553
        [
2554
            Stripped(
2555
                """\
2556
# NOTE (mristin, 2022-10-08):
2557
# Directly using the iterator turned out to result in very complex function
2558
# designs. The design became much simpler as soon as we considered one look-ahead
2559
# element. We came up finally with the following pattern which all the protected
2560
# reading functions below roughly follow:
2561
#
2562
# ..code-block::
2563
#
2564
#    _read_*(
2565
#       look-ahead element,
2566
#       iterator
2567
#    ) -> result
2568
#
2569
# The reading functions all read from the ``iterator`` coming from
2570
# :py:func:`xml.etree.ElementTree.iterparse` with the argument
2571
# ``events=["start", "end"]``. The exception :py:class:`.DeserializationException`
2572
# is raised in case of unexpected input.
2573
#
2574
# The reading functions are responsible to read the end element corresponding to the
2575
# start look-ahead element.
2576
#
2577
# When it comes to error reporting, we use exceptions. The exceptions are raised in
2578
# the *callee*, as usual. However, the context of the exception, such as the error path,
2579
# is added in the *caller*, as only the caller knows the context of
2580
# the lookahead-element. In particular, prepending the path segment corresponding to
2581
# the lookahead-element is the responsibility of the *caller*, and not of
2582
# the *callee*."""
2583
            ),
2584
            Stripped(
2585
                f"""\
2586
def _parse_element_tag(element: Element) -> str:
2587
{I}\"\"\"
2588
{I}Extract the tag name without the namespace prefix from :paramref:`element`.
2589

2590
{I}:param element: whose tag without namespace we want to extract
2591
{I}:return: tag name without the namespace prefix
2592
{I}:raise: :py:class:`DeserializationException` if unexpected :paramref:`element`
2593
{I}\"\"\"
2594
{I}if not element.tag.startswith(_NAMESPACE_IN_CURLY_BRACKETS):
2595
{II}namespace, got_namespace, tag_wo_ns = (
2596
{III}element.tag.rpartition('}}')
2597
{II})
2598
{II}if got_namespace:
2599
{III}if namespace.startswith('{{'):
2600
{IIII}namespace = namespace[1:]
2601

2602
{III}raise DeserializationException(
2603
{IIII}f"Expected the element in the namespace {{NAMESPACE!r}}, "
2604
{IIII}f"but got the element {{tag_wo_ns!r}} in the namespace {{namespace!r}}"
2605
{III})
2606
{II}else:
2607
{III}raise DeserializationException(
2608
{IIII}f"Expected the element in the namespace {{NAMESPACE!r}}, "
2609
{IIII}f"but got the element {{tag_wo_ns!r}} without the namespace prefix"
2610
{III})
2611

2612
{I}return element.tag[len(_NAMESPACE_IN_CURLY_BRACKETS):]"""
2613
            ),
2614
            Stripped(
2615
                f"""\
2616
def _raise_if_has_tail_or_attrib(
2617
{II}element: Element
2618
) -> None:
2619
{I}\"\"\"
2620
{I}Check that :paramref:`element` has no trailing text and no attributes.
2621

2622
{I}:param element: to be verified
2623
{I}:raise:
2624
{II}:py:class:`.DeserializationException` if trailing text or attributes;
2625
{II}conforming to the convention about handling error paths,
2626
{II}the exception path is left empty.
2627
{I}\"\"\"
2628
{I}if element.tail is not None and len(element.tail.strip()) != 0:
2629
{II}raise DeserializationException(
2630
{III}f"Expected no trailing text, but got: {{element.tail!r}}"
2631
{II})
2632

2633
{I}if element.attrib is not None and len(element.attrib) > 0:
2634
{II}raise DeserializationException(
2635
{III}f"Expected no attributes, but got: {{element.attrib}}"
2636
{II})"""
2637
            ),
2638
            Stripped(
2639
                f"""\
2640
def _read_end_element(
2641
{II}element: Element,
2642
{II}iterator: Iterator[Tuple[str, Element]]
2643
) -> Element:
2644
{I}\"\"\"
2645
{I}Read the end element corresponding to the start :paramref:`element`
2646
{I}from :paramref:`iterator`.
2647

2648
{I}:param element: corresponding start element
2649
{I}:param iterator:
2650
{II}Input stream of ``(event, element)`` coming from
2651
{II}:py:func:`xml.etree.ElementTree.iterparse` with the argument
2652
{II}``events=["start", "end"]``
2653
{I}:raise: :py:class:`DeserializationException` if unexpected input
2654
{I}\"\"\"
2655
{I}next_event_element = next(iterator, None)
2656
{I}if next_event_element is None:
2657
{II}raise DeserializationException(
2658
{III}f"Expected the end element for {{element.tag}}, "
2659
{III}f"but got the end-of-input"
2660
{II})
2661

2662
{I}next_event, next_element = next_event_element
2663
{I}if next_event != "end" or next_element.tag != element.tag:
2664
{II}raise DeserializationException(
2665
{III}f"Expected the end element for {{element.tag!r}}, "
2666
{III}f"but got the event {{next_event!r}} and element {{next_element.tag!r}}"
2667
{II})
2668

2669
{I}_raise_if_has_tail_or_attrib(next_element)
2670

2671
{I}return next_element"""
2672
            ),
2673
            Stripped(
2674
                f"""\
2675
def _read_text_from_element(
2676
{I}element: Element,
2677
{I}iterator: Iterator[Tuple[str, Element]]
2678
) -> str:
2679
{I}\"\"\"
2680
{I}Extract the text from the :paramref:`element`, and read
2681
{I}the end element from :paramref:`iterator`.
2682

2683
{I}The :paramref:`element` is expected to contain text. Otherwise,
2684
{I}it is considered as unexpected input.
2685

2686
{I}:param element: start element enclosing the text
2687
{I}:param iterator:
2688
{II}Input stream of ``(event, element)`` coming from
2689
{II}:py:func:`xml.etree.ElementTree.iterparse` with the argument
2690
{II}``events=["start", "end"]``
2691
{I}:raise: :py:class:`DeserializationException` if unexpected input
2692
{I}\"\"\"
2693
{I}_raise_if_has_tail_or_attrib(element)
2694

2695
{I}text = element.text
2696

2697
{I}end_element = _read_end_element(
2698
{II}element,
2699
{II}iterator,
2700
{I})
2701

2702
{I}if text is None:
2703
{II}if end_element.text is None:
2704
{III}raise DeserializationException(
2705
{IIII}"Expected an element with text, but got an element with no text."
2706
{III})
2707

2708
{II}text = end_element.text
2709

2710
{I}return text"""
2711
            ),
2712
            Stripped(
2713
                f"""\
2714
_XS_BOOLEAN_LITERAL_SET = {{
2715
{I}"1",
2716
{I}"true",
2717
{I}"0",
2718
{I}"false",
2719
}}"""
2720
            ),
2721
            Stripped(
2722
                f"""\
2723
def _read_bool_from_element_text(
2724
{I}element: Element,
2725
{I}iterator: Iterator[Tuple[str, Element]]
2726
) -> bool:
2727
{I}\"\"\"
2728
{I}Parse the text of :paramref:`element` as a boolean, and
2729
{I}read the corresponding end element from :paramref:`iterator`.
2730

2731
{I}:param element: start element
2732
{I}:param iterator:
2733
{II}Input stream of ``(event, element)`` coming from
2734
{II}:py:func:`xml.etree.ElementTree.iterparse` with the argument
2735
{II}``events=["start", "end"]``
2736
{I}:raise: :py:class:`DeserializationException` if unexpected input
2737
{I}:return: parsed value
2738
{I}\"\"\"
2739
{I}text = _read_text_from_element(
2740
{II}element,
2741
{II}iterator
2742
{I})
2743

2744
{I}if text not in _XS_BOOLEAN_LITERAL_SET:
2745
{II}raise DeserializationException(
2746
{III}f"Expected a boolean, "
2747
{III}f"but got an element with text: {{text!r}}"
2748
{II})
2749

2750
{I}return text in ('1', 'true')"""
2751
            ),
2752
            Stripped(
2753
                f"""\
2754
def _read_int_from_element_text(
2755
{I}element: Element,
2756
{I}iterator: Iterator[Tuple[str, Element]]
2757
) -> int:
2758
{I}\"\"\"
2759
{I}Parse the text of :paramref:`element` as an integer, and
2760
{I}read the corresponding end element from :paramref:`iterator`.
2761

2762
{I}:param element: start element
2763
{I}:param iterator:
2764
{II}Input stream of ``(event, element)`` coming from
2765
{II}:py:func:`xml.etree.ElementTree.iterparse` with the argument
2766
{II}``events=["start", "end"]``
2767
{I}:raise: :py:class:`DeserializationException` if unexpected input
2768
{I}:return: parsed value
2769
{I}\"\"\"
2770
{I}text = _read_text_from_element(
2771
{II}element,
2772
{II}iterator
2773
{I})
2774

2775
{I}try:
2776
{II}value = int(text)
2777
{I}except ValueError:
2778
{II}# pylint: disable=raise-missing-from
2779
{II}raise DeserializationException(
2780
{III}f"Expected an integer, "
2781
{III}f"but got an element with text: {{text!r}}"
2782
{II})
2783

2784
{I}return value"""
2785
            ),
2786
            Stripped(
2787
                f"""\
2788
_TEXT_TO_XS_DOUBLE_LITERALS = {{
2789
{I}"NaN": math.nan,
2790
{I}"INF": math.inf,
2791
{I}"-INF": -math.inf,
2792
}}"""
2793
            ),
2794
            Stripped(
2795
                f"""\
2796
def _read_float_from_element_text(
2797
{I}element: Element,
2798
{I}iterator: Iterator[Tuple[str, Element]]
2799
) -> float:
2800
{I}\"\"\"
2801
{I}Parse the text of :paramref:`element` as a floating-point number, and
2802
{I}read the corresponding end element from :paramref:`iterator`.
2803

2804
{I}:param element: start element
2805
{I}:param iterator:
2806
{II}Input stream of ``(event, element)`` coming from
2807
{II}:py:func:`xml.etree.ElementTree.iterparse` with the argument
2808
{II}``events=["start", "end"]``
2809
{I}:raise: :py:class:`DeserializationException` if unexpected input
2810
{I}:return: parsed value
2811
{I}\"\"\"
2812
{I}text = _read_text_from_element(
2813
{II}element,
2814
{II}iterator
2815
{I})
2816

2817
{I}value = _TEXT_TO_XS_DOUBLE_LITERALS.get(text, None)
2818
{I}if value is None:
2819
{II}try:
2820
{III}value = float(text)
2821
{II}except ValueError:
2822
{III}# pylint: disable=raise-missing-from
2823
{III}raise DeserializationException(
2824
{IIII}f"Expected a floating-point number, "
2825
{IIII}f"but got an element with text: {{text!r}}"
2826
{III})
2827

2828
{I}return value"""
2829
            ),
2830
            Stripped(
2831
                f"""\
2832
def _read_str_from_element_text(
2833
{I}element: Element,
2834
{I}iterator: Iterator[Tuple[str, Element]]
2835
) -> str:
2836
{I}\"\"\"
2837
{I}Parse the text of :paramref:`element` as a string, and
2838
{I}read the corresponding end element from :paramref:`iterator`.
2839

2840
{I}If there is no text, empty string is returned.
2841

2842
{I}:param element: start element
2843
{I}:param iterator:
2844
{II}Input stream of ``(event, element)`` coming from
2845
{II}:py:func:`xml.etree.ElementTree.iterparse` with the argument
2846
{II}``events=["start", "end"]``
2847
{I}:raise: :py:class:`DeserializationException` if unexpected input
2848
{I}:return: parsed value
2849
{I}\"\"\"
2850
{I}# NOTE (mristin, 2022-10-26):
2851
{I}# We do not use ``_read_text_from_element`` as that function expects
2852
{I}# the ``element`` to contain *some* text. In contrast, this function
2853
{I}# can also deal with empty text, in which case it returns an empty string.
2854

2855
{I}text = element.text
2856

2857
{I}end_element = _read_end_element(
2858
{II}element,
2859
{II}iterator
2860
{I})
2861

2862
{I}if text is None:
2863
{II}text = end_element.text
2864

2865
{I}_raise_if_has_tail_or_attrib(element)
2866
{I}result = (
2867
{II}text
2868
{II}if text is not None
2869
{II}else ""
2870
{I})
2871

2872
{I}return result"""
2873
            ),
2874
            Stripped(
2875
                f"""\
2876
def _read_bytes_from_element_text(
2877
{I}element: Element,
2878
{I}iterator: Iterator[Tuple[str, Element]]
2879
) -> bytes:
2880
{I}\"\"\"
2881
{I}Parse the text of :paramref:`element` as base64-encoded bytes, and
2882
{I}read the corresponding end element from :paramref:`iterator`.
2883

2884
{I}:param element: look-ahead element
2885
{I}:param iterator:
2886
{II}Input stream of ``(event, element)`` coming from
2887
{II}:py:func:`xml.etree.ElementTree.iterparse` with the argument
2888
{II}``events=["start", "end"]``
2889
{I}:raise: :py:class:`DeserializationException` if unexpected input
2890
{I}:return: parsed value
2891
{I}\"\"\"
2892
{I}text = _read_text_from_element(
2893
{II}element,
2894
{II}iterator
2895
{I})
2896

2897
{I}try:
2898
{II}value = base64.b64decode(text)
2899
{I}except Exception:
2900
{II}# pylint: disable=raise-missing-from
2901
{II}raise DeserializationException(
2902
{III}f"Expected a text as base64-encoded bytes, "
2903
{III}f"but got an element with text: {{text!r}}"
2904
{II})
2905

2906
{I}return value"""
2907
            ),
2908
        ]
2909
    )
2910

2911
    for our_type in symbol_table.our_types:
4✔
2912
        if isinstance(our_type, intermediate.Enumeration):
4✔
2913
            blocks.append(_generate_read_enum_from_element_text(enumeration=our_type))
4✔
2914
        elif isinstance(our_type, intermediate.ConstrainedPrimitive):
4✔
2915
            continue
4✔
2916
        elif isinstance(our_type, intermediate.AbstractClass):
4✔
2917
            blocks.append(_generate_read_cls_as_element(cls=our_type))
4✔
2918

2919
        elif isinstance(our_type, intermediate.ConcreteClass):
4✔
2920
            if our_type.is_implementation_specific:
4✔
2921
                implementation_key = specific_implementations.ImplementationKey(
×
2922
                    f"Xmlization/read_{our_type.name}.py"
2923
                )
2924

2925
                implementation = spec_impls.get(implementation_key, None)
×
2926
                if implementation is None:
×
2927
                    errors.append(
×
2928
                        Error(
2929
                            our_type.parsed.node,
2930
                            f"The xmlization snippet is missing "
2931
                            f"for the implementation-specific "
2932
                            f"class {our_type.name}: {implementation_key}",
2933
                        )
2934
                    )
2935
                    continue
×
2936
            else:
2937
                blocks.extend(
4✔
2938
                    [
2939
                        _generate_reader_and_setter(cls=our_type),
2940
                        _generate_read_as_sequence(cls=our_type),
2941
                    ]
2942
                )
2943

2944
                blocks.append(_generate_read_cls_as_element(cls=our_type))
4✔
2945

2946
        else:
2947
            assert_never(our_type)
×
2948

2949
    blocks.append(_generate_general_read_as_element(symbol_table=symbol_table))
4✔
2950

2951
    for cls in symbol_table.classes:
4✔
2952
        if isinstance(cls, intermediate.AbstractClass):
4✔
2953
            blocks.append(_generate_dispatch_map_for_class(cls=cls))
4✔
2954
        elif isinstance(cls, intermediate.ConcreteClass):
4✔
2955
            if len(cls.concrete_descendants) > 0:
4✔
2956
                blocks.append(_generate_dispatch_map_for_class(cls=cls))
4✔
2957

2958
            if not cls.is_implementation_specific:
4✔
2959
                blocks.append(_generate_reader_and_setter_map(cls=cls))
4✔
2960

2961
        else:
2962
            assert_never(cls)
×
2963

2964
    blocks.append(_generate_general_dispatch_map(symbol_table=symbol_table))
4✔
2965

2966
    blocks.append(Stripped("# endregion"))
4✔
2967

2968
    blocks.append(Stripped("# region Serialization"))
4✔
2969

2970
    blocks.append(_generate_serializer(symbol_table=symbol_table))
4✔
2971

2972
    blocks.append(
4✔
2973
        _generate_write_to_stream(symbol_table=symbol_table, aas_module=aas_module)
2974
    )
2975

2976
    blocks.append(
4✔
2977
        Stripped(
2978
            f"""\
2979
def to_str(that: aas_types.Class) -> str:
2980
{I}\"\"\"
2981
{I}Serialize :paramref:`that` to an XML-encoded text.
2982

2983
{I}:param that: instance to be serialized
2984
{I}:return: :paramref:`that` serialized to XML serialized to text
2985
{I}\"\"\"
2986
{I}writer = io.StringIO()
2987
{I}write(that, writer)
2988
{I}return writer.getvalue()"""
2989
        )
2990
    )
2991

2992
    blocks.append(Stripped("# endregion"))
4✔
2993

2994
    writer = io.StringIO()
4✔
2995
    for i, block in enumerate(blocks):
4✔
2996
        if i > 0:
4✔
2997
            writer.write("\n\n\n")
4✔
2998

2999
        writer.write(block)
4✔
3000

3001
    writer.write("\n")
4✔
3002

3003
    return writer.getvalue(), None
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc