• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

funilrys / PyFunceble / 17046250819

18 Aug 2025 04:12PM UTC coverage: 96.648% (+1.9%) from 94.721%
17046250819

push

github

funilrys
Bump verstion to v4.3.0

1 of 1 new or added line in 1 file covered. (100.0%)

154 existing lines in 14 files now uncovered.

11967 of 12382 relevant lines covered (96.65%)

8.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

27.72
/PyFunceble/dataset/csv_base.py
1
"""
2
The tool to check the availability or syntax of domain, IP or URL.
3

4
::
5

6

7
    ██████╗ ██╗   ██╗███████╗██╗   ██╗███╗   ██╗ ██████╗███████╗██████╗ ██╗     ███████╗
8
    ██╔══██╗╚██╗ ██╔╝██╔════╝██║   ██║████╗  ██║██╔════╝██╔════╝██╔══██╗██║     ██╔════╝
9
    ██████╔╝ ╚████╔╝ █████╗  ██║   ██║██╔██╗ ██║██║     █████╗  ██████╔╝██║     █████╗
10
    ██╔═══╝   ╚██╔╝  ██╔══╝  ██║   ██║██║╚██╗██║██║     ██╔══╝  ██╔══██╗██║     ██╔══╝
11
    ██║        ██║   ██║     ╚██████╔╝██║ ╚████║╚██████╗███████╗██████╔╝███████╗███████╗
12
    ╚═╝        ╚═╝   ╚═╝      ╚═════╝ ╚═╝  ╚═══╝ ╚═════╝╚══════╝╚═════╝ ╚══════╝╚══════╝
13

14
Provides the base of all CSV storeed datasets.
15

16
Author:
17
    Nissar Chababy, @funilrys, contactTATAfunilrysTODTODcom
18

19
Special thanks:
20
    https://pyfunceble.github.io/#/special-thanks
21

22
Contributors:
23
    https://pyfunceble.github.io/#/contributors
24

25
Project link:
26
    https://github.com/funilrys/PyFunceble
27

28
Project documentation:
29
    https://docs.pyfunceble.com
30

31
Project homepage:
32
    https://pyfunceble.github.io/
33

34
License:
35
::
36

37

38
    Copyright 2017, 2018, 2019, 2020, 2022, 2023, 2024, 2025 Nissar Chababy
39

40
    Licensed under the Apache License, Version 2.0 (the "License");
41
    you may not use this file except in compliance with the License.
42
    You may obtain a copy of the License at
43

44
        https://www.apache.org/licenses/LICENSE-2.0
45

46
    Unless required by applicable law or agreed to in writing, software
47
    distributed under the License is distributed on an "AS IS" BASIS,
48
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
49
    See the License for the specific language governing permissions and
50
    limitations under the License.
51
"""
52

53
import csv
9✔
54
import tempfile
9✔
55
from datetime import datetime, timedelta, timezone
9✔
56
from typing import Generator, Optional, Tuple
9✔
57

58
import PyFunceble.facility
9✔
59
from PyFunceble.dataset.db_base import DBDatasetBase
9✔
60
from PyFunceble.helpers.file import FileHelper
9✔
61

62

63
class CSVDatasetBase(DBDatasetBase):
9✔
64
    """
65
    Provides the base of all CSV dataset.
66
    """
67

68
    @DBDatasetBase.ensure_source_file_exists
9✔
69
    def get_csv_writer(self) -> Tuple[csv.DictWriter, open]:
9✔
70
        """
71
        Provides the standard and initiated CSV Dict writer along with the
72
        file that was open with it.
73
        """
74

75
        file_helper = FileHelper(self.source_file)
×
76

77
        add_header = not file_helper.exists()
×
78

79
        file_handler = file_helper.open("a+", newline="", encoding="utf-8")
×
80
        writer = csv.DictWriter(file_handler, fieldnames=self.FIELDS)
×
81

82
        if add_header:
×
83
            writer.writeheader()
×
84

85
        return writer, file_handler
×
86

87
    def update(self, row: dict, *, ignore_if_exist: bool = False) -> "DBDatasetBase":
9✔
88
        """
89
        Adds the given dataset into the database if it does not exists.
90
        Update otherwise.
91

92
        :param row:
93
            The row or dataset to manipulate.
94

95
        :param ignore_if_exist:
96
            Ignore the insertion/update if the row already exists.
97

98
        :raise TypeError:
99
            When the given :code:`row` is not a :py:class`dict`.
100
        """
101

102
        if not isinstance(row, dict):
×
103
            raise TypeError(f"<row> should be {dict}, {type(row)} given.")
×
104

105
        PyFunceble.facility.Logger.info("Started to update row.")
106

107
        if self.exists(row):
×
108
            if not ignore_if_exist:
×
109
                self.remove(row)
×
110
                self.add(row)
×
111
        else:
112
            self.add(row)
×
113

114
        PyFunceble.facility.Logger.debug("Updated row:\n%r", row)
115
        PyFunceble.facility.Logger.info("Finished to update row.")
116

117
        return self
×
118

119
    @DBDatasetBase.ensure_source_file_exists
9✔
120
    @DBDatasetBase.execute_if_authorized(None)
9✔
121
    def add(self, row: dict) -> "CSVDatasetBase":
9✔
122
        """
123
        Adds the given dataset into the CSV file.
124

125
        :param row:
126
            The row or dataset to add.
127

128
        :raise TypeError:
129
            When the given :code:`row` is not a :py:class`dict`.
130
        """
131

132
        if not isinstance(row, dict):
×
133
            raise TypeError(f"<row> should be {dict}, {type(row)} given.")
×
134

135
        PyFunceble.facility.Logger.info("Started to add row.")
136

137
        if self.remove_unneeded_fields:
×
138
            row = self.get_filtered_row(row)
×
139

140
        writer, file_handler = self.get_csv_writer()
×
141

142
        writer.writerow(row)
×
143

144
        file_handler.close()
×
145

146
        PyFunceble.facility.Logger.debug("Added row:\n%r", row)
147

148
        PyFunceble.facility.Logger.info("Finished to add row.")
149

150
        return self
×
151

152
    @DBDatasetBase.ensure_source_file_exists
9✔
153
    @DBDatasetBase.execute_if_authorized(None)
9✔
154
    def remove(self, row: dict) -> "CSVDatasetBase":
9✔
155
        """
156
        Removes the given dataset from the CSV file.
157

158
        :param row:
159
            The row or dataset to add.
160

161
        :raise TypeError:
162
            When the given :code:`row` is not a :py:class`dict`.
163
        """
164

165
        if not isinstance(row, dict):
×
166
            raise TypeError(f"<row> should be {dict}, {type(row)} given.")
×
167

168
        PyFunceble.facility.Logger.info("Started to remove row.")
169

170
        if self.remove_unneeded_fields:
×
171
            row = self.get_filtered_row(row)
×
172

173
        our_temp_file = tempfile.NamedTemporaryFile("w", delete=False, encoding="utf-8")
×
174
        our_temp_filename = our_temp_file.name
×
175

176
        writer = csv.DictWriter(our_temp_file, fieldnames=self.FIELDS)
×
177
        writer.writeheader()
×
178

179
        for read_row in self.get_content():
×
180
            if self.are_equal(read_row, row):
×
181
                continue
×
182

183
            writer.writerow(read_row)
×
184

185
        our_temp_file.close()
×
186

187
        FileHelper(our_temp_filename).move(self.source_file)
×
188

189
        PyFunceble.facility.Logger.debug("Removed row:\n%r", row)
190

191
        PyFunceble.facility.Logger.info("Finished to remove row.")
192

193
        return self
×
194

195
    @DBDatasetBase.ensure_source_file_exists
9✔
196
    @DBDatasetBase.execute_if_authorized(None)
9✔
197
    def get_content(self) -> Generator[Optional[dict], None, None]:
9✔
198
        """
199
        Provides a generator which provides the next line to read.
200
        """
201

202
        file_helper = FileHelper(self.source_file)
×
203

204
        if file_helper.exists():
×
205
            file_handler = file_helper.open(newline="", encoding="utf-8")
×
206
            reader = csv.DictReader(file_handler)
×
207

208
            for row in reader:
×
209
                if "tested_at" in row:
×
210
                    try:
×
211
                        row["tested_at"] = datetime.fromisoformat(
×
212
                            row["tested_at"]
213
                        ).astimezone(timezone.utc)
UNCOV
214
                    except (TypeError, ValueError):
×
UNCOV
215
                        row["tested_at"] = datetime.now(timezone.utc) - timedelta(
×
216
                            days=365
217
                        )
218

219
                yield row
×
220

UNCOV
221
            file_handler.close()
×
222

223
    @DBDatasetBase.execute_if_authorized(None)
9✔
224
    def get_filtered_content(
9✔
225
        self, filter_map: dict
226
    ) -> Generator[Optional[dict], None, None]:
227
        """
228
        Provides a generator which provides the next line to read.
229

230
        :param filter_map:
231
            A dictionary representing what we need to filter.
232

233
        :raise TypeError:
234
            When the given :code:`filter_map` is not a :py:class:`dict`.
235
        """
236

UNCOV
237
        if not isinstance(filter_map, dict):
×
238
            raise TypeError(f"<filter_map> should be {dict}, {type(filter_map)} given.")
×
239

240
        for row in self.get_content():
×
UNCOV
241
            if all(x in row and row[x] == y for x, y in filter_map.items()):
×
UNCOV
242
                yield row
×
243

244
    def get_filtered_comparision_row(self, row: dict):
9✔
245
        """
246
        Makes the given row ready for comparison.
247
        """
248

249
        if self.COMPARISON_FIELDS:
×
UNCOV
250
            row = {x: y for x, y in row.items() if x in self.COMPARISON_FIELDS}
×
251
            row.update({x: "" for x in self.COMPARISON_FIELDS if x not in row})
×
252

253
            return row
×
254

UNCOV
255
        return row
×
256

257
    @DBDatasetBase.ensure_source_file_exists
9✔
258
    @DBDatasetBase.execute_if_authorized(False)
9✔
259
    def exists(self, row: dict) -> bool:
9✔
260
        """
261
        Checks if the given dataset exists in our dataset.
262

263
        :param row:
264
            The row or dataset to check.
265
        """
266

UNCOV
267
        if self.remove_unneeded_fields:
×
268
            row = self.get_filtered_row(row)
×
269

270
        row = self.get_filtered_comparision_row(row)
×
271

272
        for read_row in self.get_content():
×
UNCOV
273
            if self.are_equal(read_row, row):
×
274
                return True
×
275

UNCOV
276
        return False
×
277

278
    @DBDatasetBase.execute_if_authorized(False)
9✔
279
    def are_equal(self, read_row: dict, row: dict) -> bool:
9✔
280
        """
281
        Compares the given :code:`read_row` to the `row`.
282

283
        :param read_row:
284
            The row read from the dataset infrastructure.
285
        :param row:
286
            The row given by the testing infrastructure.
287
        """
288

289
        if self.remove_unneeded_fields:
×
UNCOV
290
            read_row = self.get_filtered_row(read_row)
×
291
            row = self.get_filtered_row(row)
×
292

UNCOV
293
        read_row = self.get_filtered_comparision_row(read_row)
×
294
        row = self.get_filtered_comparision_row(row)
×
295

UNCOV
296
        return row.items() <= read_row.items()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc