• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

winter-telescope / winterdrp / 3596924942

pending completion
3596924942

Pull #224

github

GitHub
Merge 0ea201d54 into 00fbdf6f7
Pull Request #224: Code with Style

1490 of 1490 new or added lines in 93 files covered. (100.0%)

4571 of 6109 relevant lines covered (74.82%)

0.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.81
/winterdrp/processors/database/base_database_processor.py
1
import logging
1✔
2
import os
1✔
3
from abc import ABC
1✔
4

5
import numpy as np
1✔
6

7
from winterdrp.data import DataBatch
1✔
8
from winterdrp.processors.base_processor import BaseProcessor
1✔
9
from winterdrp.processors.database.postgres import (
1✔
10
    DataBaseError,
11
    check_if_db_exists,
12
    check_if_table_exists,
13
    check_if_user_exists,
14
    create_db,
15
    create_new_user,
16
    create_table,
17
    create_tables_from_schema,
18
    grant_privileges,
19
    pg_admin_pwd_key,
20
    pg_admin_user_key,
21
    run_sql_command_from_file,
22
)
23

24
logger = logging.getLogger(__name__)
1✔
25

26

27
class BaseDatabaseProcessor(BaseProcessor, ABC):
1✔
28
    def __init__(
1✔
29
        self,
30
        db_name: str,
31
        db_table: str,
32
        schema_path: str,
33
        db_user: str = os.environ.get("DB_USER"),
34
        db_password: str = os.environ.get("DB_PWD"),
35
        full_setup: bool = False,
36
        schema_dir: str = None,
37
        duplicate_protocol: str = "fail",
38
        q3c: bool = False,
39
        *args,
40
        **kwargs,
41
    ):
42
        super().__init__(*args, **kwargs)
1✔
43
        self.db_name = db_name
1✔
44
        self.db_table = db_table
1✔
45
        self.db_user = db_user
1✔
46
        self.db_password = db_password
1✔
47
        self.full_setup = full_setup
1✔
48
        self.schema_path = schema_path
1✔
49
        self.schema_dir = schema_dir
1✔
50
        self.db_check = False
1✔
51
        self.duplicate_protocol = duplicate_protocol
1✔
52
        self.q3c = q3c
1✔
53
        assert self.duplicate_protocol in ["fail", "ignore", "replace"]
1✔
54

55
    def db_exists(self):
1✔
56
        return check_if_db_exists(db_name=self.db_name)
1✔
57

58
    def make_db(self):
1✔
59
        create_db(db_name=self.db_name)
1✔
60

61
    def user_exists(self):
1✔
62
        return check_if_user_exists(self.db_user)
1✔
63

64
    def make_user(self):
1✔
65
        return create_new_user(new_db_user=self.db_user, new_password=self.db_password)
×
66

67
    def grant_privileges(self):
1✔
68
        return grant_privileges(self.db_name, self.db_user)
1✔
69

70
    def table_exists(self):
1✔
71
        return check_if_table_exists(
1✔
72
            db_name=self.db_name,
73
            db_table=self.db_table,
74
            db_user=self.db_user,
75
            password=self.db_password,
76
        )
77

78
    def make_table(self, schema_path: str):
1✔
79
        create_table(
1✔
80
            schema_path,
81
            db_name=self.db_name,
82
            db_user=self.db_user,
83
            password=self.db_password,
84
        )
85

86
    def check_prerequisites(
1✔
87
        self,
88
    ):
89
        if not self.db_check:
1✔
90
            self.check_database_setup()
1✔
91
            self.db_check = True
1✔
92

93
    def check_database_setup(self):
1✔
94

95
        if np.logical_and(self.db_exists(), np.invert(self.user_exists())):
1✔
96
            err = "Database exists but user does not exist"
97
            logger.error(err)
98
            raise DataBaseError(err)
99

100
        if not self.db_exists():
1✔
101
            self.make_db()
1✔
102

103
            if not self.user_exists():
1✔
104
                self.make_user()
×
105

106
            self.grant_privileges()
1✔
107

108
            if self.full_setup:
1✔
109
                if self.schema_dir is None:
1✔
110
                    self.schema_dir = os.path.dirname(self.schema_path)
×
111
                    logger.warning(
×
112
                        f"Warning, full db setup requested, but no schema directory specified. \
113
                    Will search for schema files in {self.schema_dir} directory."
114
                    )
115
                logger.info(
1✔
116
                    f"Looking in {self.schema_dir} directory to search for schema files"
117
                )
118

119
                create_tables_from_schema(
1✔
120
                    self.schema_dir, self.db_name, self.db_user, self.db_password
121
                )
122

123
                if self.q3c:
1✔
124
                    admin_user = os.environ.get(pg_admin_user_key)
×
125
                    admin_password = os.environ.get(pg_admin_pwd_key)
×
126
                    q3c_dir = os.path.join(self.schema_dir, "q3c")
×
127
                    q3c_indexes_file = os.path.join(q3c_dir, "q3c_indexes.sql")
×
128
                    run_sql_command_from_file(
×
129
                        file_path=q3c_indexes_file,
130
                        db_name=self.db_name,
131
                        db_user=admin_user,
132
                        password=admin_password,
133
                    )
134
                    logger.info(f"Created q3c indexes")
×
135

136
        if not self.table_exists():
1✔
137
            self.make_table(self.schema_path)
1✔
138
            if self.q3c:
1✔
139
                q3c_dir = os.path.join(self.schema_dir, "q3c")
×
140
                table_q3c_path = os.path.join(q3c_dir, f"q3c_{self.db_table}.sql")
×
141
                admin_user = os.environ.get(pg_admin_user_key)
×
142
                admin_password = os.environ.get(pg_admin_pwd_key)
×
143
                if not os.path.exists(table_q3c_path):
×
144
                    err = f"q3c extension requested but no {table_q3c_path} file found. Please add it in."
145
                    logger.error(err)
146
                    raise DataBaseError(err)
147
                else:
148
                    run_sql_command_from_file(
×
149
                        file_path=table_q3c_path,
150
                        db_name=self.db_name,
151
                        db_user=admin_user,
152
                        password=admin_password,
153
                    )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc