• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In
Info updated!

p2p-ld / numpydantic / 24373313656

14 Apr 2026 12:00AM UTC coverage: 97.821% (-0.5%) from 98.345%
24373313656

push

github

web-flow
Merge pull request #67 from p2p-ld/ndarray-schema-proxies

allow proxies to be used with ndarray schema as input

15 of 17 new or added lines in 4 files covered. (88.24%)

23 existing lines in 11 files now uncovered.

1571 of 1606 relevant lines covered (97.82%)

9.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.55
/src/numpydantic/interface/dask.py
1
"""
2
Interface for Dask arrays
3
"""
4

5
from collections.abc import Iterable
10✔
6
from typing import Any, Literal
10✔
7

8
import numpy as np
10✔
9
from pydantic import BaseModel, SerializationInfo
10✔
10

11
from numpydantic.interface.interface import Interface, JsonDict
10✔
12
from numpydantic.types import DtypeType, NDArrayType
10✔
13

14
try:
10✔
15
    from dask.array import from_array
10✔
16
    from dask.array.core import Array as DaskArray
10✔
17
except ImportError:  # pragma: no cover
18
    DaskArray = None
19

20

21
def _as_tuple(a_list: Any) -> tuple:
10✔
22
    """Make a list of list into a tuple of tuples"""
23
    return tuple(
10✔
24
        [_as_tuple(item) if isinstance(item, list) else item for item in a_list]
25
    )
26

27

28
class DaskJsonDict(JsonDict):
10✔
29
    """
30
    Round-trip json serialized form of a dask array
31
    """
32

33
    type: Literal["dask"]
10✔
34
    name: str
10✔
35
    chunks: Iterable[tuple[int, ...]]
10✔
36
    dtype: str
10✔
37
    shape: tuple[int, ...] | None = None
10✔
38
    value: list
10✔
39

40
    def to_array_input(self) -> DaskArray:
10✔
41
        """Construct a dask array"""
42
        np_array = np.array(self.value, dtype=self.dtype)
10✔
43
        if self.shape is not None and np_array.shape != self.shape:
10✔
44
            np_array = self.reshape_input(np_array, self.shape)
10✔
45
        array = from_array(
10✔
46
            np_array,
47
            name=self.name,
48
            chunks=_as_tuple(self.chunks),
49
        )
50
        return array
10✔
51

52

53
class DaskInterface(Interface):
10✔
54
    """
55
    Interface for Dask :class:`~dask.array.core.Array`
56
    """
57

58
    name = "dask"
10✔
59
    input_types = (DaskArray, dict)
10✔
60
    return_type = DaskArray
10✔
61
    json_model = DaskJsonDict
10✔
62

63
    @classmethod
10✔
64
    def check(cls, array: Any) -> bool:
10✔
65
        """
66
        check if array is a dask array
67
        """
68
        if DaskArray is None:  # pragma: no cover - no tests for interface deps atm
69
            return False
70
        elif isinstance(array, DaskArray):
10✔
71
            return True
10✔
72
        elif isinstance(array, dict):
10✔
73
            return DaskJsonDict.is_valid(array)
10✔
74
        else:
75
            return False
10✔
76

77
    def before_validation(self, array: DaskArray) -> NDArrayType:
10✔
78
        """
79
        Try and coerce dicts that should be model objects into the model objects
80
        """
81
        try:
10✔
82
            if issubclass(self.dtype, BaseModel):
10✔
83
                flat_array = array.reshape(-1)
10✔
84
                if len(flat_array) == 0:
10✔
85
                    return array
10✔
86

87
                if isinstance(flat_array[0].compute(), dict):
10✔
88

89
                    def _chunked_to_model(array: np.ndarray) -> np.ndarray:
10✔
90
                        def _vectorized_to_model(
10✔
91
                            item: dict | BaseModel,
92
                        ) -> BaseModel:
93
                            if not isinstance(item, self.dtype):
10✔
94
                                return self.dtype(**item)
10✔
95
                            else:  # pragma: no cover
96
                                return item
97

98
                        return np.vectorize(_vectorized_to_model)(array)
10✔
99

100
                    array = array.map_blocks(_chunked_to_model, dtype=self.dtype)
10✔
101
        except TypeError:
10✔
102
            # fine, dtype isn't a type
103
            pass
10✔
104
        return array
10✔
105

106
    def get_object_dtype(self, array: NDArrayType) -> DtypeType:
10✔
107
        """Dask arrays require a compute() call to retrieve a single value"""
108
        flat_array = array.reshape(-1)
10✔
109
        if len(flat_array) == 0:
10✔
110
            return Any
10✔
111
        else:
112
            return type(flat_array[0].compute())
10✔
113

114
    @classmethod
10✔
115
    def enabled(cls) -> bool:
10✔
116
        """check if we successfully imported dask"""
117
        return DaskArray is not None
10✔
118

119
    @classmethod
10✔
120
    def to_json(
10✔
121
        cls, array: DaskArray, info: SerializationInfo | None = None
122
    ) -> list | DaskJsonDict:
123
        """
124
        Convert an array to a JSON serializable array by first converting to a numpy
125
        array and then to a list.
126

127
        .. note::
128

129
            This is likely a very memory intensive operation if you are using dask for
130
            large arrays. This can't be avoided, since the creation of the json string
131
            happens in-memory with Pydantic, so you are likely looking for a different
132
            method of serialization here using the python object itself rather than
133
            its JSON representation.
134
        """
135
        np_array = np.array(array)
10✔
136
        as_json = np_array.tolist()
10✔
137
        if not isinstance(as_json, list):
10✔
UNCOV
138
            as_json = [as_json]
×
139
        if info.round_trip:
10✔
140
            as_json = DaskJsonDict(
10✔
141
                type=cls.name,
142
                value=as_json,
143
                name=array.name,
144
                chunks=array.chunks,
145
                dtype=str(np_array.dtype),
146
                shape=array.shape,
147
            )
148
        return as_json
10✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc