• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

aas-core-works / aas-core-codegen / 23051946462

13 Mar 2026 12:59PM UTC coverage: 83.657% (+1.2%) from 82.428%
23051946462

push

github

web-flow
Turn on coveralls (#596)

The site coveralls.io was down so we had to turn off the coverage
upload temporarily.

30831 of 36854 relevant lines covered (83.66%)

3.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.44
/aas_core_codegen/python/lib/_generate_xmlization.py
1
"""Generate Python code for XML-ization based on the intermediate representation."""
2

3
import io
4✔
4
import textwrap
4✔
5
from typing import Tuple, Optional, List, Union
4✔
6

7
from icontract import ensure, require
4✔
8

9
from aas_core_codegen import intermediate, specific_implementations, naming
4✔
10
from aas_core_codegen.common import (
4✔
11
    Error,
12
    Stripped,
13
    assert_never,
14
    Identifier,
15
    indent_but_first_line,
16
)
17
from aas_core_codegen.python import common as python_common, naming as python_naming
4✔
18
from aas_core_codegen.python.common import (
4✔
19
    INDENT as I,
20
    INDENT2 as II,
21
    INDENT3 as III,
22
    INDENT4 as IIII,
23
)
24

25

26
def _generate_module_docstring(
4✔
27
    symbol_table: intermediate.SymbolTable,
28
    qualified_module_name: python_common.QualifiedModuleName,
29
) -> Stripped:
30
    """Generate the docstring of the whole module."""
31
    first_cls = (
4✔
32
        symbol_table.concrete_classes[0]
33
        if len(symbol_table.concrete_classes) > 0
34
        else None
35
    )
36

37
    docstring_blocks = [
4✔
38
        Stripped(
39
            f"""\
40
Read and write AAS models as XML.
41

42
For reading, we provide different reading functions, each handling a different kind
43
of input. All the reading functions operate in one pass, *i.e.*, the source is read
44
incrementally and the complete XML is not held in memory.
45

46
We provide the following four reading functions (where ``X`` represents the name of
47
the class):
48

49
1) ``X_from_iterparse`` reads from a stream of ``(event, element)`` tuples coming from
50
   :py:func:`xml.etree.ElementTree.iterparse` with the argument
51
   ``events=["start", "end"]``. If you do not trust the source, please consider
52
   using `defusedxml.ElementTree`_.
53
2) ``X_from_stream`` reads from the given text stream.
54
3) ``X_from_file`` reads from a file on disk.
55
4) ``X_from_str`` reads from the given string.
56

57
The functions ``X_from_stream``, ``X_from_file`` and ``X_from_str`` provide
58
an extra parameter, ``has_iterparse``, which allows you to use a parsing library
59
different from :py:mod:`xml.etree.ElementTree`. For example, you can pass in
60
`defusedxml.ElementTree`_.
61

62
.. _defusedxml.ElementTree: https://pypi.org/project/defusedxml/#defusedxml-elementtree
63

64
All XML elements are expected to live in the :py:attr:`~NAMESPACE`.
65

66
For writing, use the function :py:func:`{qualified_module_name}.xmlization.write` which
67
translates the instance of the model into an XML document and writes it in one pass
68
to the stream."""
69
        )
70
    ]
71

72
    if first_cls is not None:
4✔
73
        read_first_cls_from_file = python_naming.function_name(
4✔
74
            Identifier(f"read_{first_cls.name}_from_file")
75
        )
76

77
        first_cls_name = python_naming.class_name(first_cls.name)
4✔
78

79
        docstring_blocks.append(
80
            Stripped(
81
                f"""\
82
Here is an example usage how to de-serialize from a file:
83

84
.. code-block::
85

86
    import pathlib
87
    import xml.etree.ElementTree as ET
88

89
    import {qualified_module_name}.xmlization as aas_xmlization
90

91
    path = pathlib.Path(...)
92
    instance = aas_xmlization.{read_first_cls_from_file}(
93
        path
94
    )
95

96
    # Do something with the ``instance``
97

98
Here is another code example where we serialize the instance:
99

100
.. code-block::
101

102
    import pathlib
103

104
    import {qualified_module_name}.types as aas_types
105
    import {qualified_module_name}.xmlization as aas_xmlization
106

107
    instance = {first_cls_name}(
108
       ... # some constructor arguments
109
    )
110

111
    pth = pathlib.Path(...)
112
    with pth.open("wt") as fid:
113
        aas_xmlization.write(instance, fid)"""
114
            )
115
        )
116

117
    escaped_text = "\n\n".join(docstring_blocks).replace('"""', '\\"\\"\\"')
4✔
118
    return Stripped(
4✔
119
        f"""\
120
\"\"\"
121
{escaped_text}
122
\"\"\""""
123
    )
124

125

126
def _generate_read_enum_from_element_text(
4✔
127
    enumeration: intermediate.Enumeration,
128
) -> Stripped:
129
    """Generate the reading function from an element's text for ``enumeration``."""
130
    enum_name = python_naming.enum_name(identifier=enumeration.name)
4✔
131

132
    function_name = python_naming.private_function_name(
4✔
133
        Identifier(f"read_{enumeration.name}_from_element_text")
134
    )
135

136
    enum_from_str = python_naming.function_name(
4✔
137
        Identifier(f"{enumeration.name}_from_str")
138
    )
139

140
    return Stripped(
4✔
141
        f"""\
142
def {function_name}(
143
{I}element: Element,
144
{I}iterator: Iterator[Tuple[str, Element]]
145
) -> aas_types.{enum_name}:
146
{I}\"\"\"
147
{I}Parse the text of :paramref:`element` as a literal of
148
{I}:py:class:`.types.{enum_name}`, and read the corresponding
149
{I}end element from :paramref:`iterator`.
150

151
{I}:param element: start element
152
{I}:param iterator:
153
{II}Input stream of ``(event, element)`` coming from
154
{II}:py:func:`xml.etree.ElementTree.iterparse` with the argument
155
{II}``events=["start", "end"]``
156
{I}:raise: :py:class:`DeserializationException` if unexpected input
157
{I}:return: parsed value
158
{I}\"\"\"
159
{I}text = _read_text_from_element(
160
{II}element,
161
{II}iterator
162
{I})
163

164
{I}literal = aas_stringification.{enum_from_str}(text)
165
{I}if literal is None:
166
{II}raise DeserializationException(
167
{III}f"Not a valid string representation of "
168
{III}f"a literal of {enum_name}: {{text}}"
169
{II})
170

171
{I}return literal"""
172
    )
173

174

175
def _generate_read_cls_from_iterparse(
4✔
176
    cls: Union[intermediate.AbstractClass, intermediate.ConcreteClass],
177
    qualified_module_name: python_common.QualifiedModuleName,
178
) -> Stripped:
179
    """Generate the public function for the reading for a ``cls``."""
180
    function_name = python_naming.function_name(
4✔
181
        Identifier(f"{cls.name}_from_iterparse")
182
    )
183

184
    cls_name = python_naming.class_name(cls.name)
4✔
185

186
    wrapped_function_name = python_naming.function_name(
4✔
187
        Identifier(f"_read_{cls.name}_as_element")
188
    )
189

190
    return Stripped(
4✔
191
        f"""\
192
def {function_name}(
193
{I}iterator: Iterator[Tuple[str, Element]]
194
) -> aas_types.{cls_name}:
195
{I}\"\"\"
196
{I}Read an instance of :py:class:`.types.{cls_name}` from
197
{I}the :paramref:`iterator`.
198

199
{I}Example usage:
200

201
{I}.. code-block::
202

203
{I}    import pathlib
204
{I}    import xml.etree.ElementTree as ET
205

206
{I}    import {qualified_module_name}.xmlization as aas_xmlization
207

208
{I}    path = pathlib.Path(...)
209
{I}    with path.open("rt") as fid:
210
{I}        iterator = ET.iterparse(
211
{I}            source=fid,
212
{I}            events=['start', 'end']
213
{I}        )
214
{I}        instance = aas_xmlization.{function_name}(
215
{I}            iterator
216
{I}        )
217

218
{I}    # Do something with the ``instance``
219

220
{I}:param iterator:
221
{II}Input stream of ``(event, element)`` coming from
222
{II}:py:func:`xml.etree.ElementTree.iterparse` with the argument
223
{II}``events=["start", "end"]``
224
{I}:raise: :py:class:`DeserializationException` if unexpected input
225
{I}:return:
226
{II}Instance of :py:class:`.types.{cls_name}` read from
227
{II}:paramref:`iterator`
228
{I}\"\"\"
229
{I}next_event_element = next(iterator, None)
230
{I}if next_event_element is None:
231
{II}raise DeserializationException(
232
{III}# fmt: off
233
{III}"Expected the start element for {cls_name}, "
234
{III}"but got the end-of-input"
235
{III}# fmt: on
236
{II})
237

238
{I}next_event, next_element = next_event_element
239
{I}if next_event != 'start':
240
{II}raise DeserializationException(
241
{III}f"Expected the start element for {cls_name}, "
242
{III}f"but got event {{next_event!r}} and element {{next_element.tag!r}}"
243
{II})
244

245
{I}try:
246
{II}return {wrapped_function_name}(
247
{III}next_element,
248
{III}iterator
249
{II})
250
{I}except DeserializationException as exception:
251
{II}exception.path._prepend(ElementSegment(next_element))
252
{II}raise exception"""
253
    )
254

255

256
def _generate_read_cls_from_stream(
4✔
257
    cls: Union[intermediate.AbstractClass, intermediate.ConcreteClass],
258
    qualified_module_name: python_common.QualifiedModuleName,
259
) -> Stripped:
260
    """Generate the public function for the reading of a ``cls`` from a stream."""
261
    function_name = python_naming.function_name(Identifier(f"{cls.name}_from_stream"))
4✔
262

263
    from_iterparse_name = python_naming.function_name(
4✔
264
        Identifier(f"{cls.name}_from_iterparse")
265
    )
266

267
    cls_name = python_naming.class_name(cls.name)
4✔
268

269
    return Stripped(
4✔
270
        f"""\
271
def {function_name}(
272
{I}stream: TextIO,
273
{I}has_iterparse: HasIterparse = xml.etree.ElementTree
274
) -> aas_types.{cls_name}:
275
{I}\"\"\"
276
{I}Read an instance of :py:class:`.types.{cls_name}` from
277
{I}the :paramref:`stream`.
278

279
{I}Example usage:
280

281
{I}.. code-block::
282

283
{I}    import {qualified_module_name}.xmlization as aas_xmlization
284

285
{I}    with open_some_stream_over_network(...) as stream:
286
{I}        instance = aas_xmlization.{function_name}(
287
{I}            stream
288
{I}        )
289

290
{I}    # Do something with the ``instance``
291

292
{I}:param stream:
293
{II}representing an instance of
294
{II}:py:class:`.types.{cls_name}` in XML
295
{I}:param has_iterparse:
296
{II}Module containing ``iterparse`` function.
297

298
{II}Default is to use :py:mod:`xml.etree.ElementTree` from the standard
299
{II}library. If you have to deal with malicious input, consider using
300
{II}a library such as `defusedxml.ElementTree`_.
301
{I}:raise: :py:class:`DeserializationException` if unexpected input
302
{I}:return:
303
{II}Instance of :py:class:`.types.{cls_name}` read from
304
{II}:paramref:`stream`
305
{I}\"\"\"
306
{I}iterator = has_iterparse.iterparse(
307
{II}stream,
308
{II}['start', 'end']
309
{I})
310
{I}return {from_iterparse_name}(
311
{II}_with_elements_cleared_after_yield(iterator)
312
{I})"""
313
    )
314

315

316
def _generate_read_cls_from_file(
4✔
317
    cls: Union[intermediate.AbstractClass, intermediate.ConcreteClass],
318
    qualified_module_name: python_common.QualifiedModuleName,
319
) -> Stripped:
320
    """Generate the public function for the reading of a ``cls`` from a file."""
321
    function_name = python_naming.function_name(Identifier(f"{cls.name}_from_file"))
4✔
322

323
    from_iterparse_name = python_naming.function_name(
4✔
324
        Identifier(f"{cls.name}_from_iterparse")
325
    )
326

327
    cls_name = python_naming.class_name(cls.name)
4✔
328

329
    return Stripped(
4✔
330
        f"""\
331
def {function_name}(
332
{I}path: PathLike,
333
{I}has_iterparse: HasIterparse = xml.etree.ElementTree
334
) -> aas_types.{cls_name}:
335
{I}\"\"\"
336
{I}Read an instance of :py:class:`.types.{cls_name}` from
337
{I}the :paramref:`path`.
338

339
{I}Example usage:
340

341
{I}.. code-block::
342

343
{I}    import pathlib
344
{I}    import {qualified_module_name}.xmlization as aas_xmlization
345

346
{I}    path = pathlib.Path(...)
347
{I}    instance = aas_xmlization.{function_name}(
348
{I}        path
349
{I}    )
350

351
{I}    # Do something with the ``instance``
352

353
{I}:param path:
354
{II}to the file representing an instance of
355
{II}:py:class:`.types.{cls_name}` in XML
356
{I}:param has_iterparse:
357
{II}Module containing ``iterparse`` function.
358

359
{II}Default is to use :py:mod:`xml.etree.ElementTree` from the standard
360
{II}library. If you have to deal with malicious input, consider using
361
{II}a library such as `defusedxml.ElementTree`_.
362
{I}:raise: :py:class:`DeserializationException` if unexpected input
363
{I}:return:
364
{II}Instance of :py:class:`.types.{cls_name}` read from
365
{II}:paramref:`path`
366
{I}\"\"\"
367
{I}with open(os.fspath(path), "rt", encoding='utf-8') as fid:
368
{II}iterator = has_iterparse.iterparse(
369
{III}fid,
370
{III}['start', 'end']
371
{II})
372
{II}return {from_iterparse_name}(
373
{III}_with_elements_cleared_after_yield(iterator)
374
{II})"""
375
    )
376

377

378
def _generate_read_cls_from_str(
4✔
379
    cls: Union[intermediate.AbstractClass, intermediate.ConcreteClass],
380
    qualified_module_name: python_common.QualifiedModuleName,
381
) -> Stripped:
382
    """Generate the public function for the reading of a ``cls`` from a string."""
383
    function_name = python_naming.function_name(Identifier(f"{cls.name}_from_str"))
4✔
384

385
    from_iterparse_name = python_naming.function_name(
4✔
386
        Identifier(f"{cls.name}_from_iterparse")
387
    )
388

389
    cls_name = python_naming.class_name(cls.name)
4✔
390

391
    return Stripped(
4✔
392
        f"""\
393
def {function_name}(
394
{I}text: str,
395
{I}has_iterparse: HasIterparse = xml.etree.ElementTree
396
) -> aas_types.{cls_name}:
397
{I}\"\"\"
398
{I}Read an instance of :py:class:`.types.{cls_name}` from
399
{I}the :paramref:`text`.
400

401
{I}Example usage:
402

403
{I}.. code-block::
404

405
{I}    import pathlib
406
{I}    import {qualified_module_name}.xmlization as aas_xmlization
407

408
{I}    text = "<...>...</...>"
409
{I}    instance = aas_xmlization.{function_name}(
410
{I}        text
411
{I}    )
412

413
{I}    # Do something with the ``instance``
414

415
{I}:param text:
416
{II}representing an instance of
417
{II}:py:class:`.types.{cls_name}` in XML
418
{I}:param has_iterparse:
419
{II}Module containing ``iterparse`` function.
420

421
{II}Default is to use :py:mod:`xml.etree.ElementTree` from the standard
422
{II}library. If you have to deal with malicious input, consider using
423
{II}a library such as `defusedxml.ElementTree`_.
424
{I}:raise: :py:class:`DeserializationException` if unexpected input
425
{I}:return:
426
{II}Instance of :py:class:`.types.{cls_name}` read from
427
{II}:paramref:`text`
428
{I}\"\"\"
429
{I}iterator = has_iterparse.iterparse(
430
{II}io.StringIO(text),
431
{II}['start', 'end']
432
{I})
433
{I}return {from_iterparse_name}(
434
{II}_with_elements_cleared_after_yield(iterator)
435
{I})"""
436
    )
437

438

439
# fmt: off
440
@require(
4✔
441
    lambda cls:
442
    not isinstance(cls, intermediate.AbstractClass)
443
    or len(cls.concrete_descendants) > 0,
444
    "All abstract classes must have concrete descendants; otherwise we can not dispatch"
445
)
446
# fmt: on
447
def _generate_read_cls_as_element(
4✔
448
    cls: Union[intermediate.AbstractClass, intermediate.ConcreteClass]
449
) -> Stripped:
450
    """Generate the read function to dispatch or read a concrete instance of ``cls``."""
451

452
    if len(cls.concrete_descendants) > 0:
4✔
453
        dispatch_map = python_naming.private_constant_name(
4✔
454
            Identifier(f"dispatch_for_{cls.name}")
455
        )
456

457
        cls_name = python_naming.class_name(cls.name)
4✔
458

459
        body = Stripped(
4✔
460
            f"""\
461
tag_wo_ns = _parse_element_tag(element)
462
read_as_sequence = {dispatch_map}.get(
463
{I}tag_wo_ns,
464
{I}None
465
)
466

467
if read_as_sequence is None:
468
{I}raise DeserializationException(
469
{II}f"Expected the element tag to be a valid model type "
470
{II}f"of a concrete instance of '{cls_name}', "
471
{II}f"but got tag {{tag_wo_ns!r}}"
472
{I})
473

474
return read_as_sequence(
475
{I}element,
476
{I}iterator
477
)"""
478
        )
479
    else:
480
        xml_cls = naming.xml_class_name(cls.name)
4✔
481
        xml_cls_literal = python_common.string_literal(xml_cls)
4✔
482

483
        read_as_sequence_function_name = python_naming.function_name(
4✔
484
            Identifier(f"_read_{cls.name}_as_sequence")
485
        )
486

487
        body = Stripped(
4✔
488
            f"""\
489
tag_wo_ns = _parse_element_tag(element)
490

491
if tag_wo_ns != {xml_cls_literal}:
492
{I}raise DeserializationException(
493
{II}f"Expected the element with the tag '{xml_cls}', "
494
{II}f"but got tag: {{tag_wo_ns}}"
495
{I})
496

497
return {read_as_sequence_function_name}(
498
{I}element,
499
{I}iterator
500
)"""
501
        )
502

503
    function_name = python_naming.function_name(
4✔
504
        Identifier(f"_read_{cls.name}_as_element")
505
    )
506

507
    cls_name = python_naming.class_name(cls.name)
4✔
508

509
    return Stripped(
4✔
510
        f"""\
511
def {function_name}(
512
{I}element: Element,
513
{I}iterator: Iterator[Tuple[str, Element]]
514
) -> aas_types.{cls_name}:
515
{I}\"\"\"
516
{I}Read an instance of :py:class:`.types.{cls_name}` from
517
{I}:paramref:`iterator`, including the end element.
518

519
{I}:param element: start element
520
{I}:param iterator:
521
{II}Input stream of ``(event, element)`` coming from
522
{II}:py:func:`xml.etree.ElementTree.iterparse` with the argument
523
{II}``events=["start", "end"]``
524
{I}:raise: :py:class:`DeserializationException` if unexpected input
525
{I}:return: parsed instance
526
{I}\"\"\"
527
{I}{indent_but_first_line(body, I)}"""
528
    )
529

530

531
def _generate_read_from_iterparse(
4✔
532
    qualified_module_name: python_common.QualifiedModuleName,
533
) -> Stripped:
534
    """Generate the general read function to parse an instance from iterparse."""
535
    function_name = "from_iterparse"
4✔
536

537
    return Stripped(
4✔
538
        f"""\
539
def {function_name}(
540
{I}iterator: Iterator[Tuple[str, Element]]
541
) -> aas_types.Class:
542
{I}\"\"\"
543
{I}Read an instance from the :paramref:`iterator`.
544

545
{I}The type of the instance is determined by the very first start element.
546

547
{I}Example usage:
548

549
{I}.. code-block::
550

551
{I}    import pathlib
552
{I}    import xml.etree.ElementTree as ET
553

554
{I}    import {qualified_module_name}.xmlization as aas_xmlization
555

556
{I}    path = pathlib.Path(...)
557
{I}    with path.open("rt") as fid:
558
{I}        iterator = ET.iterparse(
559
{I}            source=fid,
560
{I}            events=['start', 'end']
561
{I}        )
562
{I}        instance = aas_xmlization.{function_name}(
563
{I}            iterator
564
{I}        )
565

566
{I}    # Do something with the ``instance``
567

568
{I}:param iterator:
569
{II}Input stream of ``(event, element)`` coming from
570
{II}:py:func:`xml.etree.ElementTree.iterparse` with the argument
571
{II}``events=["start", "end"]``
572
{I}:raise: :py:class:`DeserializationException` if unexpected input
573
{I}:return:
574
{II}Instance of :py:class:`.types.Class` read from the :paramref:`iterator`
575
{I}\"\"\"
576
{I}next_event_element = next(iterator, None)
577
{I}if next_event_element is None:
578
{II}raise DeserializationException(
579
{III}# fmt: off
580
{III}"Expected the start element of an instance, "
581
{III}"but got the end-of-input"
582
{III}# fmt: on
583
{II})
584

585
{I}next_event, next_element = next_event_element
586
{I}if next_event != 'start':
587
{II}raise DeserializationException(
588
{III}f"Expected the start element of an instance, "
589
{III}f"but got event {{next_event!r}} and element {{next_element.tag!r}}"
590
{II})
591

592
{I}try:
593
{II}return _read_as_element(
594
{III}next_element,
595
{III}iterator
596
{II})
597
{I}except DeserializationException as exception:
598
{II}exception.path._prepend(ElementSegment(next_element))
599
{II}raise exception"""
600
    )
601

602

603
def _generate_read_from_stream(
4✔
604
    qualified_module_name: python_common.QualifiedModuleName,
605
) -> Stripped:
606
    """Generate the general read function to parse an instance from a text stream."""
607
    function_name = python_naming.function_name(Identifier("from_stream"))
4✔
608

609
    return Stripped(
4✔
610
        f"""\
611
def {function_name}(
612
{I}stream: TextIO,
613
{I}has_iterparse: HasIterparse = xml.etree.ElementTree
614
) -> aas_types.Class:
615
{I}\"\"\"
616
{I}Read an instance from the :paramref:`stream`.
617

618
{I}The type of the instance is determined by the very first start element.
619

620
{I}Example usage:
621

622
{I}.. code-block::
623

624
{I}    import {qualified_module_name}.xmlization as aas_xmlization
625

626
{I}    with open_some_stream_over_network(...) as stream:
627
{I}        instance = aas_xmlization.{function_name}(
628
{I}            stream
629
{I}        )
630

631
{I}    # Do something with the ``instance``
632

633
{I}:param stream:
634
{II}representing an instance in XML
635
{I}:param has_iterparse:
636
{II}Module containing ``iterparse`` function.
637

638
{II}Default is to use :py:mod:`xml.etree.ElementTree` from the standard
639
{II}library. If you have to deal with malicious input, consider using
640
{II}a library such as `defusedxml.ElementTree`_.
641
{I}:raise: :py:class:`DeserializationException` if unexpected input
642
{I}:return:
643
{II}Instance read from :paramref:`stream`
644
{I}\"\"\"
645
{I}iterator = has_iterparse.iterparse(
646
{II}stream,
647
{II}['start', 'end']
648
{I})
649
{I}return from_iterparse(
650
{II}_with_elements_cleared_after_yield(iterator)
651
{I})"""
652
    )
653

654

655
def _generate_read_from_file(
4✔
656
    qualified_module_name: python_common.QualifiedModuleName,
657
) -> Stripped:
658
    """Generate the general read function to parse an instance from a file."""
659
    function_name = python_naming.function_name(Identifier("from_file"))
4✔
660

661
    return Stripped(
4✔
662
        f"""\
663
def {function_name}(
664
{I}path: PathLike,
665
{I}has_iterparse: HasIterparse = xml.etree.ElementTree
666
) -> aas_types.Class:
667
{I}\"\"\"
668
{I}Read an instance from the file at the :paramref:`path`.
669

670
{I}Example usage:
671

672
{I}.. code-block::
673

674
{I}    import pathlib
675
{I}    import {qualified_module_name}.xmlization as aas_xmlization
676

677
{I}    path = pathlib.Path(...)
678
{I}    instance = aas_xmlization.{function_name}(
679
{I}        path
680
{I}    )
681

682
{I}    # Do something with the ``instance``
683

684
{I}:param path:
685
{II}to the file representing an instance in XML
686
{I}:param has_iterparse:
687
{II}Module containing ``iterparse`` function.
688

689
{II}Default is to use :py:mod:`xml.etree.ElementTree` from the standard
690
{II}library. If you have to deal with malicious input, consider using
691
{II}a library such as `defusedxml.ElementTree`_.
692
{I}:raise: :py:class:`DeserializationException` if unexpected input
693
{I}:return:
694
{II}Instance read from the file at :paramref:`path`
695
{I}\"\"\"
696
{I}with open(os.fspath(path), "rt", encoding='utf-8') as fid:
697
{II}iterator = has_iterparse.iterparse(
698
{III}fid,
699
{III}['start', 'end']
700
{II})
701
{II}return from_iterparse(
702
{III}_with_elements_cleared_after_yield(iterator)
703
{II})"""
704
    )
705

706

707
def _generate_read_from_str(
4✔
708
    qualified_module_name: python_common.QualifiedModuleName,
709
) -> Stripped:
710
    """Generate the general read function to parse an instance from a string."""
711
    function_name = python_naming.function_name(Identifier("from_str"))
4✔
712

713
    return Stripped(
4✔
714
        f"""\
715
def {function_name}(
716
{I}text: str,
717
{I}has_iterparse: HasIterparse = xml.etree.ElementTree
718
) -> aas_types.Class:
719
{I}\"\"\"
720
{I}Read an instance from the :paramref:`text`.
721

722
{I}Example usage:
723

724
{I}.. code-block::
725

726
{I}    import pathlib
727
{I}    import {qualified_module_name}.xmlization as aas_xmlization
728

729
{I}    text = "<...>...</...>"
730
{I}    instance = aas_xmlization.{function_name}(
731
{I}        text
732
{I}    )
733

734
{I}    # Do something with the ``instance``
735

736
{I}:param text:
737
{II}representing an instance in XML
738
{I}:param has_iterparse:
739
{II}Module containing ``iterparse`` function.
740

741
{II}Default is to use :py:mod:`xml.etree.ElementTree` from the standard
742
{II}library. If you have to deal with malicious input, consider using
743
{II}a library such as `defusedxml.ElementTree`_.
744
{I}:raise: :py:class:`DeserializationException` if unexpected input
745
{I}:return:
746
{II}Instance read from :paramref:`text`
747
{I}\"\"\"
748
{I}iterator = has_iterparse.iterparse(
749
{II}io.StringIO(text),
750
{II}['start', 'end']
751
{I})
752
{I}return from_iterparse(
753
{II}_with_elements_cleared_after_yield(iterator)
754
{I})"""
755
    )
756

757

758
def _generate_general_read_as_element(
4✔
759
    symbol_table: intermediate.SymbolTable,
760
) -> Stripped:
761
    """Generate the general read function to dispatch on concrete classes."""
762
    dispatch_map = python_naming.private_constant_name(Identifier("general_dispatch"))
4✔
763

764
    body = Stripped(
4✔
765
        f"""\
766
tag_wo_ns = _parse_element_tag(element)
767
read_as_sequence = {dispatch_map}.get(
768
{I}tag_wo_ns,
769
{I}None
770
)
771

772
if read_as_sequence is None:
773
{I}raise DeserializationException(
774
{II}f"Expected the element tag to be a valid model type "
775
{II}f"of a concrete instance, "
776
{II}f"but got tag {{tag_wo_ns!r}}"
777
{I})
778

779
return read_as_sequence(
780
{I}element,
781
{I}iterator
782
)"""
783
    )
784

785
    return Stripped(
4✔
786
        f"""\
787
def _read_as_element(
788
{I}element: Element,
789
{I}iterator: Iterator[Tuple[str, Element]]
790
) -> aas_types.Class:
791
{I}\"\"\"
792
{I}Read an instance from :paramref:`iterator`, including the end element.
793

794
{I}:param element: start element
795
{I}:param iterator:
796
{II}Input stream of ``(event, element)`` coming from
797
{II}:py:func:`xml.etree.ElementTree.iterparse` with the argument
798
{II}``events=["start", "end"]``
799
{I}:raise: :py:class:`DeserializationException` if unexpected input
800
{I}:return: parsed instance
801
{I}\"\"\"
802
{I}{indent_but_first_line(body, I)}"""
803
    )
804

805

806
_READ_FUNCTION_BY_PRIMITIVE_TYPE = {
4✔
807
    intermediate.PrimitiveType.BOOL: "_read_bool_from_element_text",
808
    intermediate.PrimitiveType.INT: "_read_int_from_element_text",
809
    intermediate.PrimitiveType.FLOAT: "_read_float_from_element_text",
810
    intermediate.PrimitiveType.STR: "_read_str_from_element_text",
811
    intermediate.PrimitiveType.BYTEARRAY: "_read_bytes_from_element_text",
812
}
813
assert all(
4✔
814
    literal in _READ_FUNCTION_BY_PRIMITIVE_TYPE
815
    for literal in intermediate.PrimitiveType
816
)
817

818

819
def _generate_reader_and_setter(cls: intermediate.ConcreteClass) -> Stripped:
4✔
820
    """Generate the ``ReaderAndSetterFor{cls}``."""
821
    methods = []  # type: List[Stripped]
4✔
822

823
    cls_name = python_naming.class_name(cls.name)
4✔
824

825
    init_writer = io.StringIO()
4✔
826
    for i, prop in enumerate(cls.properties):
4✔
827
        prop_name = python_naming.property_name(prop.name)
4✔
828
        prop_type = python_common.generate_type(
4✔
829
            prop.type_annotation, types_module=Identifier("aas_types")
830
        )
831

832
        # NOTE (mristin, 2022-07-22):
833
        # We make all the properties optional since we switch over the properties
834
        # during the de-serialization.
835
        if not isinstance(prop.type_annotation, intermediate.OptionalTypeAnnotation):
4✔
836
            prop_type = Stripped(f"Optional[{prop_type}]")
4✔
837

838
        if i > 0:
4✔
839
            init_writer.write("\n")
4✔
840
        init_writer.write(f"self.{prop_name}: {prop_type} = None")
4✔
841

842
    methods.append(
4✔
843
        Stripped(
844
            f"""\
845
def __init__(self) -> None:
846
{I}\"\"\"Initialize with all the properties unset.\"\"\"
847
{I}{indent_but_first_line(init_writer.getvalue(), I)}"""
848
        )
849
    )
850

851
    for prop in cls.properties:
4✔
852
        type_anno = intermediate.beneath_optional(prop.type_annotation)
4✔
853

854
        prop_name = python_naming.property_name(prop.name)
4✔
855

856
        method_body: Stripped
857

858
        if isinstance(type_anno, intermediate.PrimitiveTypeAnnotation) or (
4✔
859
            isinstance(type_anno, intermediate.OurTypeAnnotation)
860
            and isinstance(type_anno.our_type, intermediate.ConstrainedPrimitive)
861
        ):
862
            primitive_type = intermediate.try_primitive_type(type_anno)
4✔
863
            assert primitive_type is not None
4✔
864

865
            read_function = _READ_FUNCTION_BY_PRIMITIVE_TYPE[primitive_type]
4✔
866

867
            method_body = Stripped(
4✔
868
                f"""\
869
self.{prop_name} = {read_function}(
870
{I}element,
871
{I}iterator
872
)"""
873
            )
874

875
        elif isinstance(type_anno, intermediate.OurTypeAnnotation):
4✔
876
            our_type = type_anno.our_type
4✔
877
            if isinstance(our_type, intermediate.Enumeration):
4✔
878
                read_function = python_naming.private_function_name(
4✔
879
                    Identifier(f"read_{our_type.name}_from_element_text")
880
                )
881

882
                method_body = Stripped(
4✔
883
                    f"""\
884
self.{prop_name} = {read_function}(
885
{I}element,
886
{I}iterator
887
)"""
888
                )
889

890
            elif isinstance(our_type, intermediate.ConstrainedPrimitive):
4✔
891
                raise AssertionError(
×
892
                    f"Expected {intermediate.ConstrainedPrimitive.__name__} "
893
                    f"to have been handled before"
894
                )
895

896
            elif isinstance(
4✔
897
                our_type, (intermediate.AbstractClass, intermediate.ConcreteClass)
898
            ):
899
                prop_cls_name = python_naming.class_name(our_type.name)
4✔
900

901
                if len(our_type.concrete_descendants) > 0:
4✔
902
                    read_prop_cls_as_element = python_naming.function_name(
4✔
903
                        Identifier(f"_read_{our_type.name}_as_element")
904
                    )
905

906
                    method_body = Stripped(
4✔
907
                        f"""\
908
next_event_element = next(iterator, None)
909
if next_event_element is None:
910
{I}raise DeserializationException(
911
{II}"Expected a discriminator start element corresponding "
912
{II}"to {prop_cls_name}, but got end-of-input"
913
{I})
914

915
next_event, next_element = next_event_element
916
if next_event != 'start':
917
{I}raise DeserializationException(
918
{II}f"Expected a discriminator start element corresponding "
919
{II}f"to {prop_cls_name}, "
920
{II}f"but got event {{next_event!r}} and element {{next_element.tag!r}}"
921
{I})
922

923
try:
924
{I}result = {read_prop_cls_as_element}(
925
{II}next_element,
926
{II}iterator
927
{I})
928
except DeserializationException as exception:
929
{I}exception.path._prepend(ElementSegment(next_element))
930
{I}raise
931

932
_read_end_element(element, iterator)
933

934
self.{prop_name} = result"""
935
                    )
936
                else:
937
                    read_prop_cls_as_sequence = python_naming.function_name(
4✔
938
                        Identifier(f"_read_{our_type.name}_as_sequence")
939
                    )
940

941
                    method_body = Stripped(
4✔
942
                        f"""\
943
self.{prop_name} = {read_prop_cls_as_sequence}(
944
{I}element,
945
{I}iterator
946
)"""
947
                    )
948

949
        elif isinstance(type_anno, intermediate.ListTypeAnnotation):
4✔
950
            if isinstance(
4✔
951
                type_anno.items, intermediate.OurTypeAnnotation
952
            ) and isinstance(
953
                type_anno.items.our_type,
954
                (intermediate.AbstractClass, intermediate.ConcreteClass),
955
            ):
956
                read_item_cls_as_element = python_naming.function_name(
4✔
957
                    Identifier(f"_read_{type_anno.items.our_type.name}_as_element")
958
                )
959

960
                items_type = python_common.generate_type(
4✔
961
                    type_anno.items, types_module=Identifier("aas_types")
962
                )
963
            elif isinstance(type_anno.items, intermediate.PrimitiveTypeAnnotation):
×
964
                read_item_cls_as_element = Identifier(
×
965
                    _READ_FUNCTION_BY_PRIMITIVE_TYPE[type_anno.items.a_type]
966
                )
967
                items_type = Stripped(str(type_anno.items.a_type))
×
968
            else:
969
                raise AssertionError(
×
970
                    "(mristin, 2022-10-09) We handle only lists of classes and primitive types"
971
                    "in the XML de-serialization at the moment. The meta-model does not contain "
972
                    "any other lists, so we wanted to keep the code as simple as "
973
                    "possible, and avoid unrolling. Please contact the developers "
974
                    "if you need this feature."
975
                )
976

977
            method_body = Stripped(
4✔
978
                f"""\
979
if element.text is not None and len(element.text.strip()) != 0:
980
{I}raise DeserializationException(
981
{II}f"Expected only item elements and whitespace text, "
982
{II}f"but got text: {{element.text!r}}"
983
{I})
984

985
result: List[
986
{I}{items_type}
987
] = []
988

989
item_i = 0
990

991
while True:
992
{I}next_event_element = next(iterator, None)
993
{I}if next_event_element is None:
994
{II}raise DeserializationException(
995
{III}"Expected one or more items from a list or the end element, "
996
{III}"but got end-of-input"
997
{II})
998

999
{I}next_event, next_element = next_event_element
1000
{I}if next_event == 'end' and next_element.tag == element.tag:
1001
{II}# We reached the end of the list.
1002
{II}break
1003

1004
{I}if next_event != 'start':
1005
{II}raise DeserializationException(
1006
{III}"Expected a start element corresponding to an item, "
1007
{III}f"but got event {{next_event!r}} and element {{next_element.tag!r}}"
1008
{II})
1009

1010
{I}try:
1011
{II}item = {read_item_cls_as_element}(
1012
{III}next_element,
1013
{III}iterator
1014
{II})
1015
{I}except DeserializationException as exception:
1016
{II}exception.path._prepend(IndexSegment(next_element, item_i))
1017
{II}raise
1018

1019
{I}result.append(item)
1020
{I}item_i += 1
1021

1022
self.{prop_name} = result"""
1023
            )
1024

1025
        else:
1026
            assert_never(type_anno)
×
1027

1028
        method_name = python_naming.method_name(Identifier(f"read_and_set_{prop.name}"))
4✔
1029
        methods.append(
4✔
1030
            Stripped(
1031
                f"""\
1032
def {method_name}(
1033
{I}self,
1034
{I}element: Element,
1035
{I}iterator: Iterator[Tuple[str, Element]]
1036
) -> None:
1037
{I}\"\"\"
1038
{I}Read :paramref:`element` as the property
1039
{I}:py:attr:`.types.{cls_name}.{prop_name}` and set it.
1040
{I}\"\"\"
1041
{I}{indent_but_first_line(method_body, I)}"""
1042
            )
1043
        )
1044

1045
    reader_and_setter_name = python_naming.private_class_name(
4✔
1046
        Identifier(f"Reader_and_setter_for_{cls.name}")
1047
    )
1048

1049
    writer = io.StringIO()
4✔
1050
    writer.write(
4✔
1051
        f"""\
1052
class {reader_and_setter_name}:
1053
{I}\"\"\"
1054
{I}Provide a buffer for reading and setting the properties for the class
1055
{I}:py:class:`{cls_name}`.
1056

1057
{I}The properties correspond to the constructor arguments of
1058
{I}:py:class:`{cls_name}`. We use this buffer to facilitate dispatching when
1059
{I}parsing the properties in a streaming fashion.
1060
{I}\"\"\""""
1061
    )
1062

1063
    for method in methods:
4✔
1064
        writer.write("\n\n")
4✔
1065
        writer.write(textwrap.indent(method, I))
4✔
1066

1067
    return Stripped(writer.getvalue())
4✔
1068

1069

1070
def _generate_read_as_sequence(cls: intermediate.ConcreteClass) -> Stripped:
4✔
1071
    """
1072
    Generate the method to read the instance as sequence of XML-encoded properties.
1073

1074
    This function performs no dispatch! The dispatch is expected to have been
1075
    performed already based on the discriminator element.
1076

1077
    The properties are expected to correspond to the constructor arguments of
1078
    the ``cls``.
1079
    """
1080
    # fmt: off
1081
    assert (
4✔
1082
            sorted(
1083
                (arg.name, str(arg.type_annotation))
1084
                for arg in cls.constructor.arguments
1085
            ) == sorted(
1086
                (prop.name, str(prop.type_annotation))
1087
                for prop in cls.properties
1088
            )
1089
    ), (
1090
        "(mristin, 2022-10-11) We assume that the properties and constructor arguments "
1091
        "are identical at this point. If this is not the case, we have to re-write the "
1092
        "logic substantially! Please contact the developers if you see this."
1093
    )
1094
    # fmt: on
1095

1096
    blocks = [
4✔
1097
        Stripped(
1098
            f"""\
1099
if element.text is not None and len(element.text.strip()) != 0:
1100
{I}raise DeserializationException(
1101
{II}f"Expected only XML elements representing the properties and whitespace text, "
1102
{II}f"but got text: {{element.text!r}}"
1103
{I})"""
1104
        ),
1105
        Stripped("_raise_if_has_tail_or_attrib(element)"),
1106
    ]  # type: List[Stripped]
1107

1108
    # region Body
1109

1110
    cls_name = python_naming.class_name(cls.name)
4✔
1111

1112
    if len(cls.constructor.arguments) == 0:
4✔
1113
        blocks.append(
×
1114
            Stripped(
1115
                f"""\
1116
next_event_element = next(iterator, None)
1117
if next_event_element is None:
1118
{I}raise DeserializationException(
1119
{II}f"Expected the end element corresponding to {{element.tag}}, "
1120
{II}f"but got the end-of-input"
1121
{I})
1122

1123
{I}next_event, next_element = next_event_element
1124
{I}if next_event != 'end' or next_element.tag == element.tag:
1125
{I}raise DeserializationException(
1126
{II}f"Expected the end element corresponding to {{element.tag}}, "
1127
{II}f"but got event {{next_event!r}} and element {{next_element.tag!r}}"
1128
{I})"""
1129
            )
1130
        )
1131

1132
        blocks.append(Stripped(f"return aas_types.{cls_name}()"))
×
1133
    else:
1134
        reader_and_setter_name = python_naming.private_class_name(
4✔
1135
            Identifier(f"Reader_and_setter_for_{cls.name}")
1136
        )
1137

1138
        read_and_set_dispatch_name = python_naming.private_constant_name(
4✔
1139
            Identifier(f"read_and_set_dispatch_for_{cls.name}")
1140
        )
1141

1142
        blocks.append(
4✔
1143
            Stripped(
1144
                f"""\
1145
reader_and_setter = (
1146
{I}{reader_and_setter_name}()
1147
)
1148

1149
while True:
1150
{I}next_event_element = next(iterator, None)
1151
{I}if next_event_element is None:
1152
{II}raise DeserializationException(
1153
{III}"Expected one or more XML-encoded properties or the end element, "
1154
{III}"but got the end-of-input"
1155
{II})
1156

1157
{I}next_event, next_element = next_event_element
1158
{I}if next_event == 'end' and next_element.tag == element.tag:
1159
{II}# We reached the end element enclosing the sequence.
1160
{II}break
1161

1162
{I}if next_event != 'start':
1163
{II}raise DeserializationException(
1164
{III}"Expected a start element corresponding to a property, "
1165
{III}f"but got event {{next_event!r}} and element {{next_element.tag!r}}"
1166
{II})
1167

1168
{I}try:
1169
{II}tag_wo_ns = _parse_element_tag(next_element)
1170
{I}except DeserializationException as exception:
1171
{II}exception.path._prepend(ElementSegment(next_element))
1172
{II}raise
1173

1174
{I}read_and_set_method = {read_and_set_dispatch_name}.get(
1175
{II}tag_wo_ns,
1176
{II}None
1177
{I})
1178
{I}if read_and_set_method is None:
1179
{II}an_exception = DeserializationException(
1180
{III}f"Expected an element representing a property, "
1181
{III}f"but got an element with unexpected tag: {{tag_wo_ns!r}}"
1182
{II})
1183
{II}an_exception.path._prepend(ElementSegment(next_element))
1184
{II}raise an_exception
1185

1186
{I}try:
1187
{II}read_and_set_method(
1188
{III}reader_and_setter,
1189
{III}next_element,
1190
{III}iterator
1191
{II})
1192
{I}except DeserializationException as exception:
1193
{II}exception.path._prepend(ElementSegment(next_element))
1194
{II}raise"""
1195
            )
1196
        )
1197

1198
        for i, prop in enumerate(cls.properties):
4✔
1199
            if isinstance(prop.type_annotation, intermediate.OptionalTypeAnnotation):
4✔
1200
                continue
4✔
1201

1202
            prop_name = python_naming.property_name(prop.name)
4✔
1203

1204
            cause_literal = python_common.string_literal(
4✔
1205
                f"The required property {naming.xml_property(prop.name)!r} is missing"
1206
            )
1207
            blocks.append(
4✔
1208
                Stripped(
1209
                    f"""\
1210
if reader_and_setter.{prop_name} is None:
1211
{I}raise DeserializationException(
1212
{II}{cause_literal}
1213
{I})"""
1214
                )
1215
            )
1216

1217
        init_writer = io.StringIO()
4✔
1218
        init_writer.write(f"return aas_types.{cls_name}(\n")
4✔
1219

1220
        for i, arg in enumerate(cls.constructor.arguments):
4✔
1221
            prop = cls.properties_by_name[arg.name]
4✔
1222

1223
            prop_name = python_naming.property_name(prop.name)
4✔
1224

1225
            init_writer.write(f"{I}reader_and_setter.{prop_name}")
4✔
1226

1227
            if i < len(cls.constructor.arguments) - 1:
4✔
1228
                init_writer.write(",\n")
4✔
1229
            else:
1230
                init_writer.write("\n")
4✔
1231

1232
        init_writer.write(")")
4✔
1233

1234
        blocks.append(Stripped(init_writer.getvalue()))
4✔
1235

1236
    # endregion
1237

1238
    function_name = python_naming.private_function_name(
4✔
1239
        Identifier(f"read_{cls.name}_as_sequence")
1240
    )
1241

1242
    writer = io.StringIO()
4✔
1243
    writer.write(
4✔
1244
        f"""\
1245
def {function_name}(
1246
{II}element: Element,
1247
{II}iterator: Iterator[Tuple[str, Element]]
1248
) -> aas_types.{cls_name}:
1249
{I}\"\"\"
1250
{I}Read an instance of :py:class:`.types.{cls_name}`
1251
{I}as a sequence of XML-encoded properties.
1252

1253
{I}The end element corresponding to the :paramref:`element` will be
1254
{I}read as well.
1255

1256
{I}:param element: start element, parent of the sequence
1257
{I}:param iterator:
1258
{II}Input stream of ``(event, element)`` coming from
1259
{II}:py:func:`xml.etree.ElementTree.iterparse` with the argument
1260
{II}``events=["start", "end"]``
1261
{I}:raise: :py:class:`DeserializationException` if unexpected input
1262
{I}:return: parsed instance
1263
{I}\"\"\"
1264
"""
1265
    )
1266

1267
    for i, block in enumerate(blocks):
4✔
1268
        if i > 0:
4✔
1269
            writer.write("\n\n")
4✔
1270
        writer.write(textwrap.indent(block, I))
4✔
1271

1272
    return Stripped(writer.getvalue())
4✔
1273

1274

1275
@require(
4✔
1276
    lambda cls: len(cls.concrete_descendants) > 0,
1277
    "Expected the class to have concrete descendants; "
1278
    "otherwise it makes no sense to dispatch",
1279
)
1280
def _generate_dispatch_map_for_class(
4✔
1281
    cls: Union[intermediate.AbstractClass, intermediate.ConcreteClass]
1282
) -> Stripped:
1283
    """Generate the mapping model type 🠒 read-as-sequence function."""
1284
    mapping_name = python_naming.private_constant_name(
4✔
1285
        Identifier(f"dispatch_for_{cls.name}")
1286
    )
1287

1288
    mapping_writer = io.StringIO()
4✔
1289

1290
    cls_name = python_naming.class_name(cls.name)
4✔
1291
    if isinstance(cls, intermediate.AbstractClass):
4✔
1292
        mapping_writer.write(
4✔
1293
            f"""\
1294
#: Dispatch XML class names to read-as-sequence functions
1295
#: corresponding to concrete descendants of {cls_name}
1296
"""
1297
        )
1298
    else:
1299
        mapping_writer.write(
4✔
1300
            f"""\
1301
#: Dispatch XML class names to read-as-sequence functions
1302
#: corresponding to {cls_name} and its concrete descendants
1303
"""
1304
        )
1305

1306
    cls_name = python_naming.class_name(cls.name)
4✔
1307

1308
    mapping_writer.write(
4✔
1309
        f"""\
1310
{mapping_name}: Mapping[
1311
{I}str,
1312
{I}Callable[
1313
{II}[
1314
{III}Element,
1315
{III}Iterator[Tuple[str, Element]]
1316
{II}],
1317
{II}aas_types.{cls_name}
1318
{I}]
1319
] = {{
1320
"""
1321
    )
1322

1323
    dispatch_classes = list(cls.concrete_descendants)
4✔
1324

1325
    # NOTE (mristin, 2022-10-11):
1326
    # In case of concrete classes, we have to consider also dispatching to their
1327
    # own read function as ``concrete_descendants`` *exclude* the concrete class
1328
    # itself.
1329
    if isinstance(cls, intermediate.ConcreteClass):
4✔
1330
        dispatch_classes.insert(0, cls)
4✔
1331

1332
    for dispatch_class in dispatch_classes:
4✔
1333
        read_as_sequence_name = python_naming.private_function_name(
4✔
1334
            Identifier(f"read_{dispatch_class.name}_as_sequence")
1335
        )
1336

1337
        xml_name_literal = python_common.string_literal(
4✔
1338
            naming.xml_class_name(dispatch_class.name)
1339
        )
1340

1341
        mapping_writer.write(
4✔
1342
            f"""\
1343
{I}{xml_name_literal}: {read_as_sequence_name},
1344
"""
1345
        )
1346

1347
    mapping_writer.write("}")
4✔
1348

1349
    return Stripped(mapping_writer.getvalue())
4✔
1350

1351

1352
def _generate_general_dispatch_map(symbol_table: intermediate.SymbolTable) -> Stripped:
4✔
1353
    """Generate the general mapping model type 🠒 read-as-sequence function."""
1354
    mapping_name = python_naming.private_constant_name(Identifier("general_dispatch"))
4✔
1355

1356
    mapping_writer = io.StringIO()
4✔
1357

1358
    mapping_writer.write(
4✔
1359
        """\
1360
#: Dispatch XML class names to read-as-sequence functions
1361
#: corresponding to the concrete classes
1362
"""
1363
    )
1364

1365
    mapping_writer.write(
4✔
1366
        f"""\
1367
{mapping_name}: Mapping[
1368
{I}str,
1369
{I}Callable[
1370
{II}[
1371
{III}Element,
1372
{III}Iterator[Tuple[str, Element]]
1373
{II}],
1374
{II}aas_types.Class
1375
{I}]
1376
] = {{
1377
"""
1378
    )
1379

1380
    for concrete_cls in symbol_table.concrete_classes:
4✔
1381
        read_as_sequence_name = python_naming.private_function_name(
4✔
1382
            Identifier(f"read_{concrete_cls.name}_as_sequence")
1383
        )
1384

1385
        xml_name_literal = python_common.string_literal(
4✔
1386
            naming.xml_class_name(concrete_cls.name)
1387
        )
1388

1389
        mapping_writer.write(
4✔
1390
            f"""\
1391
{I}{xml_name_literal}: {read_as_sequence_name},
1392
"""
1393
        )
1394

1395
    mapping_writer.write("}")
4✔
1396

1397
    return Stripped(mapping_writer.getvalue())
4✔
1398

1399

1400
def _generate_reader_and_setter_map(cls: intermediate.ConcreteClass) -> Stripped:
4✔
1401
    """Generate the mapping property name 🠒 read function."""
1402
    # fmt: off
1403
    assert (
4✔
1404
            sorted(
1405
                (arg.name, str(arg.type_annotation))
1406
                for arg in cls.constructor.arguments
1407
            ) == sorted(
1408
                (prop.name, str(prop.type_annotation))
1409
                for prop in cls.properties
1410
            )
1411
    ), (
1412
        "(mristin, 2022-10-11) We assume that the properties and constructor arguments "
1413
        "are identical at this point. If this is not the case, we have to re-write the "
1414
        "logic substantially! Please contact the developers if you see this."
1415
    )
1416
    # fmt: on
1417

1418
    identifiers_expressions = []  # type: List[Tuple[Identifier, Stripped]]
4✔
1419

1420
    reader_and_setter_cls_name = python_naming.private_class_name(
4✔
1421
        Identifier(f"Reader_and_setter_for_{cls.name}")
1422
    )
1423

1424
    for prop in cls.properties:
4✔
1425
        xml_identifier = naming.xml_property(prop.name)
4✔
1426
        method_name = python_naming.method_name(Identifier(f"read_and_set_{prop.name}"))
4✔
1427

1428
        identifiers_expressions.append(
4✔
1429
            (xml_identifier, Stripped(f"{reader_and_setter_cls_name}.{method_name}"))
1430
        )
1431

1432
    map_name = python_naming.private_constant_name(
4✔
1433
        Identifier(f"read_and_set_dispatch_for_{cls.name}")
1434
    )
1435

1436
    writer = io.StringIO()
4✔
1437
    writer.write(
4✔
1438
        f"""\
1439
#: Dispatch XML property name to read & set method in
1440
#: :py:class:`{reader_and_setter_cls_name}`
1441
{map_name}: Mapping[
1442
{I}str,
1443
{I}Callable[
1444
{II}[
1445
{III}{reader_and_setter_cls_name},
1446
{III}Element,
1447
{III}Iterator[Tuple[str, Element]]
1448
{II}],
1449
{II}None
1450
{I}]
1451
] = {{
1452
"""
1453
    )
1454
    for identifier, expression in identifiers_expressions:
4✔
1455
        writer.write(
4✔
1456
            f"""\
1457
{I}{python_common.string_literal(identifier)}:
1458
{II}{indent_but_first_line(expression, II)},
1459
"""
1460
        )
1461

1462
    writer.write("}")
4✔
1463
    return Stripped(writer.getvalue())
4✔
1464

1465

1466
_WRITE_METHOD_BY_PRIMITIVE_TYPE = {
4✔
1467
    intermediate.PrimitiveType.BOOL: "_write_bool_property",
1468
    intermediate.PrimitiveType.INT: "_write_int_property",
1469
    intermediate.PrimitiveType.FLOAT: "_write_float_property",
1470
    intermediate.PrimitiveType.STR: "_write_str_property",
1471
    intermediate.PrimitiveType.BYTEARRAY: "_write_bytes_property",
1472
}
1473
assert all(
4✔
1474
    literal in _WRITE_METHOD_BY_PRIMITIVE_TYPE for literal in intermediate.PrimitiveType
1475
)
1476

1477

1478
def _count_required_properties(cls: intermediate.Class) -> int:
4✔
1479
    """Count the number of properties which are marked as non-optional."""
1480
    return sum(
4✔
1481
        1
1482
        for prop in cls.properties
1483
        if not isinstance(prop.type_annotation, intermediate.OptionalTypeAnnotation)
1484
    )
1485

1486

1487
# fmt: off
1488
@require(
4✔
1489
    lambda prop:
1490
    (
1491
        type_anno := intermediate.beneath_optional(prop.type_annotation),
1492
        isinstance(type_anno, intermediate.OurTypeAnnotation)
1493
        and isinstance(type_anno.our_type, intermediate.ConcreteClass)
1494
        and len(type_anno.our_type.concrete_descendants) == 0
1495
    )[1],
1496
    "We expect the property to be of a concrete class with no descendants so that "
1497
    "its value can be represented as a sequence of XML elements, each corresponding "
1498
    "to a property of the value."
1499
)
1500
# fmt: on
1501
def _generate_snippet_for_writing_concrete_cls_prop(
4✔
1502
    prop: intermediate.Property,
1503
) -> Stripped:
1504
    """Generate the code snippet to write a class property as a sequence."""
1505
    type_anno = intermediate.beneath_optional(prop.type_annotation)
4✔
1506
    assert isinstance(type_anno, intermediate.OurTypeAnnotation)
4✔
1507

1508
    our_type = type_anno.our_type
4✔
1509
    assert isinstance(our_type, intermediate.ConcreteClass)
4✔
1510

1511
    xml_prop_literal = python_common.string_literal(naming.xml_property(prop.name))
4✔
1512

1513
    write_cls_as_sequence = python_naming.private_method_name(
4✔
1514
        Identifier(f"write_{our_type.name}_as_sequence")
1515
    )
1516

1517
    prop_name = python_naming.property_name(prop.name)
4✔
1518

1519
    if _count_required_properties(our_type) > 0:
4✔
1520
        return Stripped(
4✔
1521
            f"""\
1522
self._write_start_element({xml_prop_literal})
1523
self.{write_cls_as_sequence}(
1524
{I}that.{prop_name}
1525
)
1526
self._write_end_element({xml_prop_literal})"""
1527
        )
1528

1529
    # NOTE (mristin, 2022-10-14):
1530
    # Prefix with "the" so that we avoid naming conflicts.
1531
    variable = python_naming.variable_name(Identifier(f"the_{prop.name}"))
4✔
1532

1533
    writer = io.StringIO()
4✔
1534
    writer.write(f"{variable} = that.{prop_name}\n")
4✔
1535

1536
    conjunction = [
4✔
1537
        f"{variable}.{python_naming.property_name(prop.name)} is None"
1538
        for prop in our_type.properties
1539
    ]
1540

1541
    writer.write(
4✔
1542
        """\
1543
# We optimize for the case where all the optional properties are not set,
1544
# so that we can simply output an empty element.
1545
if (
1546
"""
1547
    )
1548
    for i, expr in enumerate(conjunction):
4✔
1549
        if i > 0:
4✔
1550
            writer.write(f"{II}and {indent_but_first_line(expr, II)}\n")
4✔
1551
        else:
1552
            writer.write(f"{II}{indent_but_first_line(expr, II)}\n")
4✔
1553

1554
    writer.write(
4✔
1555
        f"""\
1556
):
1557
{I}self._write_empty_element(
1558
{II}{xml_prop_literal}
1559
{I})
1560
else:
1561
{I}self._write_start_element({xml_prop_literal})
1562
{I}self.{write_cls_as_sequence}(
1563
{II}{variable}
1564
{I})
1565
{I}self._write_end_element({xml_prop_literal})"""
1566
    )
1567

1568
    return Stripped(writer.getvalue())
4✔
1569

1570

1571
def _generate_write_cls_as_sequence(cls: intermediate.ConcreteClass) -> Stripped:
4✔
1572
    """
1573
    Generate the method to serialize the ``cls`` as a sequence of XML elements.
1574

1575
    The elements correspond to the properties of the ``cls``.
1576

1577
    The generated method lives in the ``_Serializer`` class.
1578
    """
1579
    # fmt: off
1580
    assert (
4✔
1581
            sorted(
1582
                (arg.name, str(arg.type_annotation))
1583
                for arg in cls.constructor.arguments
1584
            ) == sorted(
1585
        (prop.name, str(prop.type_annotation))
1586
        for prop in cls.properties
1587
    )
1588
    ), (
1589
        "(mristin, 2022-10-14) We assume that the properties and constructor arguments "
1590
        "are identical at this point. If this is not the case, we have to re-write the "
1591
        "logic substantially! Please contact the developers if you see this."
1592
    )
1593
    # fmt: on
1594

1595
    # NOTE (mristin, 2022-10-14):
1596
    # We need to introduce a new loop variable for each loop since Python tracks
1597
    # the variables in the function scope instead of the block scope.
1598
    generator_for_loop_variables = python_common.GeneratorForLoopVariables()
4✔
1599

1600
    body_blocks = []  # type: List[Stripped]
4✔
1601

1602
    if len(cls.properties) == 0:
4✔
1603
        body_blocks.append(
×
1604
            Stripped(
1605
                """\
1606
# There are no properties specified for this class, so nothing can be written.
1607
return"""
1608
            )
1609
        )
1610
    else:
1611
        for prop in cls.properties:
4✔
1612
            prop_name = python_naming.property_name(prop.name)
4✔
1613
            xml_prop_literal = python_common.string_literal(
4✔
1614
                naming.xml_property(prop.name)
1615
            )
1616

1617
            type_anno = intermediate.beneath_optional(prop.type_annotation)
4✔
1618

1619
            primitive_type = intermediate.try_primitive_type(type_anno)
4✔
1620

1621
            write_prop: Stripped
1622

1623
            if primitive_type is not None:
4✔
1624
                write_method = _WRITE_METHOD_BY_PRIMITIVE_TYPE[primitive_type]
4✔
1625

1626
                write_prop = Stripped(
4✔
1627
                    f"""\
1628
self.{write_method}(
1629
{I}{xml_prop_literal},
1630
{I}that.{prop_name}
1631
)"""
1632
                )
1633
            else:
1634
                assert not isinstance(type_anno, intermediate.PrimitiveTypeAnnotation)
4✔
1635

1636
                if isinstance(type_anno, intermediate.OurTypeAnnotation):
4✔
1637
                    our_type = type_anno.our_type
4✔
1638
                    if isinstance(our_type, intermediate.Enumeration):
4✔
1639
                        write_prop = Stripped(
4✔
1640
                            f"""\
1641
self._write_str_property(
1642
{I}{xml_prop_literal},
1643
{I}that.{prop_name}.value
1644
)"""
1645
                        )
1646

1647
                    elif isinstance(our_type, intermediate.ConstrainedPrimitive):
4✔
1648
                        raise AssertionError("Expected to be handled before")
×
1649

1650
                    elif isinstance(
4✔
1651
                        our_type,
1652
                        (intermediate.AbstractClass, intermediate.ConcreteClass),
1653
                    ):
1654
                        if len(our_type.concrete_descendants) > 0:
4✔
1655
                            write_prop = Stripped(
4✔
1656
                                f"""\
1657
self._write_start_element({xml_prop_literal})
1658
self.visit(that.{prop_name})
1659
self._write_end_element({xml_prop_literal})"""
1660
                            )
1661
                        else:
1662
                            assert isinstance(our_type, intermediate.ConcreteClass), (
4✔
1663
                                f"Unexpected abstract class with no concrete "
1664
                                f"descendants: {our_type.name!r}"
1665
                            )
1666

1667
                            # NOTE (mristin, 2022-10-14):
1668
                            # We have to put the code in a separate function as it
1669
                            # became barely readable *this* indented.
1670
                            write_prop = (
4✔
1671
                                _generate_snippet_for_writing_concrete_cls_prop(
1672
                                    prop=prop
1673
                                )
1674
                            )
1675
                    else:
1676
                        assert_never(our_type)
×
1677

1678
                elif isinstance(type_anno, intermediate.ListTypeAnnotation):
4✔
1679
                    variable = next(generator_for_loop_variables)
4✔
1680

1681
                    if isinstance(
4✔
1682
                        type_anno.items, intermediate.OurTypeAnnotation
1683
                    ) and isinstance(
1684
                        type_anno.items.our_type,
1685
                        (intermediate.AbstractClass, intermediate.ConcreteClass),
1686
                    ):
1687
                        write_prop = Stripped(
4✔
1688
                            f"""\
1689
if len(that.{prop_name}) == 0:
1690
{I}self._write_empty_element({xml_prop_literal})
1691
else:
1692
{I}self._write_start_element({xml_prop_literal})
1693
{I}for {variable} in that.{prop_name}:
1694
{II}self.visit({variable})
1695
{I}self._write_end_element({xml_prop_literal})"""
1696
                        )
1697

1698
                    elif isinstance(
×
1699
                        type_anno.items, intermediate.PrimitiveTypeAnnotation
1700
                    ):
1701
                        write_method = _WRITE_METHOD_BY_PRIMITIVE_TYPE[
×
1702
                            type_anno.items.a_type
1703
                        ]
1704
                        write_prop = Stripped(
×
1705
                            f"""\
1706
if len(that.{prop_name}) == 0:
1707
{I}self._write_empty_element({xml_prop_literal})
1708
else:
1709
{I}self._write_start_element({xml_prop_literal})
1710
{I}for {variable} in that.{prop_name}:
1711
{II}self.{write_method}('v', {variable})
1712
{I}self._write_end_element({xml_prop_literal})"""
1713
                        )
1714

1715
                    else:
1716
                        raise NotImplementedError(
×
1717
                            f"We only handle lists of class instances and primitive values, "
1718
                            f"but you supplied the following type: {type_anno}. Please contact the developers "
1719
                            f"if you need this feature."
1720
                        )
1721

1722
                else:
1723
                    assert_never(type_anno)
×
1724

1725
            if isinstance(prop.type_annotation, intermediate.OptionalTypeAnnotation):
4✔
1726
                write_prop = Stripped(
4✔
1727
                    f"""\
1728
if that.{prop_name} is not None:
1729
{I}{indent_but_first_line(write_prop, I)}"""
1730
                )
1731

1732
            body_blocks.append(write_prop)
4✔
1733

1734
    cls_name = python_naming.class_name(cls.name)
4✔
1735
    function_name = python_naming.private_method_name(
4✔
1736
        Identifier(f"write_{cls.name}_as_sequence")
1737
    )
1738

1739
    writer = io.StringIO()
4✔
1740
    writer.write(
4✔
1741
        f"""\
1742
def {function_name}(
1743
{I}self,
1744
{I}that: aas_types.{cls_name}
1745
) -> None:
1746
{I}\"\"\"
1747
{I}Serialize :paramref:`that` to :py:attr:`~stream` as a sequence of
1748
{I}XML elements.
1749

1750
{I}Each element in the sequence corresponds to a property. If no properties
1751
{I}are set, nothing is written to the :py:attr:`~stream`.
1752

1753
{I}:param that: instance to be serialized
1754
{I}\"\"\"
1755
"""
1756
    )
1757

1758
    for i, body_block in enumerate(body_blocks):
4✔
1759
        if i > 0:
4✔
1760
            writer.write("\n\n")
4✔
1761
        writer.write(textwrap.indent(body_block, I))
4✔
1762

1763
    return Stripped(writer.getvalue())
4✔
1764

1765

1766
def _generate_visit_cls(cls: intermediate.ConcreteClass) -> Stripped:
4✔
1767
    """
1768
    Generate the method to serialize the ``cls`` as an XML element.
1769

1770
    The generated method lives in the ``_Serializer`` class.
1771
    """
1772
    # fmt: off
1773
    assert (
4✔
1774
            sorted(
1775
                (arg.name, str(arg.type_annotation))
1776
                for arg in cls.constructor.arguments
1777
            ) == sorted(
1778
        (prop.name, str(prop.type_annotation))
1779
        for prop in cls.properties
1780
    )
1781
    ), (
1782
        "(mristin, 2022-10-11) We assume that the properties and constructor arguments "
1783
        "are identical at this point. If this is not the case, we have to re-write the "
1784
        "logic substantially! Please contact the developers if you see this."
1785
    )
1786
    # fmt: on
1787

1788
    xml_cls_literal = python_common.string_literal(naming.xml_class_name(cls.name))
4✔
1789

1790
    body_blocks = []  # type: List[Stripped]
4✔
1791

1792
    if len(cls.properties) == 0:
4✔
1793
        body_blocks.append(
×
1794
            Stripped(
1795
                f"""\
1796
self._write_empty_element(
1797
{I}{xml_cls_literal}
1798
)"""
1799
            )
1800
        )
1801
    else:
1802
        write_cls_as_sequence = python_naming.private_method_name(
4✔
1803
            Identifier(f"write_{cls.name}_as_sequence")
1804
        )
1805

1806
        if _count_required_properties(cls) > 0:
4✔
1807
            body_blocks.append(
4✔
1808
                Stripped(
1809
                    f"""\
1810
self._write_start_element({xml_cls_literal})
1811
self.{write_cls_as_sequence}(
1812
{I}that
1813
)
1814
self._write_end_element({xml_cls_literal})"""
1815
                )
1816
            )
1817
        else:
1818
            # NOTE (mristin, 2022-10-14):
1819
            # We optimize for the case where all the optional properties are not set,
1820
            # so that we can simply output an empty element.
1821
            conjunction = [
4✔
1822
                f"that.{python_naming.property_name(prop.name)} is None"
1823
                for prop in cls.properties
1824
            ]
1825

1826
            if_empty_writer = io.StringIO()
4✔
1827
            if_empty_writer.write(
4✔
1828
                """\
1829
# We optimize for the case where all the optional properties are not set,
1830
# so that we can simply output an empty element.
1831
if (
1832
"""
1833
            )
1834
            for i, expr in enumerate(conjunction):
4✔
1835
                if i > 0:
4✔
1836
                    if_empty_writer.write(
4✔
1837
                        f"{II}and {indent_but_first_line(expr, II)}\n"
1838
                    )
1839
                else:
1840
                    if_empty_writer.write(f"{II}{indent_but_first_line(expr, II)}\n")
4✔
1841

1842
            if_empty_writer.write(
4✔
1843
                f"""\
1844
):
1845
{I}self._write_empty_element(
1846
{II}{xml_cls_literal}
1847
{I})
1848
else:
1849
{I}self._write_start_element({xml_cls_literal})
1850
{I}self.{write_cls_as_sequence}(
1851
{II}that
1852
{I})
1853
{I}self._write_end_element({xml_cls_literal})"""
1854
            )
1855

1856
            body_blocks.append(Stripped(if_empty_writer.getvalue()))
4✔
1857

1858
    cls_name = python_naming.class_name(cls.name)
4✔
1859
    visit_name = python_naming.method_name(Identifier(f"visit_{cls.name}"))
4✔
1860

1861
    writer = io.StringIO()
4✔
1862
    writer.write(
4✔
1863
        f"""\
1864
def {visit_name}(
1865
{I}self,
1866
{I}that: aas_types.{cls_name}
1867
) -> None:
1868
{I}\"\"\"
1869
{I}Serialize :paramref:`that` to :py:attr:`~stream` as an XML element.
1870

1871
{I}The enclosing XML element designates the class of the instance, where its
1872
{I}children correspond to the properties of the instance.
1873

1874
{I}:param that: instance to be serialized
1875
{I}\"\"\"
1876
"""
1877
    )
1878

1879
    for i, body_block in enumerate(body_blocks):
4✔
1880
        if i > 0:
4✔
1881
            writer.write("\n\n")
×
1882
        writer.write(textwrap.indent(body_block, I))
4✔
1883

1884
    return Stripped(writer.getvalue())
4✔
1885

1886

1887
# fmt: off
1888
@require(
4✔
1889
    lambda symbol_table:
1890
    '"' not in symbol_table.meta_model.xml_namespace,
1891
    "No single quotes expected in the XML namespace so that we can directly "
1892
    "write the namespace as-is"
1893
)
1894
# fmt: on
1895
def _generate_serializer(symbol_table: intermediate.SymbolTable) -> Stripped:
4✔
1896
    """Generate the serializer as a visitor which writes to a stream on visits."""
1897
    body_blocks = [
4✔
1898
        Stripped(
1899
            """\
1900
#: Stream to be written to when we visit the instances
1901
stream: Final[TextIO]"""
1902
        ),
1903
        Stripped(
1904
            f"""\
1905
#: Method pointer to be invoked for writing the start element with or without
1906
#: specifying a namespace (depending on the state of the serializer)
1907
_write_start_element: Callable[
1908
{I}[str],
1909
{I}None
1910
]"""
1911
        ),
1912
        Stripped(
1913
            f"""\
1914
#: Method pointer to be invoked for writing an empty element with or without
1915
#: specifying a namespace (depending on the state of the serializer)
1916
_write_empty_element: Callable[
1917
{I}[str],
1918
{I}None
1919
]"""
1920
        ),
1921
        Stripped(
1922
            """\
1923
# NOTE (mristin, 2022-10-14):
1924
# The serialization procedure is quite rigid. We leverage the specifics of
1925
# the serialization procedure to optimize the code a bit.
1926
#
1927
# Namely, we model the writing of the XML elements as a state machine.
1928
# The namespace is only specified for the very first element. All the subsequent
1929
# elements will *not* have the namespace specified. We implement that behavior by
1930
# using pointers to methods, as Python treats the methods as first-class citizens.
1931
#
1932
# The ``_write_start_element`` will point to
1933
# ``_write_first_start_element_with_namespace`` on the *first* invocation.
1934
# Afterwards, it will be redirected to ``_write_start_element_without_namespace``.
1935
#
1936
# Analogously for ``_write_empty_element``.
1937
#
1938
# Please see the implementation for the details, but this should give you at least
1939
# a rough overview."""
1940
        ),
1941
        Stripped(
1942
            f"""\
1943
def _write_first_start_element_with_namespace(
1944
{II}self,
1945
{II}name: str
1946
) -> None:
1947
{I}\"\"\"
1948
{I}Write the start element with the tag name :paramref:`name` and specify
1949
{I}its namespace.
1950

1951
{I}The :py:attr:`~_write_start_element` is set to
1952
{I}:py:meth:`~_write_start_element_without_namespace` after the first invocation
1953
{I}of this method.
1954

1955
{I}:param name: of the element tag. Expected to contain no XML special characters.
1956
{I}\"\"\"
1957
{I}self.stream.write(f'<{{name}} xmlns="{{NAMESPACE}}">')
1958

1959
{I}# NOTE (mristin, 2022-10-14):
1960
{I}# Any subsequence call to `_write_start_element` or `_write_empty_element`
1961
{I}# should not specify the namespace of the element as we specified now already
1962
{I}# specified it.
1963
{I}self._write_start_element = self._write_start_element_without_namespace
1964
{I}self._write_empty_element = self._write_empty_element_without_namespace"""
1965
        ),
1966
        Stripped(
1967
            f"""\
1968
def _write_start_element_without_namespace(
1969
{II}self,
1970
{II}name: str
1971
) -> None:
1972
{I}\"\"\"
1973
{I}Write the start element with the tag name :paramref:`name`.
1974

1975
{I}The first element, written *before* this one, is expected to have been
1976
{I}already written with the namespace specified.
1977

1978
{I}:param name: of the element tag. Expected to contain no XML special characters.
1979
{I}\"\"\"
1980
{I}self.stream.write(f'<{{name}}>')"""
1981
        ),
1982
        Stripped(
1983
            f"""\
1984
def _escape_and_write_text(
1985
{II}self,
1986
{II}text: str
1987
) -> None:
1988
{I}\"\"\"
1989
{I}Escape :paramref:`text` for XML and write it.
1990

1991
{I}:param text: to be escaped and written
1992
{I}\"\"\"
1993
{I}# NOTE (mristin, 2022-10-14):
1994
{I}# We ran ``timeit`` on manual code which escaped XML special characters with
1995
{I}# a dictionary, and on another snippet which called three ``.replace()``.
1996
{I}# The code with ``.replace()`` was an order of magnitude faster on our computers.
1997
{I}self.stream.write(
1998
{II}text.replace('&', '&amp;').replace('<', '&lt;').replace('>', '&gt;')
1999
{I})"""
2000
        ),
2001
        Stripped(
2002
            f"""\
2003
def _write_end_element(
2004
{II}self,
2005
{II}name: str
2006
) -> None:
2007
{I}\"\"\"
2008
{I}Write the end element with the tag name :paramref:`name`.
2009

2010
{I}:param name: of the element tag. Expected to contain no XML special characters.
2011
{I}\"\"\"
2012
{I}self.stream.write(f'</{{name}}>')"""
2013
        ),
2014
        Stripped(
2015
            f"""\
2016
def _write_first_empty_element_with_namespace(
2017
{II}self,
2018
{II}name: str
2019
) -> None:
2020
{I}\"\"\"
2021
{I}Write the first (and only) empty element with the tag name :paramref:`name`.
2022

2023
{I}No elements are expected to be written to the stream afterwards. The element
2024
{I}includes the namespace specification.
2025

2026
{I}:param name: of the element tag. Expected to contain no XML special characters.
2027
{I}\"\"\"
2028
{I}self.stream.write(f'<{{name}} xmlns="{{NAMESPACE}}"/>')
2029
{I}self._write_empty_element = self._rase_if_write_element_called_again
2030
{I}self._write_start_element = self._rase_if_write_element_called_again"""
2031
        ),
2032
        Stripped(
2033
            f"""\
2034
def _rase_if_write_element_called_again(
2035
{II}self,
2036
{II}name: str
2037
) -> None:
2038
{I}raise AssertionError(
2039
{II}f"We expected to call ``_write_first_empty_element_with_namespace`` "
2040
{II}f"only once. This is an unexpected second call for writing "
2041
{II}f"an (empty or non-empty) element with the tag name: {{name!r}}"
2042
{I})"""
2043
        ),
2044
        Stripped(
2045
            f"""\
2046
def _write_empty_element_without_namespace(
2047
{II}self,
2048
{II}name: str
2049
) -> None:
2050
{I}\"\"\"
2051
{I}Write the empty element with the tag name :paramref:`name`.
2052

2053
{I}The call to this method is expected to occur *after* the enclosing element with
2054
{I}a specified namespace has been written.
2055

2056
{I}:param name: of the element tag. Expected to contain no XML special characters.
2057
{I}\"\"\"
2058
{I}self.stream.write(f'<{{name}}/>')"""
2059
        ),
2060
        Stripped(
2061
            f"""\
2062
def _write_bool_property(
2063
{II}self,
2064
{II}name: str,
2065
{II}value: bool
2066
) -> None:
2067
{I}\"\"\"
2068
{I}Write the :paramref:`value` of a boolean property enclosed in
2069
{I}the :paramref:`name` element.
2070

2071
{I}:param name: of the corresponding element tag
2072
{I}:param value: of the property
2073
{I}\"\"\"
2074
{I}self._write_start_element(name)
2075
{I}self.stream.write('true' if value else 'false')
2076
{I}self._write_end_element(name)"""
2077
        ),
2078
        Stripped(
2079
            f"""\
2080
def _write_int_property(
2081
{II}self,
2082
{II}name: str,
2083
{II}value: int
2084
) -> None:
2085
{I}\"\"\"
2086
{I}Write the :paramref:`value` of an integer property enclosed in
2087
{I}the :paramref:`name` element.
2088

2089
{I}:param name: of the corresponding element tag
2090
{I}:param value: of the property
2091
{I}\"\"\"
2092
{I}self._write_start_element(name)
2093
{I}self.stream.write(str(value))
2094
{I}self._write_end_element(name)"""
2095
        ),
2096
        Stripped(
2097
            f"""\
2098
def _write_float_property(
2099
{II}self,
2100
{II}name: str,
2101
{II}value: float
2102
) -> None:
2103
{I}\"\"\"
2104
{I}Write the :paramref:`value` of a floating-point property enclosed in
2105
{I}the :paramref:`name` element.
2106

2107
{I}:param name: of the corresponding element tag
2108
{I}:param value: of the property
2109
{I}\"\"\"
2110
{I}self._write_start_element(name)
2111

2112
{I}if value == math.inf:
2113
{II}self.stream.write('INF')
2114
{I}elif value == -math.inf:
2115
{II}self.stream.write('-INF')
2116
{I}elif math.isnan(value):
2117
{II}self.stream.write('NaN')
2118
{I}elif value == 0:
2119
{II}if math.copysign(1.0, value) < 0.0:
2120
{III}self.stream.write('-0.0')
2121
{II}else:
2122
{III}self.stream.write('0.0')
2123
{I}else:
2124
{II}self.stream.write(str(value))
2125

2126
{I}self._write_end_element(name)"""
2127
        ),
2128
        Stripped(
2129
            f"""\
2130
def _write_str_property(
2131
{II}self,
2132
{II}name: str,
2133
{II}value: str
2134
) -> None:
2135
{I}\"\"\"
2136
{I}Write the :paramref:`value` of a string property enclosed in
2137
{I}the :paramref:`name` element.
2138

2139
{I}:param name: of the corresponding element tag
2140
{I}:param value: of the property
2141
{I}\"\"\"
2142
{I}self._write_start_element(name)
2143
{I}self._escape_and_write_text(value)
2144
{I}self._write_end_element(name)"""
2145
        ),
2146
        Stripped(
2147
            f"""\
2148
def _write_bytes_property(
2149
{II}self,
2150
{II}name: str,
2151
{II}value: bytes
2152
) -> None:
2153
{I}\"\"\"
2154
{I}Write the :paramref:`value` of a binary-content property enclosed in
2155
{I}the :paramref:`name` element.
2156

2157
{I}:param name: of the corresponding element tag
2158
{I}:param value: of the property
2159
{I}\"\"\"
2160
{I}self._write_start_element(name)
2161

2162
{I}# NOTE (mristin, 2022-10-14):
2163
{I}# We need to decode the result of the base64-encoding to ASCII since we are
2164
{I}# writing to an XML *text* stream. ``base64.b64encode(.)`` gives us bytes,
2165
{I}# not a string.
2166
{I}encoded = base64.b64encode(value).decode('ascii')
2167

2168
{I}# NOTE (mristin, 2022-10-14):
2169
{I}# Base64 alphabet excludes ``<``, ``>`` and ``&``, so we can directly
2170
{I}# write the ``encoded`` content to the stream as XML text.
2171
{I}#
2172
{I}# See: https://datatracker.ietf.org/doc/html/rfc4648#section-4
2173
{I}self.stream.write(encoded)
2174
{I}self._write_end_element(name)"""
2175
        ),
2176
        Stripped(
2177
            f"""\
2178
def __init__(
2179
{I}self,
2180
{I}stream: TextIO
2181
) -> None:
2182
{I}\"\"\"
2183
{I}Initialize the visitor to write to :paramref:`stream`.
2184

2185
{I}The first element will include the :py:attr:`~.NAMESPACE`. Every other
2186
{I}element will not have the namespace specified.
2187

2188
{I}:param stream: where to write to
2189
{I}\"\"\"
2190
{I}self.stream = stream
2191
{I}self._write_start_element = (
2192
{II}self._write_first_start_element_with_namespace
2193
{I})
2194
{I}self._write_empty_element = (
2195
{II}self._write_first_empty_element_with_namespace
2196
{I})"""
2197
        ),
2198
    ]
2199

2200
    for cls in symbol_table.concrete_classes:
4✔
2201
        body_blocks.append(_generate_write_cls_as_sequence(cls=cls))
4✔
2202
        body_blocks.append(_generate_visit_cls(cls=cls))
4✔
2203

2204
    writer = io.StringIO()
4✔
2205
    writer.write(
4✔
2206
        Stripped(
2207
            f"""\
2208
class _Serializer(aas_types.AbstractVisitor):
2209
{I}\"\"\"Encode instances as XML and write them to :py:attr:`~stream`.\"\"\""""
2210
        )
2211
    )
2212

2213
    for body_block in body_blocks:
4✔
2214
        writer.write("\n\n")
4✔
2215
        writer.write(textwrap.indent(body_block, I))
4✔
2216

2217
    return Stripped(writer.getvalue())
4✔
2218

2219

2220
def _generate_write_to_stream(
4✔
2221
    symbol_table: intermediate.SymbolTable,
2222
    qualified_module_name: python_common.QualifiedModuleName,
2223
) -> Stripped:
2224
    """Generate the function to write an instance as XML to a stream."""
2225
    docstring_blocks = [
4✔
2226
        Stripped(
2227
            """\
2228
Write the XML representation of :paramref:`instance` to :paramref:`stream`."""
2229
        )
2230
    ]
2231

2232
    first_cls = (
4✔
2233
        symbol_table.concrete_classes[0]
2234
        if len(symbol_table.concrete_classes) > 0
2235
        else None
2236
    )
2237

2238
    if first_cls is not None:
4✔
2239
        first_cls_name = python_naming.class_name(first_cls.name)
4✔
2240

2241
        docstring_blocks.append(
2242
            Stripped(
2243
                f"""\
2244
Example usage:
2245

2246
.. code-block::
2247

2248
    import pathlib
2249

2250
    import {qualified_module_name}.types as aas_types
2251
    import {qualified_module_name}.xmlization as aas_xmlization
2252

2253
    instance = {first_cls_name}(
2254
       ... # some constructor arguments
2255
    )
2256

2257
    pth = pathlib.Path(...)
2258
    with pth.open("wt") as fid:
2259
        aas_xmlization.write(instance, fid)"""
2260
            )
2261
        )
2262

2263
    docstring_blocks.append(
4✔
2264
        Stripped(
2265
            """\
2266
:param instance: to be serialized
2267
:param stream: to write to"""
2268
        )
2269
    )
2270

2271
    escaped_text = "\n\n".join(docstring_blocks).replace('"""', '\\"\\"\\"')
4✔
2272
    docstring = Stripped(
4✔
2273
        f"""\
2274
\"\"\"
2275
{escaped_text}
2276
\"\"\""""
2277
    )
2278

2279
    return Stripped(
4✔
2280
        f"""\
2281
def write(instance: aas_types.Class, stream: TextIO) -> None:
2282
{I}{indent_but_first_line(docstring, I)}
2283
{I}serializer = _Serializer(stream)
2284
{I}serializer.visit(instance)"""
2285
    )
2286

2287

2288
# fmt: off
2289
@ensure(lambda result: (result[0] is not None) ^ (result[1] is not None))
4✔
2290
@ensure(
4✔
2291
    lambda result:
2292
    not (result[0] is not None) or result[0].endswith('\n'),
2293
    "Trailing newline mandatory for valid end-of-files"
2294
)
2295
# fmt: on
2296
def generate(
4✔
2297
    symbol_table: intermediate.SymbolTable,
2298
    qualified_module_name: python_common.QualifiedModuleName,
2299
    spec_impls: specific_implementations.SpecificImplementations,
2300
) -> Tuple[Optional[str], Optional[List[Error]]]:
2301
    """
2302
    Generate the Python code for the general XML de/serialization.
2303

2304
    The ``qualified_module_name`` indicates the fully-qualified name of the base module.
2305
    """
2306
    xml_namespace_literal = python_common.string_literal(
4✔
2307
        symbol_table.meta_model.xml_namespace
2308
    )
2309

2310
    blocks = [
2311
        _generate_module_docstring(
2312
            symbol_table=symbol_table, qualified_module_name=qualified_module_name
2313
        ),
2314
        python_common.WARNING,
2315
        # pylint: disable=line-too-long
2316
        Stripped(
2317
            f"""\
2318
import base64
2319
import io
2320
import math
2321
import os
2322
import sys
2323
from typing import (
2324
{I}Any,
2325
{I}Callable,
2326
{I}Iterator,
2327
{I}List,
2328
{I}Mapping,
2329
{I}Optional,
2330
{I}Sequence,
2331
{I}TextIO,
2332
{I}Tuple,
2333
{I}Union,
2334
{I}TYPE_CHECKING
2335
)
2336
import xml.etree.ElementTree
2337

2338
if sys.version_info >= (3, 8):
2339
{I}from typing import (
2340
{II}Final,
2341
{II}Protocol
2342
{I})
2343
else:
2344
{I}from typing_extensions import (
2345
{II}Final,
2346
{II}Protocol
2347
{I})
2348

2349
import {qualified_module_name}.stringification as aas_stringification
2350
import {qualified_module_name}.types as aas_types
2351

2352
# See: https://stackoverflow.com/questions/55076778/why-isnt-this-function-type-annotated-correctly-error-missing-type-parameters
2353
if TYPE_CHECKING:
2354
    PathLike = os.PathLike[Any]
2355
else:
2356
    PathLike = os.PathLike"""
2357
        ),
2358
        Stripped(
2359
            f"""\
2360
#: XML namespace in which all the elements are expected to reside
2361
NAMESPACE = {xml_namespace_literal}"""
2362
        ),
2363
        Stripped("# region De-serialization"),
2364
        Stripped(
2365
            """\
2366
#: XML namespace as a prefix specially tailored for
2367
#: :py:mod:`xml.etree.ElementTree`
2368
_NAMESPACE_IN_CURLY_BRACKETS = f'{{{NAMESPACE}}}'"""
2369
        ),
2370
        Stripped(
2371
            f"""\
2372
class Element(Protocol):
2373
{I}\"\"\"Behave like :py:meth:`xml.etree.ElementTree.Element`.\"\"\"
2374

2375
{I}@property
2376
{I}def attrib(self) -> Optional[Mapping[str, str]]:
2377
{II}\"\"\"Attributes of the element\"\"\"
2378
{II}raise NotImplementedError()
2379

2380
{I}@property
2381
{I}def text(self) -> Optional[str]:
2382
{II}\"\"\"Text content of the element\"\"\"
2383
{II}raise NotImplementedError()
2384

2385
{I}@property
2386
{I}def tail(self) -> Optional[str]:
2387
{II}\"\"\"Tail text of the element\"\"\"
2388
{II}raise NotImplementedError()
2389

2390
{I}@property
2391
{I}def tag(self) -> str:
2392
{II}\"\"\"Tag of the element; with a namespace provided as a ``{{...}}`` prefix\"\"\"
2393
{II}raise NotImplementedError()
2394

2395
{I}def clear(self) -> None:
2396
{II}\"\"\"Behave like :py:meth:`xml.etree.ElementTree.Element.clear`.\"\"\"
2397
{II}raise NotImplementedError()"""
2398
        ),
2399
        # pylint: disable=line-too-long
2400
        Stripped(
2401
            f"""\
2402
class HasIterparse(Protocol):
2403
{I}\"\"\"Parse an XML document incrementally.\"\"\"
2404

2405
{I}# NOTE (mristin, 2022-10-26):
2406
{I}# ``self`` is not used in this context, but is necessary for Mypy,
2407
{I}# see: https://github.com/python/mypy/issues/5018 and
2408
{I}# https://github.com/python/mypy/commit/3efbc5c5e910296a60ed5b9e0e7eb11dd912c3ed#diff-e165eb7aed9dca0a5ebd93985c8cd263a6462d36ac185f9461348dc5a1396d76R9937
2409

2410
{I}def iterparse(
2411
{III}self,
2412
{III}source: TextIO,
2413
{III}events: Optional[Sequence[str]] = None
2414
{I}) -> Iterator[Tuple[str, Element]]:
2415
{II}\"\"\"Behave like :py:func:`xml.etree.ElementTree.iterparse`.\"\"\""""
2416
        ),
2417
        Stripped(
2418
            f"""\
2419
class ElementSegment:
2420
{I}\"\"\"Represent an element on a path to the erroneous value.\"\"\"
2421
{I}#: Erroneous element
2422
{I}element: Final[Element]
2423

2424
{I}def __init__(
2425
{III}self,
2426
{III}element: Element
2427
{I}) -> None:
2428
{II}\"\"\"Initialize with the given values.\"\"\"
2429
{II}self.element = element
2430

2431
{I}def __str__(self) -> str:
2432
{II}\"\"\"
2433
{II}Render the segment as a tag without the namespace.
2434

2435
{II}We deliberately omit the namespace in the tag names. If you want to actually
2436
{II}query with the resulting XPath, you have to insert the namespaces manually.
2437
{II}We did not know how to include the namespace in a meaningful way, as XPath
2438
{II}assumes namespace prefixes to be defined *outside* of the document. At least
2439
{II}the path thus rendered is informative, and you should be able to descend it
2440
{II}manually.
2441
{II}\"\"\"
2442
{II}_, has_namespace, tag_wo_ns = self.element.tag.rpartition('}}')
2443
{II}if not has_namespace:
2444
{III}return self.element.tag
2445
{II}else:
2446
{III}return tag_wo_ns"""
2447
        ),
2448
        Stripped(
2449
            f"""\
2450
class IndexSegment:
2451
{I}\"\"\"Represent an element in a sequence on a path to the erroneous value.\"\"\"
2452
{I}#: Erroneous element
2453
{I}element: Final[Element]
2454

2455
{I}#: Index of the element in the sequence
2456
{I}index: Final[int]
2457

2458
{I}def __init__(
2459
{III}self,
2460
{III}element: Element,
2461
{III}index: int
2462
{I}) -> None:
2463
{II}\"\"\"Initialize with the given values.\"\"\"
2464
{II}self.element = element
2465
{II}self.index = index
2466

2467
{I}def __str__(self) -> str:
2468
{II}\"\"\"Render the segment as an element wildcard with the index.\"\"\"
2469
{II}return f'*[{{self.index}}]'"""
2470
        ),
2471
        Stripped(
2472
            """\
2473
Segment = Union[ElementSegment, IndexSegment]"""
2474
        ),
2475
        Stripped(
2476
            f"""\
2477
class Path:
2478
{I}\"\"\"Represent the relative path to the erroneous element.\"\"\"
2479

2480
{I}def __init__(self) -> None:
2481
{II}\"\"\"Initialize as an empty path.\"\"\"
2482
{II}self._segments = []  # type: List[Segment]
2483

2484
{I}@property
2485
{I}def segments(self) -> Sequence[Segment]:
2486
{II}\"\"\"Get the segments of the path.\"\"\"
2487
{II}return self._segments
2488

2489
{I}def _prepend(self, segment: Segment) -> None:
2490
{II}\"\"\"Insert the :paramref:`segment` in front of other segments.\"\"\"
2491
{II}self._segments.insert(0, segment)
2492

2493
{I}def __str__(self) -> str:
2494
{II}\"\"\"Render the path as a relative XPath.
2495

2496
{II}We omit the leading ``/`` so that you can easily prefix it as you need.
2497
{II}\"\"\"
2498
{II}return "/".join(str(segment) for segment in self._segments)"""
2499
        ),
2500
        Stripped(
2501
            f"""\
2502
class DeserializationException(Exception):
2503
{I}\"\"\"Signal that the XML de-serialization could not be performed.\"\"\"
2504

2505
{I}#: Human-readable explanation of the exception's cause
2506
{I}cause: Final[str]
2507

2508
{I}#: Relative path to the erroneous value
2509
{I}path: Final[Path]
2510

2511
{I}def __init__(
2512
{III}self,
2513
{III}cause: str
2514
{I}) -> None:
2515
{II}\"\"\"Initialize with the given :paramref:`cause` and an empty path.\"\"\"
2516
{II}self.cause = cause
2517
{II}self.path = Path()"""
2518
        ),
2519
        Stripped(
2520
            f"""\
2521
def _with_elements_cleared_after_yield(
2522
{II}iterator: Iterator[Tuple[str, Element]]
2523
) -> Iterator[Tuple[str, Element]]:
2524
{I}\"\"\"
2525
{I}Map the :paramref:`iterator` such that the element is ``clear()``'ed
2526
{I}*after* every ``yield``.
2527

2528
{I}:param iterator: to be mapped
2529
{I}:yield: event and element from :paramref:`iterator`
2530
{I}\"\"\"
2531
{I}for event, element in iterator:
2532
{II}yield event, element
2533
{II}element.clear()"""
2534
        ),
2535
    ]  # type: List[Stripped]
2536

2537
    errors = []  # type: List[Error]
4✔
2538

2539
    # NOTE (mristin, 2022-10-08):
2540
    # We generate first the public methods so that the reader can jump straight
2541
    # to the most important part of the code.
2542
    for cls in symbol_table.classes:
4✔
2543
        blocks.append(
4✔
2544
            _generate_read_cls_from_iterparse(
2545
                cls=cls, qualified_module_name=qualified_module_name
2546
            )
2547
        )
2548

2549
        blocks.append(
4✔
2550
            _generate_read_cls_from_stream(
2551
                cls=cls, qualified_module_name=qualified_module_name
2552
            )
2553
        )
2554

2555
        blocks.append(
4✔
2556
            _generate_read_cls_from_file(
2557
                cls=cls, qualified_module_name=qualified_module_name
2558
            )
2559
        )
2560

2561
        blocks.append(
4✔
2562
            _generate_read_cls_from_str(
2563
                cls=cls, qualified_module_name=qualified_module_name
2564
            )
2565
        )
2566

2567
    blocks.extend(
4✔
2568
        [
2569
            _generate_read_from_iterparse(qualified_module_name=qualified_module_name),
2570
            _generate_read_from_stream(qualified_module_name=qualified_module_name),
2571
            _generate_read_from_file(qualified_module_name=qualified_module_name),
2572
            _generate_read_from_str(qualified_module_name=qualified_module_name),
2573
        ]
2574
    )
2575

2576
    blocks.extend(
4✔
2577
        [
2578
            Stripped(
2579
                """\
2580
# NOTE (mristin, 2022-10-08):
2581
# Directly using the iterator turned out to result in very complex function
2582
# designs. The design became much simpler as soon as we considered one look-ahead
2583
# element. We came up finally with the following pattern which all the protected
2584
# reading functions below roughly follow:
2585
#
2586
# ..code-block::
2587
#
2588
#    _read_*(
2589
#       look-ahead element,
2590
#       iterator
2591
#    ) -> result
2592
#
2593
# The reading functions all read from the ``iterator`` coming from
2594
# :py:func:`xml.etree.ElementTree.iterparse` with the argument
2595
# ``events=["start", "end"]``. The exception :py:class:`.DeserializationException`
2596
# is raised in case of unexpected input.
2597
#
2598
# The reading functions are responsible to read the end element corresponding to the
2599
# start look-ahead element.
2600
#
2601
# When it comes to error reporting, we use exceptions. The exceptions are raised in
2602
# the *callee*, as usual. However, the context of the exception, such as the error path,
2603
# is added in the *caller*, as only the caller knows the context of
2604
# the lookahead-element. In particular, prepending the path segment corresponding to
2605
# the lookahead-element is the responsibility of the *caller*, and not of
2606
# the *callee*."""
2607
            ),
2608
            Stripped(
2609
                f"""\
2610
def _parse_element_tag(element: Element) -> str:
2611
{I}\"\"\"
2612
{I}Extract the tag name without the namespace prefix from :paramref:`element`.
2613

2614
{I}:param element: whose tag without namespace we want to extract
2615
{I}:return: tag name without the namespace prefix
2616
{I}:raise: :py:class:`DeserializationException` if unexpected :paramref:`element`
2617
{I}\"\"\"
2618
{I}if not element.tag.startswith(_NAMESPACE_IN_CURLY_BRACKETS):
2619
{II}namespace, got_namespace, tag_wo_ns = (
2620
{III}element.tag.rpartition('}}')
2621
{II})
2622
{II}if got_namespace:
2623
{III}if namespace.startswith('{{'):
2624
{IIII}namespace = namespace[1:]
2625

2626
{III}raise DeserializationException(
2627
{IIII}f"Expected the element in the namespace {{NAMESPACE!r}}, "
2628
{IIII}f"but got the element {{tag_wo_ns!r}} in the namespace {{namespace!r}}"
2629
{III})
2630
{II}else:
2631
{III}raise DeserializationException(
2632
{IIII}f"Expected the element in the namespace {{NAMESPACE!r}}, "
2633
{IIII}f"but got the element {{tag_wo_ns!r}} without the namespace prefix"
2634
{III})
2635

2636
{I}return element.tag[len(_NAMESPACE_IN_CURLY_BRACKETS):]"""
2637
            ),
2638
            Stripped(
2639
                f"""\
2640
def _raise_if_has_tail_or_attrib(
2641
{II}element: Element
2642
) -> None:
2643
{I}\"\"\"
2644
{I}Check that :paramref:`element` has no trailing text and no attributes.
2645

2646
{I}:param element: to be verified
2647
{I}:raise:
2648
{II}:py:class:`.DeserializationException` if trailing text or attributes;
2649
{II}conforming to the convention about handling error paths,
2650
{II}the exception path is left empty.
2651
{I}\"\"\"
2652
{I}if element.tail is not None and len(element.tail.strip()) != 0:
2653
{II}raise DeserializationException(
2654
{III}f"Expected no trailing text, but got: {{element.tail!r}}"
2655
{II})
2656

2657
{I}if element.attrib is not None and len(element.attrib) > 0:
2658
{II}raise DeserializationException(
2659
{III}f"Expected no attributes, but got: {{element.attrib}}"
2660
{II})"""
2661
            ),
2662
            Stripped(
2663
                f"""\
2664
def _read_end_element(
2665
{II}element: Element,
2666
{II}iterator: Iterator[Tuple[str, Element]]
2667
) -> Element:
2668
{I}\"\"\"
2669
{I}Read the end element corresponding to the start :paramref:`element`
2670
{I}from :paramref:`iterator`.
2671

2672
{I}:param element: corresponding start element
2673
{I}:param iterator:
2674
{II}Input stream of ``(event, element)`` coming from
2675
{II}:py:func:`xml.etree.ElementTree.iterparse` with the argument
2676
{II}``events=["start", "end"]``
2677
{I}:raise: :py:class:`DeserializationException` if unexpected input
2678
{I}\"\"\"
2679
{I}next_event_element = next(iterator, None)
2680
{I}if next_event_element is None:
2681
{II}raise DeserializationException(
2682
{III}f"Expected the end element for {{element.tag}}, "
2683
{III}f"but got the end-of-input"
2684
{II})
2685

2686
{I}next_event, next_element = next_event_element
2687
{I}if next_event != "end" or next_element.tag != element.tag:
2688
{II}raise DeserializationException(
2689
{III}f"Expected the end element for {{element.tag!r}}, "
2690
{III}f"but got the event {{next_event!r}} and element {{next_element.tag!r}}"
2691
{II})
2692

2693
{I}_raise_if_has_tail_or_attrib(next_element)
2694

2695
{I}return next_element"""
2696
            ),
2697
            Stripped(
2698
                f"""\
2699
def _read_text_from_element(
2700
{I}element: Element,
2701
{I}iterator: Iterator[Tuple[str, Element]]
2702
) -> str:
2703
{I}\"\"\"
2704
{I}Extract the text from the :paramref:`element`, and read
2705
{I}the end element from :paramref:`iterator`.
2706

2707
{I}The :paramref:`element` is expected to contain text. Otherwise,
2708
{I}it is considered as unexpected input.
2709

2710
{I}:param element: start element enclosing the text
2711
{I}:param iterator:
2712
{II}Input stream of ``(event, element)`` coming from
2713
{II}:py:func:`xml.etree.ElementTree.iterparse` with the argument
2714
{II}``events=["start", "end"]``
2715
{I}:raise: :py:class:`DeserializationException` if unexpected input
2716
{I}\"\"\"
2717
{I}_raise_if_has_tail_or_attrib(element)
2718

2719
{I}text = element.text
2720

2721
{I}end_element = _read_end_element(
2722
{II}element,
2723
{II}iterator,
2724
{I})
2725

2726
{I}if text is None:
2727
{II}if end_element.text is None:
2728
{III}raise DeserializationException(
2729
{IIII}"Expected an element with text, but got an element with no text."
2730
{III})
2731

2732
{II}text = end_element.text
2733

2734
{I}return text"""
2735
            ),
2736
            Stripped(
2737
                f"""\
2738
_XS_BOOLEAN_LITERAL_SET = {{
2739
{I}"1",
2740
{I}"true",
2741
{I}"0",
2742
{I}"false",
2743
}}"""
2744
            ),
2745
            Stripped(
2746
                f"""\
2747
def _read_bool_from_element_text(
2748
{I}element: Element,
2749
{I}iterator: Iterator[Tuple[str, Element]]
2750
) -> bool:
2751
{I}\"\"\"
2752
{I}Parse the text of :paramref:`element` as a boolean, and
2753
{I}read the corresponding end element from :paramref:`iterator`.
2754

2755
{I}:param element: start element
2756
{I}:param iterator:
2757
{II}Input stream of ``(event, element)`` coming from
2758
{II}:py:func:`xml.etree.ElementTree.iterparse` with the argument
2759
{II}``events=["start", "end"]``
2760
{I}:raise: :py:class:`DeserializationException` if unexpected input
2761
{I}:return: parsed value
2762
{I}\"\"\"
2763
{I}text = _read_text_from_element(
2764
{II}element,
2765
{II}iterator
2766
{I})
2767

2768
{I}if text not in _XS_BOOLEAN_LITERAL_SET:
2769
{II}raise DeserializationException(
2770
{III}f"Expected a boolean, "
2771
{III}f"but got an element with text: {{text!r}}"
2772
{II})
2773

2774
{I}return text in ('1', 'true')"""
2775
            ),
2776
            Stripped(
2777
                f"""\
2778
def _read_int_from_element_text(
2779
{I}element: Element,
2780
{I}iterator: Iterator[Tuple[str, Element]]
2781
) -> int:
2782
{I}\"\"\"
2783
{I}Parse the text of :paramref:`element` as an integer, and
2784
{I}read the corresponding end element from :paramref:`iterator`.
2785

2786
{I}:param element: start element
2787
{I}:param iterator:
2788
{II}Input stream of ``(event, element)`` coming from
2789
{II}:py:func:`xml.etree.ElementTree.iterparse` with the argument
2790
{II}``events=["start", "end"]``
2791
{I}:raise: :py:class:`DeserializationException` if unexpected input
2792
{I}:return: parsed value
2793
{I}\"\"\"
2794
{I}text = _read_text_from_element(
2795
{II}element,
2796
{II}iterator
2797
{I})
2798

2799
{I}try:
2800
{II}value = int(text)
2801
{I}except ValueError:
2802
{II}# pylint: disable=raise-missing-from
2803
{II}raise DeserializationException(
2804
{III}f"Expected an integer, "
2805
{III}f"but got an element with text: {{text!r}}"
2806
{II})
2807

2808
{I}return value"""
2809
            ),
2810
            Stripped(
2811
                f"""\
2812
_TEXT_TO_XS_DOUBLE_LITERALS = {{
2813
{I}"NaN": math.nan,
2814
{I}"INF": math.inf,
2815
{I}"-INF": -math.inf,
2816
}}"""
2817
            ),
2818
            Stripped(
2819
                f"""\
2820
def _read_float_from_element_text(
2821
{I}element: Element,
2822
{I}iterator: Iterator[Tuple[str, Element]]
2823
) -> float:
2824
{I}\"\"\"
2825
{I}Parse the text of :paramref:`element` as a floating-point number, and
2826
{I}read the corresponding end element from :paramref:`iterator`.
2827

2828
{I}:param element: start element
2829
{I}:param iterator:
2830
{II}Input stream of ``(event, element)`` coming from
2831
{II}:py:func:`xml.etree.ElementTree.iterparse` with the argument
2832
{II}``events=["start", "end"]``
2833
{I}:raise: :py:class:`DeserializationException` if unexpected input
2834
{I}:return: parsed value
2835
{I}\"\"\"
2836
{I}text = _read_text_from_element(
2837
{II}element,
2838
{II}iterator
2839
{I})
2840

2841
{I}value = _TEXT_TO_XS_DOUBLE_LITERALS.get(text, None)
2842
{I}if value is None:
2843
{II}try:
2844
{III}value = float(text)
2845
{II}except ValueError:
2846
{III}# pylint: disable=raise-missing-from
2847
{III}raise DeserializationException(
2848
{IIII}f"Expected a floating-point number, "
2849
{IIII}f"but got an element with text: {{text!r}}"
2850
{III})
2851

2852
{I}return value"""
2853
            ),
2854
            Stripped(
2855
                f"""\
2856
def _read_str_from_element_text(
2857
{I}element: Element,
2858
{I}iterator: Iterator[Tuple[str, Element]]
2859
) -> str:
2860
{I}\"\"\"
2861
{I}Parse the text of :paramref:`element` as a string, and
2862
{I}read the corresponding end element from :paramref:`iterator`.
2863

2864
{I}If there is no text, empty string is returned.
2865

2866
{I}:param element: start element
2867
{I}:param iterator:
2868
{II}Input stream of ``(event, element)`` coming from
2869
{II}:py:func:`xml.etree.ElementTree.iterparse` with the argument
2870
{II}``events=["start", "end"]``
2871
{I}:raise: :py:class:`DeserializationException` if unexpected input
2872
{I}:return: parsed value
2873
{I}\"\"\"
2874
{I}# NOTE (mristin, 2022-10-26):
2875
{I}# We do not use ``_read_text_from_element`` as that function expects
2876
{I}# the ``element`` to contain *some* text. In contrast, this function
2877
{I}# can also deal with empty text, in which case it returns an empty string.
2878

2879
{I}text = element.text
2880

2881
{I}end_element = _read_end_element(
2882
{II}element,
2883
{II}iterator
2884
{I})
2885

2886
{I}if text is None:
2887
{II}text = end_element.text
2888

2889
{I}_raise_if_has_tail_or_attrib(element)
2890
{I}result = (
2891
{II}text
2892
{II}if text is not None
2893
{II}else ""
2894
{I})
2895

2896
{I}return result"""
2897
            ),
2898
            Stripped(
2899
                f"""\
2900
def _read_bytes_from_element_text(
2901
{I}element: Element,
2902
{I}iterator: Iterator[Tuple[str, Element]]
2903
) -> bytes:
2904
{I}\"\"\"
2905
{I}Parse the text of :paramref:`element` as base64-encoded bytes, and
2906
{I}read the corresponding end element from :paramref:`iterator`.
2907

2908
{I}:param element: look-ahead element
2909
{I}:param iterator:
2910
{II}Input stream of ``(event, element)`` coming from
2911
{II}:py:func:`xml.etree.ElementTree.iterparse` with the argument
2912
{II}``events=["start", "end"]``
2913
{I}:raise: :py:class:`DeserializationException` if unexpected input
2914
{I}:return: parsed value
2915
{I}\"\"\"
2916
{I}text = _read_text_from_element(
2917
{II}element,
2918
{II}iterator
2919
{I})
2920

2921
{I}try:
2922
{II}value = base64.b64decode(text)
2923
{I}except Exception:
2924
{II}# pylint: disable=raise-missing-from
2925
{II}raise DeserializationException(
2926
{III}f"Expected a text as base64-encoded bytes, "
2927
{III}f"but got an element with text: {{text!r}}"
2928
{II})
2929

2930
{I}return value"""
2931
            ),
2932
        ]
2933
    )
2934

2935
    for our_type in symbol_table.our_types:
4✔
2936
        if isinstance(our_type, intermediate.Enumeration):
4✔
2937
            blocks.append(_generate_read_enum_from_element_text(enumeration=our_type))
4✔
2938
        elif isinstance(our_type, intermediate.ConstrainedPrimitive):
4✔
2939
            continue
4✔
2940
        elif isinstance(our_type, intermediate.AbstractClass):
4✔
2941
            blocks.append(_generate_read_cls_as_element(cls=our_type))
4✔
2942

2943
        elif isinstance(our_type, intermediate.ConcreteClass):
4✔
2944
            if our_type.is_implementation_specific:
4✔
2945
                implementation_key = specific_implementations.ImplementationKey(
×
2946
                    f"Xmlization/read_{our_type.name}.py"
2947
                )
2948

2949
                implementation = spec_impls.get(implementation_key, None)
×
2950
                if implementation is None:
×
2951
                    errors.append(
×
2952
                        Error(
2953
                            our_type.parsed.node,
2954
                            f"The xmlization snippet is missing "
2955
                            f"for the implementation-specific "
2956
                            f"class {our_type.name}: {implementation_key}",
2957
                        )
2958
                    )
2959
                    continue
×
2960
            else:
2961
                blocks.extend(
4✔
2962
                    [
2963
                        _generate_reader_and_setter(cls=our_type),
2964
                        _generate_read_as_sequence(cls=our_type),
2965
                    ]
2966
                )
2967

2968
                blocks.append(_generate_read_cls_as_element(cls=our_type))
4✔
2969

2970
        else:
2971
            assert_never(our_type)
×
2972

2973
    blocks.append(_generate_general_read_as_element(symbol_table=symbol_table))
4✔
2974

2975
    for cls in symbol_table.classes:
4✔
2976
        if isinstance(cls, intermediate.AbstractClass):
4✔
2977
            blocks.append(_generate_dispatch_map_for_class(cls=cls))
4✔
2978
        elif isinstance(cls, intermediate.ConcreteClass):
4✔
2979
            if len(cls.concrete_descendants) > 0:
4✔
2980
                blocks.append(_generate_dispatch_map_for_class(cls=cls))
4✔
2981

2982
            if not cls.is_implementation_specific:
4✔
2983
                blocks.append(_generate_reader_and_setter_map(cls=cls))
4✔
2984

2985
        else:
2986
            assert_never(cls)
×
2987

2988
    blocks.append(_generate_general_dispatch_map(symbol_table=symbol_table))
4✔
2989

2990
    blocks.append(Stripped("# endregion"))
4✔
2991

2992
    blocks.append(Stripped("# region Serialization"))
4✔
2993

2994
    blocks.append(_generate_serializer(symbol_table=symbol_table))
4✔
2995

2996
    blocks.append(
4✔
2997
        _generate_write_to_stream(
2998
            symbol_table=symbol_table, qualified_module_name=qualified_module_name
2999
        )
3000
    )
3001

3002
    blocks.append(
4✔
3003
        Stripped(
3004
            f"""\
3005
def to_str(that: aas_types.Class) -> str:
3006
{I}\"\"\"
3007
{I}Serialize :paramref:`that` to an XML-encoded text.
3008

3009
{I}:param that: instance to be serialized
3010
{I}:return: :paramref:`that` serialized to XML serialized to text
3011
{I}\"\"\"
3012
{I}writer = io.StringIO()
3013
{I}write(that, writer)
3014
{I}return writer.getvalue()"""
3015
        )
3016
    )
3017

3018
    blocks.append(Stripped("# endregion"))
4✔
3019

3020
    writer = io.StringIO()
4✔
3021
    for i, block in enumerate(blocks):
4✔
3022
        if i > 0:
4✔
3023
            writer.write("\n\n\n")
4✔
3024

3025
        writer.write(block)
4✔
3026

3027
    writer.write("\n")
4✔
3028

3029
    return writer.getvalue(), None
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc