• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uc-cdis / indexd / 25305103925

04 May 2026 06:47AM UTC coverage: 87.486% (+0.1%) from 87.353%
25305103925

Pull #441

github

k-burt-uch
drs 1.5 helm branch for integration_tests
Pull Request #441: Feat/drs 1.5

3076 of 3516 relevant lines covered (87.49%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.71
indexd/utils.py
1
import logging
1✔
2
import re
1✔
3
from urllib.parse import urlparse
1✔
4
import os
1✔
5
import requests
1✔
6
from flask import current_app as app
1✔
7

8

9
def hint_match(record, hints):
1✔
10
    for hint in hints:
1✔
11
        if re.match(hint, record):
1✔
12
            return True
×
13
    return False
1✔
14

15

16
from sqlalchemy import create_engine
1✔
17
from sqlalchemy.engine.reflection import Inspector
1✔
18

19

20
def try_drop_test_data(
21
    user, database, root_user="postgres", host=""
22
):  # pragma: no cover
23
    engine = create_engine(
24
        "postgresql://{user}@{host}/postgres".format(user=root_user, host=host)
25
    )
26

27
    conn = engine.connect()
28
    conn.execute("commit")
29

30
    try:
31
        create_stmt = 'DROP DATABASE "{database}"'.format(database=database)
32
        conn.execute(create_stmt)
33
    except Exception:
34
        logging.warning("Unable to drop test data:")
35

36
    conn.close()
37

38

39
def setup_database(
40
    user,
41
    password,
42
    database,
43
    root_user="postgres",
44
    host="",
45
    no_drop=False,
46
    no_user=False,
47
):  # pragma: no cover
48
    """
49
    setup the user and database
50
    """
51

52
    if not no_drop:
53
        try_drop_test_data(user, database)
54

55
    engine = create_engine(
56
        "postgresql://{user}@{host}/postgres".format(user=root_user, host=host)
57
    )
58
    conn = engine.connect()
59
    conn.execute("commit")
60

61
    create_stmt = 'CREATE DATABASE "{database}"'.format(database=database)
62
    try:
63
        conn.execute(create_stmt)
64
    except Exception:
65
        logging.warning("Unable to create database")
66

67
    if not no_user:
68
        try:
69
            user_stmt = "CREATE USER {user} WITH PASSWORD '{password}'".format(
70
                user=user, password=password
71
            )
72
            conn.execute(user_stmt)
73

74
            perm_stmt = (
75
                "GRANT ALL PRIVILEGES ON DATABASE {database} to {password}"
76
                "".format(database=database, password=password)
77
            )
78
            conn.execute(perm_stmt)
79
            conn.execute("commit")
80
        except Exception:
81
            logging.warning("Unable to add user")
82
    conn.close()
83

84

85
def create_tables(host, user, password, database):  # pragma: no cover
86
    """
87
    create tables
88
    """
89
    engine = create_engine(
90
        "postgresql://{user}:{pwd}@{host}/{db}".format(
91
            user=user, host=host, pwd=password, db=database
92
        )
93
    )
94
    conn = engine.connect()
95

96
    create_index_record_stm = "CREATE TABLE index_record (\
97
        did VARCHAR NOT NULL, rev VARCHAR, form VARCHAR, size BIGINT, PRIMARY KEY (did) )"
98
    create_record_hash_stm = "CREATE TABLE index_record_hash (\
99
        did VARCHAR NOT NULL, hash_type VARCHAR NOT NULL, hash_value VARCHAR, \
100
        PRIMARY KEY (did, hash_type), FOREIGN KEY(did) REFERENCES index_record (did))"
101
    create_record_url_stm = "CREATE TABLE index_record_url( \
102
        did VARCHAR NOT NULL, url VARCHAR NOT NULL, PRIMARY KEY (did, url),\
103
        FOREIGN KEY(did) REFERENCES index_record (did) )"
104
    create_index_schema_version_stm = "CREATE TABLE index_schema_version (\
105
        version INT)"
106
    create_drs_bundle_record = "CREATE TABLE drs_bundle_record (\
107
        bundle_id VARCHAR NOT NULL, name VARCHAR, created_time DATETIME, updated_time DATETIME,\
108
        checksum VARCHAR, size BIGINT, bundle_data TEXT, description TEXT, version VARCHAR, aliases VARCHAR, PRIMARY KEY(bundle_id)"
109
    try:
110
        conn.execute(create_index_record_stm)
111
        conn.execute(create_record_hash_stm)
112
        conn.execute(create_record_url_stm)
113
        conn.execute(create_index_schema_version_stm)
114
        conn.execute(create_drs_bundle_record)
115
    except Exception:
116
        logging.warning("Unable to create table")
117
        raise
118
    finally:
119
        conn.close()
120

121

122
def check_engine_for_migrate(engine):
1✔
123
    """
124
    check if a db engine support database migration
125

126
    Args:
127
        engine (sqlalchemy.engine.base.Engine): a sqlalchemy engine
128

129
    Return:
130
        bool: whether the engine support migration
131
    """
132
    return engine.dialect.supports_alter
×
133

134

135
def init_schema_version(driver, model, version):
1✔
136
    """
137
    initialize schema table with a initialized singleton of version
138

139
    Args:
140
        driver (object): an alias or index driver instance
141
        model (sqlalchemy.ext.declarative.api.Base): the version table model
142

143
    Return:
144
        version (int): current version number in database
145
    """
146
    with driver.session as s:
×
147
        schema_version = s.query(model).first()
×
148
        if not schema_version:
×
149
            schema_version = model(version=version)
×
150
            s.add(schema_version)
×
151
        version = schema_version.version
×
152
    return version
×
153

154

155
def migrate_database(driver, migrate_functions, current_schema_version, model):
1✔
156
    """
157
    This migration logic is DEPRECATED. It is still supported for backwards compatibility,
158
    but any new migration should be added using Alembic.
159

160
    migrate current database to match the schema version provided in
161
    current schema
162

163
    Args:
164
        driver (object): an alias or index driver instance
165
        migrate_functions (list): a list of migration functions
166
        curent_schema_version (int): version of current schema in code
167
        model (sqlalchemy.ext.declarative.api.Base): the version table model
168

169
    Return:
170
        None
171
    """
172
    db_schema_version = init_schema_version(driver, model, 0)
×
173

174
    need_migrate = (current_schema_version - db_schema_version) > 0
×
175

176
    if not check_engine_for_migrate(driver.engine) and need_migrate:
×
177
        driver.logger.error(
×
178
            "Engine {} does not support alter, skip migration".format(
179
                driver.engine.dialect.name
180
            )
181
        )
182
        return
×
183

184
    for f in migrate_functions[db_schema_version:current_schema_version]:
×
185
        with driver.session as s:
×
186
            schema_version = s.query(model).first()
×
187
            driver.logger.info(
×
188
                "migrating {} schema to {}".format(
189
                    driver.__class__.__name__, schema_version.version
190
                )
191
            )
192

193
            f(engine=driver.engine, session=s)
×
194
            schema_version.version += 1
×
195
            s.add(schema_version)
×
196

197

198
def reverse_url(url):
1✔
199
    """
200
    Reverse the domain name for drs service-info IDs
201
    Args:
202
        url (str): url of the domain
203
        example: drs.example.org
204

205
    returns:
206
        id (str): DRS service-info ID
207
        example: org.example.drs
208
    """
209
    parsed_url = urlparse(url)
1✔
210
    if parsed_url.scheme in ["http", "https"]:
1✔
211
        url = parsed_url.hostname
×
212
    segments = url.split(".")
1✔
213
    reversed_segments = reversed(segments)
1✔
214
    res = ".".join(reversed_segments)
1✔
215
    return res
1✔
216

217

218
FENCE_SERVICE = os.environ.get("FENCE_SERVICE_URL", "http://fence-service")
1✔
219

220

221
def lookup_bucket_region(bucket_name, bucket_regions):
1✔
222
    """
223
    Resolve a bucket name to a region.
224
    Exact match first, then simple prefix fallback.
225
    """
226
    if bucket_name in bucket_regions:
1✔
227
        return bucket_regions[bucket_name]
1✔
228

229
    # remove regexp for prefix matching to remove snyk vulnerability
230
    for pattern, region in bucket_regions.items():
1✔
231
        if pattern.endswith(".*") and bucket_name.startswith(pattern[:-2]):
1✔
232
            return region
1✔
233

234
    return ""
1✔
235

236

237
def get_bucket_regions():
1✔
238
    cached = getattr(app, "cache", None)
1✔
239
    if cached:
1✔
240
        hit = cached.get("bucket_regions")
1✔
241
        if hit:
1✔
242
            return hit
×
243

244
    url = f"{FENCE_SERVICE}/data/buckets"
1✔
245
    data = {}
1✔
246

247
    try:
1✔
248
        resp = requests.get(url)
1✔
249
        resp.raise_for_status()
1✔
250
        data = resp.json().get("S3_BUCKETS") or {}
×
251
    except Exception as e:
1✔
252
        app.logger.warning(f"Failed to fetch bucket regions from Fence: {e}")
1✔
253

254
    regions = {k: v.get("region", "") for k, v in data.items()}
1✔
255

256
    if cached:
1✔
257
        cached.set("bucket_regions", regions)
1✔
258

259
    return regions
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc