• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

winter-telescope / winterdrp / 4119519683

pending completion
4119519683

push

github

GitHub
Lintify (#287)

36 of 36 new or added lines in 9 files covered. (100.0%)

5321 of 6332 relevant lines covered (84.03%)

1.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.09
/winterdrp/processors/database/base_database_processor.py
1
"""
2
Module containing base database processor class
3
"""
4
import logging
2✔
5
from abc import ABC
2✔
6
from pathlib import Path
2✔
7
from typing import Optional
2✔
8

9
from winterdrp.paths import max_n_cpu
2✔
10
from winterdrp.processors.base_processor import BaseProcessor
2✔
11
from winterdrp.processors.database.postgres import (
2✔
12
    POSTGRES_DUPLICATE_PROTOCOLS,
13
    DataBaseError,
14
    PostgresAdmin,
15
    PostgresUser,
16
)
17

18
logger = logging.getLogger(__name__)
2✔
19

20

21
class BaseDatabaseProcessor(BaseProcessor, ABC):
2✔
22
    """Base class for processors which interact with a postgres database"""
23

24
    max_n_cpu = min(max_n_cpu, 10)
2✔
25

26
    def __init__(
2✔
27
        self,
28
        db_name: str,
29
        db_table: str,
30
        schema_path: str | Path,
31
        pg_user: PostgresUser = PostgresUser(),
32
        pg_admin: PostgresAdmin = PostgresAdmin(),
33
        has_foreign_keys: bool = False,
34
        schema_dir: Optional[str | Path] = None,
35
        duplicate_protocol: str = "fail",
36
        q3c_bool: bool = False,
37
    ):
38
        super().__init__()
2✔
39
        self.db_name = db_name
2✔
40
        self.db_table = db_table
2✔
41

42
        self.schema_path = schema_path
2✔
43
        self.schema_dir = Path(schema_dir) if schema_dir is not None else None
2✔
44

45
        self.pg_user = pg_user
2✔
46
        self._pg_admin = pg_admin
2✔
47

48
        self.has_foreign_keys = has_foreign_keys
2✔
49
        self.db_check_bool = False
2✔
50
        self.duplicate_protocol = duplicate_protocol
2✔
51
        self.q3c = q3c_bool
2✔
52

53
        assert self.duplicate_protocol in POSTGRES_DUPLICATE_PROTOCOLS
2✔
54

55
    def db_exists(self) -> bool:
2✔
56
        """
57
        Checks if a database exists
58

59
        :return: boolean
60
        """
61
        return self._pg_admin.check_if_db_exists(db_name=self.db_name)
2✔
62

63
    def _make_db(self):
2✔
64
        """
65
        Creates a database
66

67
        :return:
68
        """
69
        self._pg_admin.create_db(db_name=self.db_name)
2✔
70

71
    def pg_user_exists(self):
2✔
72
        """
73
        Check if the specified db user exists
74

75
        :return: boolean
76
        """
77
        return self._pg_admin.check_if_user_exists(self.pg_user.db_user)
2✔
78

79
    def _make_pg_user(self):
2✔
80
        """
81
        Creates a new database user
82

83
        :return: None
84
        """
85
        self._pg_admin.create_new_user(
×
86
            new_db_user=self.pg_user.db_user, new_password=self.pg_user.db_password
87
        )
88

89
    def _grant_privileges_to_pg_user(self):
2✔
90
        """
91
        Grants db privilege to user
92

93
        :return: None
94
        """
95
        self._pg_admin.grant_privileges(self.db_name, db_user=self.pg_user.db_user)
2✔
96

97
    def table_exists(self):
2✔
98
        """
99
        Check if the database table exists
100

101
        :return: boolean
102
        """
103
        return self.pg_user.check_if_table_exists(
2✔
104
            db_name=self.db_name,
105
            db_table=self.db_table,
106
        )
107

108
    def _create_table(self, schema_path: str | Path):
2✔
109
        """
110
        Makes a database table using schema
111

112
        :param schema_path: Path of schema file
113
        :return: None
114
        """
115
        self.pg_user.create_table(
2✔
116
            schema_path,
117
            db_name=self.db_name,
118
        )
119

120
    def check_prerequisites(
2✔
121
        self,
122
    ):
123
        if not self.db_check_bool:
2✔
124
            self.check_database_setup()
2✔
125
            self.db_check_bool = True
2✔
126

127
    def check_database_setup(self):
2✔
128
        """
129
        Checks the database is set up, and if not, creates the database and tables
130

131
        :return: None
132
        """
133
        # If not specified, admin credentials equal user credentials
134
        # Check that the credentials work at all
135

136
        self._pg_admin.validate_credentials()
2✔
137

138
        # Create pg user for the first time
139

140
        if not self.pg_user_exists():
2✔
141
            self._make_pg_user()
×
142
        self.pg_user.validate_credentials()
2✔
143

144
        # Check there is a db
145

146
        if not self.db_exists():
2✔
147
            self._make_db()
2✔
148
            self._grant_privileges_to_pg_user()
2✔
149

150
            if self.has_foreign_keys:
2✔
151
                if self.schema_dir is None:
2✔
152
                    self.schema_dir = self.schema_path.parent
×
153
                    logger.warning(
×
154
                        f"Warning, integrated db setup with foreign keys requested, "
155
                        f"but no schema directory specified. "
156
                        f"Will search for schema files in {self.schema_dir} directory."
157
                    )
158

159
                logger.debug(
2✔
160
                    f"Looking in {self.schema_dir} directory to search for schema files"
161
                )
162

163
                self.pg_user.create_tables_from_schema(self.schema_dir, self.db_name)
2✔
164

165
                if self.q3c:
2✔
166
                    q3c_dir = self.schema_dir.joinpath("q3c")
×
167
                    q3c_indexes_file = q3c_dir.joinpath("q3c_indexes.sql")
×
168
                    self.pg_user.run_sql_command_from_file(
×
169
                        file_path=q3c_indexes_file, db_name=self.db_name
170
                    )
171
                    logger.debug("Created q3c_bool indexes")
×
172

173
        if not self.table_exists():
2✔
174
            self._create_table(self.schema_path)
2✔
175

176
            if self.q3c:
2✔
177
                q3c_dir = self.schema_dir.joinpath("q3c")
×
178
                table_q3c_path = q3c_dir.joinpath(f"q3c_{self.db_table}.sql")
×
179

180
                if not table_q3c_path.exists():
×
181
                    err = (
182
                        f"q3c_bool extension requested but no "
183
                        f"{table_q3c_path} file found. Please add it in."
184
                    )
185
                    logger.error(err)
186
                    raise DataBaseError(err)
187

188
                self.pg_user.run_sql_command_from_file(
×
189
                    file_path=table_q3c_path, db_name=self.db_name
190
                )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc