• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uc-cdis / indexd / 28181841506

25 Jun 2026 03:36PM UTC coverage: 87.405% (+0.003%) from 87.402%
28181841506

push

github

web-flow
Merge pull request #441 from uc-cdis/feat/drs-1.5

Feat/drs 1.5

3220 of 3684 relevant lines covered (87.4%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.71
indexd/utils.py
1
import re
1✔
2
from urllib.parse import urlparse
1✔
3
import os
1✔
4
import requests
1✔
5
from flask import current_app as app
1✔
6
from cdislogging import get_logger
1✔
7
from sqlalchemy import create_engine
1✔
8

9
logger = get_logger(__name__)
1✔
10

11

12
def hint_match(record, hints):
1✔
13
    for hint in hints:
1✔
14
        if re.match(hint, record):
1✔
15
            return True
×
16
    return False
1✔
17

18

19
def try_drop_test_data(
20
    user, database, root_user="postgres", host=""
21
):  # pragma: no cover
22
    engine = create_engine(
23
        "postgresql://{user}@{host}/postgres".format(user=root_user, host=host)
24
    )
25

26
    conn = engine.connect()
27
    conn.execute("commit")
28

29
    try:
30
        create_stmt = 'DROP DATABASE "{database}"'.format(database=database)
31
        conn.execute(create_stmt)
32
    except Exception:
33
        logging.warning("Unable to drop test data:")
34

35
    conn.close()
36

37

38
def setup_database(
39
    user,
40
    password,
41
    database,
42
    root_user="postgres",
43
    host="",
44
    no_drop=False,
45
    no_user=False,
46
):  # pragma: no cover
47
    """
48
    setup the user and database
49
    """
50

51
    if not no_drop:
52
        try_drop_test_data(user, database)
53

54
    engine = create_engine(
55
        "postgresql://{user}@{host}/postgres".format(user=root_user, host=host)
56
    )
57
    conn = engine.connect()
58
    conn.execute("commit")
59

60
    create_stmt = 'CREATE DATABASE "{database}"'.format(database=database)
61
    try:
62
        conn.execute(create_stmt)
63
    except Exception:
64
        logging.warning("Unable to create database")
65

66
    if not no_user:
67
        try:
68
            user_stmt = "CREATE USER {user} WITH PASSWORD '{password}'".format(
69
                user=user, password=password
70
            )
71
            conn.execute(user_stmt)
72

73
            perm_stmt = (
74
                "GRANT ALL PRIVILEGES ON DATABASE {database} to {password}"
75
                "".format(database=database, password=password)
76
            )
77
            conn.execute(perm_stmt)
78
            conn.execute("commit")
79
        except Exception:
80
            logging.warning("Unable to add user")
81
    conn.close()
82

83

84
def create_tables(host, user, password, database):  # pragma: no cover
85
    """
86
    create tables
87
    """
88
    engine = create_engine(
89
        "postgresql://{user}:{pwd}@{host}/{db}".format(
90
            user=user, host=host, pwd=password, db=database
91
        )
92
    )
93
    conn = engine.connect()
94

95
    create_index_record_stm = "CREATE TABLE index_record (\
96
        did VARCHAR NOT NULL, rev VARCHAR, form VARCHAR, size BIGINT, PRIMARY KEY (did) )"
97
    create_record_hash_stm = "CREATE TABLE index_record_hash (\
98
        did VARCHAR NOT NULL, hash_type VARCHAR NOT NULL, hash_value VARCHAR, \
99
        PRIMARY KEY (did, hash_type), FOREIGN KEY(did) REFERENCES index_record (did))"
100
    create_record_url_stm = "CREATE TABLE index_record_url( \
101
        did VARCHAR NOT NULL, url VARCHAR NOT NULL, PRIMARY KEY (did, url),\
102
        FOREIGN KEY(did) REFERENCES index_record (did) )"
103
    create_index_schema_version_stm = "CREATE TABLE index_schema_version (\
104
        version INT)"
105
    create_drs_bundle_record = "CREATE TABLE drs_bundle_record (\
106
        bundle_id VARCHAR NOT NULL, name VARCHAR, created_time DATETIME, updated_time DATETIME,\
107
        checksum VARCHAR, size BIGINT, bundle_data TEXT, description TEXT, version VARCHAR, aliases VARCHAR, PRIMARY KEY(bundle_id)"
108
    try:
109
        conn.execute(create_index_record_stm)
110
        conn.execute(create_record_hash_stm)
111
        conn.execute(create_record_url_stm)
112
        conn.execute(create_index_schema_version_stm)
113
        conn.execute(create_drs_bundle_record)
114
    except Exception:
115
        logging.warning("Unable to create table")
116
        raise
117
    finally:
118
        conn.close()
119

120

121
def check_engine_for_migrate(engine):
1✔
122
    """
123
    check if a db engine support database migration
124

125
    Args:
126
        engine (sqlalchemy.engine.base.Engine): a sqlalchemy engine
127

128
    Return:
129
        bool: whether the engine support migration
130
    """
131
    return engine.dialect.supports_alter
×
132

133

134
def init_schema_version(driver, model, version):
1✔
135
    """
136
    initialize schema table with a initialized singleton of version
137

138
    Args:
139
        driver (object): an alias or index driver instance
140
        model (sqlalchemy.ext.declarative.api.Base): the version table model
141

142
    Return:
143
        version (int): current version number in database
144
    """
145
    with driver.session as s:
×
146
        schema_version = s.query(model).first()
×
147
        if not schema_version:
×
148
            schema_version = model(version=version)
×
149
            s.add(schema_version)
×
150
        version = schema_version.version
×
151
    return version
×
152

153

154
def migrate_database(driver, migrate_functions, current_schema_version, model):
1✔
155
    """
156
    This migration logic is DEPRECATED. It is still supported for backwards compatibility,
157
    but any new migration should be added using Alembic.
158

159
    migrate current database to match the schema version provided in
160
    current schema
161

162
    Args:
163
        driver (object): an alias or index driver instance
164
        migrate_functions (list): a list of migration functions
165
        curent_schema_version (int): version of current schema in code
166
        model (sqlalchemy.ext.declarative.api.Base): the version table model
167

168
    Return:
169
        None
170
    """
171
    db_schema_version = init_schema_version(driver, model, 0)
×
172

173
    need_migrate = (current_schema_version - db_schema_version) > 0
×
174

175
    if not check_engine_for_migrate(driver.engine) and need_migrate:
×
176
        driver.logger.error(
×
177
            "Engine {} does not support alter, skip migration".format(
178
                driver.engine.dialect.name
179
            )
180
        )
181
        return
×
182

183
    for f in migrate_functions[db_schema_version:current_schema_version]:
×
184
        with driver.session as s:
×
185
            schema_version = s.query(model).first()
×
186
            driver.logger.info(
×
187
                "migrating {} schema to {}".format(
188
                    driver.__class__.__name__, schema_version.version
189
                )
190
            )
191

192
            f(engine=driver.engine, session=s)
×
193
            schema_version.version += 1
×
194
            s.add(schema_version)
×
195

196

197
def reverse_url(url):
1✔
198
    """
199
    Reverse the domain name for drs service-info IDs
200
    Args:
201
        url (str): url of the domain
202
        example: drs.example.org
203

204
    returns:
205
        id (str): DRS service-info ID
206
        example: org.example.drs
207
    """
208
    parsed_url = urlparse(url)
1✔
209
    if parsed_url.scheme in ["http", "https"]:
1✔
210
        url = parsed_url.hostname
×
211
    segments = url.split(".")
1✔
212
    reversed_segments = reversed(segments)
1✔
213
    res = ".".join(reversed_segments)
1✔
214
    return res
1✔
215

216

217
FENCE_SERVICE = os.environ.get("FENCE_SERVICE_URL", "http://fence-service")
1✔
218

219

220
def lookup_bucket_region(bucket_name, bucket_regions):
1✔
221
    """
222
    Resolve a bucket name to a region.
223
    Exact match first, then simple prefix fallback.
224
    """
225
    if bucket_name in bucket_regions:
1✔
226
        return bucket_regions[bucket_name]
1✔
227

228
    # remove regexp for prefix matching to remove snyk vulnerability
229
    for pattern, region in bucket_regions.items():
1✔
230
        if pattern.endswith(".*") and bucket_name.startswith(pattern[:-2]):
1✔
231
            return region
1✔
232

233
    return ""
1✔
234

235

236
def get_bucket_regions():
1✔
237
    cached = getattr(app, "cache", None)
1✔
238
    if cached:
1✔
239
        hit = cached.get("bucket_regions")
1✔
240
        if hit:
1✔
241
            return hit
×
242

243
    url = f"{FENCE_SERVICE}/data/buckets"
1✔
244
    data = {}
1✔
245

246
    try:
1✔
247
        resp = requests.get(url)
1✔
248
        resp.raise_for_status()
1✔
249
        data = resp.json().get("S3_BUCKETS") or {}
×
250
    except Exception as e:
1✔
251
        logger.warning(f"Failed to fetch bucket regions from Fence: {e}")
1✔
252

253
    regions = {k: v.get("region", "") for k, v in data.items()}
1✔
254

255
    if cached:
1✔
256
        cached.set("bucket_regions", regions)
1✔
257

258
    return regions
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc