• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

winter-telescope / winterdrp / 3699777599

pending completion
3699777599

push

github

GitHub
Add initial mypy integration (#241)

223 of 223 new or added lines in 45 files covered. (100.0%)

4575 of 6107 relevant lines covered (74.91%)

0.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.81
/winterdrp/processors/database/base_database_processor.py
1
import logging
1✔
2
import os
1✔
3
from abc import ABC
1✔
4
from typing import Optional
1✔
5

6
import numpy as np
1✔
7

8
from winterdrp.processors.base_processor import BaseProcessor
1✔
9
from winterdrp.processors.database.postgres import (
1✔
10
    PG_ADMIN_PWD_KEY,
11
    PG_ADMIN_USER_KEY,
12
    DataBaseError,
13
    check_if_db_exists,
14
    check_if_table_exists,
15
    check_if_user_exists,
16
    create_db,
17
    create_new_user,
18
    create_table,
19
    create_tables_from_schema,
20
    grant_privileges,
21
    run_sql_command_from_file,
22
)
23

24
logger = logging.getLogger(__name__)
1✔
25

26

27
class BaseDatabaseProcessor(BaseProcessor, ABC):
1✔
28
    def __init__(
1✔
29
        self,
30
        db_name: str,
31
        db_table: str,
32
        schema_path: str,
33
        db_user: str = os.environ.get("DB_USER"),
34
        db_password: str = os.environ.get("DB_PWD"),
35
        full_setup: bool = False,
36
        schema_dir: Optional[str] = None,
37
        duplicate_protocol: str = "fail",
38
        q3c: bool = False,
39
    ):
40
        super().__init__()
1✔
41
        self.db_name = db_name
1✔
42
        self.db_table = db_table
1✔
43
        self.db_user = db_user
1✔
44
        self.db_password = db_password
1✔
45
        self.full_setup = full_setup
1✔
46
        self.schema_path = schema_path
1✔
47
        self.schema_dir = schema_dir
1✔
48
        self.db_check = False
1✔
49
        self.duplicate_protocol = duplicate_protocol
1✔
50
        self.q3c = q3c
1✔
51
        assert self.duplicate_protocol in ["fail", "ignore", "replace"]
1✔
52

53
    def db_exists(self):
1✔
54
        return check_if_db_exists(db_name=self.db_name)
1✔
55

56
    def make_db(self):
1✔
57
        create_db(db_name=self.db_name)
1✔
58

59
    def user_exists(self):
1✔
60
        return check_if_user_exists(self.db_user)
1✔
61

62
    def make_user(self):
1✔
63
        return create_new_user(new_db_user=self.db_user, new_password=self.db_password)
×
64

65
    def grant_privileges(self):
1✔
66
        return grant_privileges(self.db_name, self.db_user)
1✔
67

68
    def table_exists(self):
1✔
69
        return check_if_table_exists(
1✔
70
            db_name=self.db_name,
71
            db_table=self.db_table,
72
            db_user=self.db_user,
73
            password=self.db_password,
74
        )
75

76
    def make_table(self, schema_path: str):
1✔
77
        create_table(
1✔
78
            schema_path,
79
            db_name=self.db_name,
80
            db_user=self.db_user,
81
            password=self.db_password,
82
        )
83

84
    def check_prerequisites(
1✔
85
        self,
86
    ):
87
        if not self.db_check:
1✔
88
            self.check_database_setup()
1✔
89
            self.db_check = True
1✔
90

91
    def check_database_setup(self):
1✔
92

93
        if np.logical_and(self.db_exists(), np.invert(self.user_exists())):
1✔
94
            err = "Database exists but user does not exist"
95
            logger.error(err)
96
            raise DataBaseError(err)
97

98
        if not self.db_exists():
1✔
99
            self.make_db()
1✔
100

101
            if not self.user_exists():
1✔
102
                self.make_user()
×
103

104
            self.grant_privileges()
1✔
105

106
            if self.full_setup:
1✔
107
                if self.schema_dir is None:
1✔
108
                    self.schema_dir = os.path.dirname(self.schema_path)
×
109
                    logger.warning(
×
110
                        f"Warning, full db setup requested, but no schema directory specified. \
111
                    Will search for schema files in {self.schema_dir} directory."
112
                    )
113
                logger.info(
1✔
114
                    f"Looking in {self.schema_dir} directory to search for schema files"
115
                )
116

117
                create_tables_from_schema(
1✔
118
                    self.schema_dir, self.db_name, self.db_user, self.db_password
119
                )
120

121
                if self.q3c:
1✔
122
                    admin_user = os.environ.get(PG_ADMIN_USER_KEY)
×
123
                    admin_password = os.environ.get(PG_ADMIN_PWD_KEY)
×
124
                    q3c_dir = os.path.join(self.schema_dir, "q3c")
×
125
                    q3c_indexes_file = os.path.join(q3c_dir, "q3c_indexes.sql")
×
126
                    run_sql_command_from_file(
×
127
                        file_path=q3c_indexes_file,
128
                        db_name=self.db_name,
129
                        db_user=admin_user,
130
                        password=admin_password,
131
                    )
132
                    logger.info(f"Created q3c indexes")
×
133

134
        if not self.table_exists():
1✔
135
            self.make_table(self.schema_path)
1✔
136
            if self.q3c:
1✔
137
                q3c_dir = os.path.join(self.schema_dir, "q3c")
×
138
                table_q3c_path = os.path.join(q3c_dir, f"q3c_{self.db_table}.sql")
×
139
                admin_user = os.environ.get(PG_ADMIN_USER_KEY)
×
140
                admin_password = os.environ.get(PG_ADMIN_PWD_KEY)
×
141
                if not os.path.exists(table_q3c_path):
×
142
                    err = f"q3c extension requested but no {table_q3c_path} file found. Please add it in."
143
                    logger.error(err)
144
                    raise DataBaseError(err)
145
                else:
146
                    run_sql_command_from_file(
×
147
                        file_path=table_q3c_path,
148
                        db_name=self.db_name,
149
                        db_user=admin_user,
150
                        password=admin_password,
151
                    )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc