• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

datajoint / datajoint-python / #12897

pending completion
#12897

push

travis-ci

web-flow
<a href="https://github.com/datajoint/datajoint-python/commit/<a class=hub.com/datajoint/datajoint-python/commit/715ab40552f63cd79723ed2830c6691b2cb228b9">715ab4055<a href="https://github.com/datajoint/datajoint-python/commit/715ab40552f63cd79723ed2830c6691b2cb228b9">">Merge </a><a class="double-link" href="https://github.com/datajoint/datajoint-python/commit/<a class="double-link" href="https://github.com/datajoint/datajoint-python/commit/0a4f193031d8b1e14b09ec62d83c5def3b7421b0">0a4f19303</a>">0a4f19303</a><a href="https://github.com/datajoint/datajoint-python/commit/715ab40552f63cd79723ed2830c6691b2cb228b9"> into 3b6e84588">3b6e84588</a>

69 of 69 new or added lines in 9 files covered. (100.0%)

3125 of 3454 relevant lines covered (90.47%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.6
/datajoint/fetch.py
1
from functools import partial
1✔
2
from pathlib import Path
1✔
3
import logging
1✔
4
import pandas
1✔
5
import itertools
1✔
6
import re
1✔
7
import json
1✔
8
import numpy as np
1✔
9
import uuid
1✔
10
import numbers
1✔
11
from . import blob, hash
1✔
12
from .errors import DataJointError
1✔
13
from .settings import config
1✔
14
from .utils import safe_write
1✔
15

16
logger = logging.getLogger(__name__.split(".")[0])
1✔
17

18

19
class key:
1✔
20
    """
21
    object that allows requesting the primary key as an argument in expression.fetch()
22
    The string "KEY" can be used instead of the class key
23
    """
24

25
    pass
1✔
26

27

28
def is_key(attr):
1✔
29
    return attr is key or attr == "KEY"
1✔
30

31

32
def to_dicts(recarray):
1✔
33
    """convert record array to a dictionaries"""
34
    for rec in recarray:
1✔
35
        yield dict(zip(recarray.dtype.names, rec.tolist()))
1✔
36

37

38
def _get(connection, attr, data, squeeze, download_path):
1✔
39
    """
40
    This function is called for every attribute
41

42
    :param connection: a dj.Connection object
43
    :param attr: attribute name from the table's heading
44
    :param data: literal value fetched from the table
45
    :param squeeze: if True squeeze blobs
46
    :param download_path: for fetches that download data, e.g. attachments
47
    :return: unpacked data
48
    """
49
    if data is None:
1✔
50
        return
1✔
51
    if attr.json:
1✔
52
        return json.loads(data)
×
53

54
    extern = (
1✔
55
        connection.schemas[attr.database].external[attr.store]
56
        if attr.is_external
57
        else None
58
    )
59

60
    # apply attribute adapter if present
61
    adapt = attr.adapter.get if attr.adapter else lambda x: x
1✔
62

63
    if attr.is_filepath:
1✔
64
        return adapt(extern.download_filepath(uuid.UUID(bytes=data))[0])
1✔
65
    if attr.is_attachment:
1✔
66
        # Steps:
67
        # 1. get the attachment filename
68
        # 2. check if the file already exists at download_path, verify checksum
69
        # 3. if exists and checksum passes then return the local filepath
70
        # 4. Otherwise, download the remote file and return the new filepath
71
        _uuid = uuid.UUID(bytes=data) if attr.is_external else None
1✔
72
        attachment_name = (
1✔
73
            extern.get_attachment_name(_uuid)
74
            if attr.is_external
75
            else data.split(b"\0", 1)[0].decode()
76
        )
77
        local_filepath = Path(download_path) / attachment_name
1✔
78
        if local_filepath.is_file():
1✔
79
            attachment_checksum = (
1✔
80
                _uuid if attr.is_external else hash.uuid_from_buffer(data)
81
            )
82
            if attachment_checksum == hash.uuid_from_file(
1✔
83
                local_filepath, init_string=attachment_name + "\0"
84
            ):
85
                return adapt(
1✔
86
                    str(local_filepath)
87
                )  # checksum passed, no need to download again
88
            # generate the next available alias filename
89
            for n in itertools.count():
1✔
90
                f = local_filepath.parent / (
1✔
91
                    local_filepath.stem + "_%04x" % n + local_filepath.suffix
92
                )
93
                if not f.is_file():
1✔
94
                    local_filepath = f
1✔
95
                    break
1✔
96
                if attachment_checksum == hash.uuid_from_file(
1✔
97
                    f, init_string=attachment_name + "\0"
98
                ):
99
                    return adapt(str(f))  # checksum passed, no need to download again
×
100
        # Save attachment
101
        if attr.is_external:
1✔
102
            extern.download_attachment(_uuid, attachment_name, local_filepath)
1✔
103
        else:
104
            # write from buffer
105
            safe_write(local_filepath, data.split(b"\0", 1)[1])
1✔
106
        return adapt(str(local_filepath))  # download file from remote store
1✔
107

108
    return adapt(
1✔
109
        uuid.UUID(bytes=data)
110
        if attr.uuid
111
        else (
112
            blob.unpack(
113
                extern.get(uuid.UUID(bytes=data)) if attr.is_external else data,
114
                squeeze=squeeze,
115
            )
116
            if attr.is_blob
117
            else data
118
        )
119
    )
120

121

122
def _flatten_attribute_list(primary_key, attrs):
1✔
123
    """
124
    :param primary_key: list of attributes in primary key
125
    :param attrs: list of attribute names, which may include "KEY", "KEY DESC" or "KEY ASC"
126
    :return: generator of attributes where "KEY" is replaces with its component attributes
127
    """
128
    for a in attrs:
1✔
129
        if re.match(r"^\s*KEY(\s+[aA][Ss][Cc])?\s*$", a):
1✔
130
            yield from primary_key
1✔
131
        elif re.match(r"^\s*KEY\s+[Dd][Ee][Ss][Cc]\s*$", a):
1✔
132
            yield from (q + " DESC" for q in primary_key)
1✔
133
        else:
134
            yield a
1✔
135

136

137
class Fetch:
1✔
138
    """
139
    A fetch object that handles retrieving elements from the table expression.
140

141
    :param expression: the QueryExpression object to fetch from.
142
    """
143

144
    def __init__(self, expression):
1✔
145
        self._expression = expression
1✔
146

147
    def __call__(
1✔
148
        self,
149
        *attrs,
150
        offset=None,
151
        limit=None,
152
        order_by=None,
153
        format=None,
154
        as_dict=None,
155
        squeeze=False,
156
        download_path="."
157
    ):
158
        """
159
        Fetches the expression results from the database into an np.array or list of dictionaries and
160
        unpacks blob attributes.
161

162
        :param attrs: zero or more attributes to fetch. If not provided, the call will return all attributes of this
163
                        table. If provided, returns tuples with an entry for each attribute.
164
        :param offset: the number of tuples to skip in the returned result
165
        :param limit: the maximum number of tuples to return
166
        :param order_by: a single attribute or the list of attributes to order the results. No ordering should be assumed
167
                        if order_by=None. To reverse the order, add DESC to the attribute name or names: e.g. ("age DESC",
168
                        "frequency") To order by primary key, use "KEY" or "KEY DESC"
169
        :param format: Effective when as_dict=None and when attrs is empty None: default from config['fetch_format'] or
170
                        'array' if not configured "array": use numpy.key_array "frame": output pandas.DataFrame. .
171
        :param as_dict: returns a list of dictionaries instead of a record array. Defaults to False for .fetch() and to
172
                        True for .fetch('KEY')
173
        :param squeeze:  if True, remove extra dimensions from arrays
174
        :param download_path: for fetches that download data, e.g. attachments
175
        :return: the contents of the table in the form of a structured numpy.array or a dict list
176
        """
177
        if order_by is not None:
1✔
178
            # if 'order_by' passed in a string, make into list
179
            if isinstance(order_by, str):
1✔
180
                order_by = [order_by]
1✔
181
            # expand "KEY" or "KEY DESC"
182
            order_by = list(
1✔
183
                _flatten_attribute_list(self._expression.primary_key, order_by)
184
            )
185

186
        attrs_as_dict = as_dict and attrs
1✔
187
        if attrs_as_dict:
1✔
188
            # absorb KEY into attrs and prepare to return attributes as dict (issue #595)
189
            if any(is_key(k) for k in attrs):
1✔
190
                attrs = list(self._expression.primary_key) + [
×
191
                    a for a in attrs if a not in self._expression.primary_key
192
                ]
193
        if as_dict is None:
1✔
194
            as_dict = bool(attrs)  # default to True for "KEY" and False otherwise
1✔
195
        # format should not be specified with attrs or is_dict=True
196
        if format is not None and (as_dict or attrs):
1✔
197
            raise DataJointError(
×
198
                "Cannot specify output format when as_dict=True or "
199
                "when attributes are selected to be fetched separately."
200
            )
201
        if format not in {None, "array", "frame"}:
1✔
202
            raise DataJointError(
×
203
                "Fetch output format must be in "
204
                '{{"array", "frame"}} but "{}" was given'.format(format)
205
            )
206

207
        if not (attrs or as_dict) and format is None:
1✔
208
            format = config["fetch_format"]  # default to array
1✔
209
            if format not in {"array", "frame"}:
1✔
210
                raise DataJointError(
×
211
                    'Invalid entry "{}" in datajoint.config["fetch_format"]: '
212
                    'use "array" or "frame"'.format(format)
213
                )
214

215
        if limit is None and offset is not None:
1✔
216
            logger.warning(
1✔
217
                "Offset set, but no limit. Setting limit to a large number. "
218
                "Consider setting a limit explicitly."
219
            )
220
            limit = 8000000000  # just a very large number to effect no limit
1✔
221

222
        get = partial(
1✔
223
            _get,
224
            self._expression.connection,
225
            squeeze=squeeze,
226
            download_path=download_path,
227
        )
228
        if attrs:  # a list of attributes provided
1✔
229
            attributes = [a for a in attrs if not is_key(a)]
1✔
230
            ret = self._expression.proj(*attributes)
1✔
231
            ret = ret.fetch(
1✔
232
                offset=offset,
233
                limit=limit,
234
                order_by=order_by,
235
                as_dict=False,
236
                squeeze=squeeze,
237
                download_path=download_path,
238
                format="array",
239
            )
240
            if attrs_as_dict:
1✔
241
                ret = [
1✔
242
                    {k: v for k, v in zip(ret.dtype.names, x) if k in attrs}
243
                    for x in ret
244
                ]
245
            else:
246
                return_values = [
1✔
247
                    list(
248
                        (to_dicts if as_dict else lambda x: x)(
249
                            ret[self._expression.primary_key]
250
                        )
251
                    )
252
                    if is_key(attribute)
253
                    else ret[attribute]
254
                    for attribute in attrs
255
                ]
256
                ret = return_values[0] if len(attrs) == 1 else return_values
1✔
257
        else:  # fetch all attributes as a numpy.record_array or pandas.DataFrame
258
            cur = self._expression.cursor(
1✔
259
                as_dict=as_dict, limit=limit, offset=offset, order_by=order_by
260
            )
261
            heading = self._expression.heading
1✔
262
            if as_dict:
1✔
263
                ret = [
1✔
264
                    dict((name, get(heading[name], d[name])) for name in heading.names)
265
                    for d in cur
266
                ]
267
            else:
268
                ret = list(cur.fetchall())
1✔
269
                record_type = (
1✔
270
                    heading.as_dtype
271
                    if not ret
272
                    else np.dtype(
273
                        [
274
                            (
275
                                name,
276
                                type(value),
277
                            )  # use the first element to determine blob type
278
                            if heading[name].is_blob
279
                            and isinstance(value, numbers.Number)
280
                            else (name, heading.as_dtype[name])
281
                            for value, name in zip(ret[0], heading.as_dtype.names)
282
                        ]
283
                    )
284
                )
285
                try:
1✔
286
                    ret = np.array(ret, dtype=record_type)
1✔
287
                except Exception as e:
×
288
                    raise e
×
289
                for name in heading:
1✔
290
                    # unpack blobs and externals
291
                    ret[name] = list(map(partial(get, heading[name]), ret[name]))
1✔
292
                if format == "frame":
1✔
293
                    ret = pandas.DataFrame(ret).set_index(heading.primary_key)
1✔
294
        return ret
1✔
295

296

297
class Fetch1:
1✔
298
    """
299
    Fetch object for fetching the result of a query yielding one row.
300

301
    :param expression: a query expression to fetch from.
302
    """
303

304
    def __init__(self, expression):
1✔
305
        self._expression = expression
1✔
306

307
    def __call__(self, *attrs, squeeze=False, download_path="."):
1✔
308
        """
309
        Fetches the result of a query expression that yields one entry.
310

311
        If no attributes are specified, returns the result as a dict.
312
        If attributes are specified returns the corresponding results as a tuple.
313

314
        Examples:
315
        d = rel.fetch1()   # as a dictionary
316
        a, b = rel.fetch1('a', 'b')   # as a tuple
317

318
        :params *attrs: attributes to return when expanding into a tuple.
319
                 If attrs is empty, the return result is a dict
320
        :param squeeze:  When true, remove extra dimensions from arrays in attributes
321
        :param download_path: for fetches that download data, e.g. attachments
322
        :return: the one tuple in the table in the form of a dict
323
        """
324
        heading = self._expression.heading
1✔
325

326
        if not attrs:  # fetch all attributes, return as ordered dict
1✔
327
            cur = self._expression.cursor(as_dict=True)
1✔
328
            ret = cur.fetchone()
1✔
329
            if not ret or cur.fetchone():
1✔
330
                raise DataJointError(
1✔
331
                    "fetch1 requires exactly one tuple in the input set."
332
                )
333
            ret = dict(
1✔
334
                (
335
                    name,
336
                    _get(
337
                        self._expression.connection,
338
                        heading[name],
339
                        ret[name],
340
                        squeeze=squeeze,
341
                        download_path=download_path,
342
                    ),
343
                )
344
                for name in heading.names
345
            )
346
        else:  # fetch some attributes, return as tuple
347
            attributes = [a for a in attrs if not is_key(a)]
1✔
348
            result = self._expression.proj(*attributes).fetch(
1✔
349
                squeeze=squeeze, download_path=download_path, format="array"
350
            )
351
            if len(result) != 1:
1✔
352
                raise DataJointError(
1✔
353
                    "fetch1 should only return one tuple. %d tuples found" % len(result)
354
                )
355
            return_values = tuple(
1✔
356
                next(to_dicts(result[self._expression.primary_key]))
357
                if is_key(attribute)
358
                else result[attribute][0]
359
                for attribute in attrs
360
            )
361
            ret = return_values[0] if len(attrs) == 1 else return_values
1✔
362
        return ret
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc