• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OpenDataServices / flatten-tool / 6507626273

13 Oct 2023 11:25AM UTC coverage: 42.006% (-53.7%) from 95.72%
6507626273

Pull #433

github

odscjames
New "Geo" optional dependencies

https://github.com/OpenDataServices/flatten-tool/issues/424
Pull Request #433: New "Geo" optional dependencies

38 of 38 new or added lines in 6 files covered. (100.0%)

1466 of 3490 relevant lines covered (42.01%)

4.16 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

49.16
/flattentool/json_input.py
1
"""
2

3
This file contains code that takes an instance of a JSON file as input (not a
4
JSON schema, for that see schema.py).
5

6
"""
7

8
import codecs
10✔
9
import copy
10✔
10
import os
10✔
11
import tempfile
10✔
12
import uuid
10✔
13
from collections import OrderedDict
10✔
14
from decimal import Decimal
10✔
15
from warnings import warn
10✔
16

17
import BTrees.OOBTree
10✔
18
import ijson
10✔
19

20
try:
10✔
21
    import shapely.wkt
10✔
22

23
    SHAPELY_LIBRARY_AVAILABLE = True
10✔
24
except ImportError:
×
25
    SHAPELY_LIBRARY_AVAILABLE = False
×
26
import transaction
10✔
27
import xmltodict
10✔
28
import zc.zlibstorage
10✔
29
import ZODB.FileStorage
10✔
30

31
from flattentool.i18n import _
10✔
32
from flattentool.input import path_search
10✔
33
from flattentool.schema import make_sub_sheet_name
10✔
34
from flattentool.sheet import PersistentSheet
10✔
35

36
BASIC_TYPES = [str, bool, int, Decimal, type(None)]
10✔
37

38

39
class BadlyFormedJSONError(ValueError):
10✔
40
    pass
10✔
41

42

43
class BadlyFormedJSONErrorUTF8(BadlyFormedJSONError):
10✔
44
    pass
10✔
45

46

47
def sheet_key_field(sheet, key):
10✔
48
    if key not in sheet:
10✔
49
        sheet.append(key)
10✔
50
    return key
10✔
51

52

53
def sheet_key_title(sheet, key):
10✔
54
    """
55
    If the key has a corresponding title, return that. If doesn't, create it in the sheet and return it.
56

57
    """
58
    if key in sheet.titles:
×
59
        title = sheet.titles[key]
×
60
        if title not in sheet:
×
61
            sheet.append(title)
×
62
        return title
×
63
    else:
64
        if key not in sheet:
×
65
            sheet.append(key)
×
66
        return key
×
67

68

69
def lists_of_dicts_paths(xml_dict):
10✔
70
    for key, value in xml_dict.items():
×
71
        if isinstance(value, list) and value and isinstance(value[0], dict):
×
72
            yield (key,)
×
73
            for x in value:
×
74
                if isinstance(x, dict):
×
75
                    for path in lists_of_dicts_paths(x):
×
76
                        yield (key,) + path
×
77
        elif isinstance(value, dict):
×
78
            for path in lists_of_dicts_paths(value):
×
79
                yield (key,) + path
×
80

81

82
def dicts_to_list_of_dicts(lists_of_dicts_paths_set, xml_dict, path=()):
10✔
83
    for key, value in xml_dict.items():
×
84
        if isinstance(value, list):
×
85
            for x in value:
×
86
                if isinstance(x, dict):
×
87
                    dicts_to_list_of_dicts(lists_of_dicts_paths_set, x, path + (key,))
×
88
        elif isinstance(value, dict):
×
89
            child_path = path + (key,)
×
90
            dicts_to_list_of_dicts(lists_of_dicts_paths_set, value, child_path)
×
91
            if child_path in lists_of_dicts_paths_set:
×
92
                xml_dict[key] = [value]
×
93

94

95
def list_dict_consistency(xml_dict):
10✔
96
    """
97
    For use with XML files opened with xmltodict.
98

99
    If there is only one tag, xmltodict produces a dict. If there are
100
    multiple, xmltodict produces a list of dicts. This functions replaces
101
    dicts with lists of dicts, if there exists a list of dicts for the same
102
    path elsewhere in the file.
103
    """
104
    lists_of_dicts_paths_set = set(lists_of_dicts_paths(xml_dict))
×
105
    dicts_to_list_of_dicts(lists_of_dicts_paths_set, xml_dict)
×
106

107

108
class JSONParser(object):
10✔
109
    # Named for consistency with schema.SchemaParser, but not sure it's the most appropriate name.
110
    # Similarly with methods like parse_json_dict
111

112
    def __init__(
10✔
113
        self,
114
        json_filename=None,
115
        root_json_dict=None,
116
        schema_parser=None,
117
        root_list_path=None,
118
        root_id="ocid",
119
        use_titles=False,
120
        xml=False,
121
        id_name="id",
122
        filter_field=None,
123
        filter_value=None,
124
        preserve_fields=None,
125
        remove_empty_schema_columns=False,
126
        rollup=False,
127
        truncation_length=3,
128
        persist=False,
129
        convert_flags={},
130
    ):
131
        if persist:
10✔
132
            # Use temp directories in OS agnostic way
133
            self.zodb_db_location = (
10✔
134
                tempfile.gettempdir() + "/flattentool-" + str(uuid.uuid4())
135
            )
136
            # zlibstorage lowers disk usage by a lot at very small performance cost
137
            zodb_storage = zc.zlibstorage.ZlibStorage(
10✔
138
                ZODB.FileStorage.FileStorage(self.zodb_db_location)
139
            )
140
            self.db = ZODB.DB(zodb_storage)
10✔
141
        else:
142
            # If None, in memory storage is used.
143
            self.db = ZODB.DB(None)
10✔
144

145
        self.connection = self.db.open()
10✔
146

147
        # ZODB root, only objects attached here will be persisted
148
        root = self.connection.root
10✔
149
        # OOBTree means a btree with keys and values are objects (including strings)
150
        root.sheet_store = BTrees.OOBTree.BTree()
10✔
151

152
        self.sub_sheets = {}
10✔
153
        self.main_sheet = PersistentSheet(connection=self.connection, name="")
10✔
154
        self.root_list_path = root_list_path
10✔
155
        self.root_id = root_id
10✔
156
        self.use_titles = use_titles
10✔
157
        self.truncation_length = truncation_length
10✔
158
        self.id_name = id_name
10✔
159
        self.xml = xml
10✔
160
        self.filter_field = filter_field
10✔
161
        self.filter_value = filter_value
10✔
162
        self.remove_empty_schema_columns = remove_empty_schema_columns
10✔
163
        self.seen_paths = set()
10✔
164
        self.persist = persist
10✔
165
        self.convert_flags = convert_flags
10✔
166

167
        if schema_parser:
10✔
168
            # schema parser does not make sheets that are persistent,
169
            # so use from_sheets which deep copies everything in it.
170
            self.main_sheet = PersistentSheet.from_sheet(
×
171
                schema_parser.main_sheet, self.connection
172
            )
173
            for sheet_name, sheet in list(self.sub_sheets.items()):
×
174
                self.sub_sheets[sheet_name] = PersistentSheet.from_sheet(
×
175
                    sheet, self.connection
176
                )
177

178
            self.sub_sheets = copy.deepcopy(schema_parser.sub_sheets)
×
179
            if remove_empty_schema_columns:
×
180
                # Don't use columns from the schema parser
181
                # (avoids empty columns)
182
                self.main_sheet.columns = []
×
183
                for sheet_name, sheet in list(self.sub_sheets.items()):
×
184
                    sheet.columns = []
×
185
            self.schema_parser = schema_parser
×
186
        else:
187
            self.schema_parser = None
10✔
188

189
        self.rollup = False
10✔
190
        if rollup:
10✔
191
            if schema_parser and len(schema_parser.rollup) > 0:
×
192
                # If rollUp is present in the schema this takes precedence over direct input.
193
                self.rollup = schema_parser.rollup
×
194
                if isinstance(rollup, (list,)) and (
×
195
                    len(rollup) > 1 or (len(rollup) == 1 and rollup[0] is not True)
196
                ):
197
                    warn(_("Using rollUp values from schema, ignoring direct input."))
×
198
            elif isinstance(rollup, (list,)):
×
199
                if len(rollup) == 1 and os.path.isfile(rollup[0]):
×
200
                    # Parse file, one json path per line.
201
                    rollup_from_file = set()
×
202
                    with open(rollup[0]) as rollup_file:
×
203
                        for line in rollup_file:
×
204
                            line = line.strip()
×
205
                            rollup_from_file.add(line)
×
206
                    self.rollup = rollup_from_file
×
207
                    # Rollup args passed directly at the commandline
208
                elif len(rollup) == 1 and rollup[0] is True:
×
209
                    warn(
×
210
                        _(
211
                            "No fields to rollup found (pass json path directly, as a list in a file, or via a schema)"
212
                        )
213
                    )
214
                else:
215
                    self.rollup = set(rollup)
×
216
            else:
217
                warn(
×
218
                    _(
219
                        "Invalid value passed for rollup (pass json path directly, as a list in a file, or via a schema)"
220
                    )
221
                )
222

223
        if self.xml:
10✔
224
            with codecs.open(json_filename, "rb") as xml_file:
×
225
                top_dict = xmltodict.parse(
×
226
                    xml_file,
227
                    force_list=(root_list_path,),
228
                    force_cdata=True,
229
                )
230
                # AFAICT, this should be true for *all* XML files
231
                assert len(top_dict) == 1
×
232
                root_json_dict = list(top_dict.values())[0]
×
233
                list_dict_consistency(root_json_dict)
×
234
            json_filename = None
×
235

236
        if json_filename is None and root_json_dict is None:
10✔
237
            raise ValueError(
×
238
                _("Either json_filename or root_json_dict must be supplied")
239
            )
240

241
        if json_filename is not None and root_json_dict is not None:
10✔
242
            raise ValueError(
×
243
                _("Only one of json_file or root_json_dict should be supplied")
244
            )
245

246
        if not json_filename:
10✔
247
            if self.root_list_path is None:
10✔
248
                self.root_json_list = root_json_dict
10✔
249
            else:
250
                self.root_json_list = path_search(
×
251
                    root_json_dict, self.root_list_path.split("/")
252
                )
253

254
        if preserve_fields:
10✔
255
            # Extract fields to be preserved from input file (one path per line)
256
            preserve_fields_all = []
×
257
            preserve_fields_input = []
×
258
            with open(preserve_fields) as preserve_fields_file:
×
259
                for line in preserve_fields_file:
×
260
                    line = line.strip()
×
261
                    path_fields = line.rsplit("/", 1)
×
262
                    preserve_fields_all = (
×
263
                        preserve_fields_all + path_fields + [line.rstrip("/")]
264
                    )
265
                    preserve_fields_input = preserve_fields_input + [line.rstrip("/")]
×
266

267
            self.preserve_fields = set(preserve_fields_all)
×
268
            self.preserve_fields_input = set(preserve_fields_input)
×
269

270
            try:
×
271
                input_not_in_schema = set()
×
272
                for field in self.preserve_fields_input:
×
273
                    if field not in self.schema_parser.flattened.keys():
×
274
                        input_not_in_schema.add(field)
×
275
                warn(
×
276
                    _(
277
                        "You wanted to preserve the following fields which are not present in the supplied schema: {}"
278
                    ).format(list(input_not_in_schema))
279
                )
280
            except AttributeError:
×
281
                # no schema
282
                pass
×
283
        else:
284
            self.preserve_fields = None
10✔
285
            self.preserve_fields_input = None
10✔
286

287
        if json_filename:
10✔
288
            if self.root_list_path is None:
10✔
289
                path = "item"
×
290
            else:
291
                path = root_list_path.replace("/", ".") + ".item"
10✔
292

293
            json_file = codecs.open(json_filename, encoding="utf-8")
10✔
294

295
            self.root_json_list = ijson.items(json_file, path, map_type=OrderedDict)
10✔
296

297
        try:
10✔
298
            self.parse()
10✔
299
        except ijson.common.IncompleteJSONError as err:
×
300
            raise BadlyFormedJSONError(*err.args)
×
301
        except UnicodeDecodeError as err:
×
302
            raise BadlyFormedJSONErrorUTF8(*err.args)
×
303
        finally:
304
            if json_filename:
10✔
305
                json_file.close()
10✔
306

307
    def parse(self):
10✔
308
        for num, json_dict in enumerate(self.root_json_list):
10✔
309
            if json_dict is None:
10✔
310
                # This is particularly useful for IATI XML, in order to not
311
                # fall over on empty activity, e.g. <iati-activity/>
312
                continue
×
313
            self.parse_json_dict(json_dict, sheet=self.main_sheet)
10✔
314
            # only persist every 2000 objects. peristing more often slows down storing.
315
            # 2000 top level objects normally not too much to store in memory.
316
            if num % 2000 == 0 and num != 0:
10✔
317
                transaction.commit()
×
318

319
        # This commit could be removed which would mean that upto 2000 objects
320
        # could be stored in memory without anything being persisted.
321
        transaction.commit()
10✔
322

323
        if self.remove_empty_schema_columns:
10✔
324
            # Remove sheets with no lines of data
325
            for sheet_name, sheet in list(self.sub_sheets.items()):
×
326
                if not sheet.lines:
×
327
                    del self.sub_sheets[sheet_name]
×
328

329
        if self.preserve_fields_input:
10✔
330
            nonexistent_input_paths = []
×
331
            for field in self.preserve_fields_input:
×
332
                if field not in self.seen_paths:
×
333
                    nonexistent_input_paths.append(field)
×
334
            if len(nonexistent_input_paths) > 0:
×
335
                warn(
×
336
                    _(
337
                        "You wanted to preserve the following fields which are not present in the input data: {}"
338
                    ).format(nonexistent_input_paths)
339
                )
340

341
    def parse_json_dict(
10✔
342
        self,
343
        json_dict,
344
        sheet,
345
        json_key=None,
346
        parent_name="",
347
        flattened_dict=None,
348
        parent_id_fields=None,
349
        top_level_of_sub_sheet=False,
350
    ):
351
        """
352
        Parse a json dictionary.
353

354
        json_dict - the json dictionary
355
        sheet - a sheet.Sheet object representing the resulting spreadsheet
356
        json_key - the key that maps to this JSON dict, either directly to the dict, or to a dict that this list contains.  Is None if this dict is contained in root_json_list directly.
357
        """
358
        # Possibly main_sheet should be main_sheet_columns, but this is
359
        # currently named for consistency with schema.py
360

361
        if self.use_titles:
10✔
362
            sheet_key = sheet_key_title
×
363
        else:
364
            sheet_key = sheet_key_field
10✔
365

366
        skip_type_and_coordinates = False
10✔
367
        if (
10✔
368
            self.convert_flags.get("wkt")
369
            and "type" in json_dict
370
            and "coordinates" in json_dict
371
            and SHAPELY_LIBRARY_AVAILABLE
372
        ):
373
            _sheet_key = sheet_key(sheet, parent_name.strip("/"))
10✔
374
            try:
10✔
375
                geom = shapely.geometry.shape(json_dict)
10✔
376
            except (shapely.errors.GeometryTypeError, TypeError, ValueError) as e:
10✔
377
                warn(_("Invalid GeoJSON: {parser_msg}").format(parser_msg=repr(e)))
10✔
378
                return
10✔
379
            flattened_dict[_sheet_key] = geom.wkt
10✔
380
            skip_type_and_coordinates = True
10✔
381

382
        parent_id_fields = copy.copy(parent_id_fields) or OrderedDict()
10✔
383
        if flattened_dict is None:
10✔
384
            flattened_dict = {}
10✔
385
            top = True
10✔
386
        else:
387
            top = False
10✔
388

389
        if parent_name == "" and self.filter_field and self.filter_value:
10✔
390
            if self.filter_field not in json_dict:
×
391
                return
×
392
            if json_dict[self.filter_field] != self.filter_value:
×
393
                return
×
394

395
        if top_level_of_sub_sheet:
10✔
396
            # Add the IDs for the top level of object in an array
397
            for k, v in parent_id_fields.items():
10✔
398
                if self.xml:
×
399
                    flattened_dict[sheet_key(sheet, k)] = v["#text"]
×
400
                else:
401
                    flattened_dict[sheet_key(sheet, k)] = v
×
402

403
        if self.root_id and self.root_id in json_dict:
10✔
404
            parent_id_fields[sheet_key(sheet, self.root_id)] = json_dict[self.root_id]
×
405

406
        if self.id_name in json_dict:
10✔
407
            parent_id_fields[sheet_key(sheet, parent_name + self.id_name)] = json_dict[
×
408
                self.id_name
409
            ]
410

411
        for key, value in json_dict.items():
10✔
412

413
            if skip_type_and_coordinates and key in ["type", "coordinates"]:
10✔
414
                continue
10✔
415

416
            # Keep a unique list of all the JSON paths in the data that have been seen.
417
            parent_path = parent_name.replace("/0", "")
10✔
418
            full_path = parent_path + key
10✔
419
            self.seen_paths.add(full_path)
10✔
420

421
            if self.preserve_fields:
10✔
422

423
                siblings = False
×
424
                for field in self.preserve_fields:
×
425
                    if parent_path in field:
×
426
                        siblings = True
×
427
                if siblings and full_path not in self.preserve_fields:
×
428
                    continue
×
429

430
            if type(value) in BASIC_TYPES:
10✔
431
                if self.xml and key == "#text":
10✔
432
                    # Handle the text output from xmltodict
433
                    key = ""
×
434
                    parent_name = parent_name.strip("/")
×
435
                flattened_dict[sheet_key(sheet, parent_name + key)] = value
10✔
436
            elif hasattr(value, "items"):
10✔
437
                self.parse_json_dict(
10✔
438
                    value,
439
                    sheet=sheet,
440
                    json_key=key,
441
                    parent_name=parent_name + key + "/",
442
                    flattened_dict=flattened_dict,
443
                    parent_id_fields=parent_id_fields,
444
                )
445
            elif hasattr(value, "__iter__"):
10✔
446
                if all(type(x) in BASIC_TYPES for x in value):
10✔
447
                    # Check for an array of BASIC types
448
                    # TODO Make this check the schema
449
                    # TODO Error if the any of the values contain the separator
450
                    flattened_dict[sheet_key(sheet, parent_name + key)] = ";".join(
10✔
451
                        map(str, value)
452
                    )
453
                # Arrays of arrays
454
                elif all(
10✔
455
                    l not in BASIC_TYPES
456
                    and not hasattr(l, "items")
457
                    and hasattr(l, "__iter__")
458
                    and all(type(x) in BASIC_TYPES for x in l)
459
                    for l in value
460
                ):
461
                    flattened_dict[sheet_key(sheet, parent_name + key)] = ";".join(
×
462
                        map(lambda l: ",".join(map(str, l)), value)
463
                    )
464
                else:
465
                    if (
10✔
466
                        self.rollup and parent_name == ""
467
                    ):  # Rollup only currently possible to main sheet
468

469
                        if self.use_titles and not self.schema_parser:
×
470
                            warn(
×
471
                                _(
472
                                    "Warning: No schema was provided so column headings are JSON keys, not titles."
473
                                )
474
                            )
475

476
                        if len(value) == 1:
×
477
                            for k, v in value[0].items():
×
478

479
                                if (
×
480
                                    self.preserve_fields
481
                                    and parent_name + key + "/" + k
482
                                    not in self.preserve_fields
483
                                ):
484
                                    continue
×
485

486
                                if type(v) not in BASIC_TYPES:
×
487
                                    raise ValueError(
×
488
                                        _("Rolled up values must be basic types")
489
                                    )
490
                                else:
491
                                    if self.schema_parser:
×
492
                                        # We want titles and there's a schema and rollUp is in it
493
                                        if (
×
494
                                            self.use_titles
495
                                            and parent_name + key + "/0/" + k
496
                                            in self.schema_parser.main_sheet.titles
497
                                        ):
498
                                            flattened_dict[
×
499
                                                sheet_key_title(
500
                                                    sheet, parent_name + key + "/0/" + k
501
                                                )
502
                                            ] = v
503

504
                                        # We want titles and there's a schema but rollUp isn't in it
505
                                        # so the titles for rollup properties aren't in the main sheet
506
                                        # so we need to try to get the titles from a subsheet
507
                                        elif (
×
508
                                            self.use_titles
509
                                            and parent_name + key in self.rollup
510
                                            and self.schema_parser.sub_sheet_titles.get(
511
                                                (
512
                                                    parent_name,
513
                                                    key,
514
                                                )
515
                                            )
516
                                            in self.schema_parser.sub_sheets
517
                                        ):
518
                                            relevant_subsheet = self.schema_parser.sub_sheets.get(
×
519
                                                self.schema_parser.sub_sheet_titles.get(
520
                                                    (
521
                                                        parent_name,
522
                                                        key,
523
                                                    )
524
                                                )
525
                                            )
526
                                            if relevant_subsheet is not None:
×
527
                                                rollup_field_title = sheet_key_title(
×
528
                                                    relevant_subsheet,
529
                                                    parent_name + key + "/0/" + k,
530
                                                )
531
                                                flattened_dict[
×
532
                                                    sheet_key(sheet, rollup_field_title)
533
                                                ] = v
534

535
                                        # We don't want titles even though there's a schema
536
                                        elif not self.use_titles and (
×
537
                                            parent_name + key + "/0/" + k
538
                                            in self.schema_parser.main_sheet
539
                                            or parent_name + key in self.rollup
540
                                        ):
541
                                            flattened_dict[
×
542
                                                sheet_key(
543
                                                    sheet, parent_name + key + "/0/" + k
544
                                                )
545
                                            ] = v
546

547
                                    # No schema, so no titles
548
                                    elif parent_name + key in self.rollup:
×
549
                                        flattened_dict[
×
550
                                            sheet_key(
551
                                                sheet, parent_name + key + "/0/" + k
552
                                            )
553
                                        ] = v
554

555
                        elif len(value) > 1:
×
556
                            for k in set(sum((list(x.keys()) for x in value), [])):
×
557

558
                                if (
×
559
                                    self.preserve_fields
560
                                    and parent_name + key + "/" + k
561
                                    not in self.preserve_fields
562
                                ):
563
                                    continue
×
564

565
                                if (
×
566
                                    self.schema_parser
567
                                    and parent_name + key + "/0/" + k
568
                                    in self.schema_parser.main_sheet
569
                                ):
570
                                    warn(
×
571
                                        _(
572
                                            'More than one value supplied for "{}". Could not provide rollup, so adding a warning to the relevant cell(s) in the spreadsheet.'
573
                                        ).format(parent_name + key)
574
                                    )
575
                                    flattened_dict[
×
576
                                        sheet_key(sheet, parent_name + key + "/0/" + k)
577
                                    ] = _(
578
                                        "WARNING: More than one value supplied, consult the relevant sub-sheet for the data."
579
                                    )
580
                                elif parent_name + key in self.rollup:
×
581
                                    warn(
×
582
                                        _(
583
                                            'More than one value supplied for "{}". Could not provide rollup, so adding a warning to the relevant cell(s) in the spreadsheet.'
584
                                        ).format(parent_name + key)
585
                                    )
586
                                    flattened_dict[
×
587
                                        sheet_key(sheet, parent_name + key + "/0/" + k)
588
                                    ] = _(
589
                                        "WARNING: More than one value supplied, consult the relevant sub-sheet for the data."
590
                                    )
591

592
                    if (
10✔
593
                        self.use_titles
594
                        and self.schema_parser
595
                        and (
596
                            parent_name,
597
                            key,
598
                        )
599
                        in self.schema_parser.sub_sheet_titles
600
                    ):
601
                        sub_sheet_name = self.schema_parser.sub_sheet_titles[
×
602
                            (
603
                                parent_name,
604
                                key,
605
                            )
606
                        ]
607
                    else:
608
                        sub_sheet_name = make_sub_sheet_name(
10✔
609
                            parent_name, key, truncation_length=self.truncation_length
610
                        )
611
                    if sub_sheet_name not in self.sub_sheets:
10✔
612
                        self.sub_sheets[sub_sheet_name] = PersistentSheet(
10✔
613
                            name=sub_sheet_name, connection=self.connection
614
                        )
615

616
                    for json_dict in value:
10✔
617
                        if json_dict is None:
10✔
618
                            continue
×
619
                        self.parse_json_dict(
10✔
620
                            json_dict,
621
                            sheet=self.sub_sheets[sub_sheet_name],
622
                            json_key=key,
623
                            parent_id_fields=parent_id_fields,
624
                            parent_name=parent_name + key + "/0/",
625
                            top_level_of_sub_sheet=True,
626
                        )
627
            else:
628
                raise ValueError(_("Unsupported type {}").format(type(value)))
×
629

630
        if top:
10✔
631
            sheet.append_line(flattened_dict)
10✔
632

633
    def __enter__(self):
10✔
634
        return self
10✔
635

636
    def __exit__(self, type, value, traceback):
10✔
637
        if self.persist:
10✔
638
            self.connection.close()
10✔
639
            self.db.close()
10✔
640
            os.remove(self.zodb_db_location)
10✔
641
            os.remove(self.zodb_db_location + ".lock")
10✔
642
            os.remove(self.zodb_db_location + ".index")
10✔
643
            os.remove(self.zodb_db_location + ".tmp")
10✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc