• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

aas-core-works / aas-core-codegen / 25155643253

30 Apr 2026 08:32AM UTC coverage: 83.973% (-0.03%) from 83.998%
25155643253

push

github

web-flow
Infer schema constraints for values (#618)

So far, we inferred the constraints on the class properties. As we want
to support lists of primitives, lists of constrained primitives *etc.*,
we need to infer the constraints at each atomic value of a type
annotation, not only at the property level.

We adapt the inference in this change to achieve that. It caused only
minor changes in the generated code -- since we do not limit ourselves
to properties only, and need a more general logic, some of the generated
artefacts had to slightly change.

378 of 430 new or added lines in 6 files covered. (87.91%)

13 existing lines in 3 files now uncovered.

29310 of 34904 relevant lines covered (83.97%)

3.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.49
/aas_core_codegen/jsonschema/main.py
1
"""Generate JSON schema corresponding to the meta-model."""
2
import collections
4✔
3
import itertools
4✔
4
import json
4✔
5
from typing import (
4✔
6
    TextIO,
7
    Any,
8
    MutableMapping,
9
    Optional,
10
    Tuple,
11
    List,
12
    Sequence,
13
    Mapping,
14
    Callable,
15
    Iterator,
16
    Final,
17
    cast,
18
)
19

20
from icontract import ensure, require
4✔
21

22
import aas_core_codegen.jsonschema
4✔
23
from aas_core_codegen import (
4✔
24
    naming,
25
    specific_implementations,
26
    intermediate,
27
    run,
28
    infer_for_schema,
29
)
30
from aas_core_codegen.parse import retree as parse_retree
4✔
31
from aas_core_codegen.common import Stripped, Error, assert_never, Identifier
4✔
32

33
assert aas_core_codegen.jsonschema.__doc__ == __doc__
4✔
34

35

36
def _define_for_enumeration(
4✔
37
    enumeration: intermediate.Enumeration,
38
) -> MutableMapping[str, Any]:
39
    """
40
    Generate the definition for an ``enumeration``.
41

42
    The list of definitions is to be *extended* with the resulting mapping.
43
    """
44
    definition = collections.OrderedDict()  # type: MutableMapping[str, Any]
4✔
45
    definition["type"] = "string"
4✔
46
    definition["enum"] = sorted(literal.value for literal in enumeration.literals)
4✔
47

48
    model_type = naming.json_model_type(enumeration.name)
4✔
49

50
    return collections.OrderedDict([(model_type, definition)])
4✔
51

52

53
_PRIMITIVE_MAP = {
4✔
54
    intermediate.PrimitiveType.BOOL: "boolean",
55
    intermediate.PrimitiveType.INT: "integer",
56
    intermediate.PrimitiveType.FLOAT: "number",
57
    intermediate.PrimitiveType.STR: "string",
58
    intermediate.PrimitiveType.BYTEARRAY: "string",
59
}
60
assert all(literal in _PRIMITIVE_MAP for literal in intermediate.PrimitiveType)
4✔
61

62

63
class _AllOf:
4✔
64
    """Represent a sequence of JSON schema declarations."""
65

66
    subschemas: Final[Sequence[Mapping[str, Any]]]
4✔
67

68
    @require(lambda subschemas: len(subschemas) > 0)
4✔
69
    def __init__(self, subschemas: Sequence[Mapping[str, Any]]) -> None:
4✔
70
        self.subschemas = subschemas
4✔
71

72

73
def _translate_constraints(
4✔
74
    type_annotation: intermediate.TypeAnnotationExceptOptional,
75
    constraints: Optional[infer_for_schema.Constraints],
76
    fix_pattern: Callable[[str], str],
77
) -> Optional[_AllOf]:
78
    """
79
    Translate the constraints for a value into JSON schema.
80

81
    If none of the constraints could be translated, return None.
82
    """
83
    if constraints is None:
4✔
NEW
84
        return None
×
85

86
    primitive_type = intermediate.try_primitive_type(type_annotation)
4✔
87

88
    base_subschema: MutableMapping[str, Any] = collections.OrderedDict()
4✔
89

90
    additional_subschemas: List[MutableMapping[str, Any]] = []
4✔
91

92
    if primitive_type is not None:
4✔
93
        if (
4✔
94
            primitive_type
95
            in (intermediate.PrimitiveType.STR, intermediate.PrimitiveType.BYTEARRAY)
96
            and constraints.len_constraint is not None
97
        ):
98
            if constraints.len_constraint.min_value is not None:
4✔
99
                base_subschema["minLength"] = constraints.len_constraint.min_value
4✔
100

101
            if constraints.len_constraint.max_value is not None:
4✔
102
                base_subschema["maxLength"] = constraints.len_constraint.max_value
4✔
103

104
        if (
4✔
105
            primitive_type == intermediate.PrimitiveType.STR
106
            and constraints.patterns is not None
107
            and len(constraints.patterns) > 0
108
        ):
109
            if len(constraints.patterns) == 1:
4✔
110
                base_subschema["pattern"] = fix_pattern(constraints.patterns[0].pattern)
4✔
111
            else:
112
                iterator = iter(constraints.patterns)
4✔
113

114
                first_pattern = next(iterator)
4✔
115

116
                base_subschema["pattern"] = fix_pattern(first_pattern.pattern)
4✔
117

118
                for pattern_constraint in iterator:
4✔
119
                    additional_subschema: MutableMapping[
4✔
120
                        str, Any
121
                    ] = collections.OrderedDict()
122

123
                    additional_subschema["pattern"] = fix_pattern(
4✔
124
                        pattern_constraint.pattern
125
                    )
126

127
                    additional_subschemas.append(additional_subschema)
4✔
128

129
    if isinstance(type_annotation, intermediate.ListTypeAnnotation):
4✔
130
        if constraints.len_constraint is not None:
4✔
131
            if constraints.len_constraint.min_value is not None:
4✔
132
                base_subschema["minItems"] = constraints.len_constraint.min_value
4✔
133

134
            if constraints.len_constraint.max_value is not None:
4✔
NEW
135
                base_subschema["maxItems"] = constraints.len_constraint.max_value
×
136

137
    assert (
4✔
138
        not (len(base_subschema) == 0) or len(additional_subschemas) == 0
139
    ), "If base subschema is empty, no additional subschemas are expected."
140

141
    if len(base_subschema) == 0:
4✔
142
        return None
4✔
143

144
    return _AllOf([base_subschema] + additional_subschemas)
4✔
145

146

147
def _all_of_as_jsonable_mapping(all_of: _AllOf) -> MutableMapping[str, Any]:
4✔
148
    """
149
    Render the instance of AllOf JSON schema construct to a JSON-able mapping.
150

151
    If it consists of just one subschema, that schema is simply returned.
152
    """
153
    if len(all_of.subschemas) == 1:
4✔
154
        return cast(MutableMapping[str, Any], all_of.subschemas[0])
4✔
155

156
    # NOTE (mristin):
157
    # We want to make it easier for code generators to see the base schema *before*
158
    # ``allOf``, so that they can ignore ``allOf`` in case that they can not process
159
    # it, but still get enough relevant information from the first base subschema.
160
    mapping: MutableMapping[str, Any] = collections.OrderedDict(all_of.subschemas[0])
4✔
161

162
    mapping["allOf"] = all_of.subschemas[1:]
4✔
163

164
    return mapping
4✔
165

166

167
@ensure(lambda result: (result[0] is not None) ^ (result[1] is not None))
4✔
168
def _define_type(
4✔
169
    type_annotation: intermediate.TypeAnnotationExceptOptional,
170
    constraints_by_value: infer_for_schema.ConstraintsByValue,
171
    fix_pattern: Callable[[str], str],
172
) -> Tuple[Optional[MutableMapping[str, Any]], Optional[Error]]:
173
    """
174
    Generate the type definition for ``type_annotation``.
175

176
    The constraints-by-value define all the constraints of the class nesting a property
177
    of this type.
178

179
    The ``fix_pattern`` determines how the pattern should be translated for
180
    the regex engine. For example, some JSON schema verification engines expect only
181
    characters below Basic Multilingual Plane (BMP), and use surrogate pairs to
182
    represent characters above BMP.
183
    """
184
    definition: MutableMapping[str, Any] = collections.OrderedDict()
4✔
185

186
    constraints = constraints_by_value.get(type_annotation, None)
4✔
187

188
    primitive_type = intermediate.try_primitive_type(type_annotation)
4✔
189

190
    if primitive_type is not None:
4✔
191
        definition["type"] = _PRIMITIVE_MAP[primitive_type]
4✔
192

193
        if primitive_type is intermediate.PrimitiveType.BYTEARRAY:
4✔
194
            definition["contentEncoding"] = "base64"
4✔
195

196
    else:
197
        if isinstance(type_annotation, intermediate.PrimitiveTypeAnnotation):
4✔
NEW
198
            raise AssertionError(
×
199
                f"Expected to handle this path before with try_primitive_type: "
200
                f"{type_annotation=}, {primitive_type}"
201
            )
202

203
        elif isinstance(type_annotation, intermediate.OurTypeAnnotation):
4✔
204
            model_type = naming.json_model_type(type_annotation.our_type.name)
4✔
205

206
            if isinstance(type_annotation.our_type, intermediate.Enumeration):
4✔
207
                return (
4✔
208
                    collections.OrderedDict([("$ref", f"#/definitions/{model_type}")]),
209
                    None,
210
                )
211

212
            elif isinstance(
4✔
213
                type_annotation.our_type, intermediate.ConstrainedPrimitive
214
            ):
NEW
215
                raise AssertionError(
×
216
                    "Expected to handle this path before with try_primitive_type"
217
                )
218

219
            elif isinstance(type_annotation.our_type, intermediate.Class):
4✔
220
                if len(type_annotation.our_type.concrete_descendants) > 0:
4✔
221
                    return (
4✔
222
                        collections.OrderedDict(
223
                            [("$ref", f"#/definitions/{model_type}_choice")]
224
                        ),
225
                        None,
226
                    )
227
                else:
228
                    return (
4✔
229
                        collections.OrderedDict(
230
                            [("$ref", f"#/definitions/{model_type}")]
231
                        ),
232
                        None,
233
                    )
234

235
            else:
NEW
236
                assert_never(type_annotation.our_type)
×
237

238
        elif isinstance(type_annotation, intermediate.ListTypeAnnotation):
4✔
239
            assert not isinstance(
4✔
240
                type_annotation.items, intermediate.OptionalTypeAnnotation
241
            ), (
242
                "NOTE (mristin): Lists of optional values were not expected "
243
                "at the time when we implemented this. Please contact the developers "
244
                "if you need this functionality."
245
            )
246

247
            items_type_definition, items_error = _define_type(
4✔
248
                type_annotation=type_annotation.items,
249
                constraints_by_value=constraints_by_value,
250
                fix_pattern=fix_pattern,
251
            )
252

253
            if items_error is not None:
4✔
NEW
254
                return None, items_error
×
255

256
            assert items_type_definition is not None
4✔
257

258
            definition["type"] = "array"
4✔
259
            definition["items"] = items_type_definition
4✔
260

261
        else:
NEW
262
            assert_never(type_annotation)
×
263

264
    if constraints is None:
4✔
265
        return definition, None
4✔
266
    else:
267
        all_of = _translate_constraints(
4✔
268
            type_annotation=type_annotation,
269
            constraints=constraints,
270
            fix_pattern=fix_pattern,
271
        )
272

273
        if all_of is None:
4✔
NEW
274
            return definition, None
×
275

276
        base_subschema = all_of.subschemas[0]
4✔
277

278
        # NOTE (mristin):
279
        # We put the type definitions first for readability.
280
        definition.update(base_subschema)
4✔
281

282
        return (
4✔
283
            _all_of_as_jsonable_mapping(
284
                _AllOf([definition, *itertools.islice(all_of.subschemas, 1, None)])
285
            ),
286
            None,
287
        )
288

289

290
# NOTE (mristin):
291
# This function is made public so that we can use it in other schema generators such
292
# as the SHACL generator.
293
def fix_pattern_for_utf16(pattern: str) -> str:
4✔
294
    """Parse the pattern and re-render it for UTF-16-only regex engines."""
295
    regex, error = parse_retree.parse([pattern])
4✔
296
    if error is not None:
4✔
297
        raise ValueError(
×
298
            f"The pattern could not be parsed: {pattern!r}; error was: {error}"
299
        )
300

301
    assert regex is not None
4✔
302
    parse_retree.fix_for_utf16_regex_in_place(regex)
4✔
303

304
    parts = parse_retree.render(regex=regex)
4✔
305
    assert all(
4✔
306
        isinstance(part, str) for part in parts
307
    ), "Only string parts expected, no formatted values"
308

309
    # NOTE (mristin, 2023-03-15):
310
    # We have to make this transformation for mypy.
311
    parts_str = []  # type: List[str]
4✔
312
    for part in parts:
4✔
313
        assert isinstance(part, str), "Only string parts expected, no formatted values"
4✔
314
        parts_str.append(part)
4✔
315

316
    return "".join(parts_str)
4✔
317

318

319
def _over_non_optional_type_annotations(
4✔
320
    type_annotation: intermediate.TypeAnnotationUnion,
321
) -> Iterator[intermediate.TypeAnnotationUnion]:
322
    """
323
    Iterate recursively over the type annotation and all its nested type annotations.
324

325
    The optional type annotations are recursed into, but will not be yielded.
326
    """
NEW
327
    if not isinstance(type_annotation, intermediate.OptionalTypeAnnotation):
×
NEW
328
        yield type_annotation
×
329

NEW
330
    if isinstance(type_annotation, intermediate.OptionalTypeAnnotation):
×
NEW
331
        yield from _over_non_optional_type_annotations(type_annotation.value)
×
332

UNCOV
333
    elif isinstance(type_annotation, intermediate.ListTypeAnnotation):
×
NEW
334
        yield from _over_non_optional_type_annotations(type_annotation.items)
×
335

NEW
336
    elif isinstance(
×
337
        type_annotation,
338
        (intermediate.PrimitiveTypeAnnotation, intermediate.OurTypeAnnotation),
339
    ):
NEW
340
        pass
×
341

342
    else:
343
        # noinspection PyTypeChecker
NEW
344
        assert_never(type_annotation)
×
345

346

347
@ensure(lambda result: (result[0] is not None) ^ (result[1] is not None))
4✔
348
def _define_properties(
4✔
349
    cls: intermediate.ClassUnion,
350
    constraints_by_class: Mapping[
351
        intermediate.ClassUnion, infer_for_schema.ConstraintsByValue
352
    ],
353
    fix_pattern: Callable[[str], str],
354
) -> Tuple[Optional[MutableMapping[str, Any]], Optional[List[Error]]]:
355
    """
356
    Generate the definitions of the meta-model properties for the given ``cls``.
357

358
    The property ``modelType`` is defined separately as we need to distinguish
359
    cases where it is set (concrete class) and not (abstract class, or stub of
360
    a concrete class with descendants).
361

362
    The ``fix_pattern`` determines how the pattern should be translated for
363
    the regex engine. For example, some JSON schema verification engines expect only
364
    characters below Basic Multilingual Plane (BMP), and use surrogate pairs to
365
    represent characters above BMP.
366
    """
367
    errors = []  # type: List[Error]
4✔
368

369
    properties = collections.OrderedDict()  # type: MutableMapping[str, Any]
4✔
370

371
    constraints_by_value = constraints_by_class[cls]
4✔
372

373
    for prop in cls.properties:
4✔
374
        prop_name = naming.json_property(prop.name)
4✔
375

376
        type_anno = intermediate.beneath_optional(prop.type_annotation)
4✔
377

378
        definition: Optional[MutableMapping[str, Any]] = None
4✔
379

380
        if prop.specified_for is cls:
4✔
381
            maybe_definition, error = _define_type(
4✔
382
                type_annotation=type_anno,
383
                constraints_by_value=constraints_by_value,
384
                fix_pattern=fix_pattern,
385
            )
386

387
            if error is not None:
4✔
388
                errors.append(error)
×
389
            else:
390
                assert maybe_definition is not None
4✔
391
                definition = maybe_definition
4✔
392
        else:
393
            # NOTE (mristin):
394
            # The properties are inherited through ``allOf``. We can not change the type
395
            # of property in the children as this would result in a conflict and
396
            # unsatisfiable schema. Therefore, we define the type for a property only
397
            # at the parent class, where the property is defined for the first time.
398
            #
399
            # However, children classes can tighten constraints, so we have to reflect
400
            # that here. While we could theoretically check for all the constraints
401
            # deep into the nested type annotations, we currently limit ourselves to
402
            # check only if the constraints differ at the property level due to
403
            # the lack of time.
404

405
            constraints = constraints_by_value.get(type_anno, None)
4✔
406
            if constraints is None:
4✔
407
                continue
4✔
408

409
            for parent in cls.inheritances:
4✔
410
                parent_constraints = constraints_by_class[parent].get(type_anno, None)
4✔
411

412
                # NOTE (mristin):
413
                # We leverage here the fact that we do not make additional copies
414
                # when merging the constraints in case where one of the constraints is
415
                # none (see ``infer_for_schema._inline._merge_*`` functions).
416
                if parent_constraints is not None:
4✔
417
                    assert (
4✔
418
                        constraints is not None
419
                    ), "We can only tighten the constraints, but not relax them."
420

421
                    # fmt: off
422
                    constraints = (
4✔
423
                        infer_for_schema
424
                        .tightening_steps_from_other_to_that_constraints(
425
                            that=constraints,
426
                            other=parent_constraints,
427
                        )
428
                    )
429
                    # fmt: on
430

431
            all_of = _translate_constraints(
4✔
432
                type_annotation=type_anno,
433
                constraints=constraints,
434
                fix_pattern=fix_pattern,
435
            )
436

437
            if all_of is not None:
4✔
438
                definition = _all_of_as_jsonable_mapping(all_of)
4✔
439

440
        # NOTE (mristin):
441
        # We do not want to pollute the schema with empty definitions.
442
        if definition is not None and len(definition) > 0:
4✔
443
            properties[prop_name] = definition
4✔
444

445
    if len(errors) > 0:
4✔
446
        return None, errors
×
447

448
    return properties, None
4✔
449

450

451
def _list_required_properties(cls: intermediate.ClassUnion) -> List[Identifier]:
4✔
452
    """
453
    List all the properties which are required.
454

455
    Only the meta-model properties are listed, so this list might not be exhaustive.
456
    For example, the JSON property ``modelType`` is not listed here.
457
    """
458
    required = []  # type: List[Identifier]
4✔
459
    for prop in cls.properties:
4✔
460
        # NOTE (mristin, 2023-02-06):
461
        # We stack the inheritance as ``allOf``. This will impose the stacking
462
        # of the required fields as well, so whenever you add a field to
463
        # a child ``required`` constraint, it will *extend* the list of
464
        # the required fields, *not* replace it.
465
        if prop.specified_for is not cls:
4✔
466
            continue
4✔
467

468
        if not isinstance(prop.type_annotation, intermediate.OptionalTypeAnnotation):
4✔
469
            prop_name = naming.json_property(prop.name)
4✔
470
            required.append(prop_name)
4✔
471

472
    return required
4✔
473

474

475
def _define_all_of_for_inheritance(
4✔
476
    cls: intermediate.ClassUnion,
477
) -> List[MutableMapping[str, Any]]:
478
    """Generate ``allOf`` definition as inheritance."""
479
    all_of = []  # type: List[MutableMapping[str, Any]]
4✔
480

481
    for inheritance in cls.inheritances:
4✔
482
        if isinstance(inheritance, intermediate.AbstractClass):
4✔
483
            all_of.append(
4✔
484
                {"$ref": f"#/definitions/{naming.json_model_type(inheritance.name)}"}
485
            )
486
        elif isinstance(inheritance, intermediate.ConcreteClass):
4✔
487
            # NOTE (mristin, 2023-03-13):
488
            # We distinguish between two definitions corresponding to the same concrete
489
            # class:
490
            #
491
            # 1) One definition defines only the class in abstract, so that
492
            #    it can be inherited. The abstract definition lacks the ``modelType``
493
            #    constant, as that would conflict in inheritance.
494
            # 2) The other definition corresponds to the concrete definition of
495
            #    the class that an instance has to fulfill. This definition includes
496
            #    the constant ``modelType``.
497
            #
498
            # We had to separate these two definitions to avoid conflicts in
499
            # ``modelType`` constant between a parent concrete class and a child
500
            # concrete class.
501
            all_of.append(
4✔
502
                {
503
                    "$ref": f"#/definitions/"
504
                    f"{naming.json_model_type(inheritance.name)}_abstract"
505
                }
506
            )
507

508
    return all_of
4✔
509

510

511
@require(lambda cls: len(cls.concrete_descendants) > 0)
4✔
512
@ensure(lambda result: (result[0] is not None) ^ (result[1] is not None))
4✔
513
def _generate_inheritable_definition(
4✔
514
    cls: intermediate.ClassUnion,
515
    constraints_by_class: Mapping[
516
        intermediate.ClassUnion, infer_for_schema.ConstraintsByValue
517
    ],
518
    fix_pattern: Callable[[str], str],
519
) -> Tuple[Optional[MutableMapping[str, Any]], Optional[List[Error]]]:
520
    """
521
    Generate a definition of ``cls`` for inheritance through ``allOf``.
522

523
    The ``fix_pattern`` determines how the pattern should be translated for
524
    the respective JSON schema engine.
525

526
    The definitions are to be *extended* with the resulting mapping.
527
    """
528
    all_of = _define_all_of_for_inheritance(cls)
4✔
529

530
    errors = []  # type: List[Error]
4✔
531

532
    properties, properties_error = _define_properties(
4✔
533
        cls=cls,
534
        constraints_by_class=constraints_by_class,
535
        fix_pattern=fix_pattern,
536
    )
537
    if properties_error is not None:
4✔
538
        errors.extend(properties_error)
×
539

540
    if len(errors) > 0:
4✔
541
        return None, errors
×
542

543
    assert properties is not None
4✔
544

545
    required = _list_required_properties(cls)
4✔
546

547
    if cls.serialization.with_model_type and not any(
4✔
548
        inheritance.serialization.with_model_type for inheritance in cls.inheritances
549
    ):
550
        # NOTE (mristin, 2023-03-13):
551
        # This is going to be an abstract definition for inheritance, so we can not pin
552
        # the ``modelType`` to a fixed, constant value.
553
        assert "modelType" not in properties
4✔
554
        properties["modelType"] = {"$ref": "#/definitions/ModelType"}
4✔
555

556
        required.append(Identifier("modelType"))
4✔
557

558
    definition = collections.OrderedDict()  # type: MutableMapping[str, Any]
4✔
559
    if len(cls.inheritances) == 0:
4✔
560
        definition["type"] = "object"
4✔
561

562
    if len(properties) > 0:
4✔
563
        definition["properties"] = properties
4✔
564

565
        if len(required) > 0:
4✔
566
            definition["required"] = required
4✔
567

568
    if len(definition) > 0:
4✔
569
        all_of.append(definition)
4✔
570

571
    definition_name: str
572
    if isinstance(cls, intermediate.AbstractClass):
4✔
573
        definition_name = naming.json_model_type(cls.name)
4✔
574
    elif isinstance(cls, intermediate.ConcreteClass):
4✔
575
        definition_name = f"{naming.json_model_type(cls.name)}_abstract"
4✔
576
    else:
577
        assert_never(cls)
×
578

579
    result = collections.OrderedDict()  # type: MutableMapping[str, Any]
4✔
580

581
    if len(all_of) == 0:
4✔
582
        result[definition_name] = {"type": "object"}
×
583
    elif len(all_of) == 1:
4✔
584
        result[definition_name] = all_of[0]
4✔
585
    else:
586
        result[definition_name] = {"allOf": all_of}
4✔
587

588
    return result, None
4✔
589

590

591
@require(lambda cls: len(cls.concrete_descendants) > 0)
4✔
592
def _generate_choice_definition(
4✔
593
    cls: intermediate.ClassUnion,
594
) -> MutableMapping[str, Any]:
595
    """
596
    Generate the definition of dispatching through ``oneOf``.
597

598
    The definitions are to be *extended* with the resulting mapping.
599
    """
600
    one_of = []  # type: List[Mapping[str, Any]]
4✔
601
    if isinstance(cls, intermediate.ConcreteClass):
4✔
602
        one_of.append({"$ref": f"#/definitions/{naming.json_model_type(cls.name)}"})
4✔
603

604
    for descendant in cls.concrete_descendants:
4✔
605
        one_of.append(
4✔
606
            {"$ref": f"#/definitions/{naming.json_model_type(descendant.name)}"}
607
        )
608

609
    return {f"{naming.json_model_type(cls.name)}_choice": {"oneOf": one_of}}
4✔
610

611

612
@ensure(lambda result: (result[0] is not None) ^ (result[1] is not None))
4✔
613
def _generate_concrete_definition(
4✔
614
    cls: intermediate.ClassUnion,
615
    constraints_by_class: Mapping[
616
        intermediate.ClassUnion, infer_for_schema.ConstraintsByValue
617
    ],
618
    fix_pattern: Callable[[str], str],
619
) -> Tuple[Optional[MutableMapping[str, Any]], Optional[List[Error]]]:
620
    """
621
    Generate the definition of a concrete class to be matched by an instance.
622

623
    The ``fix_pattern`` determines how the pattern should be translated for
624
    the respective JSON schema engine.
625
    """
626
    # NOTE (mristin):
627
    # We distinguish between two definitions corresponding to the same concrete
628
    # class:
629
    #
630
    # 1) One definition defines only the class in abstract, so that
631
    #    it can be inherited. The abstract definition lacks the ``modelType``
632
    #    constant, as that would conflict in inheritance.
633
    # 2) The other definition corresponds to the concrete definition of
634
    #    the class that an instance has to fulfill. This definition includes
635
    #    the constant ``modelType``.
636
    #
637
    # We had to separate these two definitions to avoid conflicts in
638
    # ``modelType`` constant between a parent concrete class and a child
639
    # concrete class.
640

641
    model_type = naming.json_model_type(cls.name)
4✔
642

643
    if len(cls.concrete_descendants) > 0:
4✔
644
        assert cls.serialization.with_model_type, (
4✔
645
            f"Expected model type to be included in a class with concrete descendants "
646
            f"{cls.name!r}"
647
        )
648

649
        all_of = [
4✔
650
            {"$ref": f"#/definitions/{model_type}_abstract"},
651
            {"properties": {"modelType": {"const": model_type}}},
652
        ]  # type: List[MutableMapping[str, Any]]
653

654
        return {model_type: {"allOf": all_of}}, None
4✔
655

656
    all_of = _define_all_of_for_inheritance(cls)
4✔
657

658
    errors = []  # type: List[Error]
4✔
659

660
    properties, properties_error = _define_properties(
4✔
661
        cls=cls,
662
        constraints_by_class=constraints_by_class,
663
        fix_pattern=fix_pattern,
664
    )
665
    if properties_error is not None:
4✔
666
        errors.extend(properties_error)
×
667

668
    if len(errors) > 0:
4✔
669
        return None, errors
×
670

671
    assert properties is not None
4✔
672

673
    required = _list_required_properties(cls)
4✔
674

675
    if cls.serialization.with_model_type:
4✔
676
        properties["modelType"] = {"const": model_type}
4✔
677

678
    definition = collections.OrderedDict()  # type: MutableMapping[str, Any]
4✔
679
    if len(cls.inheritances) == 0:
4✔
680
        definition["type"] = "object"
4✔
681

682
    if len(properties) > 0:
4✔
683
        definition["properties"] = properties
4✔
684

685
        if len(required) > 0:
4✔
686
            definition["required"] = required
4✔
687

688
    all_of.append(definition)
4✔
689

690
    model_type = naming.json_model_type(cls.name)
4✔
691

692
    result = collections.OrderedDict()  # type: MutableMapping[str, Any]
4✔
693

694
    if len(all_of) == 0:
4✔
695
        result[model_type] = {"type": "object"}
×
696
    elif len(all_of) == 1:
4✔
697
        result[model_type] = all_of[0]
4✔
698
    else:
699
        result[model_type] = {"allOf": all_of}
4✔
700

701
    return result, None
4✔
702

703

704
class Definitions:
4✔
705
    """Store definitions of the schema as we go."""
706

707
    def __init__(self) -> None:
4✔
708
        """Initialize as empty."""
709
        self._definitions = collections.OrderedDict()  # type: MutableMapping[str, Any]
4✔
710

711
    def get(self) -> Mapping[str, Any]:
4✔
712
        """Get the content."""
713
        return self._definitions
4✔
714

715
    def update_for(
4✔
716
        self, our_type: intermediate.OurType, extension: Mapping[str, Any]
717
    ) -> Optional[Error]:
718
        """Update the definitions with ``extension`` related to ``our_type``."""
719
        for key, definition in extension.items():
4✔
720
            if key in self._definitions:
4✔
721
                return Error(
×
722
                    our_type.parsed.node,
723
                    f"One of the JSON definitions, {key}, "
724
                    f"for our type {our_type.name} has been "
725
                    f"already provided in the definitions; "
726
                    f"did you already perhaps define it in another "
727
                    f"implementation-specific snippet?",
728
                )
729

730
            self._definitions[key] = definition
4✔
731

732
        return None
4✔
733

734
    def update(self, extension: Mapping[str, Any]) -> Optional[Error]:
4✔
735
        """Update the definitions with ``extension`` unrelated to any of our types."""
736
        for key, definition in extension.items():
4✔
737
            if key in self._definitions:
4✔
738
                return Error(
×
739
                    None,
740
                    f"One of the JSON definitions, {key}, "
741
                    f"has been already provided in the definitions; "
742
                    f"did you already perhaps define it in another "
743
                    f"implementation-specific snippet?",
744
                )
745

746
            self._definitions[key] = definition
4✔
747

748
        return None
4✔
749

750

751
@ensure(lambda result: (result[0] is not None) ^ (result[1] is not None))
4✔
752
def generate(
4✔
753
    symbol_table: intermediate.SymbolTable,
754
    spec_impls: specific_implementations.SpecificImplementations,
755
    fix_pattern: Callable[[str], str],
756
) -> Tuple[Optional[Stripped], Optional[List[Error]]]:
757
    """
758
    Generate the JSON schema based on the symbol table.
759

760
    The ``fix_pattern`` determines how the pattern should be translated for
761
    the respective JSON schema engine.
762

763
    This function is intended to be used not only by aas-core-codegen, but also for
764
    downstream clients. For example, the downstream clients should use this function
765
    to customize how patterns should be rendered / fixed for the respective regex
766
    engine.
767
    """
768
    schema_base_key = specific_implementations.ImplementationKey("schema_base.json")
4✔
769

770
    schema_base_json = spec_impls.get(schema_base_key, None)
4✔
771
    if schema_base_json is None:
4✔
772
        return None, [
×
773
            Error(
774
                None,
775
                f"The implementation snippet for the base schema "
776
                f"is missing: {schema_base_key}",
777
            )
778
        ]
779

780
    schema: MutableMapping[str, Any]
781

782
    try:
4✔
783
        # noinspection PyTypeChecker
784
        schema = json.loads(schema_base_json, object_pairs_hook=collections.OrderedDict)
4✔
785
    except json.JSONDecodeError as err:
×
786
        return None, [
×
787
            Error(
788
                None, f"Failed to parse the base schema from {schema_base_key}: {err}"
789
            )
790
        ]
791

792
    if "$id" in schema:
4✔
793
        return None, [
×
794
            Error(
795
                None,
796
                f"Unexpected property '$id' in the base JSON schema "
797
                f"from: {schema_base_key}",
798
            )
799
        ]
800

801
    # NOTE (mristin, 2022-08-25):
802
    # We use the same namespace in all the schemas for the consistency.
803
    schema["$id"] = symbol_table.meta_model.xml_namespace
4✔
804

805
    if "definitions" in schema:
4✔
806
        return None, [
×
807
            Error(
808
                None,
809
                f"The property ``definitions`` unexpected in the base JSON schema "
810
                f"from: {schema_base_key}",
811
            )
812
        ]
813

814
    errors = []  # type: List[Error]
4✔
815

816
    definitions = Definitions()
4✔
817

818
    constraints_by_class, some_errors = infer_for_schema.infer_constraints_by_class(
4✔
819
        symbol_table=symbol_table
820
    )
821

822
    if some_errors is not None:
4✔
823
        errors.extend(some_errors)
×
824

825
    if len(errors) > 0:
4✔
826
        return None, errors
×
827

828
    assert constraints_by_class is not None
4✔
829

830
    ids_of_our_types_in_properties = (
4✔
831
        intermediate.collect_ids_of_our_types_in_properties(symbol_table=symbol_table)
832
    )
833

834
    for our_type in symbol_table.our_types:
4✔
835
        if (
4✔
836
            isinstance(
837
                our_type, (intermediate.AbstractClass, intermediate.ConcreteClass)
838
            )
839
            and our_type.is_implementation_specific
840
        ):
841
            implementation_key = specific_implementations.ImplementationKey(
×
842
                f"{our_type.name}.json"
843
            )
844

845
            code = spec_impls.get(implementation_key, None)
×
846
            if code is None:
×
847
                errors.append(
×
848
                    Error(
849
                        our_type.parsed.node,
850
                        f"The implementation is missing "
851
                        f"for the implementation-specific class: {implementation_key}",
852
                    )
853
                )
854
                continue
×
855

856
            try:
×
857
                # noinspection PyTypeChecker
858
                extension = json.loads(code, object_pairs_hook=collections.OrderedDict)
×
859
            except Exception as err:
×
860
                errors.append(
×
861
                    Error(
862
                        our_type.parsed.node,
863
                        f"Failed to parse the JSON out of "
864
                        f"the specific implementation {implementation_key}: {err}",
865
                    )
866
                )
867
                continue
×
868

869
            if not isinstance(extension, dict):
×
870
                errors.append(
×
871
                    Error(
872
                        our_type.parsed.node,
873
                        f"Expected the implementation-specific snippet "
874
                        f"at {implementation_key} to be a JSON object, "
875
                        f"but got: {type(extension)}",
876
                    )
877
                )
878
                continue
×
879

880
            update_error = definitions.update_for(
×
881
                our_type=our_type, extension=extension
882
            )
883
            if update_error is not None:
×
884
                errors.append(update_error)
×
885

886
        else:
887
            if isinstance(our_type, intermediate.Enumeration):
4✔
888
                update_error = definitions.update_for(
4✔
889
                    our_type=our_type,
890
                    extension=_define_for_enumeration(enumeration=our_type),
891
                )
892
                if update_error is not None:
4✔
893
                    errors.append(update_error)
×
894

895
            elif isinstance(our_type, intermediate.ConstrainedPrimitive):
4✔
896
                # NOTE (mristin):
897
                # We in-line the constraints from the constrained primitives directly
898
                # in the properties. We do not want to introduce separate definitions
899
                # for them as that would make it more difficult for downstream code
900
                # generators to generate meaningful code (*e.g.*, code generators for
901
                # OpenAPI3).
902
                continue
4✔
903

904
            elif isinstance(
4✔
905
                our_type, (intermediate.AbstractClass, intermediate.ConcreteClass)
906
            ):
907
                if len(our_type.concrete_descendants) > 0:
4✔
908
                    # region Inheritable
909
                    inheritable, definition_errors = _generate_inheritable_definition(
4✔
910
                        cls=our_type,
911
                        constraints_by_class=constraints_by_class,
912
                        fix_pattern=fix_pattern,
913
                    )
914

915
                    if definition_errors is not None:
4✔
916
                        errors.extend(definition_errors)
×
917
                        continue
×
918

919
                    assert inheritable is not None
4✔
920
                    update_error = definitions.update_for(
4✔
921
                        our_type=our_type, extension=inheritable
922
                    )
923
                    if update_error is not None:
4✔
924
                        errors.append(update_error)
×
925
                    # endregion
926

927
                    # region Choice
928
                    if isinstance(our_type, intermediate.ConcreteClass) or (
4✔
929
                        isinstance(our_type, intermediate.AbstractClass)
930
                        and id(our_type) in ids_of_our_types_in_properties
931
                    ):
932
                        update_error = definitions.update_for(
4✔
933
                            our_type=our_type,
934
                            extension=_generate_choice_definition(cls=our_type),
935
                        )
936
                        if update_error is not None:
4✔
937
                            errors.append(update_error)
×
938
                    # endregion
939

940
                if isinstance(our_type, intermediate.ConcreteClass):
4✔
941
                    definition, definition_errors = _generate_concrete_definition(
4✔
942
                        cls=our_type,
943
                        constraints_by_class=constraints_by_class,
944
                        fix_pattern=fix_pattern,
945
                    )
946
                    if definition_errors is not None:
4✔
947
                        errors.extend(definition_errors)
×
948
                        continue
×
949

950
                    assert definition is not None
4✔
951

952
                    update_error = definitions.update_for(
4✔
953
                        our_type=our_type, extension=definition
954
                    )
955
                    if update_error is not None:
4✔
956
                        errors.append(update_error)
×
957
                else:
958
                    assert isinstance(our_type, intermediate.AbstractClass)
4✔
959

960
                    # We do not generate any concrete definition for an abstract class.
961
                    pass
4✔
962
            else:
963
                assert_never(our_type)
×
964

965
    if len(errors) > 0:
4✔
966
        return None, errors
×
967

968
    model_types = sorted(
4✔
969
        naming.json_model_type(cls.name)
970
        for cls in symbol_table.concrete_classes
971
        if cls.serialization.with_model_type
972
    )  # type: List[Identifier]
973

974
    definitions.update(
4✔
975
        {
976
            "ModelType": collections.OrderedDict(
977
                [("type", "string"), ("enum", model_types)]
978
            )
979
        }
980
    )
981

982
    definitions_mapping = definitions.get()
4✔
983

984
    schema["definitions"] = collections.OrderedDict(
4✔
985
        [
986
            (name, definitions_mapping[name])
987
            for name in sorted(definitions_mapping.keys())
988
        ]
989
    )
990

991
    return Stripped(json.dumps(schema, indent=2)), None
4✔
992

993

994
def execute(context: run.Context, stdout: TextIO, stderr: TextIO) -> int:
4✔
995
    """Generate the code."""
996
    code, errors = generate(
4✔
997
        symbol_table=context.symbol_table,
998
        spec_impls=context.spec_impls,
999
        fix_pattern=fix_pattern_for_utf16,
1000
    )
1001

1002
    if errors is not None:
4✔
1003
        run.write_error_report(
×
1004
            message=f"Failed to generate the JSON Schema "
1005
            f"based on {context.model_path}",
1006
            errors=[context.lineno_columner.error_message(error) for error in errors],
1007
            stderr=stderr,
1008
        )
1009
        return 1
×
1010

1011
    assert code is not None
4✔
1012

1013
    pth = context.output_dir / "schema.json"
4✔
1014
    try:
4✔
1015
        pth.write_text(code, encoding="utf-8")
4✔
1016
    except Exception as exception:
×
1017
        run.write_error_report(
×
1018
            message=f"Failed to write the JSON schema to {pth}",
1019
            errors=[str(exception)],
1020
            stderr=stderr,
1021
        )
1022
        return 1
×
1023

1024
    stdout.write(f"Code generated to: {context.output_dir}\n")
4✔
1025
    return 0
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc