• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

winter-telescope / winterdrp / 3760811150

pending completion
3760811150

push

github

Robert Stein
Create DBQueryConstraints

80 of 80 new or added lines in 4 files covered. (100.0%)

4637 of 6146 relevant lines covered (75.45%)

0.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.2
/winterdrp/processors/database/postgres.py
1
"""
2
Module containing postgres util functions
3
"""
4
# pylint: disable=not-context-manager
5
import logging
1✔
6
import os
1✔
7
from glob import glob
1✔
8
from pathlib import Path
1✔
9
from typing import Optional
1✔
10

11
import numpy as np
1✔
12
import psycopg
1✔
13
from psycopg import errors
1✔
14
from psycopg.rows import Row
1✔
15

16
from winterdrp.data import DataBlock
1✔
17
from winterdrp.errors import ProcessorError
1✔
18
from winterdrp.processors.database.constraints import DBQueryConstraints
1✔
19

20
logger = logging.getLogger(__name__)
1✔
21

22
PG_ADMIN_USER_KEY = "PG_ADMIN_USER"
1✔
23
PG_ADMIN_PWD_KEY = "PG_ADMIN_PWD"
1✔
24

25
POSTGRES_DUPLICATE_PROTOCOLS = ["fail", "ignore", "replace"]
1✔
26

27

28
class DataBaseError(ProcessorError):
1✔
29
    """Error relating to postgres interactions"""
30

31

32
def validate_credentials(db_user: str, password: str, admin: bool = False):
1✔
33
    """
34
    Checks that user credentials exist
35

36
    :param db_user: Username
37
    :param password: Password
38
    :param admin: boolean whether the account is an admin one
39
    :return: None
40
    """
41

42
    if db_user is None:
1✔
43
        if admin:
×
44
            user = "admin_db_user"
×
45
            env_user_var = PG_ADMIN_USER_KEY
×
46
        else:
47
            user = "db_user"
×
48
            env_user_var = "DB_USER"
×
49
        err = (
50
            f"'{user}' is set as None. Please pass a db_user as an argument, "
51
            f"or set the environment variable '{env_user_var}'. Using "
52
        )
53
        logger.warning(err)
×
54
        raise DataBaseError(err)
55

56
    if password is None:
1✔
57
        if admin:
×
58
            pwd = "db_admin_password"
×
59
            env_pwd_var = PG_ADMIN_PWD_KEY
×
60
        else:
61
            pwd = "password"
×
62
            env_pwd_var = "DB_PWD"
×
63
        err = (
64
            f"'{pwd}' is set as None. Please pass a password as an argument, "
65
            f"or set the environment variable '{env_pwd_var}'."
66
        )
67
        logger.error(err)
68
        raise DataBaseError(err)
69

70

71
def create_db(db_name: str):
1✔
72
    """
73
    Creates a database using credentials
74

75
    :param db_name: DB to create
76
    :return: None
77
    """
78
    admin_user = os.environ.get(PG_ADMIN_USER_KEY)
1✔
79
    admin_password = os.environ.get(PG_ADMIN_PWD_KEY)
1✔
80

81
    validate_credentials(db_user=admin_user, password=admin_password)
1✔
82

83
    with psycopg.connect(
1✔
84
        f"dbname=postgres user={admin_user} password={admin_password}"
85
    ) as conn:
86
        conn.autocommit = True
1✔
87
        sql = f"""CREATE database {db_name}"""
1✔
88
        conn.execute(sql)
1✔
89
        logger.info(f"Created db {db_name}")
1✔
90

91

92
def run_sql_command_from_file(
1✔
93
    file_path: str | Path,
94
    db_name: str,
95
    db_user: str,
96
    password: str,
97
    admin: bool = False,
98
):
99
    """
100
    Execute SQL command from file
101

102
    :param file_path: File to execute
103
    :param db_name: name of database
104
    :param db_user: Postgres db user
105
    :param password: Postgress password
106
    :param admin: Whether to use an admin user
107
    :return: False
108
    """
109
    validate_credentials(db_name, db_user, admin)
×
110
    with psycopg.connect(
×
111
        f"dbname={db_name} user={db_user} password={password}"
112
    ) as conn:
113
        with open(file_path, "r", encoding="utf8") as sql_file:
×
114
            conn.execute(sql_file.read())
×
115

116
        logger.info(f"Executed sql commands from file {file_path}")
×
117

118

119
def create_table(schema_path: str | Path, db_name: str, db_user: str, password: str):
1✔
120
    """
121
    Create a database table
122

123
    :param schema_path: File to execute
124
    :param db_name: name of database
125
    :param db_user: Postgres db user
126
    :param password: Postgress password
127
    :return: False
128
    """
129
    validate_credentials(db_user, password)
1✔
130

131
    with psycopg.connect(
1✔
132
        f"dbname={db_name} user={db_user} password={password}"
133
    ) as conn:
134
        conn.autocommit = True
1✔
135
        with open(schema_path, "r", encoding="utf8") as schema_file:
1✔
136
            conn.execute(schema_file.read())
1✔
137

138
    logger.info(f"Created table from schema path {schema_path}")
1✔
139

140

141
def create_new_user(new_db_user: str, new_password: str):
1✔
142
    """
143
    Create a new postgres user
144

145
    :param new_db_user: new username
146
    :param new_password: new user password
147
    :return: None
148
    """
149
    admin_user = os.environ.get(PG_ADMIN_USER_KEY)
×
150
    admin_password = os.environ.get(PG_ADMIN_PWD_KEY)
×
151

152
    validate_credentials(new_db_user, new_password)
×
153
    validate_credentials(db_user=admin_user, password=admin_password, admin=True)
×
154

155
    with psycopg.connect(
×
156
        f"dbname=postgres user={admin_user} password={admin_password}"
157
    ) as conn:
158
        conn.autocommit = True
×
159
        command = f"""CREATE ROLE {new_db_user} WITH password '{new_password}' LOGIN;"""
×
160
        conn.execute(command)
×
161

162

163
def grant_privileges(db_name: str, db_user: str):
1✔
164
    """
165
    Grant privilege to user on database
166

167
    :param db_name: name of database
168
    :param db_user: username
169
    :return: None
170
    """
171
    admin_user = os.environ.get(PG_ADMIN_USER_KEY)
1✔
172
    admin_password = os.environ.get(PG_ADMIN_PWD_KEY)
1✔
173
    validate_credentials(admin_user, admin_password, admin=True)
1✔
174

175
    with psycopg.connect(
1✔
176
        f"dbname=postgres user={admin_user} password={admin_password}"
177
    ) as conn:
178
        conn.autocommit = True
1✔
179
        command = f"""GRANT ALL PRIVILEGES ON DATABASE {db_name} TO {db_user};"""
1✔
180
        conn.execute(command)
1✔
181

182

183
def check_if_exists(
1✔
184
    check_command: str,
185
    check_value: str,
186
    db_name: str = "postgres",
187
    db_user: str = os.environ.get(PG_ADMIN_USER_KEY),
188
    password: str = os.environ.get(PG_ADMIN_PWD_KEY),
189
) -> bool:
190
    """
191
    Check if a user account exists
192

193
    :param check_command if a user/database/table exists
194
    :param check_value: username to check
195
    :param db_name: name of database to query
196
    :param db_user: username to use for check
197
    :param password: password to use for check
198
    :return: boolean
199
    """
200
    validate_credentials(db_user, password)
1✔
201

202
    with psycopg.connect(
1✔
203
        f"dbname={db_name} user={db_user} password={password}"
204
    ) as conn:
205
        conn.autocommit = True
1✔
206
        data = conn.execute(check_command).fetchall()
1✔
207
    existing_user_names = [x[0] for x in data]
1✔
208
    logger.debug(f"Found the following values: {existing_user_names}")
1✔
209

210
    return check_value in existing_user_names
1✔
211

212

213
def check_if_user_exists(
1✔
214
    user_name: str,
215
    db_user: str = os.environ.get(PG_ADMIN_USER_KEY),
216
    password: str = os.environ.get(PG_ADMIN_PWD_KEY),
217
) -> bool:
218
    """
219
    Check if a user account exists
220

221
    :param user_name: username to check
222
    :param db_user: username to use for check
223
    :param password: password to use for check
224
    :return: boolean
225
    """
226
    check_command = """SELECT usename FROM pg_user;"""
1✔
227

228
    user_exist_bool = check_if_exists(
1✔
229
        check_command=check_command,
230
        check_value=user_name,
231
        db_user=db_user,
232
        password=password,
233
    )
234

235
    logger.debug(
1✔
236
        f"User '{user_name}' {['does not exist', 'already exists'][user_exist_bool]}"
237
    )
238
    return user_exist_bool
1✔
239

240

241
def check_if_db_exists(
1✔
242
    db_name: str,
243
    db_user: str = os.environ.get(PG_ADMIN_USER_KEY),
244
    password: str = os.environ.get(PG_ADMIN_PWD_KEY),
245
) -> bool:
246
    """
247
    Check if a user account exists
248

249
    :param db_name: database to check
250
    :param db_user: username to use for check
251
    :param password: password to use for check
252
    :return: boolean
253
    """
254

255
    check_command = """SELECT datname FROM pg_database;"""
1✔
256

257
    db_exist_bool = check_if_exists(
1✔
258
        check_command=check_command,
259
        check_value=db_name,
260
        db_user=db_user,
261
        password=password,
262
        db_name="postgres",
263
    )
264

265
    logger.debug(
1✔
266
        f"Database '{db_name}' {['does not exist', 'already exists'][db_exist_bool]}"
267
    )
268

269
    return db_exist_bool
1✔
270

271

272
def check_if_table_exists(
1✔
273
    db_name: str, db_table: str, db_user: str, password: str
274
) -> bool:
275
    """
276
    Check if a db table account exists
277

278
    :param db_name: database to check
279
    :param db_table: table to check
280
    :param db_user: username to use for check
281
    :param password: password to use for check
282
    :return: boolean
283
    """
284

285
    check_command = (
1✔
286
        "SELECT table_name FROM information_schema.tables "
287
        "WHERE table_schema='public';"
288
    )
289

290
    table_exist_bool = check_if_exists(
1✔
291
        check_command=check_command,
292
        check_value=db_table,
293
        db_name=db_name,
294
        db_user=db_user,
295
        password=password,
296
    )
297

298
    logger.debug(
1✔
299
        f"Database table '{db_table}' "
300
        f"{['does not exist', 'already exists'][table_exist_bool]}"
301
    )
302

303
    return table_exist_bool
1✔
304

305

306
def get_foreign_tables_list(schema_files: list[str]) -> np.ndarray:
1✔
307
    """
308
    Returns a list of foreign tables
309

310
    :param schema_files: List of schema files to read
311
    :return: Returns list of foreign keys in schema
312
    """
313
    foreign_tables_list = []
1✔
314
    for schema_file_path in schema_files:
1✔
315
        table_names = []
1✔
316
        with open(schema_file_path, "r", encoding="utf8") as schema_file:
1✔
317
            schema = schema_file.read()
1✔
318
        if "FOREIGN KEY" not in schema:
1✔
319
            pass
1✔
320
        else:
321
            schema = schema.replace("\n", "")
1✔
322
            schema = schema.replace("\t", "")
1✔
323
            schema_split = np.array(schema.split(","))
1✔
324
            fk_rows = np.array(["FOREIGN KEY" in x for x in schema_split])
1✔
325
            for row in schema_split[fk_rows]:
1✔
326
                words = np.array(row.split(" "))
1✔
327
                refmask = np.array(["REFERENCES" in x for x in words])
1✔
328
                idx = np.where(refmask)[0][0] + 1
1✔
329
                tablename = words[idx].split("(")[0]
1✔
330
                table_names.append(tablename)
1✔
331
        foreign_tables_list.append(np.array(table_names))
1✔
332
    return np.array(foreign_tables_list)
1✔
333

334

335
def get_ordered_schema_list(schema_files: list[str]) -> list[str]:
1✔
336
    """
337
    Returns an ordered list of schema, ensuring connected database tables
338
    are created in the right order
339

340
    :param schema_files: List of schema files to read
341
    :return: Returns list of foreign keys in schema
342
    """
343
    foreign_tables_list = get_foreign_tables_list(schema_files)
1✔
344
    ordered_schema_list = []
1✔
345
    tables_created = []
1✔
346
    schema_table_names = [x.split("/")[-1].split(".sql")[0] for x in schema_files]
1✔
347
    while len(tables_created) < len(schema_files):
1✔
348
        for ind, schema_file in enumerate(schema_files):
1✔
349
            table_name = schema_table_names[ind]
1✔
350
            if table_name in tables_created:
1✔
351
                pass
1✔
352
            else:
353
                foreign_tables = foreign_tables_list[ind]
1✔
354
                if len(foreign_tables) == 0:
1✔
355
                    ordered_schema_list.append(schema_file)
1✔
356
                    tables_created.append(table_name)
1✔
357
                else:
358
                    if np.all(np.isin(foreign_tables, tables_created)):
1✔
359
                        ordered_schema_list.append(schema_file)
1✔
360
                        tables_created.append(table_name)
1✔
361

362
    return ordered_schema_list
1✔
363

364

365
def create_tables_from_schema(
1✔
366
    schema_dir: str | Path,
367
    db_name: str,
368
    db_user: str = os.environ.get(PG_ADMIN_USER_KEY),
369
    password: str = os.environ.get(PG_ADMIN_PWD_KEY),
370
):
371
    """
372
    Creates a db with tables, as described by .sql files in a directory
373

374
    :param schema_dir: Directory containing schema files
375
    :param db_name: name of DB
376
    :param db_user: db user
377
    :param password: db password
378
    :return: None
379
    """
380
    schema_files = glob(f"{schema_dir}/*.sql")
1✔
381
    ordered_schema_files = get_ordered_schema_list(schema_files)
1✔
382
    logger.info(f"Creating the following tables - {ordered_schema_files}")
1✔
383
    for schema_file in ordered_schema_files:
1✔
384
        create_table(
1✔
385
            schema_path=schema_file, db_name=db_name, db_user=db_user, password=password
386
        )
387

388

389
def export_to_db(
1✔
390
    value_dict: dict | DataBlock,
391
    db_name: str,
392
    db_table: str,
393
    db_user: str,
394
    password: str,
395
    duplicate_protocol: str = "fail",
396
) -> tuple[list, list]:
397
    """
398
    Export a list of fields in value dict to a batabase table
399

400
    :param value_dict: dictionary/DataBlock/other dictonary-like object to export
401
    :param db_name: name of db to export to
402
    :param db_table: table of DB to export to
403
    :param db_user: db user
404
    :param password: password
405
    :param duplicate_protocol: protocol for handling duplicates,
406
        in "fail"/"ignore"/"replace"
407
    :return:
408
    """
409

410
    assert duplicate_protocol in POSTGRES_DUPLICATE_PROTOCOLS
1✔
411

412
    with psycopg.connect(
1✔
413
        f"dbname={db_name} user={db_user} password={password}"
414
    ) as conn:
415
        conn.autocommit = True
1✔
416

417
        sql_query = f"""
1✔
418
        SELECT Col.Column_Name from
419
            INFORMATION_SCHEMA.TABLE_CONSTRAINTS Tab,
420
            INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE Col
421
        WHERE
422
            Col.Constraint_Name = Tab.Constraint_Name
423
            AND Col.Table_Name = Tab.Table_Name
424
            AND Constraint_Type = 'PRIMARY KEY'
425
            AND Col.Table_Name = '{db_table}'
426
        """
427
        serial_keys, serial_key_values = [], []
1✔
428
        with conn.execute(sql_query) as cursor:
1✔
429

430
            primary_key = [x[0] for x in cursor.fetchall()]
1✔
431
            serial_keys = list(
1✔
432
                get_sequence_keys_from_table(db_table, db_name, db_user, password)
433
            )
434
            logger.debug(serial_keys)
1✔
435
            colnames = [
1✔
436
                desc[0]
437
                for desc in conn.execute(
438
                    f"SELECT * FROM {db_table} LIMIT 1"
439
                ).description
440
                if desc[0] not in serial_keys
441
            ]
442

443
            colnames_str = ""
1✔
444
            for column in colnames:
1✔
445
                colnames_str += f'"{column}",'
1✔
446
            colnames_str = colnames_str[:-1]
1✔
447
            txt = f"INSERT INTO {db_table} ({colnames_str}) VALUES ("
1✔
448

449
            for char in ["[", "]", "'"]:
1✔
450
                txt = txt.replace(char, "")
1✔
451

452
            for column in colnames:
1✔
453
                txt += f"'{str(value_dict[column])}', "
1✔
454

455
            txt = txt + ") "
1✔
456
            txt = txt.replace(", )", ")")
1✔
457

458
            if len(serial_keys) > 0:
1✔
459
                txt += "RETURNING "
1✔
460
                for key in serial_keys:
1✔
461
                    txt += f"{key},"
1✔
462
                txt += ";"
1✔
463
                txt = txt.replace(",;", ";")
1✔
464

465
            logger.debug(txt)
1✔
466
            command = txt
1✔
467

468
            try:
1✔
469
                cursor.execute(command)
1✔
470
                if len(serial_keys) > 0:
1✔
471
                    serial_key_values = cursor.fetchall()[0]
1✔
472
                else:
473
                    serial_key_values = []
1✔
474

475
            except errors.UniqueViolation as exc:
×
476
                primary_key_values = [value_dict[x] for x in primary_key]
×
477

478
                if duplicate_protocol == "fail":
×
479
                    err = (
480
                        f"Duplicate error, entry with "
481
                        f"{primary_key}={primary_key_values} "
482
                        f"already exists in {db_name}."
483
                    )
484
                    logger.error(err)
485
                    raise errors.UniqueViolation from exc
486

487
                if duplicate_protocol == "ignore":
×
488
                    logger.debug(
×
489
                        f"Found duplicate entry with "
490
                        f"{primary_key}={primary_key_values} in {db_name}. Ignoring."
491
                    )
492
                elif duplicate_protocol == "replace":
×
493
                    logger.debug(
×
494
                        f"Updating duplicate entry with "
495
                        f"{primary_key}={primary_key_values} in {db_name}."
496
                    )
497

498
                    db_constraints = DBQueryConstraints(
×
499
                        columns=primary_key,
500
                        accepted_values=primary_key_values,
501
                    )
502

503
                    update_colnames = []
×
504
                    for column in colnames:
×
505
                        if column not in primary_key:
×
506
                            update_colnames.append(column)
×
507

508
                    serial_key_values = modify_db_entry(
×
509
                        db_constraints=db_constraints,
510
                        value_dict=value_dict,
511
                        db_alter_columns=update_colnames,
512
                        db_table=db_table,
513
                        db_name=db_name,
514
                        db_user=db_user,
515
                        password=password,
516
                        return_columns=serial_keys,
517
                    )
518

519
    return serial_keys, serial_key_values
1✔
520

521

522
def import_from_db(
1✔
523
    db_name: str,
524
    db_table: str,
525
    db_output_columns: str | list[str],
526
    output_alias_map: Optional[str | list[str]] = None,
527
    db_user: str = os.environ.get(PG_ADMIN_USER_KEY),
528
    password: str = os.environ.get(PG_ADMIN_PWD_KEY),
529
    max_num_results: Optional[int] = None,
530
    db_constraints: Optional[DBQueryConstraints] = None,
531
) -> list[dict]:
532
    """Query an SQL database with constraints, and return a list of dictionaries.
533
    One dictionary per entry returned from the query.
534

535
    Parameters
536
    ----------
537
    db_name: Name of database to query
538
    db_table: Name of database table to query
539
    db_output_columns: Name(s) of columns to return for matched database entries
540
    output_alias_map: Alias to assign for each output column
541
    db_user: Username for database
542
    password: password for database
543
    max_num_results: Maximum number of results to return
544

545
    Returns
546
    -------
547
    A list of dictionaries (one per entry)
548
    """
549

550
    if not isinstance(db_output_columns, list):
×
551
        db_output_columns = [db_output_columns]
×
552

553
    if output_alias_map is None:
×
554
        output_alias_map = db_output_columns
×
555

556
    if not isinstance(output_alias_map, list):
×
557
        output_alias_map = [output_alias_map]
×
558

559
    assert len(output_alias_map) == len(db_output_columns)
×
560

561
    all_query_res = []
×
562

563
    if db_constraints is not None:
×
564
        constraints = db_constraints.parse_constraints()
×
565
    else:
566
        constraints = ""
×
567

568
    with psycopg.connect(
×
569
        f"dbname={db_name} user={db_user} password={password}"
570
    ) as conn:
571
        conn.autocommit = True
×
572
        sql_query = f"""
×
573
        SELECT {', '.join(db_output_columns)} from {db_table}
574
            WHERE {constraints}
575
        """
576

577
        if max_num_results is not None:
×
578
            sql_query += f" LIMIT {max_num_results}"
×
579

580
        sql_query += ";"
×
581

582
        logger.debug(f"Query: {sql_query}")
×
583

584
        with conn.execute(sql_query) as cursor:
×
585
            query_output = cursor.fetchall()
×
586

587
        for entry in query_output:
×
588

589
            assert len(entry) == len(db_output_columns)
×
590

591
            query_res = {}
×
592

593
            for i, key in enumerate(output_alias_map):
×
594
                query_res[key] = entry[i]
×
595

596
            all_query_res.append(query_res)
×
597

598
    return all_query_res
×
599

600

601
def execute_query(
1✔
602
    sql_query: str, db_name: str, db_user: str, password: str
603
) -> list[Row]:
604
    """
605
    Generically execute SQL query
606

607
    :param sql_query: SQL query to execute
608
    :param db_name: db name
609
    :param db_user: db user
610
    :param password: db password
611
    :return: rows from db
612
    """
613
    with psycopg.connect(
1✔
614
        f"dbname={db_name} user={db_user} password={password}"
615
    ) as conn:
616
        conn.autocommit = True
1✔
617
        logger.debug(f"Query: {sql_query}")
1✔
618

619
        with conn.execute(sql_query) as cursor:
1✔
620
            query_output = cursor.fetchall()
1✔
621

622
    return query_output
1✔
623

624

625
def crossmatch_with_database(
1✔
626
    db_name: str,
627
    db_table: str,
628
    db_output_columns: str | list[str],
629
    ra: float,
630
    dec: float,
631
    crossmatch_radius_arcsec: float,
632
    output_alias_map: Optional[dict] = None,
633
    ra_field_name: str = "ra",
634
    dec_field_name: str = "dec",
635
    query_distance_bool: bool = False,
636
    q3c_bool: bool = False,
637
    query_constraints: Optional[DBQueryConstraints] = None,
638
    order_field_name: Optional[str] = None,
639
    num_limit: Optional[int] = None,
640
    db_user: str = os.environ.get(PG_ADMIN_USER_KEY),
641
    db_password: str = os.environ.get(PG_ADMIN_PWD_KEY),
642
) -> list[dict]:
643
    """
644
    Crossmatch a given spatial position (ra/dec) with sources in a database,
645
    and returns a list of matches
646

647
    :param db_name: name of db to query
648
    :param db_table: name of db table
649
    :param db_output_columns: columns to return
650
    :param output_alias_map: mapping for renaming columns
651
    :param ra: RA
652
    :param dec: dec
653
    :param crossmatch_radius_arcsec: radius for crossmatch
654
    :param ra_field_name: name of ra column in database
655
    :param dec_field_name: name of dec column in database
656
    :param query_distance_bool: boolean where to return crossmatch distance
657
    :param q3c_bool: boolean whether to use q3c_bool
658
    :param order_field_name: field to order result by
659
    :param num_limit: limit on sql query
660
    :param db_user: db user
661
    :param db_password: db password
662
    :return: list of query result dictionaries
663
    """
664

665
    if output_alias_map is None:
×
666
        output_alias_map = {}
×
667
        for col in db_output_columns:
×
668
            output_alias_map[col] = col
×
669

670
    crossmatch_radius_deg = crossmatch_radius_arcsec / 3600.0
×
671

672
    if q3c_bool:
×
673
        constraints = (
×
674
            f"q3c_radial_query({ra_field_name},{dec_field_name},"
675
            f"{ra},{dec},{crossmatch_radius_deg}) "
676
        )
677
    else:
678
        ra_min = ra - crossmatch_radius_deg
×
679
        ra_max = ra + crossmatch_radius_deg
×
680
        dec_min = dec - crossmatch_radius_deg
×
681
        dec_max = dec + crossmatch_radius_deg
×
682
        constraints = (
×
683
            f" {ra_field_name} between {ra_min} and {ra_max} AND "
684
            f"{dec_field_name} between {dec_min} and {dec_max} "
685
        )
686

687
    if query_constraints is not None:
×
688
        constraints += f"""AND {query_constraints.parse_constraints()}"""
×
689

690
    select = f""" {'"' + '","'.join(db_output_columns) + '"'}"""
×
691
    if query_distance_bool:
×
692
        if q3c_bool:
×
693
            select = (
×
694
                f"""q3c_dist({ra_field_name},{dec_field_name},{ra},{dec}) AS xdist,"""
695
                + select
696
            )
697
        else:
698
            select = f"""{ra_field_name} - ra AS xdist,""" + select
×
699

700
    query = f"""SELECT {select} FROM {db_table} WHERE {constraints}"""
×
701

702
    if order_field_name is not None:
×
703
        query += f""" ORDER BY {order_field_name}"""
×
704
    if num_limit is not None:
×
705
        query += f""" LIMIT {num_limit}"""
×
706

707
    query += ";"
×
708

709
    query_output = execute_query(query, db_name, db_user, db_password)
×
710
    all_query_res = []
×
711

712
    for entry in query_output:
×
713
        if not query_distance_bool:
×
714
            assert len(entry) == len(db_output_columns)
×
715
        else:
716
            assert len(entry) == len(db_output_columns) + 1
×
717
        query_res = {}
×
718
        for i, key in enumerate(output_alias_map):
×
719
            query_res[key] = entry[i]
×
720
            if query_distance_bool:
×
721
                query_res["dist"] = entry["xdist"]
×
722
        all_query_res.append(query_res)
×
723
    return all_query_res
×
724

725

726
def get_sequence_keys_from_table(
1✔
727
    db_table: str, db_name: str, db_user: str, password: str
728
) -> np.ndarray:
729
    """
730
    Gets sequence keys of db table
731

732
    :param db_table: database table to use
733
    :param db_name: dataname name
734
    :param db_user: db user
735
    :param password: db password
736
    :return: numpy array of keys
737
    """
738
    with psycopg.connect(
1✔
739
        f"dbname={db_name} user={db_user} password={password}"
740
    ) as conn:
741
        conn.autocommit = True
1✔
742
        sequences = [
1✔
743
            x[0]
744
            for x in conn.execute(
745
                "SELECT c.relname FROM pg_class c WHERE c.relkind = 'S';"
746
            ).fetchall()
747
        ]
748
        seq_tables = np.array([x.split("_")[0] for x in sequences])
1✔
749
        seq_columns = np.array([x.split("_")[1] for x in sequences])
1✔
750
        table_sequence_keys = seq_columns[(seq_tables == db_table)]
1✔
751
    return table_sequence_keys
1✔
752

753

754
def modify_db_entry(
1✔
755
    db_name: str,
756
    db_table: str,
757
    db_constraints: DBQueryConstraints,
758
    value_dict: dict | DataBlock,
759
    db_alter_columns: str | list[str],
760
    return_columns: Optional[str | list[str]] = None,
761
    db_user: str = os.environ.get(PG_ADMIN_USER_KEY),
762
    password: str = os.environ.get(PG_ADMIN_PWD_KEY),
763
) -> list[Row]:
764
    """
765
    Modify a db entry
766

767
    :param db_name: name of db
768
    :param db_table: Name of table
769
    :param value_dict: dict-like object to provide updated values
770
    :param db_alter_columns: columns to alter in db
771
    :param return_columns: columns to return
772
    :param db_user: db user
773
    :param password: db password
774
    :return: db query (return columns)
775
    """
776

777
    if not isinstance(db_alter_columns, list):
1✔
778
        db_alter_columns = [db_alter_columns]
1✔
779

780
    if return_columns is None:
1✔
781
        return_columns = db_alter_columns
1✔
782
    if not isinstance(return_columns, list):
1✔
783
        return_columns = [return_columns]
×
784

785
    constraints = db_constraints.parse_constraints()
1✔
786

787
    with psycopg.connect(
1✔
788
        f"dbname={db_name} user={db_user} password={password}"
789
    ) as conn:
790
        conn.autocommit = True
1✔
791

792
        db_alter_values = [str(value_dict[c]) for c in db_alter_columns]
1✔
793

794
        alter_values_txt = [
1✔
795
            f"{db_alter_columns[ind]}='{db_alter_values[ind]}'"
796
            for ind in range(len(db_alter_columns))
797
        ]
798

799
        sql_query = f"""
1✔
800
                    UPDATE {db_table} SET {', '.join(alter_values_txt)} WHERE {constraints}
801
                    """
802
        if len(return_columns) > 0:
1✔
803
            logger.debug(return_columns)
1✔
804
            sql_query += f""" RETURNING {', '.join(return_columns)}"""
1✔
805
        sql_query += ";"
1✔
806
        query_output = execute_query(sql_query, db_name, db_user, password)
1✔
807

808
    return query_output
1✔
809

810

811
def get_column_names_from_schema(schema_file_path: str | Path) -> list[str]:
1✔
812
    """
813
    Get column names from a schema file
814

815
    :param schema_file_path: file to read
816
    :return: list of columns
817
    """
818
    with open(schema_file_path, "r", encoding="utf8") as schema_file:
1✔
819
        dat = schema_file.read()
1✔
820
    dat = dat.split(");")[0]
1✔
821
    dat = dat.split("\n")[1:-1]
1✔
822
    pkstrip = [x.strip(",").split("PRIMARY KEY")[0].strip() for x in dat]
1✔
823
    fkstrip = [x.strip(",").split("FOREIGN KEY")[0].strip() for x in pkstrip]
1✔
824
    colnames = [x.split(" ")[0].strip('"') for x in fkstrip]
1✔
825
    return colnames
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc