• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

funilrys / PyFunceble / 3866429906

pending completion
3866429906

push

github-actions

funilrys
fixup! Introduction of the support for postgresql DBs.

11160 of 11683 relevant lines covered (95.52%)

14.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

27.72
/PyFunceble/dataset/csv_base.py
1
"""
2
The tool to check the availability or syntax of domain, IP or URL.
3

4
::
5

6

7
    ██████╗ ██╗   ██╗███████╗██╗   ██╗███╗   ██╗ ██████╗███████╗██████╗ ██╗     ███████╗
8
    ██╔══██╗╚██╗ ██╔╝██╔════╝██║   ██║████╗  ██║██╔════╝██╔════╝██╔══██╗██║     ██╔════╝
9
    ██████╔╝ ╚████╔╝ █████╗  ██║   ██║██╔██╗ ██║██║     █████╗  ██████╔╝██║     █████╗
10
    ██╔═══╝   ╚██╔╝  ██╔══╝  ██║   ██║██║╚██╗██║██║     ██╔══╝  ██╔══██╗██║     ██╔══╝
11
    ██║        ██║   ██║     ╚██████╔╝██║ ╚████║╚██████╗███████╗██████╔╝███████╗███████╗
12
    ╚═╝        ╚═╝   ╚═╝      ╚═════╝ ╚═╝  ╚═══╝ ╚═════╝╚══════╝╚═════╝ ╚══════╝╚══════╝
13

14
Provides the base of all CSV storeed datasets.
15

16
Author:
17
    Nissar Chababy, @funilrys, contactTATAfunilrysTODTODcom
18

19
Special thanks:
20
    https://pyfunceble.github.io/#/special-thanks
21

22
Contributors:
23
    https://pyfunceble.github.io/#/contributors
24

25
Project link:
26
    https://github.com/funilrys/PyFunceble
27

28
Project documentation:
29
    https://pyfunceble.readthedocs.io/en/dev/
30

31
Project homepage:
32
    https://pyfunceble.github.io/
33

34
License:
35
::
36

37

38
    Copyright 2017, 2018, 2019, 2020, 2022 Nissar Chababy
39

40
    Licensed under the Apache License, Version 2.0 (the "License");
41
    you may not use this file except in compliance with the License.
42
    You may obtain a copy of the License at
43

44
        http://www.apache.org/licenses/LICENSE-2.0
45

46
    Unless required by applicable law or agreed to in writing, software
47
    distributed under the License is distributed on an "AS IS" BASIS,
48
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
49
    See the License for the specific language governing permissions and
50
    limitations under the License.
51
"""
52

53
import csv
15✔
54
import tempfile
15✔
55
from datetime import datetime, timedelta
15✔
56
from typing import Generator, Optional, Tuple
15✔
57

58
import PyFunceble.facility
15✔
59
from PyFunceble.dataset.db_base import DBDatasetBase
15✔
60
from PyFunceble.helpers.file import FileHelper
15✔
61

62

63
class CSVDatasetBase(DBDatasetBase):
15✔
64
    """
65
    Provides the base of all CSV dataset.
66
    """
67

68
    @DBDatasetBase.ensure_source_file_exists
15✔
69
    def get_csv_writer(self) -> Tuple[csv.DictWriter, open]:
15✔
70
        """
71
        Provides the standard and initiated CSV Dict writer along with the
72
        file that was open with it.
73
        """
74

75
        file_helper = FileHelper(self.source_file)
×
76

77
        add_header = not file_helper.exists()
×
78

79
        file_handler = file_helper.open("a+", newline="", encoding="utf-8")
×
80
        writer = csv.DictWriter(file_handler, fieldnames=self.FIELDS)
×
81

82
        if add_header:
×
83
            writer.writeheader()
×
84

85
        return writer, file_handler
×
86

87
    def update(self, row: dict, *, ignore_if_exist: bool = False) -> "DBDatasetBase":
15✔
88
        """
89
        Adds the given dataset into the database if it does not exists.
90
        Update otherwise.
91

92
        :param row:
93
            The row or dataset to manipulate.
94

95
        :param ignore_if_exist:
96
            Ignore the insertion/update if the row already exists.
97

98
        :raise TypeError:
99
            When the given :code:`row` is not a :py:class`dict`.
100
        """
101

102
        if not isinstance(row, dict):
×
103
            raise TypeError(f"<row> should be {dict}, {type(row)} given.")
×
104

105
        PyFunceble.facility.Logger.info("Started to update row.")
106

107
        if self.exists(row):
×
108
            if not ignore_if_exist:
×
109
                self.remove(row)
×
110
                self.add(row)
×
111
        else:
112
            self.add(row)
×
113

114
        PyFunceble.facility.Logger.debug("Updated row:\n%r", row)
115
        PyFunceble.facility.Logger.info("Finished to update row.")
116

117
        return self
×
118

119
    @DBDatasetBase.ensure_source_file_exists
15✔
120
    @DBDatasetBase.execute_if_authorized(None)
15✔
121
    def add(self, row: dict) -> "CSVDatasetBase":
15✔
122
        """
123
        Adds the given dataset into the CSV file.
124

125
        :param row:
126
            The row or dataset to add.
127

128
        :raise TypeError:
129
            When the given :code:`row` is not a :py:class`dict`.
130
        """
131

132
        if not isinstance(row, dict):
×
133
            raise TypeError(f"<row> should be {dict}, {type(row)} given.")
×
134

135
        PyFunceble.facility.Logger.info("Started to add row.")
136

137
        if self.remove_unneeded_fields:
×
138
            row = self.get_filtered_row(row)
×
139

140
        writer, file_handler = self.get_csv_writer()
×
141

142
        writer.writerow(row)
×
143

144
        file_handler.close()
×
145

146
        PyFunceble.facility.Logger.debug("Added row:\n%r", row)
147

148
        PyFunceble.facility.Logger.info("Finished to add row.")
149

150
        return self
×
151

152
    @DBDatasetBase.ensure_source_file_exists
15✔
153
    @DBDatasetBase.execute_if_authorized(None)
15✔
154
    def remove(self, row: dict) -> "CSVDatasetBase":
15✔
155
        """
156
        Removes the given dataset from the CSV file.
157

158
        :param row:
159
            The row or dataset to add.
160

161
        :raise TypeError:
162
            When the given :code:`row` is not a :py:class`dict`.
163
        """
164

165
        if not isinstance(row, dict):
×
166
            raise TypeError(f"<row> should be {dict}, {type(row)} given.")
×
167

168
        PyFunceble.facility.Logger.info("Started to remove row.")
169

170
        if self.remove_unneeded_fields:
×
171
            row = self.get_filtered_row(row)
×
172

173
        our_temp_file = tempfile.NamedTemporaryFile("w", delete=False, encoding="utf-8")
×
174
        our_temp_filename = our_temp_file.name
×
175

176
        writer = csv.DictWriter(our_temp_file, fieldnames=self.FIELDS)
×
177
        writer.writeheader()
×
178

179
        for read_row in self.get_content():
×
180
            if self.are_equal(read_row, row):
×
181
                continue
×
182

183
            writer.writerow(read_row)
×
184

185
        our_temp_file.close()
×
186

187
        FileHelper(our_temp_filename).move(self.source_file)
×
188

189
        PyFunceble.facility.Logger.debug("Removed row:\n%r", row)
190

191
        PyFunceble.facility.Logger.info("Finished to remove row.")
192

193
        return self
×
194

195
    @DBDatasetBase.ensure_source_file_exists
15✔
196
    @DBDatasetBase.execute_if_authorized(None)
15✔
197
    def get_content(self) -> Generator[Optional[dict], None, None]:
15✔
198
        """
199
        Provides a generator which provides the next line to read.
200
        """
201

202
        file_helper = FileHelper(self.source_file)
×
203

204
        if file_helper.exists():
×
205
            file_handler = file_helper.open(newline="", encoding="utf-8")
×
206
            reader = csv.DictReader(file_handler)
×
207

208
            for row in reader:
×
209
                if "tested_at" in row:
×
210
                    try:
×
211
                        row["tested_at"] = datetime.fromisoformat(row["tested_at"])
×
212
                    except (TypeError, ValueError):
×
213
                        row["tested_at"] = datetime.utcnow() - timedelta(days=365)
×
214

215
                yield row
×
216

217
            file_handler.close()
×
218

219
    @DBDatasetBase.execute_if_authorized(None)
15✔
220
    def get_filtered_content(
15✔
221
        self, filter_map: dict
222
    ) -> Generator[Optional[dict], None, None]:
223
        """
224
        Provides a generator which provides the next line to read.
225

226
        :param filter_map:
227
            A dictionary representing what we need to filter.
228

229
        :raise TypeError:
230
            When the given :code:`filter_map` is not a :py:class:`dict`.
231
        """
232

233
        if not isinstance(filter_map, dict):
×
234
            raise TypeError(f"<filter_map> should be {dict}, {type(filter_map)} given.")
×
235

236
        for row in self.get_content():
×
237
            if all(x in row and row[x] == y for x, y in filter_map.items()):
×
238
                yield row
×
239

240
    def get_filtered_comparision_row(self, row: dict):
15✔
241
        """
242
        Makes the given row ready for comparison.
243
        """
244

245
        if self.COMPARISON_FIELDS:
×
246
            row = {x: y for x, y in row.items() if x in self.COMPARISON_FIELDS}
×
247
            row.update({x: "" for x in self.COMPARISON_FIELDS if x not in row})
×
248

249
            return row
×
250

251
        return row
×
252

253
    @DBDatasetBase.ensure_source_file_exists
15✔
254
    @DBDatasetBase.execute_if_authorized(False)
15✔
255
    def exists(self, row: dict) -> bool:
15✔
256
        """
257
        Checks if the given dataset exists in our dataset.
258

259
        :param row:
260
            The row or dataset to check.
261
        """
262

263
        if self.remove_unneeded_fields:
×
264
            row = self.get_filtered_row(row)
×
265

266
        row = self.get_filtered_comparision_row(row)
×
267

268
        for read_row in self.get_content():
×
269
            if self.are_equal(read_row, row):
×
270
                return True
×
271

272
        return False
×
273

274
    @DBDatasetBase.execute_if_authorized(False)
15✔
275
    def are_equal(self, read_row: dict, row: dict) -> bool:
15✔
276
        """
277
        Compares the given :code:`read_row` to the `row`.
278

279
        :param read_row:
280
            The row read from the dataset infrastructure.
281
        :param row:
282
            The row given by the testing infrastructure.
283
        """
284

285
        if self.remove_unneeded_fields:
×
286
            read_row = self.get_filtered_row(read_row)
×
287
            row = self.get_filtered_row(row)
×
288

289
        read_row = self.get_filtered_comparision_row(read_row)
×
290
        row = self.get_filtered_comparision_row(row)
×
291

292
        return row.items() <= read_row.items()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc