• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

funilrys / PyFunceble / 18114703986

14 Sep 2025 10:06AM UTC coverage: 96.648%. Remained the same
18114703986

push

github

funilrys
Better handling of missing or empty columns.

This patch touches #430.

Contributor: @Yuki2718

0 of 2 new or added lines in 2 files covered. (0.0%)

11967 of 12382 relevant lines covered (96.65%)

14.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

27.72
/PyFunceble/dataset/csv_base.py
1
"""
2
The tool to check the availability or syntax of domain, IP or URL.
3

4
::
5

6

7
    ██████╗ ██╗   ██╗███████╗██╗   ██╗███╗   ██╗ ██████╗███████╗██████╗ ██╗     ███████╗
8
    ██╔══██╗╚██╗ ██╔╝██╔════╝██║   ██║████╗  ██║██╔════╝██╔════╝██╔══██╗██║     ██╔════╝
9
    ██████╔╝ ╚████╔╝ █████╗  ██║   ██║██╔██╗ ██║██║     █████╗  ██████╔╝██║     █████╗
10
    ██╔═══╝   ╚██╔╝  ██╔══╝  ██║   ██║██║╚██╗██║██║     ██╔══╝  ██╔══██╗██║     ██╔══╝
11
    ██║        ██║   ██║     ╚██████╔╝██║ ╚████║╚██████╗███████╗██████╔╝███████╗███████╗
12
    ╚═╝        ╚═╝   ╚═╝      ╚═════╝ ╚═╝  ╚═══╝ ╚═════╝╚══════╝╚═════╝ ╚══════╝╚══════╝
13

14
Provides the base of all CSV storeed datasets.
15

16
Author:
17
    Nissar Chababy, @funilrys, contactTATAfunilrysTODTODcom
18

19
Special thanks:
20
    https://pyfunceble.github.io/#/special-thanks
21

22
Contributors:
23
    https://pyfunceble.github.io/#/contributors
24

25
Project link:
26
    https://github.com/funilrys/PyFunceble
27

28
Project documentation:
29
    https://docs.pyfunceble.com
30

31
Project homepage:
32
    https://pyfunceble.github.io/
33

34
License:
35
::
36

37

38
    Copyright 2017, 2018, 2019, 2020, 2022, 2023, 2024, 2025 Nissar Chababy
39

40
    Licensed under the Apache License, Version 2.0 (the "License");
41
    you may not use this file except in compliance with the License.
42
    You may obtain a copy of the License at
43

44
        https://www.apache.org/licenses/LICENSE-2.0
45

46
    Unless required by applicable law or agreed to in writing, software
47
    distributed under the License is distributed on an "AS IS" BASIS,
48
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
49
    See the License for the specific language governing permissions and
50
    limitations under the License.
51
"""
52

53
import csv
15✔
54
import tempfile
15✔
55
from datetime import datetime, timedelta, timezone
15✔
56
from typing import Generator, Optional, Tuple
15✔
57

58
import PyFunceble.facility
15✔
59
from PyFunceble.dataset.db_base import DBDatasetBase
15✔
60
from PyFunceble.helpers.file import FileHelper
15✔
61

62

63
class CSVDatasetBase(DBDatasetBase):
15✔
64
    """
65
    Provides the base of all CSV dataset.
66
    """
67

68
    @DBDatasetBase.ensure_source_file_exists
15✔
69
    def get_csv_writer(self) -> Tuple[csv.DictWriter, open]:
15✔
70
        """
71
        Provides the standard and initiated CSV Dict writer along with the
72
        file that was open with it.
73
        """
74

75
        file_helper = FileHelper(self.source_file)
×
76

77
        add_header = not file_helper.exists()
×
78

79
        file_handler = file_helper.open("a+", newline="", encoding="utf-8")
×
80
        writer = csv.DictWriter(file_handler, fieldnames=self.FIELDS)
×
81

82
        if add_header:
×
83
            writer.writeheader()
×
84

85
        return writer, file_handler
×
86

87
    def update(self, row: dict, *, ignore_if_exist: bool = False) -> "DBDatasetBase":
15✔
88
        """
89
        Adds the given dataset into the database if it does not exists.
90
        Update otherwise.
91

92
        :param row:
93
            The row or dataset to manipulate.
94

95
        :param ignore_if_exist:
96
            Ignore the insertion/update if the row already exists.
97

98
        :raise TypeError:
99
            When the given :code:`row` is not a :py:class`dict`.
100
        """
101

102
        if not isinstance(row, dict):
×
103
            raise TypeError(f"<row> should be {dict}, {type(row)} given.")
×
104

105
        PyFunceble.facility.Logger.info("Started to update row.")
106

107
        if self.exists(row):
×
108
            if not ignore_if_exist:
×
109
                self.remove(row)
×
110
                self.add(row)
×
111
        else:
112
            self.add(row)
×
113

114
        PyFunceble.facility.Logger.debug("Updated row:\n%r", row)
115
        PyFunceble.facility.Logger.info("Finished to update row.")
116

117
        return self
×
118

119
    @DBDatasetBase.ensure_source_file_exists
15✔
120
    @DBDatasetBase.execute_if_authorized(None)
15✔
121
    def add(self, row: dict) -> "CSVDatasetBase":
15✔
122
        """
123
        Adds the given dataset into the CSV file.
124

125
        :param row:
126
            The row or dataset to add.
127

128
        :raise TypeError:
129
            When the given :code:`row` is not a :py:class`dict`.
130
        """
131

132
        if not isinstance(row, dict):
×
133
            raise TypeError(f"<row> should be {dict}, {type(row)} given.")
×
134

135
        PyFunceble.facility.Logger.info("Started to add row.")
136

137
        if self.remove_unneeded_fields:
×
138
            row = self.get_filtered_row(row)
×
139

140
        writer, file_handler = self.get_csv_writer()
×
141

142
        writer.writerow(row)
×
143

144
        file_handler.close()
×
145

146
        PyFunceble.facility.Logger.debug("Added row:\n%r", row)
147

148
        PyFunceble.facility.Logger.info("Finished to add row.")
149

150
        return self
×
151

152
    @DBDatasetBase.ensure_source_file_exists
15✔
153
    @DBDatasetBase.execute_if_authorized(None)
15✔
154
    def remove(self, row: dict) -> "CSVDatasetBase":
15✔
155
        """
156
        Removes the given dataset from the CSV file.
157

158
        :param row:
159
            The row or dataset to add.
160

161
        :raise TypeError:
162
            When the given :code:`row` is not a :py:class`dict`.
163
        """
164

165
        if not isinstance(row, dict):
×
166
            raise TypeError(f"<row> should be {dict}, {type(row)} given.")
×
167

168
        PyFunceble.facility.Logger.info("Started to remove row.")
169

170
        if self.remove_unneeded_fields:
×
171
            row = self.get_filtered_row(row)
×
172

173
        our_temp_file = tempfile.NamedTemporaryFile("w", delete=False, encoding="utf-8")
×
174
        our_temp_filename = our_temp_file.name
×
175

176
        writer = csv.DictWriter(our_temp_file, fieldnames=self.FIELDS)
×
177
        writer.writeheader()
×
178

179
        for read_row in self.get_content():
×
180
            if self.are_equal(read_row, row):
×
181
                continue
×
182

183
            writer.writerow(read_row)
×
184

185
        our_temp_file.close()
×
186

187
        FileHelper(our_temp_filename).move(self.source_file)
×
188

189
        PyFunceble.facility.Logger.debug("Removed row:\n%r", row)
190

191
        PyFunceble.facility.Logger.info("Finished to remove row.")
192

193
        return self
×
194

195
    @DBDatasetBase.ensure_source_file_exists
15✔
196
    @DBDatasetBase.execute_if_authorized(None)
15✔
197
    def get_content(self) -> Generator[Optional[dict], None, None]:
15✔
198
        """
199
        Provides a generator which provides the next line to read.
200
        """
201

202
        file_helper = FileHelper(self.source_file)
×
203

204
        if file_helper.exists():
×
205
            file_handler = file_helper.open(newline="", encoding="utf-8")
×
206
            reader = csv.DictReader(file_handler)
×
207

208
            for row in reader:
×
209
                if "tested_at" in row:
×
210
                    try:
×
211
                        row["tested_at"] = datetime.fromisoformat(
×
212
                            row["tested_at"]
213
                        ).astimezone(timezone.utc)
NEW
214
                    except (TypeError, ValueError, KeyError):
×
215
                        row["tested_at"] = datetime.now(timezone.utc) - timedelta(
×
216
                            days=365
217
                        )
218

219
                yield row
×
220

221
            file_handler.close()
×
222

223
    @DBDatasetBase.execute_if_authorized(None)
15✔
224
    def get_filtered_content(
15✔
225
        self, filter_map: dict
226
    ) -> Generator[Optional[dict], None, None]:
227
        """
228
        Provides a generator which provides the next line to read.
229

230
        :param filter_map:
231
            A dictionary representing what we need to filter.
232

233
        :raise TypeError:
234
            When the given :code:`filter_map` is not a :py:class:`dict`.
235
        """
236

237
        if not isinstance(filter_map, dict):
×
238
            raise TypeError(f"<filter_map> should be {dict}, {type(filter_map)} given.")
×
239

240
        for row in self.get_content():
×
241
            if all(x in row and row[x] == y for x, y in filter_map.items()):
×
242
                yield row
×
243

244
    def get_filtered_comparision_row(self, row: dict):
15✔
245
        """
246
        Makes the given row ready for comparison.
247
        """
248

249
        if self.COMPARISON_FIELDS:
×
250
            row = {x: y for x, y in row.items() if x in self.COMPARISON_FIELDS}
×
251
            row.update({x: "" for x in self.COMPARISON_FIELDS if x not in row})
×
252

253
            return row
×
254

255
        return row
×
256

257
    @DBDatasetBase.ensure_source_file_exists
15✔
258
    @DBDatasetBase.execute_if_authorized(False)
15✔
259
    def exists(self, row: dict) -> bool:
15✔
260
        """
261
        Checks if the given dataset exists in our dataset.
262

263
        :param row:
264
            The row or dataset to check.
265
        """
266

267
        if self.remove_unneeded_fields:
×
268
            row = self.get_filtered_row(row)
×
269

270
        row = self.get_filtered_comparision_row(row)
×
271

272
        for read_row in self.get_content():
×
273
            if self.are_equal(read_row, row):
×
274
                return True
×
275

276
        return False
×
277

278
    @DBDatasetBase.execute_if_authorized(False)
15✔
279
    def are_equal(self, read_row: dict, row: dict) -> bool:
15✔
280
        """
281
        Compares the given :code:`read_row` to the `row`.
282

283
        :param read_row:
284
            The row read from the dataset infrastructure.
285
        :param row:
286
            The row given by the testing infrastructure.
287
        """
288

289
        if self.remove_unneeded_fields:
×
290
            read_row = self.get_filtered_row(read_row)
×
291
            row = self.get_filtered_row(row)
×
292

293
        read_row = self.get_filtered_comparision_row(read_row)
×
294
        row = self.get_filtered_comparision_row(row)
×
295

296
        return row.items() <= read_row.items()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc