• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

winter-telescope / winterdrp / 3762336531

pending completion
3762336531

Pull #253

github

GitHub
Merge 2d1d633d0 into 7e1d7ae56
Pull Request #253: Update postgres DB, cap processors

493 of 493 new or added lines in 11 files covered. (100.0%)

4639 of 6135 relevant lines covered (75.62%)

0.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.09
/winterdrp/processors/database/base_database_processor.py
1
"""
2
Module containing base database processor class
3
"""
4
import logging
1✔
5
from abc import ABC
1✔
6
from pathlib import Path
1✔
7
from typing import Optional
1✔
8

9
from winterdrp.paths import max_n_cpu
1✔
10
from winterdrp.processors.base_processor import BaseProcessor
1✔
11
from winterdrp.processors.database.postgres import (
1✔
12
    POSTGRES_DUPLICATE_PROTOCOLS,
13
    DataBaseError,
14
    PostgresAdmin,
15
    PostgresUser,
16
)
17

18
logger = logging.getLogger(__name__)
1✔
19

20

21
class BaseDatabaseProcessor(BaseProcessor, ABC):
1✔
22
    """Base class for processors which interact with a postgres database"""
23

24
    max_n_cpu = min(max_n_cpu, 10)
1✔
25

26
    def __init__(
1✔
27
        self,
28
        db_name: str,
29
        db_table: str,
30
        schema_path: str | Path,
31
        pg_user: PostgresUser = PostgresUser(),
32
        pg_admin: PostgresAdmin = PostgresAdmin(),
33
        has_foreign_keys: bool = False,
34
        schema_dir: Optional[str | Path] = None,
35
        duplicate_protocol: str = "fail",
36
        q3c_bool: bool = False,
37
    ):
38
        super().__init__()
1✔
39
        self.db_name = db_name
1✔
40
        self.db_table = db_table
1✔
41

42
        self.schema_path = schema_path
1✔
43
        self.schema_dir = Path(schema_dir) if schema_dir is not None else None
1✔
44

45
        self.pg_user = pg_user
1✔
46
        self._pg_admin = pg_admin
1✔
47

48
        self.has_foreign_keys = has_foreign_keys
1✔
49
        self.db_check_bool = False
1✔
50
        self.duplicate_protocol = duplicate_protocol
1✔
51
        self.q3c = q3c_bool
1✔
52

53
        assert self.duplicate_protocol in POSTGRES_DUPLICATE_PROTOCOLS
1✔
54

55
    def db_exists(self) -> bool:
1✔
56
        """
57
        Checks if a database exists
58

59
        :return: boolean
60
        """
61
        return self._pg_admin.check_if_db_exists(db_name=self.db_name)
1✔
62

63
    def _make_db(self):
1✔
64
        """
65
        Creates a database
66

67
        :return:
68
        """
69
        self._pg_admin.create_db(db_name=self.db_name)
1✔
70

71
    def pg_user_exists(self):
1✔
72
        """
73
        Check if the specified db user exists
74

75
        :return: boolean
76
        """
77
        return self._pg_admin.check_if_user_exists(self.pg_user.db_user)
1✔
78

79
    def _make_pg_user(self):
1✔
80
        """
81
        Creates a new database user
82

83
        :return: None
84
        """
85
        self._pg_admin.create_new_user(
×
86
            new_db_user=self.pg_user.db_user, new_password=self.pg_user.db_password
87
        )
88

89
    def _grant_privileges_to_pg_user(self):
1✔
90
        """
91
        Grants db privilege to user
92

93
        :return: None
94
        """
95
        self._pg_admin.grant_privileges(self.db_name, db_user=self.pg_user.db_user)
1✔
96

97
    def table_exists(self):
1✔
98
        """
99
        Check if the database table exists
100

101
        :return: boolean
102
        """
103
        return self.pg_user.check_if_table_exists(
1✔
104
            db_name=self.db_name,
105
            db_table=self.db_table,
106
        )
107

108
    def _create_table(self, schema_path: str | Path):
1✔
109
        """
110
        Makes a database table using schema
111

112
        :param schema_path: Path of schema file
113
        :return: None
114
        """
115
        self.pg_user.create_table(
1✔
116
            schema_path,
117
            db_name=self.db_name,
118
        )
119

120
    def check_prerequisites(
1✔
121
        self,
122
    ):
123
        if not self.db_check_bool:
1✔
124
            self.check_database_setup()
1✔
125
            self.db_check_bool = True
1✔
126

127
    def check_database_setup(self):
1✔
128
        """
129
        Checks the database is set up, and if not, creates the database and tables
130

131
        :return: None
132
        """
133
        # If not specified, admin credentials equal user credentials
134
        # Check that the credentials work at all
135

136
        self._pg_admin.validate_credentials()
1✔
137

138
        # Create pg user for the first time
139

140
        if not self.pg_user_exists():
1✔
141
            self._make_pg_user()
×
142
        self.pg_user.validate_credentials()
1✔
143

144
        # Check there is a db
145

146
        if not self.db_exists():
1✔
147
            self._make_db()
1✔
148
            self._grant_privileges_to_pg_user()
1✔
149

150
            if self.has_foreign_keys:
1✔
151

152
                if self.schema_dir is None:
1✔
153
                    self.schema_dir = self.schema_path.parent
×
154
                    logger.warning(
×
155
                        f"Warning, integrated db setup with foreign keys requested, "
156
                        f"but no schema directory specified. "
157
                        f"Will search for schema files in {self.schema_dir} directory."
158
                    )
159

160
                logger.debug(
1✔
161
                    f"Looking in {self.schema_dir} directory to search for schema files"
162
                )
163

164
                self.pg_user.create_tables_from_schema(self.schema_dir, self.db_name)
1✔
165

166
                if self.q3c:
1✔
167
                    q3c_dir = self.schema_dir.joinpath("q3c")
×
168
                    q3c_indexes_file = q3c_dir.joinpath("q3c_indexes.sql")
×
169
                    self.pg_user.run_sql_command_from_file(
×
170
                        file_path=q3c_indexes_file, db_name=self.db_name
171
                    )
172
                    logger.debug("Created q3c_bool indexes")
×
173

174
        if not self.table_exists():
1✔
175

176
            self._create_table(self.schema_path)
1✔
177

178
            if self.q3c:
1✔
179
                q3c_dir = self.schema_dir.joinpath("q3c")
×
180
                table_q3c_path = q3c_dir.joinpath(f"q3c_{self.db_table}.sql")
×
181

182
                if not table_q3c_path.exists():
×
183
                    err = (
184
                        f"q3c_bool extension requested but no "
185
                        f"{table_q3c_path} file found. Please add it in."
186
                    )
187
                    logger.error(err)
188
                    raise DataBaseError(err)
189

190
                self.pg_user.run_sql_command_from_file(
×
191
                    file_path=table_q3c_path, db_name=self.db_name
192
                )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc