• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

avanov / typeit / 6664915557

27 Oct 2023 08:39AM UTC coverage: 91.858% (+0.02%) from 91.834%
6664915557

push

github

avanov
Release 3.11.1.2

7 of 7 new or added lines in 2 files covered. (100.0%)

1241 of 1351 relevant lines covered (91.86%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.42
/typeit/schema/types.py
1
import enum as std_enum
2✔
2
import typing as t
2✔
3
import pathlib
2✔
4

5
import typing_inspect as insp
2✔
6
import colander as col
2✔
7
from pyrsistent import pmap, pvector
2✔
8
from pyrsistent.typing import PMap
2✔
9

10
from .errors import Invalid
2✔
11
from .. import sums
2✔
12
from .. import interface as iface
2✔
13
from . import primitives
2✔
14
from . import meta
2✔
15
from . import nodes
2✔
16

17

18
Null = nodes.Null
2✔
19

20

21
class TypedMapping(meta.Mapping):
2✔
22
    def __init__(self, *, key_node: nodes.SchemaNode, value_node: nodes.SchemaNode):
2✔
23
        # https://docs.pylonsproject.org/projects/colander/en/latest/api.html#colander.Mapping
24
        super().__init__(unknown='preserve')
2✔
25
        self.key_node = key_node
2✔
26
        self.value_node = value_node
2✔
27

28
    def deserialize(self, node, cstruct):
2✔
29
        r = super().deserialize(node, cstruct)
2✔
30
        if r in (Null, None):
2✔
31
            return r
2✔
32
        rv = {}
2✔
33
        for k, v in r.items():
2✔
34
            try:
2✔
35
                key = self.key_node.deserialize(k)
2✔
36
            except Invalid as e:
×
37
                error = Invalid(node, "{<k>: <v>} error parsing <k>", cstruct)
×
38
                error.add(e)
×
39
                raise error
×
40
            else:
41
                try:
2✔
42
                    val = self.value_node.deserialize(v)
2✔
43
                except Invalid as e:
×
44
                    error = Invalid(node, f"{{{k}: <v>}} error parsing <v>", v)
×
45
                    error.add(e)
×
46
                    raise error
×
47
                else:
48
                    rv[key] = val
2✔
49
        return {self.key_node.deserialize(k): self.value_node.deserialize(v) for k, v in r.items()}
2✔
50

51
    def serialize(self, node, appstruct):
2✔
52
        r = super().serialize(node, appstruct)
2✔
53
        if r in (Null, None):
2✔
54
            return r
×
55

56
        return {self.key_node.serialize(k): self.value_node.serialize(v) for k, v in r.items()}
2✔
57

58

59
class Path(primitives.Str):
2✔
60
    def __init__(self, typ: t.Type[pathlib.PurePath], *args, **kwargs) -> None:
2✔
61
        super().__init__(*args, **kwargs)
2✔
62
        self.typ = typ
2✔
63

64
    def serialize(self, node, appstruct: t.Union[col._null, pathlib.PurePath]):
2✔
65
        if appstruct is Null:
2✔
66
            return appstruct
×
67
        r = super().serialize(node, str(appstruct))
2✔
68
        return r
2✔
69

70
    def deserialize(self, node, cstruct) -> pathlib.PurePath:
2✔
71
        r = super().deserialize(node, cstruct)
2✔
72
        if r is Null:
2✔
73
            return r
×
74
        try:
2✔
75
            return self.typ(r)
2✔
76
        except TypeError:
×
77
            raise Invalid(node, f'Invalid variant of {self.typ.__name__}', cstruct)
×
78

79

80
class Structure(meta.Mapping):
2✔
81
    """ SchemaNode for NamedTuples and derived types.
82
    """
83
    def __init__(self,
2✔
84
                 typ: t.Type[iface.IType],
85
                 attrs: t.Sequence[str] = pvector([]),
86
                 deserialize_overrides: PMap[str, str] = pmap({}),
87
                 unknown: str = 'ignore',
88
                 ) -> None:
89
        """
90
        :param deserialize_overrides: source_field_name => struct_field_name mapping
91
        """
92
        super().__init__(unknown)
2✔
93
        self.typ = typ
2✔
94
        self.attrs = attrs
2✔
95
        self.deserialize_overrides = deserialize_overrides
2✔
96
        # struct_field_name => source_field_name
97
        self.serialize_overrides = pmap({
2✔
98
            v: k for k, v in self.deserialize_overrides.items()
99
        })
100

101
    def __repr__(self) -> str:
2✔
102
        return f'Structure({self.typ})'
2✔
103

104
    def deserialize(self, node, cstruct):
2✔
105
        r = super().deserialize(node, cstruct)
2✔
106
        if r is Null:
2✔
107
            return r
2✔
108
        d = {
2✔
109
            self.deserialize_overrides.get(k, k): v
110
            for k, v in r.items()
111
        }
112
        try:
2✔
113
            return self.typ(**d)
2✔
114
        except TypeError:
×
115
            raise Invalid(
×
116
                node,
117
                "Are you trying to use generically defined type Name(Generic[A]) via Name[<MyType>]? "
118
                "Python doesn't support it, you have to use subclassing with a concrete type, like "
119
                "class MySubtype(Name[ConcreteType])"
120
            )
121

122
    def serialize(self, node, appstruct: iface.IType) -> t.Mapping[str, t.Any]:
2✔
123
        if appstruct is Null:
2✔
124
            return super().serialize(node, appstruct)
×
125
        return super().serialize(
2✔
126
            node,
127
            {
128
                self.serialize_overrides.get(attr_name, attr_name): getattr(appstruct, attr_name)
129
                for attr_name in self.attrs
130
            }
131
        )
132

133

134
Tuple = meta.Tuple
2✔
135

136

137
class Sum(meta.SchemaType):
2✔
138
    def __init__(
2✔
139
        self,
140
        typ: sums.SumType,
141
        variant_nodes: t.Sequence[
142
            t.Tuple[
143
                t.Type, t.Union[nodes.SchemaNode, col.SequenceSchema, col.TupleSchema]
144
            ],
145
        ],
146
        as_dict_key: t.Optional[str] = None,
147
    ) -> None:
148
        super().__init__()
2✔
149
        self.typ = typ
2✔
150
        self.variant_nodes = variant_nodes
2✔
151
        self.as_dict_key = as_dict_key
2✔
152
        self.variant_schema_types: t.Set[meta.SchemaType] = {
2✔
153
            x.typ for _, x in variant_nodes
154
        }
155

156
    def deserialize(self, node, cstruct):
2✔
157
        if cstruct in (Null, None):
2✔
158
            # explicitly passed None is not col.null
159
            # therefore we must handle both
160
            return cstruct
×
161

162
        if self.as_dict_key:
2✔
163
            try:
2✔
164
                tag = cstruct[self.as_dict_key]
2✔
165
            except (KeyError, ValueError) as e:
×
166
                raise Invalid(
×
167
                    node,
168
                    f'Incorrect data layout for this type: '
169
                    f'tag is not present as key "{self.as_dict_key}"',
170
                    cstruct
171
                ) from e
172
            try:
2✔
173
                payload = {k: v for k, v in cstruct.items() if k != self.as_dict_key}
2✔
174
            except (AttributeError, TypeError) as e:
×
175
                raise Invalid(
×
176
                    node,
177
                    'Incorrect data layout for this type: payload is not a mapping',
178
                    cstruct
179
                ) from e
180
        else:
181
            try:
2✔
182
                tag, payload = cstruct
2✔
183
            except ValueError:
×
184
                raise Invalid(
×
185
                    node,
186
                    'Incorrect data layout for this type.',
187
                    cstruct
188
                )
189
        # next, iterate over available variants and return the first
190
        # matched structure.
191
        for var_type, var_schema in self.variant_nodes:
2✔
192
            if var_type.__variant_meta__.value != tag:
2✔
193
                continue
2✔
194
            try:
2✔
195
                variant_struct = var_schema.deserialize(payload)
2✔
196
            except Invalid as e:
2✔
197
                raise Invalid(
2✔
198
                    node,
199
                    f'Incorrect payload format for '
200
                    f'{var_type.__variant_meta__.variant_of.__name__}.{var_type.__variant_meta__.variant_name}',
201
                    cstruct
202
                )
203
            return var_type(**variant_struct._asdict())
2✔
204

205
        raise Invalid(
×
206
            node,
207
            'None of the expected variants matches provided data',
208
            cstruct
209
        )
210

211
    def serialize(self, node, appstruct: t.Any):
2✔
212
        if appstruct in (Null, None):
2✔
213
            return None
×
214

215
        for var_type, var_schema in self.variant_nodes:
2✔
216
            if isinstance(appstruct, var_type):
2✔
217
                if self.as_dict_key:
2✔
218
                    rv = var_schema.serialize(appstruct)
2✔
219
                    rv[self.as_dict_key] = var_type.__variant_meta__.value
2✔
220
                    return rv
2✔
221
                else:
222
                    return (var_type.__variant_meta__.value, var_schema.serialize(appstruct))
2✔
223

224
        raise Invalid(
×
225
            node,
226
            'None of the expected variants matches provided structure',
227
            appstruct
228
        )
229

230

231
EnumLike = std_enum.Enum
2✔
232

233

234
class Enum(primitives.Str):
2✔
235
    def __init__(self, typ: t.Type[EnumLike], *args, **kwargs) -> None:
2✔
236
        super().__init__(*args, **kwargs)
2✔
237
        self.typ = typ
2✔
238

239
    def serialize(self, node, appstruct):
2✔
240
        if appstruct is Null:
2✔
241
            return appstruct
×
242
        r = super().serialize(node, appstruct.value)
2✔
243
        return r
2✔
244

245
    def deserialize(self, node, cstruct) -> std_enum.Enum:
2✔
246
        r = super().deserialize(node, cstruct)
2✔
247
        if r is Null:
2✔
248
            return r
2✔
249
        try:
2✔
250
            return self.typ(r)
2✔
251
        except ValueError:
2✔
252
            raise Invalid(node, f'Invalid variant of {self.typ.__name__}: {cstruct}', cstruct)
2✔
253

254

255
generic_type_bases: t.Callable[[t.Type], t.Tuple[t.Type, ...]] = lambda x: (insp.get_origin(x),)
2✔
256

257

258
class Literal(meta.SchemaType):
2✔
259
    def __init__(self, variants: t.FrozenSet):
2✔
260
        super().__init__()
2✔
261
        self.variants = variants
2✔
262

263
    def deserialize(self, node, cstruct):
2✔
264
        if cstruct is Null:
2✔
265
            # explicitly passed None is not col.null
266
            # therefore we must handle it separately
267
            return cstruct
2✔
268
        if cstruct in self.variants:
2✔
269
            return cstruct
2✔
270
        raise Invalid(
2✔
271
            node,
272
            'None of the Literal variants matches provided data',
273
            cstruct
274
        )
275

276
    def serialize(self, node, appstruct: t.Any):
2✔
277
        if appstruct is Null:
2✔
278
            return None
×
279
        if appstruct in self.variants:
2✔
280
            return appstruct
2✔
281
        raise Invalid(
2✔
282
            node,
283
            'None of the Literal variants matches provided data',
284
            appstruct
285
        )
286

287

288
class Union(meta.SchemaType):
2✔
289
    """ This node handles typing.Union[T1, T2, ...] cases.
290
    Please note that typing.Optional[T] is normalized by parser as typing.Union[None, T],
291
    and this Union schema type will not have None among its variants. Instead,
292
    `Union.missing` will be set to None, indicating the value for missing
293
    data.
294
    """
295
    def __init__(
2✔
296
        self,
297
        variant_nodes: t.Sequence[
298
            t.Tuple[
299
                t.Type,t.Union[nodes.SchemaNode, col.SequenceSchema, col.TupleSchema]
300
            ],
301
        ],
302
        primitive_types: t.Union[
303
            t.Mapping[t.Type, primitives.PrimitiveSchemaTypeT],
304
            t.Mapping[t.Type, primitives.NonStrictPrimitiveSchemaTypeT],
305
        ]
306
    ) -> None:
307
        super().__init__()
2✔
308
        self.primitive_types = primitive_types
2✔
309
        self.variant_nodes = variant_nodes
2✔
310
        self.variant_schema_types: t.Set[meta.SchemaType] = {
2✔
311
            x.typ for _, x in variant_nodes
312
        }
313
        # The literals collections are needed for a corner case where the union is constructed out of
314
        # literal variants, each of which should be able to serialise into primitive values directly without further check.
315
        # That is, if ``appstruct`` is a primitive type, and if the union has literal values and those values match the
316
        # ``appstruct``, then appstruct is already in a proper serialised form.
317
        self.variant_schema_literals: t.FrozenSet[t.Any] = frozenset().union(
2✔
318
            *[x.variants for x in self.variant_schema_types if isinstance(x, Literal)]
319
        )
320

321
    def __repr__(self) -> str:
2✔
322
        return f'Optional({self.variant_schema_types})' if len(self.variant_schema_types) == 1 else f'Union({self.variant_schema_types})'
×
323

324
    def deserialize(self, node, cstruct):
2✔
325
        if cstruct in (Null, None):
2✔
326
            # explicitly passed None is not col.null
327
            # therefore we must handle both
328
            return cstruct
2✔
329

330
        collected_errors: t.List[Invalid] = []
2✔
331
        # Firstly, let's see if `cstruct` is one of the primitive types
332
        # supported by Python, and if this primitive type is specified
333
        # among union variants. If it is, then we need to try
334
        # a constructor of that primitive type first.
335
        # We do it to support cases like `Union[str, int, float]`,
336
        # where a value of 1.0 should be handled as float despite the
337
        # fact that both str() and int() constructors can happily
338
        # handle that value and return one of the expected variants
339
        # (but incorrectly!)
340
        prim_schema_type = self.primitive_types.get(type(cstruct))
2✔
341
        if prim_schema_type in self.variant_schema_types:
2✔
342
            try:
2✔
343
                return prim_schema_type.deserialize(node, cstruct)
2✔
344
            except Invalid as e:
×
345
                collected_errors.append(e)
×
346

347
        # next, iterate over available variants and return the first
348
        # matched structure.
349
        remaining_variants = (
2✔
350
            x for _, x in self.variant_nodes
351
            if x.typ is not prim_schema_type
352
        )
353
        for variant in remaining_variants:
2✔
354
            try:
2✔
355
                return variant.deserialize(cstruct)
2✔
356
            except Invalid as e:
2✔
357
                collected_errors.append(e)
2✔
358
                continue
2✔
359

360
        errors = "\n\t * ".join(str(x.node) for x in collected_errors)
2✔
361
        error = Invalid(
2✔
362
            node,
363
            f'No suitable variant among tried:\n\t * {errors}\n',
364
            cstruct
365
        )
366
        for e in collected_errors:
2✔
367
            error.add(e)
2✔
368
        raise error
2✔
369

370
    def serialize(self, node, appstruct: t.Any):
2✔
371
        if appstruct in (Null, None):
2✔
372
            return None
2✔
373

374
        struct_type = type(appstruct)
2✔
375

376
        prim_schema_type = self.primitive_types.get(struct_type)
2✔
377

378
        # special case for unions consisting of literals among other things
379
        if prim_schema_type and appstruct in self.variant_schema_literals:
2✔
380
            return appstruct
2✔
381

382
        if prim_schema_type in self.variant_schema_types:
2✔
383
            try:
2✔
384
                return prim_schema_type.serialize(node, appstruct)
2✔
385
            except Invalid:
×
386
                pass
×
387

388
        remaining_variants = (
2✔
389
            (t, s)
390
            for (t, s) in self.variant_nodes
391
            if s.typ is not prim_schema_type
392
        )
393

394
        collected_errors = []
2✔
395
        for var_type, var_schema in remaining_variants:
2✔
396
            # Mappings (which are not structs) have their own serializer
397
            if isinstance(var_schema.typ, TypedMapping) and not isinstance(var_schema, Structure):
2✔
398
                try:
2✔
399
                    return var_schema.serialize(appstruct)
2✔
400
                except Invalid as e:
×
401
                    collected_errors.append(e)
×
402
                    continue
×
403

404
            # Sequences and tuples require special treatment here:
405
            # since there is no direct reference to the target python data type
406
            # through variant.typ.typ that we could use to compare this variant
407
            # with appstruct's type, we just check if the appstruct is a list-like
408
            # object. And if it is, we apply SequenceSchema's serializer on it,
409
            # otherwise we skip this variant (we need to do that to make sure that
410
            # a Union variant matches appstruct's type as close as possible)
411
            # if isinstance(struct_type, (list, tuple, set))
412
            if isinstance(var_schema, col.SequenceSchema):
2✔
413
                if not isinstance(appstruct, list):
2✔
414
                    continue
×
415
                try:
2✔
416
                    return var_schema.serialize(appstruct)
2✔
417
                except Invalid as e:
2✔
418
                    collected_errors.append(e)
2✔
419
                    continue
2✔
420

421
            elif isinstance(var_schema, col.TupleSchema):
2✔
422
                if not isinstance(appstruct, tuple):
×
423
                    continue
×
424
                try:
×
425
                    return var_schema.serialize(appstruct)
×
426
                except Invalid as e:
×
427
                    collected_errors.append(e)
×
428
                    continue
×
429

430
            else:
431
                # nodes.SchemaNode
432
                # We need to check if the type of the appstruct
433
                # is the same as the type that appears in the Union
434
                # definition and is associated with this SchemaNode.
435
                # get_origin() normalizes meta-types like `typing.Dict`
436
                # to dict class etc.
437
                #  Please note that the order of checks matters here, since
438
                # subscripted generics like typing.Dict cannot be used with
439
                # issubclass
440
                if insp.is_generic_type(var_type):
2✔
441
                    matching_types = (var_type,) + generic_type_bases(var_type)
2✔
442
                else:
443
                    matching_types = (insp.get_origin(var_type), var_type)
2✔
444
                if struct_type in matching_types or isinstance(var_type, t.ForwardRef) or issubclass(struct_type, var_type):
2✔
445
                    try:
2✔
446
                        return var_schema.serialize(appstruct)
2✔
447
                    except Invalid as e:
×
448
                        collected_errors.append(e)
×
449

450
        raise Invalid(
2✔
451
            node,
452
            f'None of the expected variants matches provided structure. Matched and tried: {collected_errors}',
453
            appstruct
454
        )
455

456

457
class ForwardReferenceType(meta.SchemaType):
2✔
458
    """ A special type that is promised to understand how to serialise and serialise a given
459
    reference of a type that will be resolved at a later stage of parsing
460
    """
461
    def __init__(self, forward_ref: t.ForwardRef, ref_registry):
2✔
462
        super().__init__()
2✔
463
        self.ref = forward_ref
2✔
464
        self.ref_registry = ref_registry
2✔
465

466
    def __repr__(self) -> str:
2✔
467
        return f'ForwardReferenceType(typ={self.ref})'
×
468

469
    def deserialize(self, node, cstruct):
2✔
470
        rv = self.ref_registry[self.ref].deserialize(cstruct)
2✔
471
        return rv
2✔
472

473
    def serialize(self, node, appstruct):
2✔
474
        rv = self.ref_registry[self.ref].serialize(appstruct)
2✔
475
        return rv
2✔
476

477

478
SUBCLASS_BASED_TO_SCHEMA_TYPE: t.Mapping[
2✔
479
    t.Tuple[t.Type, ...], t.Type[meta.SchemaType],
480
] = {
481
    (std_enum.Enum,): Enum,
482
    # Pathlib's PurePath and its derivatives
483
    (pathlib.PurePath,): Path,
484
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc