• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

datajoint / datajoint-python / #12880

pending completion
#12880

push

travis-ci

web-flow
Merge pull request #1067 from CBroz1/master

Add support for insert CSV

4 of 4 new or added lines in 1 file covered. (100.0%)

3102 of 3424 relevant lines covered (90.6%)

0.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.05
/datajoint/external.py
1
from pathlib import Path, PurePosixPath, PureWindowsPath
1✔
2
from collections.abc import Mapping
1✔
3
from tqdm import tqdm
1✔
4
import logging
1✔
5
from .settings import config
1✔
6
from .errors import DataJointError, MissingExternalFile
1✔
7
from .hash import uuid_from_buffer, uuid_from_file
1✔
8
from .table import Table, FreeTable
1✔
9
from .heading import Heading
1✔
10
from .declare import EXTERNAL_TABLE_ROOT
1✔
11
from . import s3
1✔
12
from .utils import safe_write, safe_copy
1✔
13

14
logger = logging.getLogger(__name__.split(".")[0])
1✔
15

16
CACHE_SUBFOLDING = (
1✔
17
    2,
18
    2,
19
)  # (2, 2) means  "0123456789abcd" will be saved as "01/23/0123456789abcd"
20
SUPPORT_MIGRATED_BLOBS = True  # support blobs migrated from datajoint 0.11.*
1✔
21

22

23
def subfold(name, folds):
1✔
24
    """
25
    subfolding for external storage:   e.g.  subfold('aBCdefg', (2, 3))  -->  ['ab','cde']
26
    """
27
    return (
1✔
28
        (name[: folds[0]].lower(),) + subfold(name[folds[0] :], folds[1:])
29
        if folds
30
        else ()
31
    )
32

33

34
class ExternalTable(Table):
1✔
35
    """
36
    The table tracking externally stored objects.
37
    Declare as ExternalTable(connection, database)
38
    """
39

40
    def __init__(self, connection, store, database):
1✔
41
        self.store = store
1✔
42
        self.spec = config.get_store_spec(store)
1✔
43
        self._s3 = None
1✔
44
        self.database = database
1✔
45
        self._connection = connection
1✔
46
        self._heading = Heading(
1✔
47
            table_info=dict(
48
                conn=connection,
49
                database=database,
50
                table_name=self.table_name,
51
                context=None,
52
            )
53
        )
54
        self._support = [self.full_table_name]
1✔
55
        if not self.is_declared:
1✔
56
            self.declare()
1✔
57
        self._s3 = None
1✔
58
        if self.spec["protocol"] == "file" and not Path(self.spec["location"]).is_dir():
1✔
59
            raise FileNotFoundError(
×
60
                "Inaccessible local directory %s" % self.spec["location"]
61
            ) from None
62

63
    @property
1✔
64
    def definition(self):
1✔
65
        return """
1✔
66
        # external storage tracking
67
        hash  : uuid    #  hash of contents (blob), of filename + contents (attach), or relative filepath (filepath)
68
        ---
69
        size      :bigint unsigned     # size of object in bytes
70
        attachment_name=null : varchar(255)  # the filename of an attachment
71
        filepath=null : varchar(1000)  # relative filepath or attachment filename
72
        contents_hash=null : uuid      # used for the filepath datatype
73
        timestamp=CURRENT_TIMESTAMP  :timestamp   # automatic timestamp
74
        """
75

76
    @property
1✔
77
    def table_name(self):
1✔
78
        return f"{EXTERNAL_TABLE_ROOT}_{self.store}"
1✔
79

80
    @property
1✔
81
    def s3(self):
1✔
82
        if self._s3 is None:
1✔
83
            self._s3 = s3.Folder(**self.spec)
1✔
84
        return self._s3
1✔
85

86
    # - low-level operations - private
87

88
    def _make_external_filepath(self, relative_filepath):
1✔
89
        """resolve the complete external path based on the relative path"""
90
        # Strip root
91
        if self.spec["protocol"] == "s3":
1✔
92
            posix_path = PurePosixPath(PureWindowsPath(self.spec["location"]))
1✔
93
            location_path = (
1✔
94
                Path(*posix_path.parts[1:])
95
                if len(self.spec["location"]) > 0
96
                and any(case in posix_path.parts[0] for case in ("\\", ":"))
97
                else Path(posix_path)
98
            )
99
            return PurePosixPath(location_path, relative_filepath)
1✔
100
        # Preserve root
101
        elif self.spec["protocol"] == "file":
1✔
102
            return PurePosixPath(Path(self.spec["location"]), relative_filepath)
1✔
103
        else:
104
            assert False
×
105

106
    def _make_uuid_path(self, uuid, suffix=""):
1✔
107
        """create external path based on the uuid hash"""
108
        return self._make_external_filepath(
1✔
109
            PurePosixPath(
110
                self.database,
111
                "/".join(subfold(uuid.hex, self.spec["subfolding"])),
112
                uuid.hex,
113
            ).with_suffix(suffix)
114
        )
115

116
    def _upload_file(self, local_path, external_path, metadata=None):
1✔
117
        if self.spec["protocol"] == "s3":
1✔
118
            self.s3.fput(local_path, external_path, metadata)
1✔
119
        elif self.spec["protocol"] == "file":
1✔
120
            safe_copy(local_path, external_path, overwrite=True)
1✔
121
        else:
122
            assert False
×
123

124
    def _download_file(self, external_path, download_path):
1✔
125
        if self.spec["protocol"] == "s3":
1✔
126
            self.s3.fget(external_path, download_path)
1✔
127
        elif self.spec["protocol"] == "file":
1✔
128
            safe_copy(external_path, download_path)
1✔
129
        else:
130
            assert False
×
131

132
    def _upload_buffer(self, buffer, external_path):
1✔
133
        if self.spec["protocol"] == "s3":
1✔
134
            self.s3.put(external_path, buffer)
1✔
135
        elif self.spec["protocol"] == "file":
1✔
136
            safe_write(external_path, buffer)
1✔
137
        else:
138
            assert False
×
139

140
    def _download_buffer(self, external_path):
1✔
141
        if self.spec["protocol"] == "s3":
1✔
142
            return self.s3.get(external_path)
1✔
143
        if self.spec["protocol"] == "file":
1✔
144
            return Path(external_path).read_bytes()
1✔
145
        assert False
×
146

147
    def _remove_external_file(self, external_path):
1✔
148
        if self.spec["protocol"] == "s3":
1✔
149
            self.s3.remove_object(external_path)
1✔
150
        elif self.spec["protocol"] == "file":
1✔
151
            try:
1✔
152
                Path(external_path).unlink()
1✔
153
            except FileNotFoundError:
1✔
154
                pass
×
155

156
    def exists(self, external_filepath):
1✔
157
        """
158
        :return: True if the external file is accessible
159
        """
160
        if self.spec["protocol"] == "s3":
1✔
161
            return self.s3.exists(external_filepath)
1✔
162
        if self.spec["protocol"] == "file":
1✔
163
            return Path(external_filepath).is_file()
1✔
164
        assert False
×
165

166
    # --- BLOBS ----
167

168
    def put(self, blob):
1✔
169
        """
170
        put a binary string (blob) in external store
171
        """
172
        uuid = uuid_from_buffer(blob)
1✔
173
        self._upload_buffer(blob, self._make_uuid_path(uuid))
1✔
174
        # insert tracking info
175
        self.connection.query(
1✔
176
            "INSERT INTO {tab} (hash, size) VALUES (%s, {size}) ON DUPLICATE KEY "
177
            "UPDATE timestamp=CURRENT_TIMESTAMP".format(
178
                tab=self.full_table_name, size=len(blob)
179
            ),
180
            args=(uuid.bytes,),
181
        )
182
        return uuid
1✔
183

184
    def get(self, uuid):
1✔
185
        """
186
        get an object from external store.
187
        """
188
        if uuid is None:
1✔
189
            return None
×
190
        # attempt to get object from cache
191
        blob = None
1✔
192
        cache_folder = config.get("cache", None)
1✔
193
        if cache_folder:
1✔
194
            try:
1✔
195
                cache_path = Path(cache_folder, *subfold(uuid.hex, CACHE_SUBFOLDING))
1✔
196
                cache_file = Path(cache_path, uuid.hex)
1✔
197
                blob = cache_file.read_bytes()
1✔
198
            except FileNotFoundError:
1✔
199
                pass  # not cached
1✔
200
        # download blob from external store
201
        if blob is None:
1✔
202
            try:
1✔
203
                blob = self._download_buffer(self._make_uuid_path(uuid))
1✔
204
            except MissingExternalFile:
1✔
205
                if not SUPPORT_MIGRATED_BLOBS:
1✔
206
                    raise
×
207
                # blobs migrated from datajoint 0.11 are stored at explicitly defined filepaths
208
                relative_filepath, contents_hash = (self & {"hash": uuid}).fetch1(
1✔
209
                    "filepath", "contents_hash"
210
                )
211
                if relative_filepath is None:
1✔
212
                    raise
×
213
                blob = self._download_buffer(
1✔
214
                    self._make_external_filepath(relative_filepath)
215
                )
216
            if cache_folder:
1✔
217
                cache_path.mkdir(parents=True, exist_ok=True)
1✔
218
                safe_write(cache_path / uuid.hex, blob)
1✔
219
        return blob
1✔
220

221
    # --- ATTACHMENTS ---
222

223
    def upload_attachment(self, local_path):
1✔
224
        attachment_name = Path(local_path).name
1✔
225
        uuid = uuid_from_file(local_path, init_string=attachment_name + "\0")
1✔
226
        external_path = self._make_uuid_path(uuid, "." + attachment_name)
1✔
227
        self._upload_file(local_path, external_path)
1✔
228
        # insert tracking info
229
        self.connection.query(
1✔
230
            """
231
        INSERT INTO {tab} (hash, size, attachment_name)
232
        VALUES (%s, {size}, "{attachment_name}")
233
        ON DUPLICATE KEY UPDATE timestamp=CURRENT_TIMESTAMP""".format(
234
                tab=self.full_table_name,
235
                size=Path(local_path).stat().st_size,
236
                attachment_name=attachment_name,
237
            ),
238
            args=[uuid.bytes],
239
        )
240
        return uuid
1✔
241

242
    def get_attachment_name(self, uuid):
1✔
243
        return (self & {"hash": uuid}).fetch1("attachment_name")
1✔
244

245
    def download_attachment(self, uuid, attachment_name, download_path):
1✔
246
        """save attachment from memory buffer into the save_path"""
247
        external_path = self._make_uuid_path(uuid, "." + attachment_name)
1✔
248
        self._download_file(external_path, download_path)
1✔
249

250
    # --- FILEPATH ---
251

252
    def upload_filepath(self, local_filepath):
1✔
253
        """
254
        Raise exception if an external entry already exists with a different contents checksum.
255
        Otherwise, copy (with overwrite) file to remote and
256
        If an external entry exists with the same checksum, then no copying should occur
257
        """
258
        local_filepath = Path(local_filepath)
1✔
259
        try:
1✔
260
            relative_filepath = str(
1✔
261
                local_filepath.relative_to(self.spec["stage"]).as_posix()
262
            )
263
        except ValueError:
×
264
            raise DataJointError(
×
265
                "The path {path} is not in stage {stage}".format(
266
                    path=local_filepath.parent, **self.spec
267
                )
268
            )
269
        uuid = uuid_from_buffer(
1✔
270
            init_string=relative_filepath
271
        )  # hash relative path, not contents
272
        contents_hash = uuid_from_file(local_filepath)
1✔
273

274
        # check if the remote file already exists and verify that it matches
275
        check_hash = (self & {"hash": uuid}).fetch("contents_hash")
1✔
276
        if check_hash:
1✔
277
            # the tracking entry exists, check that it's the same file as before
278
            if contents_hash != check_hash[0]:
1✔
279
                raise DataJointError(
1✔
280
                    f"A different version of '{relative_filepath}' has already been placed."
281
                )
282
        else:
283
            # upload the file and create its tracking entry
284
            self._upload_file(
1✔
285
                local_filepath,
286
                self._make_external_filepath(relative_filepath),
287
                metadata={"contents_hash": str(contents_hash)},
288
            )
289
            self.connection.query(
1✔
290
                "INSERT INTO {tab} (hash, size, filepath, contents_hash) VALUES (%s, {size}, '{filepath}', %s)".format(
291
                    tab=self.full_table_name,
292
                    size=Path(local_filepath).stat().st_size,
293
                    filepath=relative_filepath,
294
                ),
295
                args=(uuid.bytes, contents_hash.bytes),
296
            )
297
        return uuid
1✔
298

299
    def download_filepath(self, filepath_hash):
1✔
300
        """
301
        sync a file from external store to the local stage
302

303
        :param filepath_hash: The hash (UUID) of the relative_path
304
        :return: hash (UUID) of the contents of the downloaded file or Nones
305
        """
306

307
        def _need_checksum(local_filepath, expected_size):
1✔
308
            limit = config.get("filepath_checksum_size_limit")
1✔
309
            actual_size = Path(local_filepath).stat().st_size
1✔
310
            if expected_size != actual_size:
1✔
311
                # this should never happen without outside interference
312
                raise DataJointError(
×
313
                    f"'{local_filepath}' downloaded but size did not match."
314
                )
315
            return limit is None or actual_size < limit
1✔
316

317
        if filepath_hash is not None:
1✔
318
            relative_filepath, contents_hash, size = (
1✔
319
                self & {"hash": filepath_hash}
320
            ).fetch1("filepath", "contents_hash", "size")
321
            external_path = self._make_external_filepath(relative_filepath)
1✔
322
            local_filepath = Path(self.spec["stage"]).absolute() / relative_filepath
1✔
323

324
            file_exists = Path(local_filepath).is_file() and (
1✔
325
                not _need_checksum(local_filepath, size)
326
                or uuid_from_file(local_filepath) == contents_hash
327
            )
328

329
            if not file_exists:
1✔
330
                self._download_file(external_path, local_filepath)
1✔
331
                if (
1✔
332
                    _need_checksum(local_filepath, size)
333
                    and uuid_from_file(local_filepath) != contents_hash
334
                ):
335
                    # this should never happen without outside interference
336
                    raise DataJointError(
×
337
                        f"'{local_filepath}' downloaded but did not pass checksum."
338
                    )
339
            if not _need_checksum(local_filepath, size):
1✔
340
                logger.warning(
1✔
341
                    f"Skipped checksum for file with hash: {contents_hash}, and path: {local_filepath}"
342
                )
343
            return str(local_filepath), contents_hash
1✔
344

345
    # --- UTILITIES ---
346

347
    @property
1✔
348
    def references(self):
1✔
349
        """
350
        :return: generator of referencing table names and their referencing columns
351
        """
352
        return (
1✔
353
            {k.lower(): v for k, v in elem.items()}
354
            for elem in self.connection.query(
355
                """
356
        SELECT concat('`', table_schema, '`.`', table_name, '`') as referencing_table, column_name
357
        FROM information_schema.key_column_usage
358
        WHERE referenced_table_name="{tab}" and referenced_table_schema="{db}"
359
        """.format(
360
                    tab=self.table_name, db=self.database
361
                ),
362
                as_dict=True,
363
            )
364
        )
365

366
    def fetch_external_paths(self, **fetch_kwargs):
1✔
367
        """
368
        generate complete external filepaths from the query.
369
        Each element is a tuple: (uuid, path)
370

371
        :param fetch_kwargs: keyword arguments to pass to fetch
372
        """
373
        fetch_kwargs.update(as_dict=True)
1✔
374
        paths = []
1✔
375
        for item in self.fetch("hash", "attachment_name", "filepath", **fetch_kwargs):
1✔
376
            if item["attachment_name"]:
1✔
377
                # attachments
378
                path = self._make_uuid_path(item["hash"], "." + item["attachment_name"])
×
379
            elif item["filepath"]:
1✔
380
                # external filepaths
381
                path = self._make_external_filepath(item["filepath"])
1✔
382
            else:
383
                # blobs
384
                path = self._make_uuid_path(item["hash"])
1✔
385
            paths.append((item["hash"], path))
1✔
386
        return paths
1✔
387

388
    def unused(self):
1✔
389
        """
390
        query expression for unused hashes
391

392
        :return: self restricted to elements that are not in use by any tables in the schema
393
        """
394
        return self - [
1✔
395
            FreeTable(self.connection, ref["referencing_table"]).proj(
396
                hash=ref["column_name"]
397
            )
398
            for ref in self.references
399
        ]
400

401
    def used(self):
1✔
402
        """
403
        query expression for used hashes
404

405
        :return: self restricted to elements that in use by tables in the schema
406
        """
407
        return self & [
×
408
            FreeTable(self.connection, ref["referencing_table"]).proj(
409
                hash=ref["column_name"]
410
            )
411
            for ref in self.references
412
        ]
413

414
    def delete(
1✔
415
        self,
416
        *,
417
        delete_external_files=None,
418
        limit=None,
419
        display_progress=True,
420
        errors_as_string=True,
421
    ):
422
        """
423

424
        :param delete_external_files: True or False. If False, only the tracking info is removed from the external
425
                store table but the external files remain intact. If True, then the external files themselves are deleted too.
426
        :param errors_as_string: If True any errors returned when deleting from external files will be strings
427
        :param limit: (integer) limit the number of items to delete
428
        :param display_progress: if True, display progress as files are cleaned up
429
        :return: if deleting external files, returns errors
430
        """
431
        if delete_external_files not in (True, False):
1✔
432
            raise DataJointError(
×
433
                "The delete_external_files argument must be set to either "
434
                "True or False in delete()"
435
            )
436

437
        if not delete_external_files:
1✔
438
            self.unused().delete_quick()
1✔
439
        else:
440
            items = self.unused().fetch_external_paths(limit=limit)
1✔
441
            if display_progress:
1✔
442
                items = tqdm(items)
1✔
443
            # delete items one by one, close to transaction-safe
444
            error_list = []
1✔
445
            for uuid, external_path in items:
1✔
446
                row = (self & {"hash": uuid}).fetch()
1✔
447
                if row.size:
1✔
448
                    try:
1✔
449
                        (self & {"hash": uuid}).delete_quick()
1✔
450
                    except Exception:
×
451
                        pass  # if delete failed, do not remove the external file
×
452
                    else:
453
                        try:
1✔
454
                            self._remove_external_file(external_path)
1✔
455
                        except Exception as error:
1✔
456
                            # adding row back into table after failed delete
457
                            self.insert1(row[0], skip_duplicates=True)
1✔
458
                            error_list.append(
1✔
459
                                (
460
                                    uuid,
461
                                    external_path,
462
                                    str(error) if errors_as_string else error,
463
                                )
464
                            )
465
            return error_list
1✔
466

467

468
class ExternalMapping(Mapping):
1✔
469
    """
470
    The external manager contains all the tables for all external stores for a given schema
471
    :Example:
472
        e = ExternalMapping(schema)
473
        external_table = e[store]
474
    """
475

476
    def __init__(self, schema):
1✔
477
        self.schema = schema
1✔
478
        self._tables = {}
1✔
479

480
    def __repr__(self):
1✔
481
        return "External file tables for schema `{schema}`:\n    ".format(
×
482
            schema=self.schema.database
483
        ) + "\n    ".join(
484
            '"{store}" {protocol}:{location}'.format(store=k, **v.spec)
485
            for k, v in self.items()
486
        )
487

488
    def __getitem__(self, store):
1✔
489
        """
490
        Triggers the creation of an external table.
491
        Should only be used when ready to save or read from external storage.
492

493
        :param store: the name of the store
494
        :return: the ExternalTable object for the store
495
        """
496
        if store not in self._tables:
1✔
497
            self._tables[store] = ExternalTable(
1✔
498
                connection=self.schema.connection,
499
                store=store,
500
                database=self.schema.database,
501
            )
502
        return self._tables[store]
1✔
503

504
    def __len__(self):
1✔
505
        return len(self._tables)
×
506

507
    def __iter__(self):
1✔
508
        return iter(self._tables)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc