• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

aas-core-works / aas-core-codegen / 16874751306

11 Aug 2025 08:24AM UTC coverage: 82.012% (-0.008%) from 82.02%
16874751306

Pull #547

github

web-flow
Merge 9f71c3eeb into e58baa57d
Pull Request #547: List of primitives: JSON Schema, Python

3 of 6 new or added lines in 1 file covered. (50.0%)

1 existing line in 1 file now uncovered.

28509 of 34762 relevant lines covered (82.01%)

4.1 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.23
/aas_core_codegen/python/xmlization/_generate.py
1
"""Generate Python code for XML-ization based on the intermediate representation."""
2

3
import io
5✔
4
import textwrap
5✔
5
from typing import Tuple, Optional, List, Union
5✔
6

7
from icontract import ensure, require
5✔
8

9
from aas_core_codegen import intermediate, specific_implementations, naming
5✔
10
from aas_core_codegen.common import (
5✔
11
    Error,
12
    Stripped,
13
    assert_never,
14
    Identifier,
15
    indent_but_first_line,
16
)
17
from aas_core_codegen.python import common as python_common, naming as python_naming
5✔
18
from aas_core_codegen.python.common import (
5✔
19
    INDENT as I,
20
    INDENT2 as II,
21
    INDENT3 as III,
22
    INDENT4 as IIII,
23
)
24

25

26
def _generate_module_docstring(
5✔
27
    symbol_table: intermediate.SymbolTable,
28
    aas_module: python_common.QualifiedModuleName,
29
) -> Stripped:
30
    """Generate the docstring of the whole module."""
31
    first_cls = (
5✔
32
        symbol_table.concrete_classes[0]
33
        if len(symbol_table.concrete_classes) > 0
34
        else None
35
    )
36

37
    docstring_blocks = [
5✔
38
        Stripped(
39
            f"""\
40
Read and write AAS models as XML.
41

42
For reading, we provide different reading functions, each handling a different kind
43
of input. All the reading functions operate in one pass, *i.e.*, the source is read
44
incrementally and the complete XML is not held in memory.
45

46
We provide the following four reading functions (where ``X`` represents the name of
47
the class):
48

49
1) ``X_from_iterparse`` reads from a stream of ``(event, element)`` tuples coming from
50
   :py:func:`xml.etree.ElementTree.iterparse` with the argument
51
   ``events=["start", "end"]``. If you do not trust the source, please consider
52
   using `defusedxml.ElementTree`_.
53
2) ``X_from_stream`` reads from the given text stream.
54
3) ``X_from_file`` reads from a file on disk.
55
4) ``X_from_str`` reads from the given string.
56

57
The functions ``X_from_stream``, ``X_from_file`` and ``X_from_str`` provide
58
an extra parameter, ``has_iterparse``, which allows you to use a parsing library
59
different from :py:mod:`xml.etree.ElementTree`. For example, you can pass in
60
`defusedxml.ElementTree`_.
61

62
.. _defusedxml.ElementTree: https://pypi.org/project/defusedxml/#defusedxml-elementtree
63

64
All XML elements are expected to live in the :py:attr:`~NAMESPACE`.
65

66
For writing, use the function :py:func:`{aas_module}.xmlization.write` which
67
translates the instance of the model into an XML document and writes it in one pass
68
to the stream."""
69
        )
70
    ]
71

72
    if first_cls is not None:
5✔
73
        read_first_cls_from_file = python_naming.function_name(
5✔
74
            Identifier(f"read_{first_cls.name}_from_file")
75
        )
76

77
        first_cls_name = python_naming.class_name(first_cls.name)
5✔
78

79
        docstring_blocks.append(
1✔
80
            Stripped(
81
                f"""\
82
Here is an example usage how to de-serialize from a file:
83

84
.. code-block::
85

86
    import pathlib
87
    import xml.etree.ElementTree as ET
88

89
    import {aas_module}.xmlization as aas_xmlization
90

91
    path = pathlib.Path(...)
92
    instance = aas_xmlization.{read_first_cls_from_file}(
93
        path
94
    )
95

96
    # Do something with the ``instance``
97

98
Here is another code example where we serialize the instance:
99

100
.. code-block::
101

102
    import pathlib
103

104
    import {aas_module}.types as aas_types
105
    import {aas_module}.xmlization as aas_xmlization
106

107
    instance = {first_cls_name}(
108
       ... # some constructor arguments
109
    )
110

111
    pth = pathlib.Path(...)
112
    with pth.open("wt") as fid:
113
        aas_xmlization.write(instance, fid)"""
114
            )
115
        )
116

117
    escaped_text = "\n\n".join(docstring_blocks).replace('"""', '\\"\\"\\"')
5✔
118
    return Stripped(
5✔
119
        f"""\
120
\"\"\"
121
{escaped_text}
122
\"\"\""""
123
    )
124

125

126
def _generate_read_enum_from_element_text(
5✔
127
    enumeration: intermediate.Enumeration,
128
) -> Stripped:
129
    """Generate the reading function from an element's text for ``enumeration``."""
130
    enum_name = python_naming.enum_name(identifier=enumeration.name)
5✔
131

132
    function_name = python_naming.private_function_name(
5✔
133
        Identifier(f"read_{enumeration.name}_from_element_text")
134
    )
135

136
    enum_from_str = python_naming.function_name(
5✔
137
        Identifier(f"{enumeration.name}_from_str")
138
    )
139

140
    return Stripped(
5✔
141
        f"""\
142
def {function_name}(
143
{I}element: Element,
144
{I}iterator: Iterator[Tuple[str, Element]]
145
) -> aas_types.{enum_name}:
146
{I}\"\"\"
147
{I}Parse the text of :paramref:`element` as a literal of
148
{I}:py:class:`.types.{enum_name}`, and read the corresponding
149
{I}end element from :paramref:`iterator`.
150

151
{I}:param element: start element
152
{I}:param iterator:
153
{II}Input stream of ``(event, element)`` coming from
154
{II}:py:func:`xml.etree.ElementTree.iterparse` with the argument
155
{II}``events=["start", "end"]``
156
{I}:raise: :py:class:`DeserializationException` if unexpected input
157
{I}:return: parsed value
158
{I}\"\"\"
159
{I}text = _read_text_from_element(
160
{II}element,
161
{II}iterator
162
{I})
163

164
{I}literal = aas_stringification.{enum_from_str}(text)
165
{I}if literal is None:
166
{II}raise DeserializationException(
167
{III}f"Not a valid string representation of "
168
{III}f"a literal of {enum_name}: {{text}}"
169
{II})
170

171
{I}return literal"""
172
    )
173

174

175
def _generate_read_cls_from_iterparse(
5✔
176
    cls: Union[intermediate.AbstractClass, intermediate.ConcreteClass],
177
    aas_module: python_common.QualifiedModuleName,
178
) -> Stripped:
179
    """Generate the public function for the reading for a ``cls``."""
180
    function_name = python_naming.function_name(
5✔
181
        Identifier(f"{cls.name}_from_iterparse")
182
    )
183

184
    cls_name = python_naming.class_name(cls.name)
5✔
185

186
    wrapped_function_name = python_naming.function_name(
5✔
187
        Identifier(f"_read_{cls.name}_as_element")
188
    )
189

190
    return Stripped(
5✔
191
        f"""\
192
def {function_name}(
193
{I}iterator: Iterator[Tuple[str, Element]]
194
) -> aas_types.{cls_name}:
195
{I}\"\"\"
196
{I}Read an instance of :py:class:`.types.{cls_name}` from
197
{I}the :paramref:`iterator`.
198

199
{I}Example usage:
200

201
{I}.. code-block::
202

203
{I}    import pathlib
204
{I}    import xml.etree.ElementTree as ET
205

206
{I}    import {aas_module}.xmlization as aas_xmlization
207

208
{I}    path = pathlib.Path(...)
209
{I}    with path.open("rt") as fid:
210
{I}        iterator = ET.iterparse(
211
{I}            source=fid,
212
{I}            events=['start', 'end']
213
{I}        )
214
{I}        instance = aas_xmlization.{function_name}(
215
{I}            iterator
216
{I}        )
217

218
{I}    # Do something with the ``instance``
219

220
{I}:param iterator:
221
{II}Input stream of ``(event, element)`` coming from
222
{II}:py:func:`xml.etree.ElementTree.iterparse` with the argument
223
{II}``events=["start", "end"]``
224
{I}:raise: :py:class:`DeserializationException` if unexpected input
225
{I}:return:
226
{II}Instance of :py:class:`.types.{cls_name}` read from
227
{II}:paramref:`iterator`
228
{I}\"\"\"
229
{I}next_event_element = next(iterator, None)
230
{I}if next_event_element is None:
231
{II}raise DeserializationException(
232
{III}# fmt: off
233
{III}"Expected the start element for {cls_name}, "
234
{III}"but got the end-of-input"
235
{III}# fmt: on
236
{II})
237

238
{I}next_event, next_element = next_event_element
239
{I}if next_event != 'start':
240
{II}raise DeserializationException(
241
{III}f"Expected the start element for {cls_name}, "
242
{III}f"but got event {{next_event!r}} and element {{next_element.tag!r}}"
243
{II})
244

245
{I}try:
246
{II}return {wrapped_function_name}(
247
{III}next_element,
248
{III}iterator
249
{II})
250
{I}except DeserializationException as exception:
251
{II}exception.path._prepend(ElementSegment(next_element))
252
{II}raise exception"""
253
    )
254

255

256
def _generate_read_cls_from_stream(
5✔
257
    cls: Union[intermediate.AbstractClass, intermediate.ConcreteClass],
258
    aas_module: python_common.QualifiedModuleName,
259
) -> Stripped:
260
    """Generate the public function for the reading of a ``cls`` from a stream."""
261
    function_name = python_naming.function_name(Identifier(f"{cls.name}_from_stream"))
5✔
262

263
    from_iterparse_name = python_naming.function_name(
5✔
264
        Identifier(f"{cls.name}_from_iterparse")
265
    )
266

267
    cls_name = python_naming.class_name(cls.name)
5✔
268

269
    return Stripped(
5✔
270
        f"""\
271
def {function_name}(
272
{I}stream: TextIO,
273
{I}has_iterparse: HasIterparse = xml.etree.ElementTree
274
) -> aas_types.{cls_name}:
275
{I}\"\"\"
276
{I}Read an instance of :py:class:`.types.{cls_name}` from
277
{I}the :paramref:`stream`.
278

279
{I}Example usage:
280

281
{I}.. code-block::
282

283
{I}    import {aas_module}.xmlization as aas_xmlization
284

285
{I}    with open_some_stream_over_network(...) as stream:
286
{I}        instance = aas_xmlization.{function_name}(
287
{I}            stream
288
{I}        )
289

290
{I}    # Do something with the ``instance``
291

292
{I}:param stream:
293
{II}representing an instance of
294
{II}:py:class:`.types.{cls_name}` in XML
295
{I}:param has_iterparse:
296
{II}Module containing ``iterparse`` function.
297

298
{II}Default is to use :py:mod:`xml.etree.ElementTree` from the standard
299
{II}library. If you have to deal with malicious input, consider using
300
{II}a library such as `defusedxml.ElementTree`_.
301
{I}:raise: :py:class:`DeserializationException` if unexpected input
302
{I}:return:
303
{II}Instance of :py:class:`.types.{cls_name}` read from
304
{II}:paramref:`stream`
305
{I}\"\"\"
306
{I}iterator = has_iterparse.iterparse(
307
{II}stream,
308
{II}['start', 'end']
309
{I})
310
{I}return {from_iterparse_name}(
311
{II}_with_elements_cleared_after_yield(iterator)
312
{I})"""
313
    )
314

315

316
def _generate_read_cls_from_file(
5✔
317
    cls: Union[intermediate.AbstractClass, intermediate.ConcreteClass],
318
    aas_module: python_common.QualifiedModuleName,
319
) -> Stripped:
320
    """Generate the public function for the reading of a ``cls`` from a file."""
321
    function_name = python_naming.function_name(Identifier(f"{cls.name}_from_file"))
5✔
322

323
    from_iterparse_name = python_naming.function_name(
5✔
324
        Identifier(f"{cls.name}_from_iterparse")
325
    )
326

327
    cls_name = python_naming.class_name(cls.name)
5✔
328

329
    return Stripped(
5✔
330
        f"""\
331
def {function_name}(
332
{I}path: PathLike,
333
{I}has_iterparse: HasIterparse = xml.etree.ElementTree
334
) -> aas_types.{cls_name}:
335
{I}\"\"\"
336
{I}Read an instance of :py:class:`.types.{cls_name}` from
337
{I}the :paramref:`path`.
338

339
{I}Example usage:
340

341
{I}.. code-block::
342

343
{I}    import pathlib
344
{I}    import {aas_module}.xmlization as aas_xmlization
345

346
{I}    path = pathlib.Path(...)
347
{I}    instance = aas_xmlization.{function_name}(
348
{I}        path
349
{I}    )
350

351
{I}    # Do something with the ``instance``
352

353
{I}:param path:
354
{II}to the file representing an instance of
355
{II}:py:class:`.types.{cls_name}` in XML
356
{I}:param has_iterparse:
357
{II}Module containing ``iterparse`` function.
358

359
{II}Default is to use :py:mod:`xml.etree.ElementTree` from the standard
360
{II}library. If you have to deal with malicious input, consider using
361
{II}a library such as `defusedxml.ElementTree`_.
362
{I}:raise: :py:class:`DeserializationException` if unexpected input
363
{I}:return:
364
{II}Instance of :py:class:`.types.{cls_name}` read from
365
{II}:paramref:`path`
366
{I}\"\"\"
367
{I}with open(os.fspath(path), "rt", encoding='utf-8') as fid:
368
{II}iterator = has_iterparse.iterparse(
369
{III}fid,
370
{III}['start', 'end']
371
{II})
372
{II}return {from_iterparse_name}(
373
{III}_with_elements_cleared_after_yield(iterator)
374
{II})"""
375
    )
376

377

378
def _generate_read_cls_from_str(
5✔
379
    cls: Union[intermediate.AbstractClass, intermediate.ConcreteClass],
380
    aas_module: python_common.QualifiedModuleName,
381
) -> Stripped:
382
    """Generate the public function for the reading of a ``cls`` from a string."""
383
    function_name = python_naming.function_name(Identifier(f"{cls.name}_from_str"))
5✔
384

385
    from_iterparse_name = python_naming.function_name(
5✔
386
        Identifier(f"{cls.name}_from_iterparse")
387
    )
388

389
    cls_name = python_naming.class_name(cls.name)
5✔
390

391
    return Stripped(
5✔
392
        f"""\
393
def {function_name}(
394
{I}text: str,
395
{I}has_iterparse: HasIterparse = xml.etree.ElementTree
396
) -> aas_types.{cls_name}:
397
{I}\"\"\"
398
{I}Read an instance of :py:class:`.types.{cls_name}` from
399
{I}the :paramref:`text`.
400

401
{I}Example usage:
402

403
{I}.. code-block::
404

405
{I}    import pathlib
406
{I}    import {aas_module}.xmlization as aas_xmlization
407

408
{I}    text = "<...>...</...>"
409
{I}    instance = aas_xmlization.{function_name}(
410
{I}        text
411
{I}    )
412

413
{I}    # Do something with the ``instance``
414

415
{I}:param text:
416
{II}representing an instance of
417
{II}:py:class:`.types.{cls_name}` in XML
418
{I}:param has_iterparse:
419
{II}Module containing ``iterparse`` function.
420

421
{II}Default is to use :py:mod:`xml.etree.ElementTree` from the standard
422
{II}library. If you have to deal with malicious input, consider using
423
{II}a library such as `defusedxml.ElementTree`_.
424
{I}:raise: :py:class:`DeserializationException` if unexpected input
425
{I}:return:
426
{II}Instance of :py:class:`.types.{cls_name}` read from
427
{II}:paramref:`text`
428
{I}\"\"\"
429
{I}iterator = has_iterparse.iterparse(
430
{II}io.StringIO(text),
431
{II}['start', 'end']
432
{I})
433
{I}return {from_iterparse_name}(
434
{II}_with_elements_cleared_after_yield(iterator)
435
{I})"""
436
    )
437

438

439
# fmt: off
440
@require(
5✔
441
    lambda cls:
442
    not isinstance(cls, intermediate.AbstractClass)
443
    or len(cls.concrete_descendants) > 0,
444
    "All abstract classes must have concrete descendants; otherwise we can not dispatch"
445
)
446
# fmt: on
447
def _generate_read_cls_as_element(
5✔
448
    cls: Union[intermediate.AbstractClass, intermediate.ConcreteClass]
449
) -> Stripped:
450
    """Generate the read function to dispatch or read a concrete instance of ``cls``."""
451

452
    if len(cls.concrete_descendants) > 0:
5✔
453
        dispatch_map = python_naming.private_constant_name(
5✔
454
            Identifier(f"dispatch_for_{cls.name}")
455
        )
456

457
        cls_name = python_naming.class_name(cls.name)
5✔
458

459
        body = Stripped(
5✔
460
            f"""\
461
tag_wo_ns = _parse_element_tag(element)
462
read_as_sequence = {dispatch_map}.get(
463
{I}tag_wo_ns,
464
{I}None
465
)
466

467
if read_as_sequence is None:
468
{I}raise DeserializationException(
469
{II}f"Expected the element tag to be a valid model type "
470
{II}f"of a concrete instance of '{cls_name}', "
471
{II}f"but got tag {{tag_wo_ns!r}}"
472
{I})
473

474
return read_as_sequence(
475
{I}element,
476
{I}iterator
477
)"""
478
        )
479
    else:
480
        xml_cls = naming.xml_class_name(cls.name)
5✔
481
        xml_cls_literal = python_common.string_literal(xml_cls)
5✔
482

483
        read_as_sequence_function_name = python_naming.function_name(
5✔
484
            Identifier(f"_read_{cls.name}_as_sequence")
485
        )
486

487
        body = Stripped(
5✔
488
            f"""\
489
tag_wo_ns = _parse_element_tag(element)
490

491
if tag_wo_ns != {xml_cls_literal}:
492
{I}raise DeserializationException(
493
{II}f"Expected the element with the tag '{xml_cls}', "
494
{II}f"but got tag: {{tag_wo_ns}}"
495
{I})
496

497
return {read_as_sequence_function_name}(
498
{I}element,
499
{I}iterator
500
)"""
501
        )
502

503
    function_name = python_naming.function_name(
5✔
504
        Identifier(f"_read_{cls.name}_as_element")
505
    )
506

507
    cls_name = python_naming.class_name(cls.name)
5✔
508

509
    return Stripped(
5✔
510
        f"""\
511
def {function_name}(
512
{I}element: Element,
513
{I}iterator: Iterator[Tuple[str, Element]]
514
) -> aas_types.{cls_name}:
515
{I}\"\"\"
516
{I}Read an instance of :py:class:`.types.{cls_name}` from
517
{I}:paramref:`iterator`, including the end element.
518

519
{I}:param element: start element
520
{I}:param iterator:
521
{II}Input stream of ``(event, element)`` coming from
522
{II}:py:func:`xml.etree.ElementTree.iterparse` with the argument
523
{II}``events=["start", "end"]``
524
{I}:raise: :py:class:`DeserializationException` if unexpected input
525
{I}:return: parsed instance
526
{I}\"\"\"
527
{I}{indent_but_first_line(body, I)}"""
528
    )
529

530

531
def _generate_read_from_iterparse(
5✔
532
    aas_module: python_common.QualifiedModuleName,
533
) -> Stripped:
534
    """Generate the general read function to parse an instance from iterparse."""
535
    function_name = "from_iterparse"
5✔
536

537
    return Stripped(
5✔
538
        f"""\
539
def {function_name}(
540
{I}iterator: Iterator[Tuple[str, Element]]
541
) -> aas_types.Class:
542
{I}\"\"\"
543
{I}Read an instance from the :paramref:`iterator`.
544

545
{I}The type of the instance is determined by the very first start element.
546

547
{I}Example usage:
548

549
{I}.. code-block::
550

551
{I}    import pathlib
552
{I}    import xml.etree.ElementTree as ET
553

554
{I}    import {aas_module}.xmlization as aas_xmlization
555

556
{I}    path = pathlib.Path(...)
557
{I}    with path.open("rt") as fid:
558
{I}        iterator = ET.iterparse(
559
{I}            source=fid,
560
{I}            events=['start', 'end']
561
{I}        )
562
{I}        instance = aas_xmlization.{function_name}(
563
{I}            iterator
564
{I}        )
565

566
{I}    # Do something with the ``instance``
567

568
{I}:param iterator:
569
{II}Input stream of ``(event, element)`` coming from
570
{II}:py:func:`xml.etree.ElementTree.iterparse` with the argument
571
{II}``events=["start", "end"]``
572
{I}:raise: :py:class:`DeserializationException` if unexpected input
573
{I}:return:
574
{II}Instance of :py:class:`.types.Class` read from the :paramref:`iterator`
575
{I}\"\"\"
576
{I}next_event_element = next(iterator, None)
577
{I}if next_event_element is None:
578
{II}raise DeserializationException(
579
{III}# fmt: off
580
{III}"Expected the start element of an instance, "
581
{III}"but got the end-of-input"
582
{III}# fmt: on
583
{II})
584

585
{I}next_event, next_element = next_event_element
586
{I}if next_event != 'start':
587
{II}raise DeserializationException(
588
{III}f"Expected the start element of an instance, "
589
{III}f"but got event {{next_event!r}} and element {{next_element.tag!r}}"
590
{II})
591

592
{I}try:
593
{II}return _read_as_element(
594
{III}next_element,
595
{III}iterator
596
{II})
597
{I}except DeserializationException as exception:
598
{II}exception.path._prepend(ElementSegment(next_element))
599
{II}raise exception"""
600
    )
601

602

603
def _generate_read_from_stream(
5✔
604
    aas_module: python_common.QualifiedModuleName,
605
) -> Stripped:
606
    """Generate the general read function to parse an instance from a text stream."""
607
    function_name = python_naming.function_name(Identifier("from_stream"))
5✔
608

609
    return Stripped(
5✔
610
        f"""\
611
def {function_name}(
612
{I}stream: TextIO,
613
{I}has_iterparse: HasIterparse = xml.etree.ElementTree
614
) -> aas_types.Class:
615
{I}\"\"\"
616
{I}Read an instance from the :paramref:`stream`.
617

618
{I}The type of the instance is determined by the very first start element.
619

620
{I}Example usage:
621

622
{I}.. code-block::
623

624
{I}    import {aas_module}.xmlization as aas_xmlization
625

626
{I}    with open_some_stream_over_network(...) as stream:
627
{I}        instance = aas_xmlization.{function_name}(
628
{I}            stream
629
{I}        )
630

631
{I}    # Do something with the ``instance``
632

633
{I}:param stream:
634
{II}representing an instance in XML
635
{I}:param has_iterparse:
636
{II}Module containing ``iterparse`` function.
637

638
{II}Default is to use :py:mod:`xml.etree.ElementTree` from the standard
639
{II}library. If you have to deal with malicious input, consider using
640
{II}a library such as `defusedxml.ElementTree`_.
641
{I}:raise: :py:class:`DeserializationException` if unexpected input
642
{I}:return:
643
{II}Instance read from :paramref:`stream`
644
{I}\"\"\"
645
{I}iterator = has_iterparse.iterparse(
646
{II}stream,
647
{II}['start', 'end']
648
{I})
649
{I}return from_iterparse(
650
{II}_with_elements_cleared_after_yield(iterator)
651
{I})"""
652
    )
653

654

655
def _generate_read_from_file(aas_module: python_common.QualifiedModuleName) -> Stripped:
5✔
656
    """Generate the general read function to parse an instance from a file."""
657
    function_name = python_naming.function_name(Identifier("from_file"))
5✔
658

659
    return Stripped(
5✔
660
        f"""\
661
def {function_name}(
662
{I}path: PathLike,
663
{I}has_iterparse: HasIterparse = xml.etree.ElementTree
664
) -> aas_types.Class:
665
{I}\"\"\"
666
{I}Read an instance from the file at the :paramref:`path`.
667

668
{I}Example usage:
669

670
{I}.. code-block::
671

672
{I}    import pathlib
673
{I}    import {aas_module}.xmlization as aas_xmlization
674

675
{I}    path = pathlib.Path(...)
676
{I}    instance = aas_xmlization.{function_name}(
677
{I}        path
678
{I}    )
679

680
{I}    # Do something with the ``instance``
681

682
{I}:param path:
683
{II}to the file representing an instance in XML
684
{I}:param has_iterparse:
685
{II}Module containing ``iterparse`` function.
686

687
{II}Default is to use :py:mod:`xml.etree.ElementTree` from the standard
688
{II}library. If you have to deal with malicious input, consider using
689
{II}a library such as `defusedxml.ElementTree`_.
690
{I}:raise: :py:class:`DeserializationException` if unexpected input
691
{I}:return:
692
{II}Instance read from the file at :paramref:`path`
693
{I}\"\"\"
694
{I}with open(os.fspath(path), "rt", encoding='utf-8') as fid:
695
{II}iterator = has_iterparse.iterparse(
696
{III}fid,
697
{III}['start', 'end']
698
{II})
699
{II}return from_iterparse(
700
{III}_with_elements_cleared_after_yield(iterator)
701
{II})"""
702
    )
703

704

705
def _generate_read_from_str(aas_module: python_common.QualifiedModuleName) -> Stripped:
5✔
706
    """Generate the general read function to parse an instance from a string."""
707
    function_name = python_naming.function_name(Identifier("from_str"))
5✔
708

709
    return Stripped(
5✔
710
        f"""\
711
def {function_name}(
712
{I}text: str,
713
{I}has_iterparse: HasIterparse = xml.etree.ElementTree
714
) -> aas_types.Class:
715
{I}\"\"\"
716
{I}Read an instance from the :paramref:`text`.
717

718
{I}Example usage:
719

720
{I}.. code-block::
721

722
{I}    import pathlib
723
{I}    import {aas_module}.xmlization as aas_xmlization
724

725
{I}    text = "<...>...</...>"
726
{I}    instance = aas_xmlization.{function_name}(
727
{I}        text
728
{I}    )
729

730
{I}    # Do something with the ``instance``
731

732
{I}:param text:
733
{II}representing an instance in XML
734
{I}:param has_iterparse:
735
{II}Module containing ``iterparse`` function.
736

737
{II}Default is to use :py:mod:`xml.etree.ElementTree` from the standard
738
{II}library. If you have to deal with malicious input, consider using
739
{II}a library such as `defusedxml.ElementTree`_.
740
{I}:raise: :py:class:`DeserializationException` if unexpected input
741
{I}:return:
742
{II}Instance read from :paramref:`text`
743
{I}\"\"\"
744
{I}iterator = has_iterparse.iterparse(
745
{II}io.StringIO(text),
746
{II}['start', 'end']
747
{I})
748
{I}return from_iterparse(
749
{II}_with_elements_cleared_after_yield(iterator)
750
{I})"""
751
    )
752

753

754
def _generate_general_read_as_element(
5✔
755
    symbol_table: intermediate.SymbolTable,
756
) -> Stripped:
757
    """Generate the general read function to dispatch on concrete classes."""
758
    dispatch_map = python_naming.private_constant_name(Identifier("general_dispatch"))
5✔
759

760
    body = Stripped(
5✔
761
        f"""\
762
tag_wo_ns = _parse_element_tag(element)
763
read_as_sequence = {dispatch_map}.get(
764
{I}tag_wo_ns,
765
{I}None
766
)
767

768
if read_as_sequence is None:
769
{I}raise DeserializationException(
770
{II}f"Expected the element tag to be a valid model type "
771
{II}f"of a concrete instance, "
772
{II}f"but got tag {{tag_wo_ns!r}}"
773
{I})
774

775
return read_as_sequence(
776
{I}element,
777
{I}iterator
778
)"""
779
    )
780

781
    return Stripped(
5✔
782
        f"""\
783
def _read_as_element(
784
{I}element: Element,
785
{I}iterator: Iterator[Tuple[str, Element]]
786
) -> aas_types.Class:
787
{I}\"\"\"
788
{I}Read an instance from :paramref:`iterator`, including the end element.
789

790
{I}:param element: start element
791
{I}:param iterator:
792
{II}Input stream of ``(event, element)`` coming from
793
{II}:py:func:`xml.etree.ElementTree.iterparse` with the argument
794
{II}``events=["start", "end"]``
795
{I}:raise: :py:class:`DeserializationException` if unexpected input
796
{I}:return: parsed instance
797
{I}\"\"\"
798
{I}{indent_but_first_line(body, I)}"""
799
    )
800

801

802
_READ_FUNCTION_BY_PRIMITIVE_TYPE = {
5✔
803
    intermediate.PrimitiveType.BOOL: "_read_bool_from_element_text",
804
    intermediate.PrimitiveType.INT: "_read_int_from_element_text",
805
    intermediate.PrimitiveType.FLOAT: "_read_float_from_element_text",
806
    intermediate.PrimitiveType.STR: "_read_str_from_element_text",
807
    intermediate.PrimitiveType.BYTEARRAY: "_read_bytes_from_element_text",
808
}
809
assert all(
5✔
810
    literal in _READ_FUNCTION_BY_PRIMITIVE_TYPE
811
    for literal in intermediate.PrimitiveType
812
)
813

814

815
def _generate_reader_and_setter(cls: intermediate.ConcreteClass) -> Stripped:
5✔
816
    """Generate the ``ReaderAndSetterFor{cls}``."""
817
    methods = []  # type: List[Stripped]
5✔
818

819
    cls_name = python_naming.class_name(cls.name)
5✔
820

821
    init_writer = io.StringIO()
5✔
822
    for i, prop in enumerate(cls.properties):
5✔
823
        prop_name = python_naming.property_name(prop.name)
5✔
824
        prop_type = python_common.generate_type(
5✔
825
            prop.type_annotation, types_module=Identifier("aas_types")
826
        )
827

828
        # NOTE (mristin, 2022-07-22):
829
        # We make all the properties optional since we switch over the properties
830
        # during the de-serialization.
831
        if not isinstance(prop.type_annotation, intermediate.OptionalTypeAnnotation):
5✔
832
            prop_type = Stripped(f"Optional[{prop_type}]")
5✔
833

834
        if i > 0:
5✔
835
            init_writer.write("\n")
5✔
836
        init_writer.write(f"self.{prop_name}: {prop_type} = None")
5✔
837

838
    methods.append(
5✔
839
        Stripped(
840
            f"""\
841
def __init__(self) -> None:
842
{I}\"\"\"Initialize with all the properties unset.\"\"\"
843
{I}{indent_but_first_line(init_writer.getvalue(), I)}"""
844
        )
845
    )
846

847
    for prop in cls.properties:
5✔
848
        type_anno = intermediate.beneath_optional(prop.type_annotation)
5✔
849

850
        prop_name = python_naming.property_name(prop.name)
5✔
851

852
        method_body: Stripped
853

854
        if isinstance(type_anno, intermediate.PrimitiveTypeAnnotation) or (
5✔
855
            isinstance(type_anno, intermediate.OurTypeAnnotation)
856
            and isinstance(type_anno.our_type, intermediate.ConstrainedPrimitive)
857
        ):
858
            primitive_type = intermediate.try_primitive_type(type_anno)
5✔
859
            assert primitive_type is not None
5✔
860

861
            read_function = _READ_FUNCTION_BY_PRIMITIVE_TYPE[primitive_type]
5✔
862

863
            method_body = Stripped(
5✔
864
                f"""\
865
self.{prop_name} = {read_function}(
866
{I}element,
867
{I}iterator
868
)"""
869
            )
870

871
        elif isinstance(type_anno, intermediate.OurTypeAnnotation):
5✔
872
            our_type = type_anno.our_type
5✔
873
            if isinstance(our_type, intermediate.Enumeration):
5✔
874
                read_function = python_naming.private_function_name(
5✔
875
                    Identifier(f"read_{our_type.name}_from_element_text")
876
                )
877

878
                method_body = Stripped(
5✔
879
                    f"""\
880
self.{prop_name} = {read_function}(
881
{I}element,
882
{I}iterator
883
)"""
884
                )
885

886
            elif isinstance(our_type, intermediate.ConstrainedPrimitive):
5✔
887
                raise AssertionError(
×
888
                    f"Expected {intermediate.ConstrainedPrimitive.__name__} "
889
                    f"to have been handled before"
890
                )
891

892
            elif isinstance(
5✔
893
                our_type, (intermediate.AbstractClass, intermediate.ConcreteClass)
894
            ):
895
                prop_cls_name = python_naming.class_name(our_type.name)
5✔
896

897
                if len(our_type.concrete_descendants) > 0:
5✔
898
                    read_prop_cls_as_element = python_naming.function_name(
5✔
899
                        Identifier(f"_read_{our_type.name}_as_element")
900
                    )
901

902
                    method_body = Stripped(
5✔
903
                        f"""\
904
next_event_element = next(iterator, None)
905
if next_event_element is None:
906
{I}raise DeserializationException(
907
{II}"Expected a discriminator start element corresponding "
908
{II}"to {prop_cls_name}, but got end-of-input"
909
{I})
910

911
next_event, next_element = next_event_element
912
if next_event != 'start':
913
{I}raise DeserializationException(
914
{II}f"Expected a discriminator start element corresponding "
915
{II}f"to {prop_cls_name}, "
916
{II}f"but got event {{next_event!r}} and element {{next_element.tag!r}}"
917
{I})
918

919
try:
920
{I}result = {read_prop_cls_as_element}(
921
{II}next_element,
922
{II}iterator
923
{I})
924
except DeserializationException as exception:
925
{I}exception.path._prepend(ElementSegment(next_element))
926
{I}raise
927

928
_read_end_element(element, iterator)
929

930
self.{prop_name} = result"""
931
                    )
932
                else:
933
                    read_prop_cls_as_sequence = python_naming.function_name(
5✔
934
                        Identifier(f"_read_{our_type.name}_as_sequence")
935
                    )
936

937
                    method_body = Stripped(
5✔
938
                        f"""\
939
self.{prop_name} = {read_prop_cls_as_sequence}(
940
{I}element,
941
{I}iterator
942
)"""
943
                    )
944

945
        elif isinstance(type_anno, intermediate.ListTypeAnnotation):
5✔
946
            if isinstance(
5✔
947
                type_anno.items, intermediate.OurTypeAnnotation
948
            ) and isinstance(
949
                type_anno.items.our_type,
950
                (intermediate.AbstractClass, intermediate.ConcreteClass),
951
            ):
952
                read_item_cls_as_element = python_naming.function_name(
5✔
953
                    Identifier(f"_read_{type_anno.items.our_type.name}_as_element")
954
                )
955

956
                items_type = python_common.generate_type(
5✔
957
                    type_anno.items, types_module=Identifier("aas_types")
958
                )
NEW
959
            elif isinstance(type_anno.items, intermediate.PrimitiveTypeAnnotation):
×
NEW
960
                read_item_cls_as_element = Identifier(
×
961
                    _READ_FUNCTION_BY_PRIMITIVE_TYPE[type_anno.items.a_type]
962
                )
NEW
963
                items_type = Stripped(str(type_anno.items.a_type))
×
964
            else:
UNCOV
965
                raise AssertionError(
×
966
                    "(mristin, 2022-10-09) We handle only lists of classes and primitive types"
967
                    "in the XML de-serialization at the moment. The meta-model does not contain "
968
                    "any other lists, so we wanted to keep the code as simple as "
969
                    "possible, and avoid unrolling. Please contact the developers "
970
                    "if you need this feature."
971
                )
972

973
            method_body = Stripped(
5✔
974
                f"""\
975
if element.text is not None and len(element.text.strip()) != 0:
976
{I}raise DeserializationException(
977
{II}f"Expected only item elements and whitespace text, "
978
{II}f"but got text: {{element.text!r}}"
979
{I})
980

981
result: List[
982
{I}{items_type}
983
] = []
984

985
item_i = 0
986

987
while True:
988
{I}next_event_element = next(iterator, None)
989
{I}if next_event_element is None:
990
{II}raise DeserializationException(
991
{III}"Expected one or more items from a list or the end element, "
992
{III}"but got end-of-input"
993
{II})
994

995
{I}next_event, next_element = next_event_element
996
{I}if next_event == 'end' and next_element.tag == element.tag:
997
{II}# We reached the end of the list.
998
{II}break
999

1000
{I}if next_event != 'start':
1001
{II}raise DeserializationException(
1002
{III}"Expected a start element corresponding to an item, "
1003
{III}f"but got event {{next_event!r}} and element {{next_element.tag!r}}"
1004
{II})
1005

1006
{I}try:
1007
{II}item = {read_item_cls_as_element}(
1008
{III}next_element,
1009
{III}iterator
1010
{II})
1011
{I}except DeserializationException as exception:
1012
{II}exception.path._prepend(IndexSegment(next_element, item_i))
1013
{II}raise
1014

1015
{I}result.append(item)
1016
{I}item_i += 1
1017

1018
self.{prop_name} = result"""
1019
            )
1020

1021
        else:
1022
            assert_never(type_anno)
×
1023
            raise AssertionError("Unexpected execution path")
×
1024

1025
        method_name = python_naming.method_name(Identifier(f"read_and_set_{prop.name}"))
5✔
1026
        methods.append(
5✔
1027
            Stripped(
1028
                f"""\
1029
def {method_name}(
1030
{I}self,
1031
{I}element: Element,
1032
{I}iterator: Iterator[Tuple[str, Element]]
1033
) -> None:
1034
{I}\"\"\"
1035
{I}Read :paramref:`element` as the property
1036
{I}:py:attr:`.types.{cls_name}.{prop_name}` and set it.
1037
{I}\"\"\"
1038
{I}{indent_but_first_line(method_body, I)}"""
1039
            )
1040
        )
1041

1042
    reader_and_setter_name = python_naming.private_class_name(
5✔
1043
        Identifier(f"Reader_and_setter_for_{cls.name}")
1044
    )
1045

1046
    writer = io.StringIO()
5✔
1047
    writer.write(
5✔
1048
        f"""\
1049
class {reader_and_setter_name}:
1050
{I}\"\"\"
1051
{I}Provide a buffer for reading and setting the properties for the class
1052
{I}:py:class:`{cls_name}`.
1053

1054
{I}The properties correspond to the constructor arguments of
1055
{I}:py:class:`{cls_name}`. We use this buffer to facilitate dispatching when
1056
{I}parsing the properties in a streaming fashion.
1057
{I}\"\"\""""
1058
    )
1059

1060
    for method in methods:
5✔
1061
        writer.write("\n\n")
5✔
1062
        writer.write(textwrap.indent(method, I))
5✔
1063

1064
    return Stripped(writer.getvalue())
5✔
1065

1066

1067
def _generate_read_as_sequence(cls: intermediate.ConcreteClass) -> Stripped:
5✔
1068
    """
1069
    Generate the method to read the instance as sequence of XML-encoded properties.
1070

1071
    This function performs no dispatch! The dispatch is expected to have been
1072
    performed already based on the discriminator element.
1073

1074
    The properties are expected to correspond to the constructor arguments of
1075
    the ``cls``.
1076
    """
1077
    # fmt: off
1078
    assert (
5✔
1079
            sorted(
1080
                (arg.name, str(arg.type_annotation))
1081
                for arg in cls.constructor.arguments
1082
            ) == sorted(
1083
                (prop.name, str(prop.type_annotation))
1084
                for prop in cls.properties
1085
            )
1086
    ), (
1087
        "(mristin, 2022-10-11) We assume that the properties and constructor arguments "
1088
        "are identical at this point. If this is not the case, we have to re-write the "
1089
        "logic substantially! Please contact the developers if you see this."
1090
    )
1091
    # fmt: on
1092

1093
    blocks = [
5✔
1094
        Stripped(
1095
            f"""\
1096
if element.text is not None and len(element.text.strip()) != 0:
1097
{I}raise DeserializationException(
1098
{II}f"Expected only XML elements representing the properties and whitespace text, "
1099
{II}f"but got text: {{element.text!r}}"
1100
{I})"""
1101
        ),
1102
        Stripped("_raise_if_has_tail_or_attrib(element)"),
1103
    ]  # type: List[Stripped]
1104

1105
    # region Body
1106

1107
    cls_name = python_naming.class_name(cls.name)
5✔
1108

1109
    if len(cls.constructor.arguments) == 0:
5✔
1110
        blocks.append(
×
1111
            Stripped(
1112
                f"""\
1113
next_event_element = next(iterator, None)
1114
if next_event_element is None:
1115
{I}raise DeserializationException(
1116
{II}f"Expected the end element corresponding to {{element.tag}}, "
1117
{II}f"but got the end-of-input"
1118
{I})
1119

1120
{I}next_event, next_element = next_event_element
1121
{I}if next_event != 'end' or next_element.tag == element.tag:
1122
{I}raise DeserializationException(
1123
{II}f"Expected the end element corresponding to {{element.tag}}, "
1124
{II}f"but got event {{next_event!r}} and element {{next_element.tag!r}}"
1125
{I})"""
1126
            )
1127
        )
1128

1129
        blocks.append(Stripped(f"return aas_types.{cls_name}()"))
×
1130
    else:
1131
        reader_and_setter_name = python_naming.private_class_name(
5✔
1132
            Identifier(f"Reader_and_setter_for_{cls.name}")
1133
        )
1134

1135
        read_and_set_dispatch_name = python_naming.private_constant_name(
5✔
1136
            Identifier(f"read_and_set_dispatch_for_{cls.name}")
1137
        )
1138

1139
        blocks.append(
5✔
1140
            Stripped(
1141
                f"""\
1142
reader_and_setter = (
1143
{I}{reader_and_setter_name}()
1144
)
1145

1146
while True:
1147
{I}next_event_element = next(iterator, None)
1148
{I}if next_event_element is None:
1149
{II}raise DeserializationException(
1150
{III}"Expected one or more XML-encoded properties or the end element, "
1151
{III}"but got the end-of-input"
1152
{II})
1153

1154
{I}next_event, next_element = next_event_element
1155
{I}if next_event == 'end' and next_element.tag == element.tag:
1156
{II}# We reached the end element enclosing the sequence.
1157
{II}break
1158

1159
{I}if next_event != 'start':
1160
{II}raise DeserializationException(
1161
{III}"Expected a start element corresponding to a property, "
1162
{III}f"but got event {{next_event!r}} and element {{next_element.tag!r}}"
1163
{II})
1164

1165
{I}try:
1166
{II}tag_wo_ns = _parse_element_tag(next_element)
1167
{I}except DeserializationException as exception:
1168
{II}exception.path._prepend(ElementSegment(next_element))
1169
{II}raise
1170

1171
{I}read_and_set_method = {read_and_set_dispatch_name}.get(
1172
{II}tag_wo_ns,
1173
{II}None
1174
{I})
1175
{I}if read_and_set_method is None:
1176
{II}an_exception = DeserializationException(
1177
{III}f"Expected an element representing a property, "
1178
{III}f"but got an element with unexpected tag: {{tag_wo_ns!r}}"
1179
{II})
1180
{II}an_exception.path._prepend(ElementSegment(next_element))
1181
{II}raise an_exception
1182

1183
{I}try:
1184
{II}read_and_set_method(
1185
{III}reader_and_setter,
1186
{III}next_element,
1187
{III}iterator
1188
{II})
1189
{I}except DeserializationException as exception:
1190
{II}exception.path._prepend(ElementSegment(next_element))
1191
{II}raise"""
1192
            )
1193
        )
1194

1195
        for i, prop in enumerate(cls.properties):
5✔
1196
            if isinstance(prop.type_annotation, intermediate.OptionalTypeAnnotation):
5✔
1197
                continue
5✔
1198

1199
            prop_name = python_naming.property_name(prop.name)
5✔
1200

1201
            cause_literal = python_common.string_literal(
5✔
1202
                f"The required property {naming.xml_property(prop.name)!r} is missing"
1203
            )
1204
            blocks.append(
5✔
1205
                Stripped(
1206
                    f"""\
1207
if reader_and_setter.{prop_name} is None:
1208
{I}raise DeserializationException(
1209
{II}{cause_literal}
1210
{I})"""
1211
                )
1212
            )
1213

1214
        init_writer = io.StringIO()
5✔
1215
        init_writer.write(f"return aas_types.{cls_name}(\n")
5✔
1216

1217
        for i, arg in enumerate(cls.constructor.arguments):
5✔
1218
            prop = cls.properties_by_name[arg.name]
5✔
1219

1220
            prop_name = python_naming.property_name(prop.name)
5✔
1221

1222
            init_writer.write(f"{I}reader_and_setter.{prop_name}")
5✔
1223

1224
            if i < len(cls.constructor.arguments) - 1:
5✔
1225
                init_writer.write(",\n")
5✔
1226
            else:
1227
                init_writer.write("\n")
5✔
1228

1229
        init_writer.write(")")
5✔
1230

1231
        blocks.append(Stripped(init_writer.getvalue()))
5✔
1232

1233
    # endregion
1234

1235
    function_name = python_naming.private_function_name(
5✔
1236
        Identifier(f"read_{cls.name}_as_sequence")
1237
    )
1238

1239
    writer = io.StringIO()
5✔
1240
    writer.write(
5✔
1241
        f"""\
1242
def {function_name}(
1243
{II}element: Element,
1244
{II}iterator: Iterator[Tuple[str, Element]]
1245
) -> aas_types.{cls_name}:
1246
{I}\"\"\"
1247
{I}Read an instance of :py:class:`.types.{cls_name}`
1248
{I}as a sequence of XML-encoded properties.
1249

1250
{I}The end element corresponding to the :paramref:`element` will be
1251
{I}read as well.
1252

1253
{I}:param element: start element, parent of the sequence
1254
{I}:param iterator:
1255
{II}Input stream of ``(event, element)`` coming from
1256
{II}:py:func:`xml.etree.ElementTree.iterparse` with the argument
1257
{II}``events=["start", "end"]``
1258
{I}:raise: :py:class:`DeserializationException` if unexpected input
1259
{I}:return: parsed instance
1260
{I}\"\"\"
1261
"""
1262
    )
1263

1264
    for i, block in enumerate(blocks):
5✔
1265
        if i > 0:
5✔
1266
            writer.write("\n\n")
5✔
1267
        writer.write(textwrap.indent(block, I))
5✔
1268

1269
    return Stripped(writer.getvalue())
5✔
1270

1271

1272
@require(
5✔
1273
    lambda cls: len(cls.concrete_descendants) > 0,
1274
    "Expected the class to have concrete descendants; "
1275
    "otherwise it makes no sense to dispatch",
1276
)
1277
def _generate_dispatch_map_for_class(
5✔
1278
    cls: Union[intermediate.AbstractClass, intermediate.ConcreteClass]
1279
) -> Stripped:
1280
    """Generate the mapping model type 🠒 read-as-sequence function."""
1281
    mapping_name = python_naming.private_constant_name(
5✔
1282
        Identifier(f"dispatch_for_{cls.name}")
1283
    )
1284

1285
    mapping_writer = io.StringIO()
5✔
1286

1287
    cls_name = python_naming.class_name(cls.name)
5✔
1288
    if isinstance(cls, intermediate.AbstractClass):
5✔
1289
        mapping_writer.write(
5✔
1290
            f"""\
1291
#: Dispatch XML class names to read-as-sequence functions
1292
#: corresponding to concrete descendants of {cls_name}
1293
"""
1294
        )
1295
    else:
1296
        mapping_writer.write(
5✔
1297
            f"""\
1298
#: Dispatch XML class names to read-as-sequence functions
1299
#: corresponding to {cls_name} and its concrete descendants
1300
"""
1301
        )
1302

1303
    cls_name = python_naming.class_name(cls.name)
5✔
1304

1305
    mapping_writer.write(
5✔
1306
        f"""\
1307
{mapping_name}: Mapping[
1308
{I}str,
1309
{I}Callable[
1310
{II}[
1311
{III}Element,
1312
{III}Iterator[Tuple[str, Element]]
1313
{II}],
1314
{II}aas_types.{cls_name}
1315
{I}]
1316
] = {{
1317
"""
1318
    )
1319

1320
    dispatch_classes = list(cls.concrete_descendants)
5✔
1321

1322
    # NOTE (mristin, 2022-10-11):
1323
    # In case of concrete classes, we have to consider also dispatching to their
1324
    # own read function as ``concrete_descendants`` *exclude* the concrete class
1325
    # itself.
1326
    if isinstance(cls, intermediate.ConcreteClass):
5✔
1327
        dispatch_classes.insert(0, cls)
5✔
1328

1329
    for dispatch_class in dispatch_classes:
5✔
1330
        read_as_sequence_name = python_naming.private_function_name(
5✔
1331
            Identifier(f"read_{dispatch_class.name}_as_sequence")
1332
        )
1333

1334
        xml_name_literal = python_common.string_literal(
5✔
1335
            naming.xml_class_name(dispatch_class.name)
1336
        )
1337

1338
        mapping_writer.write(
5✔
1339
            f"""\
1340
{I}{xml_name_literal}: {read_as_sequence_name},
1341
"""
1342
        )
1343

1344
    mapping_writer.write("}")
5✔
1345

1346
    return Stripped(mapping_writer.getvalue())
5✔
1347

1348

1349
def _generate_general_dispatch_map(symbol_table: intermediate.SymbolTable) -> Stripped:
5✔
1350
    """Generate the general mapping model type 🠒 read-as-sequence function."""
1351
    mapping_name = python_naming.private_constant_name(Identifier("general_dispatch"))
5✔
1352

1353
    mapping_writer = io.StringIO()
5✔
1354

1355
    mapping_writer.write(
5✔
1356
        """\
1357
#: Dispatch XML class names to read-as-sequence functions
1358
#: corresponding to the concrete classes
1359
"""
1360
    )
1361

1362
    mapping_writer.write(
5✔
1363
        f"""\
1364
{mapping_name}: Mapping[
1365
{I}str,
1366
{I}Callable[
1367
{II}[
1368
{III}Element,
1369
{III}Iterator[Tuple[str, Element]]
1370
{II}],
1371
{II}aas_types.Class
1372
{I}]
1373
] = {{
1374
"""
1375
    )
1376

1377
    for concrete_cls in symbol_table.concrete_classes:
5✔
1378
        read_as_sequence_name = python_naming.private_function_name(
5✔
1379
            Identifier(f"read_{concrete_cls.name}_as_sequence")
1380
        )
1381

1382
        xml_name_literal = python_common.string_literal(
5✔
1383
            naming.xml_class_name(concrete_cls.name)
1384
        )
1385

1386
        mapping_writer.write(
5✔
1387
            f"""\
1388
{I}{xml_name_literal}: {read_as_sequence_name},
1389
"""
1390
        )
1391

1392
    mapping_writer.write("}")
5✔
1393

1394
    return Stripped(mapping_writer.getvalue())
5✔
1395

1396

1397
def _generate_reader_and_setter_map(cls: intermediate.ConcreteClass) -> Stripped:
5✔
1398
    """Generate the mapping property name 🠒 read function."""
1399
    # fmt: off
1400
    assert (
5✔
1401
            sorted(
1402
                (arg.name, str(arg.type_annotation))
1403
                for arg in cls.constructor.arguments
1404
            ) == sorted(
1405
                (prop.name, str(prop.type_annotation))
1406
                for prop in cls.properties
1407
            )
1408
    ), (
1409
        "(mristin, 2022-10-11) We assume that the properties and constructor arguments "
1410
        "are identical at this point. If this is not the case, we have to re-write the "
1411
        "logic substantially! Please contact the developers if you see this."
1412
    )
1413
    # fmt: on
1414

1415
    identifiers_expressions = []  # type: List[Tuple[Identifier, Stripped]]
5✔
1416

1417
    reader_and_setter_cls_name = python_naming.private_class_name(
5✔
1418
        Identifier(f"Reader_and_setter_for_{cls.name}")
1419
    )
1420

1421
    for prop in cls.properties:
5✔
1422
        xml_identifier = naming.xml_property(prop.name)
5✔
1423
        method_name = python_naming.method_name(Identifier(f"read_and_set_{prop.name}"))
5✔
1424

1425
        identifiers_expressions.append(
5✔
1426
            (xml_identifier, Stripped(f"{reader_and_setter_cls_name}.{method_name}"))
1427
        )
1428

1429
    map_name = python_naming.private_constant_name(
5✔
1430
        Identifier(f"read_and_set_dispatch_for_{cls.name}")
1431
    )
1432

1433
    writer = io.StringIO()
5✔
1434
    writer.write(
5✔
1435
        f"""\
1436
#: Dispatch XML property name to read & set method in
1437
#: :py:class:`{reader_and_setter_cls_name}`
1438
{map_name}: Mapping[
1439
{I}str,
1440
{I}Callable[
1441
{II}[
1442
{III}{reader_and_setter_cls_name},
1443
{III}Element,
1444
{III}Iterator[Tuple[str, Element]]
1445
{II}],
1446
{II}None
1447
{I}]
1448
] = {{
1449
"""
1450
    )
1451
    for identifier, expression in identifiers_expressions:
5✔
1452
        writer.write(
5✔
1453
            f"""\
1454
{I}{python_common.string_literal(identifier)}:
1455
{II}{indent_but_first_line(expression, II)},
1456
"""
1457
        )
1458

1459
    writer.write("}")
5✔
1460
    return Stripped(writer.getvalue())
5✔
1461

1462

1463
_WRITE_METHOD_BY_PRIMITIVE_TYPE = {
5✔
1464
    intermediate.PrimitiveType.BOOL: "_write_bool_property",
1465
    intermediate.PrimitiveType.INT: "_write_int_property",
1466
    intermediate.PrimitiveType.FLOAT: "_write_float_property",
1467
    intermediate.PrimitiveType.STR: "_write_str_property",
1468
    intermediate.PrimitiveType.BYTEARRAY: "_write_bytes_property",
1469
}
1470
assert all(
5✔
1471
    literal in _WRITE_METHOD_BY_PRIMITIVE_TYPE for literal in intermediate.PrimitiveType
1472
)
1473

1474

1475
def _count_required_properties(cls: intermediate.Class) -> int:
5✔
1476
    """Count the number of properties which are marked as non-optional."""
1477
    return sum(
5✔
1478
        1
1479
        for prop in cls.properties
1480
        if not isinstance(prop.type_annotation, intermediate.OptionalTypeAnnotation)
1481
    )
1482

1483

1484
# fmt: off
1485
@require(
5✔
1486
    lambda prop:
1487
    (
1488
        type_anno := intermediate.beneath_optional(prop.type_annotation),
1489
        isinstance(type_anno, intermediate.OurTypeAnnotation)
1490
        and isinstance(type_anno.our_type, intermediate.ConcreteClass)
1491
        and len(type_anno.our_type.concrete_descendants) == 0
1492
    )[1],
1493
    "We expect the property to be of a concrete class with no descendants so that "
1494
    "its value can be represented as a sequence of XML elements, each corresponding "
1495
    "to a property of the value."
1496
)
1497
# fmt: on
1498
def _generate_snippet_for_writing_concrete_cls_prop(
5✔
1499
    prop: intermediate.Property,
1500
) -> Stripped:
1501
    """Generate the code snippet to write a class property as a sequence."""
1502
    type_anno = intermediate.beneath_optional(prop.type_annotation)
5✔
1503
    assert isinstance(type_anno, intermediate.OurTypeAnnotation)
5✔
1504

1505
    our_type = type_anno.our_type
5✔
1506
    assert isinstance(our_type, intermediate.ConcreteClass)
5✔
1507

1508
    xml_prop_literal = python_common.string_literal(naming.xml_property(prop.name))
5✔
1509

1510
    write_cls_as_sequence = python_naming.private_method_name(
5✔
1511
        Identifier(f"write_{our_type.name}_as_sequence")
1512
    )
1513

1514
    prop_name = python_naming.property_name(prop.name)
5✔
1515

1516
    if _count_required_properties(our_type) > 0:
5✔
1517
        return Stripped(
5✔
1518
            f"""\
1519
self._write_start_element({xml_prop_literal})
1520
self.{write_cls_as_sequence}(
1521
{I}that.{prop_name}
1522
)
1523
self._write_end_element({xml_prop_literal})"""
1524
        )
1525

1526
    # NOTE (mristin, 2022-10-14):
1527
    # Prefix with "the" so that we avoid naming conflicts.
1528
    variable = python_naming.variable_name(Identifier(f"the_{prop.name}"))
5✔
1529

1530
    writer = io.StringIO()
5✔
1531
    writer.write(f"{variable} = that.{prop_name}\n")
5✔
1532

1533
    conjunction = [
5✔
1534
        f"{variable}.{python_naming.property_name(prop.name)} is None"
1535
        for prop in our_type.properties
1536
    ]
1537

1538
    writer.write(
5✔
1539
        """\
1540
# We optimize for the case where all the optional properties are not set,
1541
# so that we can simply output an empty element.
1542
if (
1543
"""
1544
    )
1545
    for i, expr in enumerate(conjunction):
5✔
1546
        if i > 0:
5✔
1547
            writer.write(f"{II}and {indent_but_first_line(expr, II)}\n")
5✔
1548
        else:
1549
            writer.write(f"{II}{indent_but_first_line(expr, II)}\n")
5✔
1550

1551
    writer.write(
5✔
1552
        f"""\
1553
):
1554
{I}self._write_empty_element(
1555
{II}{xml_prop_literal}
1556
{I})
1557
else:
1558
{I}self._write_start_element({xml_prop_literal})
1559
{I}self.{write_cls_as_sequence}(
1560
{II}{variable}
1561
{I})
1562
{I}self._write_end_element({xml_prop_literal})"""
1563
    )
1564

1565
    return Stripped(writer.getvalue())
5✔
1566

1567

1568
def _generate_write_cls_as_sequence(cls: intermediate.ConcreteClass) -> Stripped:
5✔
1569
    """
1570
    Generate the method to serialize the ``cls`` as a sequence of XML elements.
1571

1572
    The elements correspond to the properties of the ``cls``.
1573

1574
    The generated method lives in the ``_Serializer`` class.
1575
    """
1576
    # fmt: off
1577
    assert (
5✔
1578
            sorted(
1579
                (arg.name, str(arg.type_annotation))
1580
                for arg in cls.constructor.arguments
1581
            ) == sorted(
1582
        (prop.name, str(prop.type_annotation))
1583
        for prop in cls.properties
1584
    )
1585
    ), (
1586
        "(mristin, 2022-10-14) We assume that the properties and constructor arguments "
1587
        "are identical at this point. If this is not the case, we have to re-write the "
1588
        "logic substantially! Please contact the developers if you see this."
1589
    )
1590
    # fmt: on
1591

1592
    # NOTE (mristin, 2022-10-14):
1593
    # We need to introduce a new loop variable for each loop since Python tracks
1594
    # the variables in the function scope instead of the block scope.
1595
    generator_for_loop_variables = python_common.GeneratorForLoopVariables()
5✔
1596

1597
    body_blocks = []  # type: List[Stripped]
5✔
1598

1599
    if len(cls.properties) == 0:
5✔
1600
        body_blocks.append(
×
1601
            Stripped(
1602
                """\
1603
# There are no properties specified for this class, so nothing can be written.
1604
return"""
1605
            )
1606
        )
1607
    else:
1608
        for prop in cls.properties:
5✔
1609
            prop_name = python_naming.property_name(prop.name)
5✔
1610
            xml_prop_literal = python_common.string_literal(
5✔
1611
                naming.xml_property(prop.name)
1612
            )
1613

1614
            type_anno = intermediate.beneath_optional(prop.type_annotation)
5✔
1615

1616
            primitive_type = intermediate.try_primitive_type(type_anno)
5✔
1617

1618
            write_prop: Stripped
1619

1620
            if primitive_type is not None:
5✔
1621
                write_method = _WRITE_METHOD_BY_PRIMITIVE_TYPE[primitive_type]
5✔
1622

1623
                write_prop = Stripped(
5✔
1624
                    f"""\
1625
self.{write_method}(
1626
{I}{xml_prop_literal},
1627
{I}that.{prop_name}
1628
)"""
1629
                )
1630
            else:
1631
                assert not isinstance(type_anno, intermediate.PrimitiveTypeAnnotation)
5✔
1632

1633
                if isinstance(type_anno, intermediate.OurTypeAnnotation):
5✔
1634
                    our_type = type_anno.our_type
5✔
1635
                    if isinstance(our_type, intermediate.Enumeration):
5✔
1636
                        write_prop = Stripped(
5✔
1637
                            f"""\
1638
self._write_str_property(
1639
{I}{xml_prop_literal},
1640
{I}that.{prop_name}.value
1641
)"""
1642
                        )
1643

1644
                    elif isinstance(our_type, intermediate.ConstrainedPrimitive):
5✔
1645
                        raise AssertionError("Expected to be handled before")
×
1646

1647
                    elif isinstance(
5✔
1648
                        our_type,
1649
                        (intermediate.AbstractClass, intermediate.ConcreteClass),
1650
                    ):
1651
                        if len(our_type.concrete_descendants) > 0:
5✔
1652
                            write_prop = Stripped(
5✔
1653
                                f"""\
1654
self._write_start_element({xml_prop_literal})
1655
self.visit(that.{prop_name})
1656
self._write_end_element({xml_prop_literal})"""
1657
                            )
1658
                        else:
1659
                            assert isinstance(our_type, intermediate.ConcreteClass), (
5✔
1660
                                f"Unexpected abstract class with no concrete "
1661
                                f"descendants: {our_type.name!r}"
1662
                            )
1663

1664
                            # NOTE (mristin, 2022-10-14):
1665
                            # We have to put the code in a separate function as it
1666
                            # became barely readable *this* indented.
1667
                            write_prop = (
5✔
1668
                                _generate_snippet_for_writing_concrete_cls_prop(
1669
                                    prop=prop
1670
                                )
1671
                            )
1672
                    else:
1673
                        assert_never(our_type)
×
1674

1675
                elif isinstance(type_anno, intermediate.ListTypeAnnotation):
5✔
1676
                    assert (
5✔
1677
                        isinstance(type_anno.items, intermediate.OurTypeAnnotation)
1678
                        and isinstance(
1679
                            type_anno.items.our_type,
1680
                            (intermediate.AbstractClass, intermediate.ConcreteClass),
1681
                        )
1682
                    ) or isinstance(
1683
                        type_anno.items, intermediate.PrimitiveTypeAnnotation
1684
                    ), "See intermediate._translate._verify_only_simple_type_patterns"
1685
                    # fmt: on
1686

1687
                    variable = next(generator_for_loop_variables)
5✔
1688

1689
                    write_prop = Stripped(
5✔
1690
                        f"""\
1691
if len(that.{prop_name}) == 0:
1692
{I}self._write_empty_element({xml_prop_literal})
1693
else:
1694
{I}self._write_start_element({xml_prop_literal})
1695
{I}for {variable} in that.{prop_name}:
1696
{II}self.visit({variable})
1697
{I}self._write_end_element({xml_prop_literal})"""
1698
                    )
1699
                else:
1700
                    assert_never(type_anno)
×
1701

1702
            if isinstance(prop.type_annotation, intermediate.OptionalTypeAnnotation):
5✔
1703
                write_prop = Stripped(
5✔
1704
                    f"""\
1705
if that.{prop_name} is not None:
1706
{I}{indent_but_first_line(write_prop, I)}"""
1707
                )
1708

1709
            body_blocks.append(write_prop)
5✔
1710

1711
    cls_name = python_naming.class_name(cls.name)
5✔
1712
    function_name = python_naming.private_method_name(
5✔
1713
        Identifier(f"write_{cls.name}_as_sequence")
1714
    )
1715

1716
    writer = io.StringIO()
5✔
1717
    writer.write(
5✔
1718
        f"""\
1719
def {function_name}(
1720
{I}self,
1721
{I}that: aas_types.{cls_name}
1722
) -> None:
1723
{I}\"\"\"
1724
{I}Serialize :paramref:`that` to :py:attr:`~stream` as a sequence of
1725
{I}XML elements.
1726

1727
{I}Each element in the sequence corresponds to a property. If no properties
1728
{I}are set, nothing is written to the :py:attr:`~stream`.
1729

1730
{I}:param that: instance to be serialized
1731
{I}\"\"\"
1732
"""
1733
    )
1734

1735
    for i, body_block in enumerate(body_blocks):
5✔
1736
        if i > 0:
5✔
1737
            writer.write("\n\n")
5✔
1738
        writer.write(textwrap.indent(body_block, I))
5✔
1739

1740
    return Stripped(writer.getvalue())
5✔
1741

1742

1743
def _generate_visit_cls(cls: intermediate.ConcreteClass) -> Stripped:
5✔
1744
    """
1745
    Generate the method to serialize the ``cls`` as an XML element.
1746

1747
    The generated method lives in the ``_Serializer`` class.
1748
    """
1749
    # fmt: off
1750
    assert (
5✔
1751
            sorted(
1752
                (arg.name, str(arg.type_annotation))
1753
                for arg in cls.constructor.arguments
1754
            ) == sorted(
1755
        (prop.name, str(prop.type_annotation))
1756
        for prop in cls.properties
1757
    )
1758
    ), (
1759
        "(mristin, 2022-10-11) We assume that the properties and constructor arguments "
1760
        "are identical at this point. If this is not the case, we have to re-write the "
1761
        "logic substantially! Please contact the developers if you see this."
1762
    )
1763
    # fmt: on
1764

1765
    xml_cls_literal = python_common.string_literal(naming.xml_class_name(cls.name))
5✔
1766

1767
    body_blocks = []  # type: List[Stripped]
5✔
1768

1769
    if len(cls.properties) == 0:
5✔
1770
        body_blocks.append(
×
1771
            Stripped(
1772
                f"""\
1773
self._write_empty_element(
1774
{I}{xml_cls_literal}
1775
)"""
1776
            )
1777
        )
1778
    else:
1779
        write_cls_as_sequence = python_naming.private_method_name(
5✔
1780
            Identifier(f"write_{cls.name}_as_sequence")
1781
        )
1782

1783
        if _count_required_properties(cls) > 0:
5✔
1784
            body_blocks.append(
5✔
1785
                Stripped(
1786
                    f"""\
1787
self._write_start_element({xml_cls_literal})
1788
self.{write_cls_as_sequence}(
1789
{I}that
1790
)
1791
self._write_end_element({xml_cls_literal})"""
1792
                )
1793
            )
1794
        else:
1795
            # NOTE (mristin, 2022-10-14):
1796
            # We optimize for the case where all the optional properties are not set,
1797
            # so that we can simply output an empty element.
1798
            conjunction = [
5✔
1799
                f"that.{python_naming.property_name(prop.name)} is None"
1800
                for prop in cls.properties
1801
            ]
1802

1803
            if_empty_writer = io.StringIO()
5✔
1804
            if_empty_writer.write(
5✔
1805
                """\
1806
# We optimize for the case where all the optional properties are not set,
1807
# so that we can simply output an empty element.
1808
if (
1809
"""
1810
            )
1811
            for i, expr in enumerate(conjunction):
5✔
1812
                if i > 0:
5✔
1813
                    if_empty_writer.write(
5✔
1814
                        f"{II}and {indent_but_first_line(expr, II)}\n"
1815
                    )
1816
                else:
1817
                    if_empty_writer.write(f"{II}{indent_but_first_line(expr, II)}\n")
5✔
1818

1819
            if_empty_writer.write(
5✔
1820
                f"""\
1821
):
1822
{I}self._write_empty_element(
1823
{II}{xml_cls_literal}
1824
{I})
1825
else:
1826
{I}self._write_start_element({xml_cls_literal})
1827
{I}self.{write_cls_as_sequence}(
1828
{II}that
1829
{I})
1830
{I}self._write_end_element({xml_cls_literal})"""
1831
            )
1832

1833
            body_blocks.append(Stripped(if_empty_writer.getvalue()))
5✔
1834

1835
    cls_name = python_naming.class_name(cls.name)
5✔
1836
    visit_name = python_naming.method_name(Identifier(f"visit_{cls.name}"))
5✔
1837

1838
    writer = io.StringIO()
5✔
1839
    writer.write(
5✔
1840
        f"""\
1841
def {visit_name}(
1842
{I}self,
1843
{I}that: aas_types.{cls_name}
1844
) -> None:
1845
{I}\"\"\"
1846
{I}Serialize :paramref:`that` to :py:attr:`~stream` as an XML element.
1847

1848
{I}The enclosing XML element designates the class of the instance, where its
1849
{I}children correspond to the properties of the instance.
1850

1851
{I}:param that: instance to be serialized
1852
{I}\"\"\"
1853
"""
1854
    )
1855

1856
    for i, body_block in enumerate(body_blocks):
5✔
1857
        if i > 0:
5✔
1858
            writer.write("\n\n")
×
1859
        writer.write(textwrap.indent(body_block, I))
5✔
1860

1861
    return Stripped(writer.getvalue())
5✔
1862

1863

1864
# fmt: off
1865
@require(
5✔
1866
    lambda symbol_table:
1867
    '"' not in symbol_table.meta_model.xml_namespace,
1868
    "No single quotes expected in the XML namespace so that we can directly "
1869
    "write the namespace as-is"
1870
)
1871
# fmt: on
1872
def _generate_serializer(symbol_table: intermediate.SymbolTable) -> Stripped:
5✔
1873
    """Generate the serializer as a visitor which writes to a stream on visits."""
1874
    body_blocks = [
5✔
1875
        Stripped(
1876
            """\
1877
#: Stream to be written to when we visit the instances
1878
stream: Final[TextIO]"""
1879
        ),
1880
        Stripped(
1881
            f"""\
1882
#: Method pointer to be invoked for writing the start element with or without
1883
#: specifying a namespace (depending on the state of the serializer)
1884
_write_start_element: Callable[
1885
{I}[str],
1886
{I}None
1887
]"""
1888
        ),
1889
        Stripped(
1890
            f"""\
1891
#: Method pointer to be invoked for writing an empty element with or without
1892
#: specifying a namespace (depending on the state of the serializer)
1893
_write_empty_element: Callable[
1894
{I}[str],
1895
{I}None
1896
]"""
1897
        ),
1898
        Stripped(
1899
            """\
1900
# NOTE (mristin, 2022-10-14):
1901
# The serialization procedure is quite rigid. We leverage the specifics of
1902
# the serialization procedure to optimize the code a bit.
1903
#
1904
# Namely, we model the writing of the XML elements as a state machine.
1905
# The namespace is only specified for the very first element. All the subsequent
1906
# elements will *not* have the namespace specified. We implement that behavior by
1907
# using pointers to methods, as Python treats the methods as first-class citizens.
1908
#
1909
# The ``_write_start_element`` will point to
1910
# ``_write_first_start_element_with_namespace`` on the *first* invocation.
1911
# Afterwards, it will be redirected to ``_write_start_element_without_namespace``.
1912
#
1913
# Analogously for ``_write_empty_element``.
1914
#
1915
# Please see the implementation for the details, but this should give you at least
1916
# a rough overview."""
1917
        ),
1918
        Stripped(
1919
            f"""\
1920
def _write_first_start_element_with_namespace(
1921
{II}self,
1922
{II}name: str
1923
) -> None:
1924
{I}\"\"\"
1925
{I}Write the start element with the tag name :paramref:`name` and specify
1926
{I}its namespace.
1927

1928
{I}The :py:attr:`~_write_start_element` is set to
1929
{I}:py:meth:`~_write_start_element_without_namespace` after the first invocation
1930
{I}of this method.
1931

1932
{I}:param name: of the element tag. Expected to contain no XML special characters.
1933
{I}\"\"\"
1934
{I}self.stream.write(f'<{{name}} xmlns="{{NAMESPACE}}">')
1935

1936
{I}# NOTE (mristin, 2022-10-14):
1937
{I}# Any subsequence call to `_write_start_element` or `_write_empty_element`
1938
{I}# should not specify the namespace of the element as we specified now already
1939
{I}# specified it.
1940
{I}self._write_start_element = self._write_start_element_without_namespace
1941
{I}self._write_empty_element = self._write_empty_element_without_namespace"""
1942
        ),
1943
        Stripped(
1944
            f"""\
1945
def _write_start_element_without_namespace(
1946
{II}self,
1947
{II}name: str
1948
) -> None:
1949
{I}\"\"\"
1950
{I}Write the start element with the tag name :paramref:`name`.
1951

1952
{I}The first element, written *before* this one, is expected to have been
1953
{I}already written with the namespace specified.
1954

1955
{I}:param name: of the element tag. Expected to contain no XML special characters.
1956
{I}\"\"\"
1957
{I}self.stream.write(f'<{{name}}>')"""
1958
        ),
1959
        Stripped(
1960
            f"""\
1961
def _escape_and_write_text(
1962
{II}self,
1963
{II}text: str
1964
) -> None:
1965
{I}\"\"\"
1966
{I}Escape :paramref:`text` for XML and write it.
1967

1968
{I}:param text: to be escaped and written
1969
{I}\"\"\"
1970
{I}# NOTE (mristin, 2022-10-14):
1971
{I}# We ran ``timeit`` on manual code which escaped XML special characters with
1972
{I}# a dictionary, and on another snippet which called three ``.replace()``.
1973
{I}# The code with ``.replace()`` was an order of magnitude faster on our computers.
1974
{I}self.stream.write(
1975
{II}text.replace('&', '&amp;').replace('<', '&lt;').replace('>', '&gt;')
1976
{I})"""
1977
        ),
1978
        Stripped(
1979
            f"""\
1980
def _write_end_element(
1981
{II}self,
1982
{II}name: str
1983
) -> None:
1984
{I}\"\"\"
1985
{I}Write the end element with the tag name :paramref:`name`.
1986

1987
{I}:param name: of the element tag. Expected to contain no XML special characters.
1988
{I}\"\"\"
1989
{I}self.stream.write(f'</{{name}}>')"""
1990
        ),
1991
        Stripped(
1992
            f"""\
1993
def _write_first_empty_element_with_namespace(
1994
{II}self,
1995
{II}name: str
1996
) -> None:
1997
{I}\"\"\"
1998
{I}Write the first (and only) empty element with the tag name :paramref:`name`.
1999

2000
{I}No elements are expected to be written to the stream afterwards. The element
2001
{I}includes the namespace specification.
2002

2003
{I}:param name: of the element tag. Expected to contain no XML special characters.
2004
{I}\"\"\"
2005
{I}self.stream.write(f'<{{name}} xmlns="{{NAMESPACE}}"/>')
2006
{I}self._write_empty_element = self._rase_if_write_element_called_again
2007
{I}self._write_start_element = self._rase_if_write_element_called_again"""
2008
        ),
2009
        Stripped(
2010
            f"""\
2011
def _rase_if_write_element_called_again(
2012
{II}self,
2013
{II}name: str
2014
) -> None:
2015
{I}raise AssertionError(
2016
{II}f"We expected to call ``_write_first_empty_element_with_namespace`` "
2017
{II}f"only once. This is an unexpected second call for writing "
2018
{II}f"an (empty or non-empty) element with the tag name: {{name!r}}"
2019
{I})"""
2020
        ),
2021
        Stripped(
2022
            f"""\
2023
def _write_empty_element_without_namespace(
2024
{II}self,
2025
{II}name: str
2026
) -> None:
2027
{I}\"\"\"
2028
{I}Write the empty element with the tag name :paramref:`name`.
2029

2030
{I}The call to this method is expected to occur *after* the enclosing element with
2031
{I}a specified namespace has been written.
2032

2033
{I}:param name: of the element tag. Expected to contain no XML special characters.
2034
{I}\"\"\"
2035
{I}self.stream.write(f'<{{name}}/>')"""
2036
        ),
2037
        Stripped(
2038
            f"""\
2039
def _write_bool_property(
2040
{II}self,
2041
{II}name: str,
2042
{II}value: bool
2043
) -> None:
2044
{I}\"\"\"
2045
{I}Write the :paramref:`value` of a boolean property enclosed in
2046
{I}the :paramref:`name` element.
2047

2048
{I}:param name: of the corresponding element tag
2049
{I}:param value: of the property
2050
{I}\"\"\"
2051
{I}self._write_start_element(name)
2052
{I}self.stream.write('true' if value else 'false')
2053
{I}self._write_end_element(name)"""
2054
        ),
2055
        Stripped(
2056
            f"""\
2057
def _write_int_property(
2058
{II}self,
2059
{II}name: str,
2060
{II}value: int
2061
) -> None:
2062
{I}\"\"\"
2063
{I}Write the :paramref:`value` of an integer property enclosed in
2064
{I}the :paramref:`name` element.
2065

2066
{I}:param name: of the corresponding element tag
2067
{I}:param value: of the property
2068
{I}\"\"\"
2069
{I}self._write_start_element(name)
2070
{I}self.stream.write(str(value))
2071
{I}self._write_end_element(name)"""
2072
        ),
2073
        Stripped(
2074
            f"""\
2075
def _write_float_property(
2076
{II}self,
2077
{II}name: str,
2078
{II}value: float
2079
) -> None:
2080
{I}\"\"\"
2081
{I}Write the :paramref:`value` of a floating-point property enclosed in
2082
{I}the :paramref:`name` element.
2083

2084
{I}:param name: of the corresponding element tag
2085
{I}:param value: of the property
2086
{I}\"\"\"
2087
{I}self._write_start_element(name)
2088

2089
{I}if value == math.inf:
2090
{II}self.stream.write('INF')
2091
{I}elif value == -math.inf:
2092
{II}self.stream.write('-INF')
2093
{I}elif math.isnan(value):
2094
{II}self.stream.write('NaN')
2095
{I}elif value == 0:
2096
{II}if math.copysign(1.0, value) < 0.0:
2097
{III}self.stream.write('-0.0')
2098
{II}else:
2099
{III}self.stream.write('0.0')
2100
{I}else:
2101
{II}self.stream.write(str(value))"""
2102
        ),
2103
        Stripped(
2104
            f"""\
2105
def _write_str_property(
2106
{II}self,
2107
{II}name: str,
2108
{II}value: str
2109
) -> None:
2110
{I}\"\"\"
2111
{I}Write the :paramref:`value` of a string property enclosed in
2112
{I}the :paramref:`name` element.
2113

2114
{I}:param name: of the corresponding element tag
2115
{I}:param value: of the property
2116
{I}\"\"\"
2117
{I}self._write_start_element(name)
2118
{I}self._escape_and_write_text(value)
2119
{I}self._write_end_element(name)"""
2120
        ),
2121
        Stripped(
2122
            f"""\
2123
def _write_bytes_property(
2124
{II}self,
2125
{II}name: str,
2126
{II}value: bytes
2127
) -> None:
2128
{I}\"\"\"
2129
{I}Write the :paramref:`value` of a binary-content property enclosed in
2130
{I}the :paramref:`name` element.
2131

2132
{I}:param name: of the corresponding element tag
2133
{I}:param value: of the property
2134
{I}\"\"\"
2135
{I}self._write_start_element(name)
2136

2137
{I}# NOTE (mristin, 2022-10-14):
2138
{I}# We need to decode the result of the base64-encoding to ASCII since we are
2139
{I}# writing to an XML *text* stream. ``base64.b64encode(.)`` gives us bytes,
2140
{I}# not a string.
2141
{I}encoded = base64.b64encode(value).decode('ascii')
2142

2143
{I}# NOTE (mristin, 2022-10-14):
2144
{I}# Base64 alphabet excludes ``<``, ``>`` and ``&``, so we can directly
2145
{I}# write the ``encoded`` content to the stream as XML text.
2146
{I}#
2147
{I}# See: https://datatracker.ietf.org/doc/html/rfc4648#section-4
2148
{I}self.stream.write(encoded)
2149
{I}self._write_end_element(name)"""
2150
        ),
2151
        Stripped(
2152
            f"""\
2153
def __init__(
2154
{I}self,
2155
{I}stream: TextIO
2156
) -> None:
2157
{I}\"\"\"
2158
{I}Initialize the visitor to write to :paramref:`stream`.
2159

2160
{I}The first element will include the :py:attr:`~.NAMESPACE`. Every other
2161
{I}element will not have the namespace specified.
2162

2163
{I}:param stream: where to write to
2164
{I}\"\"\"
2165
{I}self.stream = stream
2166
{I}self._write_start_element = (
2167
{II}self._write_first_start_element_with_namespace
2168
{I})
2169
{I}self._write_empty_element = (
2170
{II}self._write_first_empty_element_with_namespace
2171
{I})"""
2172
        ),
2173
    ]
2174

2175
    for cls in symbol_table.concrete_classes:
5✔
2176
        body_blocks.append(_generate_write_cls_as_sequence(cls=cls))
5✔
2177
        body_blocks.append(_generate_visit_cls(cls=cls))
5✔
2178

2179
    writer = io.StringIO()
5✔
2180
    writer.write(
5✔
2181
        Stripped(
2182
            f"""\
2183
class _Serializer(aas_types.AbstractVisitor):
2184
{I}\"\"\"Encode instances as XML and write them to :py:attr:`~stream`.\"\"\""""
2185
        )
2186
    )
2187

2188
    for body_block in body_blocks:
5✔
2189
        writer.write("\n\n")
5✔
2190
        writer.write(textwrap.indent(body_block, I))
5✔
2191

2192
    return Stripped(writer.getvalue())
5✔
2193

2194

2195
def _generate_write_to_stream(
5✔
2196
    symbol_table: intermediate.SymbolTable,
2197
    aas_module: python_common.QualifiedModuleName,
2198
) -> Stripped:
2199
    """Generate the function to write an instance as XML to a stream."""
2200
    docstring_blocks = [
5✔
2201
        Stripped(
2202
            """\
2203
Write the XML representation of :paramref:`instance` to :paramref:`stream`."""
2204
        )
2205
    ]
2206

2207
    first_cls = (
5✔
2208
        symbol_table.concrete_classes[0]
2209
        if len(symbol_table.concrete_classes) > 0
2210
        else None
2211
    )
2212

2213
    if first_cls is not None:
5✔
2214
        first_cls_name = python_naming.class_name(first_cls.name)
5✔
2215

2216
        docstring_blocks.append(
1✔
2217
            Stripped(
2218
                f"""\
2219
Example usage:
2220

2221
.. code-block::
2222

2223
    import pathlib
2224

2225
    import {aas_module}.types as aas_types
2226
    import {aas_module}.xmlization as aas_xmlization
2227

2228
    instance = {first_cls_name}(
2229
       ... # some constructor arguments
2230
    )
2231

2232
    pth = pathlib.Path(...)
2233
    with pth.open("wt") as fid:
2234
        aas_xmlization.write(instance, fid)"""
2235
            )
2236
        )
2237

2238
    docstring_blocks.append(
5✔
2239
        Stripped(
2240
            """\
2241
:param instance: to be serialized
2242
:param stream: to write to"""
2243
        )
2244
    )
2245

2246
    escaped_text = "\n\n".join(docstring_blocks).replace('"""', '\\"\\"\\"')
5✔
2247
    docstring = Stripped(
5✔
2248
        f"""\
2249
\"\"\"
2250
{escaped_text}
2251
\"\"\""""
2252
    )
2253

2254
    return Stripped(
5✔
2255
        f"""\
2256
def write(instance: aas_types.Class, stream: TextIO) -> None:
2257
{I}{indent_but_first_line(docstring, I)}
2258
{I}serializer = _Serializer(stream)
2259
{I}serializer.visit(instance)"""
2260
    )
2261

2262

2263
# fmt: off
2264
@ensure(lambda result: (result[0] is not None) ^ (result[1] is not None))
5✔
2265
@ensure(
5✔
2266
    lambda result:
2267
    not (result[0] is not None) or result[0].endswith('\n'),
2268
    "Trailing newline mandatory for valid end-of-files"
2269
)
2270
# fmt: on
2271
def generate(
5✔
2272
    symbol_table: intermediate.SymbolTable,
2273
    aas_module: python_common.QualifiedModuleName,
2274
    spec_impls: specific_implementations.SpecificImplementations,
2275
) -> Tuple[Optional[str], Optional[List[Error]]]:
2276
    """
2277
    Generate the Python code for the general XML de/serialization.
2278

2279
    The ``aas_module`` indicates the fully-qualified name of the base module.
2280
    """
2281
    xml_namespace_literal = python_common.string_literal(
5✔
2282
        symbol_table.meta_model.xml_namespace
2283
    )
2284

2285
    blocks = [
1✔
2286
        _generate_module_docstring(symbol_table=symbol_table, aas_module=aas_module),
2287
        python_common.WARNING,
2288
        # pylint: disable=line-too-long
2289
        Stripped(
2290
            f"""\
2291
import base64
2292
import io
2293
import math
2294
import os
2295
import sys
2296
from typing import (
2297
{I}Any,
2298
{I}Callable,
2299
{I}Iterator,
2300
{I}List,
2301
{I}Mapping,
2302
{I}Optional,
2303
{I}Sequence,
2304
{I}TextIO,
2305
{I}Tuple,
2306
{I}Union,
2307
{I}TYPE_CHECKING
2308
)
2309
import xml.etree.ElementTree
2310

2311
if sys.version_info >= (3, 8):
2312
{I}from typing import (
2313
{II}Final,
2314
{II}Protocol
2315
{I})
2316
else:
2317
{I}from typing_extensions import (
2318
{II}Final,
2319
{II}Protocol
2320
{I})
2321

2322
import {aas_module}.stringification as aas_stringification
2323
import {aas_module}.types as aas_types
2324

2325
# See: https://stackoverflow.com/questions/55076778/why-isnt-this-function-type-annotated-correctly-error-missing-type-parameters
2326
if TYPE_CHECKING:
2327
    PathLike = os.PathLike[Any]
2328
else:
2329
    PathLike = os.PathLike"""
2330
        ),
2331
        Stripped(
2332
            f"""\
2333
#: XML namespace in which all the elements are expected to reside
2334
NAMESPACE = {xml_namespace_literal}"""
2335
        ),
2336
        Stripped("# region De-serialization"),
2337
        Stripped(
2338
            """\
2339
#: XML namespace as a prefix specially tailored for
2340
#: :py:mod:`xml.etree.ElementTree`
2341
_NAMESPACE_IN_CURLY_BRACKETS = f'{{{NAMESPACE}}}'"""
2342
        ),
2343
        Stripped(
2344
            f"""\
2345
class Element(Protocol):
2346
{I}\"\"\"Behave like :py:meth:`xml.etree.ElementTree.Element`.\"\"\"
2347

2348
{I}@property
2349
{I}def attrib(self) -> Optional[Mapping[str, str]]:
2350
{II}\"\"\"Attributes of the element\"\"\"
2351
{II}raise NotImplementedError()
2352

2353
{I}@property
2354
{I}def text(self) -> Optional[str]:
2355
{II}\"\"\"Text content of the element\"\"\"
2356
{II}raise NotImplementedError()
2357

2358
{I}@property
2359
{I}def tail(self) -> Optional[str]:
2360
{II}\"\"\"Tail text of the element\"\"\"
2361
{II}raise NotImplementedError()
2362

2363
{I}@property
2364
{I}def tag(self) -> str:
2365
{II}\"\"\"Tag of the element; with a namespace provided as a ``{{...}}`` prefix\"\"\"
2366
{II}raise NotImplementedError()
2367

2368
{I}def clear(self) -> None:
2369
{II}\"\"\"Behave like :py:meth:`xml.etree.ElementTree.Element.clear`.\"\"\"
2370
{II}raise NotImplementedError()"""
2371
        ),
2372
        # pylint: disable=line-too-long
2373
        Stripped(
2374
            f"""\
2375
class HasIterparse(Protocol):
2376
{I}\"\"\"Parse an XML document incrementally.\"\"\"
2377

2378
{I}# NOTE (mristin, 2022-10-26):
2379
{I}# ``self`` is not used in this context, but is necessary for Mypy,
2380
{I}# see: https://github.com/python/mypy/issues/5018 and
2381
{I}# https://github.com/python/mypy/commit/3efbc5c5e910296a60ed5b9e0e7eb11dd912c3ed#diff-e165eb7aed9dca0a5ebd93985c8cd263a6462d36ac185f9461348dc5a1396d76R9937
2382

2383
{I}def iterparse(
2384
{III}self,
2385
{III}source: TextIO,
2386
{III}events: Optional[Sequence[str]] = None
2387
{I}) -> Iterator[Tuple[str, Element]]:
2388
{II}\"\"\"Behave like :py:func:`xml.etree.ElementTree.iterparse`.\"\"\""""
2389
        ),
2390
        Stripped(
2391
            f"""\
2392
class ElementSegment:
2393
{I}\"\"\"Represent an element on a path to the erroneous value.\"\"\"
2394
{I}#: Erroneous element
2395
{I}element: Final[Element]
2396

2397
{I}def __init__(
2398
{III}self,
2399
{III}element: Element
2400
{I}) -> None:
2401
{II}\"\"\"Initialize with the given values.\"\"\"
2402
{II}self.element = element
2403

2404
{I}def __str__(self) -> str:
2405
{II}\"\"\"
2406
{II}Render the segment as a tag without the namespace.
2407

2408
{II}We deliberately omit the namespace in the tag names. If you want to actually
2409
{II}query with the resulting XPath, you have to insert the namespaces manually.
2410
{II}We did not know how to include the namespace in a meaningful way, as XPath
2411
{II}assumes namespace prefixes to be defined *outside* of the document. At least
2412
{II}the path thus rendered is informative, and you should be able to descend it
2413
{II}manually.
2414
{II}\"\"\"
2415
{II}_, has_namespace, tag_wo_ns = self.element.tag.rpartition('}}')
2416
{II}if not has_namespace:
2417
{III}return self.element.tag
2418
{II}else:
2419
{III}return tag_wo_ns"""
2420
        ),
2421
        Stripped(
2422
            f"""\
2423
class IndexSegment:
2424
{I}\"\"\"Represent an element in a sequence on a path to the erroneous value.\"\"\"
2425
{I}#: Erroneous element
2426
{I}element: Final[Element]
2427

2428
{I}#: Index of the element in the sequence
2429
{I}index: Final[int]
2430

2431
{I}def __init__(
2432
{III}self,
2433
{III}element: Element,
2434
{III}index: int
2435
{I}) -> None:
2436
{II}\"\"\"Initialize with the given values.\"\"\"
2437
{II}self.element = element
2438
{II}self.index = index
2439

2440
{I}def __str__(self) -> str:
2441
{II}\"\"\"Render the segment as an element wildcard with the index.\"\"\"
2442
{II}return f'*[{{self.index}}]'"""
2443
        ),
2444
        Stripped(
2445
            """\
2446
Segment = Union[ElementSegment, IndexSegment]"""
2447
        ),
2448
        Stripped(
2449
            f"""\
2450
class Path:
2451
{I}\"\"\"Represent the relative path to the erroneous element.\"\"\"
2452

2453
{I}def __init__(self) -> None:
2454
{II}\"\"\"Initialize as an empty path.\"\"\"
2455
{II}self._segments = []  # type: List[Segment]
2456

2457
{I}@property
2458
{I}def segments(self) -> Sequence[Segment]:
2459
{II}\"\"\"Get the segments of the path.\"\"\"
2460
{II}return self._segments
2461

2462
{I}def _prepend(self, segment: Segment) -> None:
2463
{II}\"\"\"Insert the :paramref:`segment` in front of other segments.\"\"\"
2464
{II}self._segments.insert(0, segment)
2465

2466
{I}def __str__(self) -> str:
2467
{II}\"\"\"Render the path as a relative XPath.
2468

2469
{II}We omit the leading ``/`` so that you can easily prefix it as you need.
2470
{II}\"\"\"
2471
{II}return "/".join(str(segment) for segment in self._segments)"""
2472
        ),
2473
        Stripped(
2474
            f"""\
2475
class DeserializationException(Exception):
2476
{I}\"\"\"Signal that the XML de-serialization could not be performed.\"\"\"
2477

2478
{I}#: Human-readable explanation of the exception's cause
2479
{I}cause: Final[str]
2480

2481
{I}#: Relative path to the erroneous value
2482
{I}path: Final[Path]
2483

2484
{I}def __init__(
2485
{III}self,
2486
{III}cause: str
2487
{I}) -> None:
2488
{II}\"\"\"Initialize with the given :paramref:`cause` and an empty path.\"\"\"
2489
{II}self.cause = cause
2490
{II}self.path = Path()"""
2491
        ),
2492
        Stripped(
2493
            f"""\
2494
def _with_elements_cleared_after_yield(
2495
{II}iterator: Iterator[Tuple[str, Element]]
2496
) -> Iterator[Tuple[str, Element]]:
2497
{I}\"\"\"
2498
{I}Map the :paramref:`iterator` such that the element is ``clear()``'ed
2499
{I}*after* every ``yield``.
2500

2501
{I}:param iterator: to be mapped
2502
{I}:yield: event and element from :paramref:`iterator`
2503
{I}\"\"\"
2504
{I}for event, element in iterator:
2505
{II}yield event, element
2506
{II}element.clear()"""
2507
        ),
2508
    ]  # type: List[Stripped]
2509

2510
    errors = []  # type: List[Error]
5✔
2511

2512
    # NOTE (mristin, 2022-10-08):
2513
    # We generate first the public methods so that the reader can jump straight
2514
    # to the most important part of the code.
2515
    for cls in symbol_table.classes:
5✔
2516
        blocks.append(_generate_read_cls_from_iterparse(cls=cls, aas_module=aas_module))
5✔
2517

2518
        blocks.append(_generate_read_cls_from_stream(cls=cls, aas_module=aas_module))
5✔
2519

2520
        blocks.append(_generate_read_cls_from_file(cls=cls, aas_module=aas_module))
5✔
2521

2522
        blocks.append(_generate_read_cls_from_str(cls=cls, aas_module=aas_module))
5✔
2523

2524
    blocks.extend(
5✔
2525
        [
2526
            _generate_read_from_iterparse(aas_module=aas_module),
2527
            _generate_read_from_stream(aas_module=aas_module),
2528
            _generate_read_from_file(aas_module=aas_module),
2529
            _generate_read_from_str(aas_module=aas_module),
2530
        ]
2531
    )
2532

2533
    blocks.extend(
5✔
2534
        [
2535
            Stripped(
2536
                """\
2537
# NOTE (mristin, 2022-10-08):
2538
# Directly using the iterator turned out to result in very complex function
2539
# designs. The design became much simpler as soon as we considered one look-ahead
2540
# element. We came up finally with the following pattern which all the protected
2541
# reading functions below roughly follow:
2542
#
2543
# ..code-block::
2544
#
2545
#    _read_*(
2546
#       look-ahead element,
2547
#       iterator
2548
#    ) -> result
2549
#
2550
# The reading functions all read from the ``iterator`` coming from
2551
# :py:func:`xml.etree.ElementTree.iterparse` with the argument
2552
# ``events=["start", "end"]``. The exception :py:class:`.DeserializationException`
2553
# is raised in case of unexpected input.
2554
#
2555
# The reading functions are responsible to read the end element corresponding to the
2556
# start look-ahead element.
2557
#
2558
# When it comes to error reporting, we use exceptions. The exceptions are raised in
2559
# the *callee*, as usual. However, the context of the exception, such as the error path,
2560
# is added in the *caller*, as only the caller knows the context of
2561
# the lookahead-element. In particular, prepending the path segment corresponding to
2562
# the lookahead-element is the responsibility of the *caller*, and not of
2563
# the *callee*."""
2564
            ),
2565
            Stripped(
2566
                f"""\
2567
def _parse_element_tag(element: Element) -> str:
2568
{I}\"\"\"
2569
{I}Extract the tag name without the namespace prefix from :paramref:`element`.
2570

2571
{I}:param element: whose tag without namespace we want to extract
2572
{I}:return: tag name without the namespace prefix
2573
{I}:raise: :py:class:`DeserializationException` if unexpected :paramref:`element`
2574
{I}\"\"\"
2575
{I}if not element.tag.startswith(_NAMESPACE_IN_CURLY_BRACKETS):
2576
{II}namespace, got_namespace, tag_wo_ns = (
2577
{III}element.tag.rpartition('}}')
2578
{II})
2579
{II}if got_namespace:
2580
{III}if namespace.startswith('{{'):
2581
{IIII}namespace = namespace[1:]
2582

2583
{III}raise DeserializationException(
2584
{IIII}f"Expected the element in the namespace {{NAMESPACE!r}}, "
2585
{IIII}f"but got the element {{tag_wo_ns!r}} in the namespace {{namespace!r}}"
2586
{III})
2587
{II}else:
2588
{III}raise DeserializationException(
2589
{IIII}f"Expected the element in the namespace {{NAMESPACE!r}}, "
2590
{IIII}f"but got the element {{tag_wo_ns!r}} without the namespace prefix"
2591
{III})
2592

2593
{I}return element.tag[len(_NAMESPACE_IN_CURLY_BRACKETS):]"""
2594
            ),
2595
            Stripped(
2596
                f"""\
2597
def _raise_if_has_tail_or_attrib(
2598
{II}element: Element
2599
) -> None:
2600
{I}\"\"\"
2601
{I}Check that :paramref:`element` has no trailing text and no attributes.
2602

2603
{I}:param element: to be verified
2604
{I}:raise:
2605
{II}:py:class:`.DeserializationException` if trailing text or attributes;
2606
{II}conforming to the convention about handling error paths,
2607
{II}the exception path is left empty.
2608
{I}\"\"\"
2609
{I}if element.tail is not None and len(element.tail.strip()) != 0:
2610
{II}raise DeserializationException(
2611
{III}f"Expected no trailing text, but got: {{element.tail!r}}"
2612
{II})
2613

2614
{I}if element.attrib is not None and len(element.attrib) > 0:
2615
{II}raise DeserializationException(
2616
{III}f"Expected no attributes, but got: {{element.attrib}}"
2617
{II})"""
2618
            ),
2619
            Stripped(
2620
                f"""\
2621
def _read_end_element(
2622
{II}element: Element,
2623
{II}iterator: Iterator[Tuple[str, Element]]
2624
) -> Element:
2625
{I}\"\"\"
2626
{I}Read the end element corresponding to the start :paramref:`element`
2627
{I}from :paramref:`iterator`.
2628

2629
{I}:param element: corresponding start element
2630
{I}:param iterator:
2631
{II}Input stream of ``(event, element)`` coming from
2632
{II}:py:func:`xml.etree.ElementTree.iterparse` with the argument
2633
{II}``events=["start", "end"]``
2634
{I}:raise: :py:class:`DeserializationException` if unexpected input
2635
{I}\"\"\"
2636
{I}next_event_element = next(iterator, None)
2637
{I}if next_event_element is None:
2638
{II}raise DeserializationException(
2639
{III}f"Expected the end element for {{element.tag}}, "
2640
{III}f"but got the end-of-input"
2641
{II})
2642

2643
{I}next_event, next_element = next_event_element
2644
{I}if next_event != "end" or next_element.tag != element.tag:
2645
{II}raise DeserializationException(
2646
{III}f"Expected the end element for {{element.tag!r}}, "
2647
{III}f"but got the event {{next_event!r}} and element {{next_element.tag!r}}"
2648
{II})
2649

2650
{I}_raise_if_has_tail_or_attrib(next_element)
2651

2652
{I}return next_element"""
2653
            ),
2654
            Stripped(
2655
                f"""\
2656
def _read_text_from_element(
2657
{I}element: Element,
2658
{I}iterator: Iterator[Tuple[str, Element]]
2659
) -> str:
2660
{I}\"\"\"
2661
{I}Extract the text from the :paramref:`element`, and read
2662
{I}the end element from :paramref:`iterator`.
2663

2664
{I}The :paramref:`element` is expected to contain text. Otherwise,
2665
{I}it is considered as unexpected input.
2666

2667
{I}:param element: start element enclosing the text
2668
{I}:param iterator:
2669
{II}Input stream of ``(event, element)`` coming from
2670
{II}:py:func:`xml.etree.ElementTree.iterparse` with the argument
2671
{II}``events=["start", "end"]``
2672
{I}:raise: :py:class:`DeserializationException` if unexpected input
2673
{I}\"\"\"
2674
{I}_raise_if_has_tail_or_attrib(element)
2675

2676
{I}text = element.text
2677

2678
{I}end_element = _read_end_element(
2679
{II}element,
2680
{II}iterator,
2681
{I})
2682

2683
{I}if text is None:
2684
{II}if end_element.text is None:
2685
{III}raise DeserializationException(
2686
{IIII}"Expected an element with text, but got an element with no text."
2687
{III})
2688

2689
{II}text = end_element.text
2690

2691
{I}return text"""
2692
            ),
2693
            Stripped(
2694
                f"""\
2695
_XS_BOOLEAN_LITERAL_SET = {{
2696
{I}"1",
2697
{I}"true",
2698
{I}"0",
2699
{I}"false",
2700
}}"""
2701
            ),
2702
            Stripped(
2703
                f"""\
2704
def _read_bool_from_element_text(
2705
{I}element: Element,
2706
{I}iterator: Iterator[Tuple[str, Element]]
2707
) -> bool:
2708
{I}\"\"\"
2709
{I}Parse the text of :paramref:`element` as a boolean, and
2710
{I}read the corresponding end element from :paramref:`iterator`.
2711

2712
{I}:param element: start element
2713
{I}:param iterator:
2714
{II}Input stream of ``(event, element)`` coming from
2715
{II}:py:func:`xml.etree.ElementTree.iterparse` with the argument
2716
{II}``events=["start", "end"]``
2717
{I}:raise: :py:class:`DeserializationException` if unexpected input
2718
{I}:return: parsed value
2719
{I}\"\"\"
2720
{I}text = _read_text_from_element(
2721
{II}element,
2722
{II}iterator
2723
{I})
2724

2725
{I}if text not in _XS_BOOLEAN_LITERAL_SET:
2726
{II}raise DeserializationException(
2727
{III}f"Expected a boolean, "
2728
{III}f"but got an element with text: {{text!r}}"
2729
{II})
2730

2731
{I}return text in ('1', 'true')"""
2732
            ),
2733
            Stripped(
2734
                f"""\
2735
def _read_int_from_element_text(
2736
{I}element: Element,
2737
{I}iterator: Iterator[Tuple[str, Element]]
2738
) -> int:
2739
{I}\"\"\"
2740
{I}Parse the text of :paramref:`element` as an integer, and
2741
{I}read the corresponding end element from :paramref:`iterator`.
2742

2743
{I}:param element: start element
2744
{I}:param iterator:
2745
{II}Input stream of ``(event, element)`` coming from
2746
{II}:py:func:`xml.etree.ElementTree.iterparse` with the argument
2747
{II}``events=["start", "end"]``
2748
{I}:raise: :py:class:`DeserializationException` if unexpected input
2749
{I}:return: parsed value
2750
{I}\"\"\"
2751
{I}text = _read_text_from_element(
2752
{II}element,
2753
{II}iterator
2754
{I})
2755

2756
{I}try:
2757
{II}value = int(text)
2758
{I}except ValueError:
2759
{II}# pylint: disable=raise-missing-from
2760
{II}raise DeserializationException(
2761
{III}f"Expected an integer, "
2762
{III}f"but got an element with text: {{text!r}}"
2763
{II})
2764

2765
{I}return value"""
2766
            ),
2767
            Stripped(
2768
                f"""\
2769
_TEXT_TO_XS_DOUBLE_LITERALS = {{
2770
{I}"NaN": math.nan,
2771
{I}"INF": math.inf,
2772
{I}"-INF": -math.inf,
2773
}}"""
2774
            ),
2775
            Stripped(
2776
                f"""\
2777
def _read_float_from_element_text(
2778
{I}element: Element,
2779
{I}iterator: Iterator[Tuple[str, Element]]
2780
) -> float:
2781
{I}\"\"\"
2782
{I}Parse the text of :paramref:`element` as a floating-point number, and
2783
{I}read the corresponding end element from :paramref:`iterator`.
2784

2785
{I}:param element: start element
2786
{I}:param iterator:
2787
{II}Input stream of ``(event, element)`` coming from
2788
{II}:py:func:`xml.etree.ElementTree.iterparse` with the argument
2789
{II}``events=["start", "end"]``
2790
{I}:raise: :py:class:`DeserializationException` if unexpected input
2791
{I}:return: parsed value
2792
{I}\"\"\"
2793
{I}text = _read_text_from_element(
2794
{II}element,
2795
{II}iterator
2796
{I})
2797

2798
{I}value = _TEXT_TO_XS_DOUBLE_LITERALS.get(text, None)
2799
{I}if value is None:
2800
{II}try:
2801
{III}value = float(text)
2802
{II}except ValueError:
2803
{III}# pylint: disable=raise-missing-from
2804
{III}raise DeserializationException(
2805
{IIII}f"Expected a floating-point number, "
2806
{IIII}f"but got an element with text: {{text!r}}"
2807
{III})
2808

2809
{I}return value"""
2810
            ),
2811
            Stripped(
2812
                f"""\
2813
def _read_str_from_element_text(
2814
{I}element: Element,
2815
{I}iterator: Iterator[Tuple[str, Element]]
2816
) -> str:
2817
{I}\"\"\"
2818
{I}Parse the text of :paramref:`element` as a string, and
2819
{I}read the corresponding end element from :paramref:`iterator`.
2820

2821
{I}If there is no text, empty string is returned.
2822

2823
{I}:param element: start element
2824
{I}:param iterator:
2825
{II}Input stream of ``(event, element)`` coming from
2826
{II}:py:func:`xml.etree.ElementTree.iterparse` with the argument
2827
{II}``events=["start", "end"]``
2828
{I}:raise: :py:class:`DeserializationException` if unexpected input
2829
{I}:return: parsed value
2830
{I}\"\"\"
2831
{I}# NOTE (mristin, 2022-10-26):
2832
{I}# We do not use ``_read_text_from_element`` as that function expects
2833
{I}# the ``element`` to contain *some* text. In contrast, this function
2834
{I}# can also deal with empty text, in which case it returns an empty string.
2835

2836
{I}text = element.text
2837

2838
{I}end_element = _read_end_element(
2839
{II}element,
2840
{II}iterator
2841
{I})
2842

2843
{I}if text is None:
2844
{II}text = end_element.text
2845

2846
{I}_raise_if_has_tail_or_attrib(element)
2847
{I}result = (
2848
{II}text
2849
{II}if text is not None
2850
{II}else ""
2851
{I})
2852

2853
{I}return result"""
2854
            ),
2855
            Stripped(
2856
                f"""\
2857
def _read_bytes_from_element_text(
2858
{I}element: Element,
2859
{I}iterator: Iterator[Tuple[str, Element]]
2860
) -> bytes:
2861
{I}\"\"\"
2862
{I}Parse the text of :paramref:`element` as base64-encoded bytes, and
2863
{I}read the corresponding end element from :paramref:`iterator`.
2864

2865
{I}:param element: look-ahead element
2866
{I}:param iterator:
2867
{II}Input stream of ``(event, element)`` coming from
2868
{II}:py:func:`xml.etree.ElementTree.iterparse` with the argument
2869
{II}``events=["start", "end"]``
2870
{I}:raise: :py:class:`DeserializationException` if unexpected input
2871
{I}:return: parsed value
2872
{I}\"\"\"
2873
{I}text = _read_text_from_element(
2874
{II}element,
2875
{II}iterator
2876
{I})
2877

2878
{I}try:
2879
{II}value = base64.b64decode(text)
2880
{I}except Exception:
2881
{II}# pylint: disable=raise-missing-from
2882
{II}raise DeserializationException(
2883
{III}f"Expected a text as base64-encoded bytes, "
2884
{III}f"but got an element with text: {{text!r}}"
2885
{II})
2886

2887
{I}return value"""
2888
            ),
2889
        ]
2890
    )
2891

2892
    for our_type in symbol_table.our_types:
5✔
2893
        if isinstance(our_type, intermediate.Enumeration):
5✔
2894
            blocks.append(_generate_read_enum_from_element_text(enumeration=our_type))
5✔
2895
        elif isinstance(our_type, intermediate.ConstrainedPrimitive):
5✔
2896
            continue
5✔
2897
        elif isinstance(our_type, intermediate.AbstractClass):
5✔
2898
            blocks.append(_generate_read_cls_as_element(cls=our_type))
5✔
2899

2900
        elif isinstance(our_type, intermediate.ConcreteClass):
5✔
2901
            if our_type.is_implementation_specific:
5✔
2902
                implementation_key = specific_implementations.ImplementationKey(
×
2903
                    f"Xmlization/read_{our_type.name}.py"
2904
                )
2905

2906
                implementation = spec_impls.get(implementation_key, None)
×
2907
                if implementation is None:
×
2908
                    errors.append(
×
2909
                        Error(
2910
                            our_type.parsed.node,
2911
                            f"The xmlization snippet is missing "
2912
                            f"for the implementation-specific "
2913
                            f"class {our_type.name}: {implementation_key}",
2914
                        )
2915
                    )
2916
                    continue
×
2917
            else:
2918
                blocks.extend(
5✔
2919
                    [
2920
                        _generate_reader_and_setter(cls=our_type),
2921
                        _generate_read_as_sequence(cls=our_type),
2922
                    ]
2923
                )
2924

2925
                blocks.append(_generate_read_cls_as_element(cls=our_type))
5✔
2926

2927
        else:
2928
            assert_never(our_type)
×
2929

2930
    blocks.append(_generate_general_read_as_element(symbol_table=symbol_table))
5✔
2931

2932
    for cls in symbol_table.classes:
5✔
2933
        if isinstance(cls, intermediate.AbstractClass):
5✔
2934
            blocks.append(_generate_dispatch_map_for_class(cls=cls))
5✔
2935
        elif isinstance(cls, intermediate.ConcreteClass):
5✔
2936
            if len(cls.concrete_descendants) > 0:
5✔
2937
                blocks.append(_generate_dispatch_map_for_class(cls=cls))
5✔
2938

2939
            if not cls.is_implementation_specific:
5✔
2940
                blocks.append(_generate_reader_and_setter_map(cls=cls))
5✔
2941

2942
        else:
2943
            assert_never(cls)
×
2944

2945
    blocks.append(_generate_general_dispatch_map(symbol_table=symbol_table))
5✔
2946

2947
    blocks.append(Stripped("# endregion"))
5✔
2948

2949
    blocks.append(Stripped("# region Serialization"))
5✔
2950

2951
    blocks.append(_generate_serializer(symbol_table=symbol_table))
5✔
2952

2953
    blocks.append(
5✔
2954
        _generate_write_to_stream(symbol_table=symbol_table, aas_module=aas_module)
2955
    )
2956

2957
    blocks.append(
5✔
2958
        Stripped(
2959
            f"""\
2960
def to_str(that: aas_types.Class) -> str:
2961
{I}\"\"\"
2962
{I}Serialize :paramref:`that` to an XML-encoded text.
2963

2964
{I}:param that: instance to be serialized
2965
{I}:return: :paramref:`that` serialized to XML serialized to text
2966
{I}\"\"\"
2967
{I}writer = io.StringIO()
2968
{I}write(that, writer)
2969
{I}return writer.getvalue()"""
2970
        )
2971
    )
2972

2973
    blocks.append(Stripped("# endregion"))
5✔
2974

2975
    writer = io.StringIO()
5✔
2976
    for i, block in enumerate(blocks):
5✔
2977
        if i > 0:
5✔
2978
            writer.write("\n\n\n")
5✔
2979

2980
        writer.write(block)
5✔
2981

2982
    writer.write("\n")
5✔
2983

2984
    return writer.getvalue(), None
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc