• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

datajoint / datajoint-python / #12880

pending completion
#12880

push

travis-ci

web-flow
Merge pull request #1067 from CBroz1/master

Add support for insert CSV

4 of 4 new or added lines in 1 file covered. (100.0%)

3102 of 3424 relevant lines covered (90.6%)

0.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.59
/datajoint/migrate.py
1
import datajoint as dj
1✔
2
from pathlib import Path
1✔
3
import re
1✔
4
from .utils import user_choice
1✔
5

6

7
def migrate_dj011_external_blob_storage_to_dj012(migration_schema, store):
1✔
8
    """
9
    Utility function to migrate external blob data from 0.11 to 0.12.
10

11
    :param migration_schema: string of target schema to be migrated
12
    :param store: string of target dj.config['store'] to be migrated
13
    """
14
    if not isinstance(migration_schema, str):
1✔
15
        raise ValueError(
1✔
16
            "Expected type {} for migration_schema, not {}.".format(
17
                str, type(migration_schema)
18
            )
19
        )
20

21
    do_migration = False
×
22
    do_migration = (
×
23
        user_choice(
24
            """
25
Warning: Ensure the following are completed before proceeding.
26
- Appropriate backups have been taken,
27
- Any existing DJ 0.11.X connections are suspended, and
28
- External config has been updated to new dj.config['stores'] structure.
29
Proceed?
30
            """,
31
            default="no",
32
        )
33
        == "yes"
34
    )
35
    if do_migration:
×
36
        _migrate_dj011_blob(dj.Schema(migration_schema), store)
×
37
        print(
×
38
            "Migration completed for schema: {}, store: {}.".format(
39
                migration_schema, store
40
            )
41
        )
42
        return
×
43
    print("No migration performed.")
×
44

45

46
def _migrate_dj011_blob(schema, default_store):
1✔
47
    query = schema.connection.query
1✔
48

49
    LEGACY_HASH_SIZE = 43
1✔
50

51
    legacy_external = dj.FreeTable(
1✔
52
        schema.connection, "`{db}`.`~external`".format(db=schema.database)
53
    )
54

55
    # get referencing tables
56
    refs = [
1✔
57
        {k.lower(): v for k, v in elem.items()}
58
        for elem in query(
59
            """
60
    SELECT concat('`', table_schema, '`.`', table_name, '`')
61
            as referencing_table, column_name, constraint_name
62
    FROM information_schema.key_column_usage
63
    WHERE referenced_table_name="{tab}" and referenced_table_schema="{db}"
64
    """.format(
65
                tab=legacy_external.table_name, db=legacy_external.database
66
            ),
67
            as_dict=True,
68
        ).fetchall()
69
    ]
70

71
    for ref in refs:
1✔
72
        # get comment
73
        column = query(
1✔
74
            "SHOW FULL COLUMNS FROM {referencing_table}"
75
            'WHERE Field="{column_name}"'.format(**ref),
76
            as_dict=True,
77
        ).fetchone()
78

79
        store, comment = re.match(
1✔
80
            r":external(-(?P<store>.+))?:(?P<comment>.*)", column["Comment"]
81
        ).group("store", "comment")
82

83
        # get all the hashes from the reference
84
        hashes = {
1✔
85
            x[0]
86
            for x in query(
87
                "SELECT `{column_name}` FROM {referencing_table}".format(**ref)
88
            )
89
        }
90

91
        # sanity check make sure that store suffixes match
92
        if store is None:
1✔
93
            assert all(len(_) == LEGACY_HASH_SIZE for _ in hashes)
1✔
94
        else:
95
            assert all(_[LEGACY_HASH_SIZE:] == store for _ in hashes)
1✔
96

97
        # create new-style external table
98
        ext = schema.external[store or default_store]
1✔
99

100
        # add the new-style reference field
101
        temp_suffix = "tempsub"
1✔
102

103
        try:
1✔
104
            query(
1✔
105
                """ALTER TABLE {referencing_table}
106
                ADD COLUMN `{column_name}_{temp_suffix}` {type} DEFAULT NULL
107
            COMMENT ":blob@{store}:{comment}"
108
            """.format(
109
                    type=dj.declare.UUID_DATA_TYPE,
110
                    temp_suffix=temp_suffix,
111
                    store=(store or default_store),
112
                    comment=comment,
113
                    **ref
114
                )
115
            )
116
        except:
×
117
            print("Column already added")
×
118
            pass
×
119

120
        for _hash, size in zip(*legacy_external.fetch("hash", "size")):
1✔
121
            if _hash in hashes:
1✔
122
                relative_path = str(Path(schema.database, _hash).as_posix())
1✔
123
                uuid = dj.hash.uuid_from_buffer(init_string=relative_path)
1✔
124
                external_path = ext._make_external_filepath(relative_path)
1✔
125
                if ext.spec["protocol"] == "s3":
1✔
126
                    contents_hash = dj.hash.uuid_from_buffer(
1✔
127
                        ext._download_buffer(external_path)
128
                    )
129
                else:
130
                    contents_hash = dj.hash.uuid_from_file(external_path)
1✔
131
                ext.insert1(
1✔
132
                    dict(
133
                        filepath=relative_path,
134
                        size=size,
135
                        contents_hash=contents_hash,
136
                        hash=uuid,
137
                    ),
138
                    skip_duplicates=True,
139
                )
140

141
                query(
1✔
142
                    "UPDATE {referencing_table} "
143
                    "SET `{column_name}_{temp_suffix}`=%s "
144
                    'WHERE `{column_name}` = "{_hash}"'.format(
145
                        _hash=_hash, temp_suffix=temp_suffix, **ref
146
                    ),
147
                    uuid.bytes,
148
                )
149

150
        # check that all have been copied
151
        check = query(
1✔
152
            "SELECT * FROM {referencing_table} "
153
            "WHERE `{column_name}` IS NOT NULL"
154
            "  AND `{column_name}_{temp_suffix}` IS NULL".format(
155
                temp_suffix=temp_suffix, **ref
156
            )
157
        ).fetchall()
158

159
        assert len(check) == 0, "Some hashes havent been migrated"
1✔
160

161
        # drop old foreign key, rename, and create new foreign key
162
        query(
1✔
163
            """
164
            ALTER TABLE {referencing_table}
165
            DROP FOREIGN KEY `{constraint_name}`,
166
            DROP COLUMN `{column_name}`,
167
            CHANGE COLUMN `{column_name}_{temp_suffix}` `{column_name}`
168
                {type} DEFAULT NULL
169
                COMMENT ":blob@{store}:{comment}",
170
            ADD FOREIGN KEY (`{column_name}`) REFERENCES {ext_table_name}
171
                (`hash`)
172
            """.format(
173
                temp_suffix=temp_suffix,
174
                ext_table_name=ext.full_table_name,
175
                type=dj.declare.UUID_DATA_TYPE,
176
                store=(store or default_store),
177
                comment=comment,
178
                **ref
179
            )
180
        )
181

182
    # Drop the old external table but make sure it's no longer referenced
183
    # get referencing tables
184
    refs = [
1✔
185
        {k.lower(): v for k, v in elem.items()}
186
        for elem in query(
187
            """
188
    SELECT concat('`', table_schema, '`.`', table_name, '`') as
189
        referencing_table, column_name, constraint_name
190
    FROM information_schema.key_column_usage
191
    WHERE referenced_table_name="{tab}" and referenced_table_schema="{db}"
192
    """.format(
193
                tab=legacy_external.table_name, db=legacy_external.database
194
            ),
195
            as_dict=True,
196
        ).fetchall()
197
    ]
198

199
    assert not refs, "Some references still exist"
1✔
200

201
    # drop old external table
202
    legacy_external.drop_quick()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc