• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

winter-telescope / winterdrp / 3596687365

pending completion
3596687365

push

github

Robert Stein
Add black

4582 of 6121 relevant lines covered (74.86%)

0.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.81
/winterdrp/processors/database/base_database_processor.py
1
import numpy as np
1✔
2
import os
1✔
3
from winterdrp.processors.base_processor import BaseProcessor
1✔
4
import logging
1✔
5
from abc import ABC
1✔
6
from winterdrp.processors.database.postgres import check_if_db_exists, check_if_user_exists, check_if_table_exists,\
1✔
7
    create_db, create_table, create_new_user, grant_privileges, create_tables_from_schema, DataBaseError, run_sql_command_from_file,\
8
    pg_admin_user_key, pg_admin_pwd_key
9
from winterdrp.data import DataBatch
1✔
10

11

12
logger = logging.getLogger(__name__)
1✔
13

14

15
class BaseDatabaseProcessor(BaseProcessor, ABC):
1✔
16

17
    def __init__(
1✔
18
            self,
19
            db_name: str,
20
            db_table: str,
21
            schema_path: str,
22
            db_user: str = os.environ.get('DB_USER'),
23
            db_password: str = os.environ.get('DB_PWD'),
24
            full_setup: bool = False,
25
            schema_dir: str = None,
26
            duplicate_protocol: str = 'fail',
27
            q3c: bool = False,
28
            *args,
29
            **kwargs
30
    ):
31
        super().__init__(*args, **kwargs)
1✔
32
        self.db_name = db_name
1✔
33
        self.db_table = db_table
1✔
34
        self.db_user = db_user
1✔
35
        self.db_password = db_password
1✔
36
        self.full_setup = full_setup
1✔
37
        self.schema_path = schema_path
1✔
38
        self.schema_dir = schema_dir
1✔
39
        self.db_check = False
1✔
40
        self.duplicate_protocol = duplicate_protocol
1✔
41
        self.q3c = q3c
1✔
42
        assert self.duplicate_protocol in ['fail', 'ignore', 'replace']
1✔
43

44
    def db_exists(self):
1✔
45
        return check_if_db_exists(
1✔
46
            db_name=self.db_name
47
        )
48

49
    def make_db(self):
1✔
50
        create_db(
1✔
51
            db_name=self.db_name
52
        )
53

54
    def user_exists(self):
1✔
55
        return check_if_user_exists(self.db_user)
1✔
56

57
    def make_user(self):
1✔
58
        return create_new_user(
×
59
                               new_db_user=self.db_user,
60
                               new_password=self.db_password
61
        )
62

63
    def grant_privileges(self):
1✔
64
        return grant_privileges(self.db_name, self.db_user)
1✔
65

66
    def table_exists(self):
1✔
67
        return check_if_table_exists(db_name=self.db_name,
1✔
68
                                     db_table=self.db_table,
69
                                     db_user=self.db_user,
70
                                     password=self.db_password)
71

72
    def make_table(self, schema_path: str):
1✔
73
        create_table(
1✔
74
            schema_path,
75
            db_name=self.db_name,
76
            db_user=self.db_user,
77
            password=self.db_password
78
        )
79

80
    def check_prerequisites(
1✔
81
            self,
82
    ):
83
        if not self.db_check:
1✔
84
            self.check_database_setup()
1✔
85
            self.db_check = True
1✔
86

87
    def check_database_setup(self):
1✔
88

89
        if np.logical_and(self.db_exists(), np.invert(self.user_exists())):
1✔
90
            err = 'Database exists but user does not exist'
91
            logger.error(err)
92
            raise DataBaseError(err)
93

94
        if not self.db_exists():
1✔
95
            self.make_db()
1✔
96

97
            if not self.user_exists():
1✔
98
                self.make_user()
×
99

100
            self.grant_privileges()
1✔
101

102
            if self.full_setup:
1✔
103
                if self.schema_dir is None:
1✔
104
                    self.schema_dir = os.path.dirname(self.schema_path)
×
105
                    logger.warning(f'Warning, full db setup requested, but no schema directory specified. \
×
106
                    Will search for schema files in {self.schema_dir} directory.')
107
                logger.info(f'Looking in {self.schema_dir} directory to search for schema files')
1✔
108

109
                create_tables_from_schema(self.schema_dir, self.db_name, self.db_user, self.db_password)
1✔
110

111
                if self.q3c:
1✔
112
                    admin_user = os.environ.get(pg_admin_user_key)
×
113
                    admin_password = os.environ.get(pg_admin_pwd_key)
×
114
                    q3c_dir = os.path.join(self.schema_dir, 'q3c')
×
115
                    q3c_indexes_file = os.path.join(q3c_dir, 'q3c_indexes.sql')
×
116
                    run_sql_command_from_file(file_path=q3c_indexes_file,
×
117
                                              db_name=self.db_name,
118
                                              db_user=admin_user,
119
                                              password=admin_password)
120
                    logger.info(f"Created q3c indexes")
×
121

122
        if not self.table_exists():
1✔
123
            self.make_table(self.schema_path)
1✔
124
            if self.q3c:
1✔
125
                q3c_dir = os.path.join(self.schema_dir, 'q3c')
×
126
                table_q3c_path = os.path.join(q3c_dir, f'q3c_{self.db_table}.sql')
×
127
                admin_user = os.environ.get(pg_admin_user_key)
×
128
                admin_password = os.environ.get(pg_admin_pwd_key)
×
129
                if not os.path.exists(table_q3c_path):
×
130
                    err = f"q3c extension requested but no {table_q3c_path} file found. Please add it in."
131
                    logger.error(err)
132
                    raise DataBaseError(err)
133
                else:
134
                    run_sql_command_from_file(file_path=table_q3c_path,
×
135
                                              db_name=self.db_name,
136
                                              db_user=admin_user,
137
                                              password=admin_password)
138

139

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc