• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

p2p-ld / numpydantic / 10189384265

31 Jul 2024 11:57PM UTC coverage: 98.096% (+0.2%) from 97.919%
10189384265

push

github

web-flow
Merge pull request #3 from p2p-ld/vendor-nptyping

Vendor nptyping

17 of 17 new or added lines in 6 files covered. (100.0%)

1 existing line in 1 file now uncovered.

773 of 788 relevant lines covered (98.1%)

9.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.92
/src/numpydantic/schema.py
1
"""
2
Helper functions for use with :class:`~numpydantic.NDArray` - see the note in
3
:mod:`~numpydantic.ndarray` for why these are separated.
4
"""
5

6
import hashlib
10✔
7
import json
10✔
8
from typing import TYPE_CHECKING, Any, Callable, Optional, Union
10✔
9

10
import numpy as np
10✔
11
from pydantic import SerializationInfo
10✔
12
from pydantic_core import CoreSchema, core_schema
10✔
13
from pydantic_core.core_schema import ListSchema, ValidationInfo
10✔
14

15
from numpydantic import dtype as dt
10✔
16
from numpydantic.interface import Interface
10✔
17
from numpydantic.maps import np_to_python
10✔
18
from numpydantic.types import DtypeType, NDArrayType, ShapeType
10✔
19
from numpydantic.vendor.nptyping.structure import StructureMeta
10✔
20

21
if TYPE_CHECKING:  # pragma: no cover
22
    from numpydantic import Shape
23

24
_handler_type = Callable[[Any], core_schema.CoreSchema]
10✔
25
_UNSUPPORTED_TYPES = (complex,)
10✔
26

27

28
def _numeric_dtype(dtype: DtypeType, _handler: _handler_type) -> CoreSchema:
10✔
29
    """Make a numeric dtype that respects min/max values from extended numpy types"""
30
    if dtype in (np.number,):
10✔
31
        dtype = float
10✔
32

33
    if issubclass(dtype, np.floating):
10✔
34
        info = np.finfo(dtype)
10✔
35
        schema = core_schema.float_schema(le=float(info.max), ge=float(info.min))
10✔
36
    elif issubclass(dtype, np.integer):
10✔
37
        info = np.iinfo(dtype)
10✔
38
        schema = core_schema.int_schema(le=int(info.max), ge=int(info.min))
10✔
39

40
    else:
41
        schema = _handler.generate_schema(dtype)
10✔
42

43
    return schema
10✔
44

45

46
def _lol_dtype(dtype: DtypeType, _handler: _handler_type) -> CoreSchema:
10✔
47
    """Get the innermost dtype schema to use in the generated pydantic schema"""
48
    if isinstance(dtype, StructureMeta):  # pragma: no cover
49
        raise NotImplementedError("Structured dtypes are currently unsupported")
50

51
    if isinstance(dtype, tuple):
10✔
52
        # if it's a meta-type that refers to a generic float/int, just make that
53
        if dtype in (dt.Float, dt.Number):
10✔
54
            array_type = core_schema.float_schema()
10✔
55
        elif dtype == dt.Integer:
10✔
56
            array_type = core_schema.int_schema()
10✔
57
        elif dtype == dt.Complex:
10✔
58
            array_type = core_schema.any_schema()
10✔
59
        else:
60
            # make a union of dtypes recursively
61
            types_ = list(set(dtype))
10✔
62
            array_type = core_schema.union_schema(
10✔
63
                [_lol_dtype(t, _handler) for t in types_]
64
            )
65

66
    else:
67
        try:
10✔
68
            python_type = np_to_python[dtype]
10✔
69
        except KeyError as e:  # pragma: no cover
70
            # this should pretty much only happen in downstream/3rd-party interfaces
71
            # that use interface-specific types. those need to provide mappings back
72
            # to base python types (making this more streamlined is TODO)
73
            if dtype in np_to_python.values():
74
                # it's already a python type
75
                python_type = dtype
76
            else:
77
                raise ValueError(
78
                    "dtype given in model does not have a corresponding python base "
79
                    "type - add one to the `maps.np_to_python` dict"
80
                ) from e
81

82
        if python_type in _UNSUPPORTED_TYPES:
10✔
UNCOV
83
            array_type = core_schema.any_schema()
×
84
            # TODO: warn and log here
85
        elif python_type in (float, int):
10✔
86
            array_type = _numeric_dtype(dtype, _handler)
10✔
87
        else:
88
            array_type = _handler.generate_schema(python_type)
10✔
89

90
    return array_type
10✔
91

92

93
def list_of_lists_schema(shape: "Shape", array_type: CoreSchema) -> ListSchema:
10✔
94
    """
95
    Make a pydantic JSON schema for an array as a list of lists.
96

97
    For each item in the shape, create a list schema. In the innermost schema
98
    insert the passed ``array_type`` schema.
99

100
    This function is typically called from :func:`.make_json_schema`
101

102
    Args:
103
        shape (:class:`~numpydantic.Shape`): Shape determines the depth and max/min
104
            elements for each layer of list schema
105
        array_type ( :class:`pydantic_core.CoreSchema` ): The pre-rendered pydantic
106
            core schema to use in the innermost list entry
107
    """
108
    from numpydantic.shape import _is_range
10✔
109

110
    shape_parts = [part.strip() for part in shape.__args__[0].split(",")]
10✔
111
    # labels, if present
112
    split_parts = [
10✔
113
        p.split(" ")[1] if len(p.split(" ")) == 2 else None for p in shape_parts
114
    ]
115

116
    # Construct a list of list schema
117
    # go in reverse order - construct list schemas such that
118
    # the final schema is the one that checks the first dimension
119
    shape_labels = reversed(split_parts)
10✔
120
    shape_args = reversed(shape.prepared_args)
10✔
121
    list_schema = None
10✔
122
    for arg, label in zip(shape_args, shape_labels):
10✔
123
        # which handler to use? for the first we use the actual type
124
        # handler, everywhere else we use the prior list handler
125
        inner_schema = array_type if list_schema is None else list_schema
10✔
126

127
        # make a label annotation, if we have one
128
        metadata = {"name": label} if label is not None else None
10✔
129

130
        # make the current level list schema, accounting for shape
131
        if arg == "*":
10✔
132
            list_schema = core_schema.list_schema(inner_schema, metadata=metadata)
10✔
133
        elif arg == "...":
10✔
134
            list_schema = _unbounded_shape(inner_schema, metadata=metadata)
10✔
135
        else:
136
            if _is_range(arg):
10✔
137
                arg_min, arg_max = arg.split("-")
10✔
138
                arg_min = None if arg_min == "*" else int(arg_min)
10✔
139
                arg_max = None if arg_max == "*" else int(arg_max)
10✔
140

141
            else:
142
                try:
10✔
143
                    arg = int(arg)
10✔
144
                    arg_min = arg
10✔
145
                    arg_max = arg
10✔
146
                except ValueError as e:  # pragma: no cover
147

148
                    raise ValueError(
149
                        "Array shapes must be integers, wildcards, ellipses, or "
150
                        "ranges. Shape variables (for declaring that one dimension "
151
                        "must be the same size as another) are not supported because "
152
                        "it is impossible to express dynamic minItems/maxItems in "
153
                        "JSON Schema. "
154
                        "See: https://github.com/orgs/json-schema-org/discussions/730"
155
                    ) from e
156
            list_schema = core_schema.list_schema(
10✔
157
                inner_schema, min_length=arg_min, max_length=arg_max, metadata=metadata
158
            )
159
    return list_schema
10✔
160

161

162
def _hash_schema(schema: CoreSchema) -> str:
10✔
163
    """
164
    Make a hex-encoded 8-byte blake2b hash from a pydantic core schema.
165
    Collisions are really not important or likely here, but we do want the same schema
166
    to produce the same hash.
167
    """
168
    schema_str = json.dumps(
10✔
169
        schema, sort_keys=True, indent=None, separators=(",", ":")
170
    ).encode("utf-8")
171
    hasher = hashlib.blake2b(digest_size=8)
10✔
172
    hasher.update(schema_str)
10✔
173
    return hasher.hexdigest()
10✔
174

175

176
def _unbounded_shape(
10✔
177
    inner_type: CoreSchema, metadata: Optional[dict] = None
178
) -> core_schema.DefinitionsSchema:
179
    """
180
    Make a recursive schema that refers to itself using a hashed version of the inner
181
    type
182
    """
183

184
    schema_hash = _hash_schema(inner_type)
10✔
185
    array_ref = f"any-shape-array-{schema_hash}"
10✔
186

187
    schema = core_schema.definitions_schema(
10✔
188
        core_schema.list_schema(
189
            core_schema.definition_reference_schema(array_ref), metadata=metadata
190
        ),
191
        [
192
            core_schema.union_schema(
193
                [
194
                    core_schema.list_schema(
195
                        core_schema.definition_reference_schema(array_ref)
196
                    ),
197
                    inner_type,
198
                ],
199
                ref=array_ref,
200
            )
201
        ],
202
    )
203
    return schema
10✔
204

205

206
def make_json_schema(
10✔
207
    shape: ShapeType, dtype: DtypeType, _handler: _handler_type
208
) -> ListSchema:
209
    """
210
    Make a list of list JSON schema from a shape and a dtype.
211

212
    First resolves the dtype into a pydantic ``CoreSchema`` ,
213
    and then uses that with :func:`.list_of_lists_schema` .
214

215
    Args:
216
        shape ( ShapeType ): Specification of a shape, as a tuple or
217
            an nptyping ``Shape``
218
        dtype ( DtypeType ): A builtin type or numpy dtype
219
        _handler: The pydantic schema generation handler (see pydantic docs)
220

221
    Returns:
222
        :class:`pydantic_core.core_schema.ListSchema`
223
    """
224
    dtype_schema = _lol_dtype(dtype, _handler)
10✔
225

226
    # get the names of the shape constraints, if any
227
    if shape is Any:
10✔
228
        list_schema = _unbounded_shape(dtype_schema)
10✔
229
        # list_schema = core_schema.list_schema(core_schema.any_schema())
230
    else:
231
        list_schema = list_of_lists_schema(shape, dtype_schema)
10✔
232

233
    return list_schema
10✔
234

235

236
def get_validate_interface(shape: ShapeType, dtype: DtypeType) -> Callable:
10✔
237
    """
238
    Validate using a matching :class:`.Interface` class using its
239
    :meth:`.Interface.validate` method
240
    """
241

242
    def validate_interface(
10✔
243
        value: Any, info: Optional["ValidationInfo"] = None
244
    ) -> NDArrayType:
245
        interface_cls = Interface.match(value)
10✔
246
        interface = interface_cls(shape, dtype)
10✔
247
        value = interface.validate(value)
10✔
248
        return value
10✔
249

250
    return validate_interface
10✔
251

252

253
def _jsonize_array(value: Any, info: SerializationInfo) -> Union[list, dict]:
10✔
254
    """Use an interface class to render an array as JSON"""
255
    interface_cls = Interface.match_output(value)
10✔
256
    return interface_cls.to_json(value, info)
10✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc