• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

funilrys / PyFunceble / 3866429906

pending completion
3866429906

push

github-actions

funilrys
fixup! Introduction of the support for postgresql DBs.

11160 of 11683 relevant lines covered (95.52%)

14.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

34.29
/PyFunceble/dataset/sql_base.py
1
"""
2
The tool to check the availability or syntax of domain, IP or URL.
3

4
::
5

6

7
    ██████╗ ██╗   ██╗███████╗██╗   ██╗███╗   ██╗ ██████╗███████╗██████╗ ██╗     ███████╗
8
    ██╔══██╗╚██╗ ██╔╝██╔════╝██║   ██║████╗  ██║██╔════╝██╔════╝██╔══██╗██║     ██╔════╝
9
    ██████╔╝ ╚████╔╝ █████╗  ██║   ██║██╔██╗ ██║██║     █████╗  ██████╔╝██║     █████╗
10
    ██╔═══╝   ╚██╔╝  ██╔══╝  ██║   ██║██║╚██╗██║██║     ██╔══╝  ██╔══██╗██║     ██╔══╝
11
    ██║        ██║   ██║     ╚██████╔╝██║ ╚████║╚██████╗███████╗██████╔╝███████╗███████╗
12
    ╚═╝        ╚═╝   ╚═╝      ╚═════╝ ╚═╝  ╚═══╝ ╚═════╝╚══════╝╚═════╝ ╚══════╝╚══════╝
13

14
Provides the base of all Mariadb stored datasets.
15

16
Author:
17
    Nissar Chababy, @funilrys, contactTATAfunilrysTODTODcom
18

19
Special thanks:
20
    https://pyfunceble.github.io/#/special-thanks
21

22
Contributors:
23
    https://pyfunceble.github.io/#/contributors
24

25
Project link:
26
    https://github.com/funilrys/PyFunceble
27

28
Project documentation:
29
    https://pyfunceble.readthedocs.io/en/dev/
30

31
Project homepage:
32
    https://pyfunceble.github.io/
33

34
License:
35
::
36

37

38
    Copyright 2017, 2018, 2019, 2020, 2022 Nissar Chababy
39

40
    Licensed under the Apache License, Version 2.0 (the "License");
41
    you may not use this file except in compliance with the License.
42
    You may obtain a copy of the License at
43

44
        http://www.apache.org/licenses/LICENSE-2.0
45

46
    Unless required by applicable law or agreed to in writing, software
47
    distributed under the License is distributed on an "AS IS" BASIS,
48
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
49
    See the License for the specific language governing permissions and
50
    limitations under the License.
51
"""
52

53
import functools
15✔
54
from datetime import datetime
15✔
55
from typing import Any, Generator, Optional
15✔
56

57
import sqlalchemy.exc
15✔
58
from sqlalchemy.orm import Session
15✔
59

60
import PyFunceble.cli.factory
15✔
61
import PyFunceble.sessions
15✔
62
from PyFunceble.dataset.db_base import DBDatasetBase
15✔
63

64

65
class SQLDBDatasetBase(DBDatasetBase):
15✔
66
    """
67
    Provides the base of all SQLDB stored dataset.
68
    """
69

70
    STD_KEEP_SESSION_OPEN: bool = False
15✔
71
    ORM_OBJ = None
15✔
72

73
    db_session: Optional[Session] = None
15✔
74

75
    def __init__(
76
        self,
77
        *,
78
        authorized: Optional[bool] = None,
79
        remove_unneeded_fields: Optional[bool] = None,
80
        db_session: Optional[Session] = None,
81
    ) -> None:
82
        self.db_session = db_session
15✔
83

84
        super().__init__(
15✔
85
            authorized=authorized, remove_unneeded_fields=remove_unneeded_fields
86
        )
87

88
    def __contains__(self, value: str) -> bool:
15✔
89
        raise NotImplementedError()
90

91
    def __getitem__(self, value: Any) -> Any:
15✔
92
        raise KeyError(value)
×
93

94
    def ensure_orm_obj_is_given(func):  # pylint: disable=no-self-argument
15✔
95
        """
96
        Ensures that the ORM object is given before launching the decorated
97
        method.
98

99
        :raise RuntimeError:
100
            When :code:`ORM_OBJ` is not declared.
101
        """
102

103
        @functools.wraps(func)
15✔
104
        def wrapper(self, *args, **kwargs):
12✔
105
            if self.ORM_OBJ is None:
×
106
                raise RuntimeError("<self.ORM_OBJ> not given.")
×
107

108
            return func(self, *args, **kwargs)  # pylint: disable=not-callable
×
109

110
        return wrapper
15✔
111

112
    @DBDatasetBase.execute_if_authorized(None)
15✔
113
    def get_content(self) -> Generator[dict, None, None]:
15✔
114
        """
115
        Provides a generator which provides the next dataset to read.
116
        """
117

118
        for row in self.db_session.query(self.ORM_OBJ):
×
119
            row = row.to_dict()
×
120

121
            yield row
×
122

123
    @DBDatasetBase.execute_if_authorized(None)
15✔
124
    def update(self, row, *, ignore_if_exist: bool = False) -> "MariaDBDatasetBase":
15✔
125
        """
126
        Adds the given dataset into the database if it does not exists.
127
        Update otherwise.
128

129
        .. note::
130
            This should be the preferred method for introduction of new dataset.
131

132
        :raise TypeError:
133
            When the given :code:`row` is not a :py:class:`dict` or sqlalchemy
134
            schema.
135
        """
136

137
        if not isinstance(row, (dict, type(self.ORM_OBJ))):
×
138
            raise TypeError(
×
139
                f"<row> should be {dict} or {self.ORM_OBJ}, {type(row)} given."
140
            )
141

142
        PyFunceble.facility.Logger.info("Started to update row.")
143

144
        if isinstance(row, type(self.ORM_OBJ)):
×
145
            row = row.to_dict()
×
146

147
        if "id" in row:
×
148
            self.db_session.execute(
×
149
                self.ORM_OBJ.__table__.update().where(self.ORM_OBJ.id == row["id"]),
150
                row,
151
            )
152

153
            self.db_session.commit()
×
154
        else:
155
            existing_row_id = self.get_existing_row_id(row)
×
156

157
            if existing_row_id is not None:
×
158
                if not ignore_if_exist:
×
159
                    row["id"] = existing_row_id
×
160
                    self.update(row, ignore_if_exist=ignore_if_exist)
×
161
            else:
162
                self.add(row)
×
163

164
        PyFunceble.facility.Logger.debug("Updated row:\n%r", row)
165
        PyFunceble.facility.Logger.info("Finished to update row.")
166

167
        return self
×
168

169
    @DBDatasetBase.execute_if_authorized(None)
15✔
170
    @ensure_orm_obj_is_given
15✔
171
    def remove(self, row) -> "MariaDBDatasetBase":
15✔
172
        """
173
        Removes the given dataset from the database.
174

175
        :param row:
176
            The row or dataset to check.
177

178
        :raise TypeError:
179
            When the given :code:`row` is not a :py:class:`dict` or sqlalchemy
180
            schema.
181
        """
182

183
        if not isinstance(row, (dict, type(self.ORM_OBJ))):
×
184
            raise TypeError(
×
185
                f"<row> should be {dict} or {self.ORM_OBJ}, {type(row)} given."
186
            )
187

188
        PyFunceble.facility.Logger.info("Started to remove row.")
189

190
        if not isinstance(row, type(self.ORM_OBJ)):
×
191
            row_id = self.get_existing_row_id(row)
×
192

193
            if row_id is not None:
×
194
                self.db_session.query(self.ORM_OBJ).filter(
×
195
                    self.ORM_OBJ == row_id
196
                ).delete()
197
                self.db_session.commit()
×
198
        else:
199
            self.db_session.delete(row)
×
200
            self.db_session.commit()
×
201

202
        PyFunceble.facility.Logger.debug("Removed row:\n%r", row)
203
        PyFunceble.facility.Logger.info("Finished to remove row.")
204

205
        return self
×
206

207
    @DBDatasetBase.execute_if_authorized(False)
15✔
208
    @ensure_orm_obj_is_given
15✔
209
    def exists(self, row) -> bool:
15✔
210
        """
211
        Checks if the given dataset exists in our dataset.
212

213
        :param row:
214
            The row or dataset to check.
215

216
        :raise TypeError:
217
            When the given :code:`row` is not a :py:class:`dict` or sqlalchemy
218
            schema.
219
        """
220

221
        return self.get_existing_row_id(row) is not None
×
222

223
    @DBDatasetBase.execute_if_authorized(None)
15✔
224
    @ensure_orm_obj_is_given
15✔
225
    def get_existing_row_id(self, row):
12✔
226
        """
227
        Returns the ID of the existing row.
228

229
        :param row:
230
            The row or dataset to check,
231
        """
232

233
        if not isinstance(row, (dict, type(self.ORM_OBJ))):
×
234
            raise TypeError(
×
235
                f"<row> should be {dict} or {self.ORM_OBJ}, {type(row)} given."
236
            )
237

238
        if isinstance(row, type(self.ORM_OBJ)):
×
239
            row = row.to_dict()
×
240

241
        result = self.db_session.query(self.ORM_OBJ.id)
×
242

243
        for field in self.COMPARISON_FIELDS:
×
244
            result = result.filter(getattr(self.ORM_OBJ, field) == row[field])
×
245

246
        result = result.first()
×
247

248
        if result is not None:
×
249
            return result.id
×
250
        return None
×
251

252
    @DBDatasetBase.execute_if_authorized(None)
15✔
253
    @ensure_orm_obj_is_given
15✔
254
    def get_existing_row(self, row):
12✔
255
        """
256
        Returns the matching row.
257

258
        :param row:
259
            The row or dataset to check,
260
        """
261

262
        if not isinstance(row, (dict, type(self.ORM_OBJ))):
×
263
            raise TypeError(
×
264
                f"<row> should be {dict} or {self.ORM_OBJ}, {type(row)} given."
265
            )
266

267
        if isinstance(row, type(self.ORM_OBJ)):
×
268
            row = row.to_dict()
×
269

270
        result = self.db_session.query(self.ORM_OBJ)
×
271

272
        for field in self.COMPARISON_FIELDS:
×
273
            result = result.filter(getattr(self.ORM_OBJ, field) == row[field])
×
274

275
        return result.first()
×
276

277
    @DBDatasetBase.execute_if_authorized(None)
15✔
278
    @ensure_orm_obj_is_given
15✔
279
    def add(self, row) -> "MariaDBDatasetBase":
15✔
280
        """
281
        Adds the given dataset into the database.
282

283
        :param row:
284
            The row or dataset to add.
285

286
        :raise TypeError:
287
            When the given :code:`row` is not a :py:class:`dict` or sqlalchemy
288
            schema.
289
        """
290

291
        if not isinstance(row, (dict, type(self.ORM_OBJ))):
×
292
            raise TypeError(
×
293
                f"<row> should be {dict} or {self.ORM_OBJ}, {type(row)} given."
294
            )
295

296
        PyFunceble.facility.Logger.info("Started to add row.")
297

298
        if isinstance(row, type(self.ORM_OBJ)):
×
299
            row = row.to_dict()
×
300

301
        try:
×
302
            self.db_session.execute(self.ORM_OBJ.__table__.insert(), row)
×
303
            self.db_session.commit()
×
304
        except sqlalchemy.exc.DataError as exception:
×
305
            if "expiration_date" not in row and "epoch" not in row:
×
306
                raise exception
×
307

308
            y2k38_limit = datetime(2037, 12, 31, 0, 0)
×
309
            new_date = datetime.fromtimestamp(float(row["epoch"]))
×
310
            new_date -= new_date - y2k38_limit
×
311

312
            row["epoch"] = str(new_date.timestamp())
×
313
            row["expiration_date"] = new_date.strftime("%d-%b-%Y")
×
314

315
            self.db_session.execute(self.ORM_OBJ.__table__.insert(), row)
×
316
            self.db_session.commit()
×
317

318
        PyFunceble.facility.Logger.debug("Added row:\n%r", row)
319
        PyFunceble.facility.Logger.info("Finished to add row.")
320

321
        return self
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc