• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OpenDataServices / flatten-tool / 6507626273

13 Oct 2023 11:25AM UTC coverage: 42.006% (-53.7%) from 95.72%
6507626273

Pull #433

github

odscjames
New "Geo" optional dependencies

https://github.com/OpenDataServices/flatten-tool/issues/424
Pull Request #433: New "Geo" optional dependencies

38 of 38 new or added lines in 6 files covered. (100.0%)

1466 of 3490 relevant lines covered (42.01%)

4.16 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

43.47
/flattentool/input.py
1
"""
2
This file has classes describing input from spreadsheets.
3

4
"""
5

6
from __future__ import print_function, unicode_literals
10✔
7

8
import datetime
10✔
9
import os
10✔
10
from collections import OrderedDict, UserDict
10✔
11
from csv import DictReader
10✔
12
from csv import reader as csvreader
10✔
13
from decimal import Decimal, InvalidOperation
10✔
14
from warnings import warn
10✔
15

16
import openpyxl
10✔
17
import pytz
10✔
18

19
try:
10✔
20
    import geojson
10✔
21
    import shapely.wkt
10✔
22

23
    SHAPELY_AND_GEOJSON_LIBRARIES_AVAILABLE = True
10✔
24
except ImportError:
×
25
    SHAPELY_AND_GEOJSON_LIBRARIES_AVAILABLE = False
×
26

27
from openpyxl.utils.cell import _get_column_letter
10✔
28

29
from flattentool.exceptions import DataErrorWarning
10✔
30
from flattentool.i18n import _
10✔
31
from flattentool.lib import isint, parse_sheet_configuration
10✔
32
from flattentool.ODSReader import ODSReader
10✔
33

34
try:
10✔
35
    from zipfile import BadZipFile
10✔
36
except ImportError:
×
37
    from zipfile import BadZipfile as BadZipFile
×
38

39

40
class Cell:
10✔
41
    def __init__(self, cell_value, cell_location):
10✔
42
        self.cell_value = cell_value
10✔
43
        self.cell_location = cell_location
10✔
44
        self.sub_cells = []
10✔
45

46

47
def convert_type(type_string, value, timezone=pytz.timezone("UTC"), convert_flags={}):
10✔
48
    if value == "" or value is None:
10✔
49
        return None
×
50
    if type_string == "number":
10✔
51
        try:
×
52
            return Decimal(value)
×
53
        except (TypeError, ValueError, InvalidOperation):
×
54
            warn(
×
55
                _(
56
                    'Non-numeric value "{}" found in number column, returning as string instead.'
57
                ).format(value),
58
                DataErrorWarning,
59
            )
60
            return str(value)
×
61
    elif type_string == "integer":
10✔
62
        try:
×
63
            return int(value)
×
64
        except (TypeError, ValueError):
×
65
            warn(
×
66
                _(
67
                    'Non-integer value "{}" found in integer column, returning as string instead.'
68
                ).format(value),
69
                DataErrorWarning,
70
            )
71
            return str(value)
×
72
    elif type_string == "boolean":
10✔
73
        value = str(value)
×
74
        if value.lower() in ["true", "1"]:
×
75
            return True
×
76
        elif value.lower() in ["false", "0"]:
×
77
            return False
×
78
        else:
79
            warn(
×
80
                _(
81
                    'Unrecognised value for boolean: "{}", returning as string instead'
82
                ).format(value),
83
                DataErrorWarning,
84
            )
85
            return str(value)
×
86
    elif type_string in ("array", "array_array", "string_array", "number_array"):
10✔
87
        value = str(value)
10✔
88
        if type_string == "number_array":
10✔
89
            try:
10✔
90
                if "," in value:
10✔
91
                    return [
×
92
                        [Decimal(y) for y in x.split(",")] for x in value.split(";")
93
                    ]
94
                else:
95
                    return [Decimal(x) for x in value.split(";")]
10✔
96
            except (TypeError, ValueError, InvalidOperation):
×
97
                warn(
×
98
                    _(
99
                        'Non-numeric value "{}" found in number array column, returning as string array instead).'
100
                    ).format(value),
101
                    DataErrorWarning,
102
                )
103
        if "," in value:
×
104
            return [x.split(",") for x in value.split(";")]
×
105
        else:
106
            return value.split(";")
×
107
    elif type_string == "string":
10✔
108
        if type(value) == datetime.datetime:
10✔
109
            return timezone.localize(value).isoformat()
×
110
        return str(value)
10✔
111
    elif type_string == "date":
10✔
112
        if type(value) == datetime.datetime:
×
113
            return value.date().isoformat()
×
114
        return str(value)
×
115
    elif (
10✔
116
        convert_flags.get("wkt")
117
        and type_string == "geojson"
118
        and SHAPELY_AND_GEOJSON_LIBRARIES_AVAILABLE
119
    ):
120
        try:
10✔
121
            geom = shapely.wkt.loads(value)
10✔
122
        except shapely.errors.GEOSException as e:
10✔
123
            warn(
10✔
124
                _(
125
                    'An invalid WKT string was supplied "{value}", the message from the parser was: {parser_msg}'
126
                ).format(value=value, parser_msg=str(e)),
127
                DataErrorWarning,
128
            )
129
            return
10✔
130
        feature = geojson.Feature(geometry=geom, properties={})
10✔
131
        return feature.geometry
10✔
132
    elif type_string == "":
×
133
        if type(value) == datetime.datetime:
×
134
            return timezone.localize(value).isoformat()
×
135
        if type(value) == float and int(value) == value:
×
136
            return int(value)
×
137
        return value if type(value) in [int] else str(value)
×
138
    else:
139
        raise ValueError('Unrecognised type: "{}"'.format(type_string))
×
140

141

142
def warnings_for_ignored_columns(v, extra_message):
10✔
143
    if isinstance(v, Cell):
×
144
        warn("Column {} has been ignored, {}".format(v.cell_location[3], extra_message))
×
145
    elif isinstance(v, dict):
×
146
        for x in v.values():
×
147
            warnings_for_ignored_columns(x, extra_message)
×
148
    elif isinstance(v, TemporaryDict):
×
149
        for x in v.to_list():
×
150
            warnings_for_ignored_columns(x, extra_message)
×
151
    else:
152
        raise ValueError()
×
153

154

155
def merge(base, mergee, debug_info=None):
10✔
156
    if not debug_info:
×
157
        debug_info = {}
×
158
    for key, v in mergee.items():
×
159
        if isinstance(v, Cell):
×
160
            value = v.cell_value
×
161
        else:
162
            value = v
×
163
        if key in base:
×
164
            if isinstance(value, TemporaryDict):
×
165
                if not isinstance(base[key], TemporaryDict):
×
166
                    warnings_for_ignored_columns(
×
167
                        v,
168
                        _(
169
                            "because it treats {} as an array, but another column does not"
170
                        ).format(key),
171
                    )
172
                    continue
×
173
                for temporarydict_key, temporarydict_value in value.items():
×
174
                    if temporarydict_key in base[key]:
×
175
                        merge(
×
176
                            base[key][temporarydict_key],
177
                            temporarydict_value,
178
                            debug_info,
179
                        )
180
                    else:
181
                        assert temporarydict_key not in base[key], _(
×
182
                            "Overwriting cell {} by mistake"
183
                        ).format(temporarydict_value)
184
                        base[key][temporarydict_key] = temporarydict_value
×
185
                for temporarydict_value in value.items_no_keyfield:
×
186
                    base[key].items_no_keyfield.append(temporarydict_value)
×
187
            elif isinstance(value, dict):
×
188
                if isinstance(base[key], dict):
×
189
                    merge(base[key], value, debug_info)
×
190
                else:
191
                    warnings_for_ignored_columns(
×
192
                        v,
193
                        _(
194
                            "because it treats {} as an object, but another column does not"
195
                        ).format(key),
196
                    )
197
            else:
198
                if not isinstance(base[key], Cell):
×
199
                    id_info = '{} "{}"'.format(
×
200
                        debug_info.get("id_name"),
201
                        debug_info.get(debug_info.get("id_name")),
202
                    )
203
                    if debug_info.get("root_id"):
×
204
                        id_info = (
×
205
                            '{} "{}", '.format(
206
                                debug_info.get("root_id"),
207
                                debug_info.get("root_id_or_none"),
208
                            )
209
                            + id_info
210
                        )
211
                    warnings_for_ignored_columns(
×
212
                        v, _("because another column treats it as an array or object")
213
                    )
214
                    continue
×
215
                base_value = base[key].cell_value
×
216
                if base_value != value:
×
217
                    id_info = '{} "{}"'.format(
×
218
                        debug_info.get("id_name"),
219
                        debug_info.get(debug_info.get("id_name")),
220
                    )
221
                    if debug_info.get("root_id"):
×
222
                        id_info = (
×
223
                            '{} "{}", '.format(
224
                                debug_info.get("root_id"),
225
                                debug_info.get("root_id_or_none"),
226
                            )
227
                            + id_info
228
                        )
229
                    warn(
×
230
                        _(
231
                            'You may have a duplicate Identifier: We couldn\'t merge these rows with the {}: field "{}" in sheet "{}": one cell has the value: "{}", the other cell has the value: "{}"'
232
                        ).format(
233
                            id_info,
234
                            key,
235
                            debug_info.get("sheet_name"),
236
                            base_value,
237
                            value,
238
                        ),
239
                        DataErrorWarning,
240
                    )
241
                else:
242
                    base[key].sub_cells.append(v)
×
243
        else:
244
            # This happens when a parent record finds the first a child record of a known type
245
            base[key] = v
×
246

247

248
class SpreadsheetInput(object):
10✔
249
    """
250
    Base class describing a spreadsheet input. Has stubs which are
251
    implemented via inheritance for particular types of spreadsheet (e.g. xlsx
252
    or csv).
253

254
    """
255

256
    def convert_dict_titles(self, dicts, title_lookup=None):
10✔
257
        """
258
        Replace titles with field names in the given list of dictionaries
259
        (``dicts``) using the titles lookup in the schema parser.
260

261
        """
262
        if self.parser:
10✔
263
            title_lookup = self.parser.title_lookup
10✔
264
        for d in dicts:
10✔
265
            if title_lookup:
10✔
266
                yield OrderedDict(
10✔
267
                    [(title_lookup.lookup_header(k), v) for k, v in d.items()]
268
                )
269
            else:
270
                yield d
×
271

272
    def __init__(
10✔
273
        self,
274
        input_name="",
275
        root_list_path="main",
276
        root_is_list=False,
277
        timezone_name="UTC",
278
        root_id="ocid",
279
        convert_titles=False,
280
        vertical_orientation=False,
281
        include_sheets=[],
282
        exclude_sheets=[],
283
        id_name="id",
284
        xml=False,
285
        base_configuration={},
286
        use_configuration=True,
287
        convert_flags={},
288
    ):
289
        self.input_name = input_name
10✔
290
        self.root_list_path = root_list_path
10✔
291
        self.root_is_list = root_is_list
10✔
292
        self.sub_sheet_names = []
10✔
293
        self.timezone = pytz.timezone(timezone_name)
10✔
294
        self.root_id = root_id
10✔
295
        self.convert_titles = convert_titles
10✔
296
        self.id_name = id_name
10✔
297
        self.xml = xml
10✔
298
        self.parser = None
10✔
299
        self.vertical_orientation = vertical_orientation
10✔
300
        self.include_sheets = include_sheets
10✔
301
        self.exclude_sheets = exclude_sheets
10✔
302
        self.base_configuration = base_configuration or {}
10✔
303
        self.sheet_configuration = {}
10✔
304
        self.use_configuration = use_configuration
10✔
305
        self.convert_flags = convert_flags
10✔
306

307
    def get_sub_sheets_lines(self):
10✔
308
        for sub_sheet_name in self.sub_sheet_names:
10✔
309
            if self.convert_titles:
10✔
310
                yield sub_sheet_name, self.convert_dict_titles(
10✔
311
                    self.get_sheet_lines(sub_sheet_name),
312
                    self.parser.sub_sheets[sub_sheet_name].title_lookup
313
                    if sub_sheet_name in self.parser.sub_sheets
314
                    else None,
315
                )
316
            else:
317
                yield sub_sheet_name, self.get_sheet_lines(sub_sheet_name)
10✔
318

319
    def configure_sheets(self):
10✔
320
        for sub_sheet_name in self.sub_sheet_names:
10✔
321
            self.sheet_configuration[sub_sheet_name] = parse_sheet_configuration(
10✔
322
                self.get_sheet_configuration(sub_sheet_name)
323
            )
324

325
    def get_sheet_configuration(self, sheet_name):
10✔
326
        return []
×
327

328
    def get_sheet_lines(self, sheet_name):
10✔
329
        raise NotImplementedError
×
330

331
    def get_sheet_headings(self, sheet_name):
10✔
332
        raise NotImplementedError
10✔
333

334
    def read_sheets(self):
10✔
335
        raise NotImplementedError
×
336

337
    def do_unflatten(self):
10✔
338
        main_sheet_by_ocid = OrderedDict()
10✔
339
        sheets = list(self.get_sub_sheets_lines())
10✔
340
        for i, sheet in enumerate(sheets):
10✔
341
            sheet_name, lines = sheet
10✔
342
            try:
10✔
343
                actual_headings = self.get_sheet_headings(sheet_name)
10✔
344
                # If sheet is empty or too many lines have been skipped
345
                if not actual_headings:
10✔
346
                    continue
×
347
                found = OrderedDict()
10✔
348
                last_col = len(actual_headings)
10✔
349
                # We want to ignore data in earlier columns, so we look
350
                # through the data backwards
351
                for i, actual_heading in enumerate(reversed(actual_headings)):
10✔
352
                    if actual_heading is None:
10✔
353
                        continue
×
354
                    if actual_heading in found:
10✔
355
                        found[actual_heading].append((last_col - i) - 1)
×
356
                    else:
357
                        found[actual_heading] = [i]
10✔
358
                for actual_heading in reversed(found):
10✔
359
                    if len(found[actual_heading]) > 1:
10✔
360
                        keeping = found[actual_heading][0]  # noqa
×
361
                        ignoring = found[actual_heading][1:]
×
362
                        ignoring.reverse()
×
363
                        if len(ignoring) >= 3:
×
364
                            warn(
×
365
                                (
366
                                    _(
367
                                        'Duplicate heading "{}" found, ignoring '
368
                                        'the data in columns {} and {} (sheet: "{}").'
369
                                    )
370
                                ).format(
371
                                    actual_heading,
372
                                    ", ".join(
373
                                        [
374
                                            _get_column_letter(x + 1)
375
                                            for x in ignoring[:-1]
376
                                        ]
377
                                    ),
378
                                    _get_column_letter(ignoring[-1] + 1),
379
                                    sheet_name,
380
                                ),
381
                                DataErrorWarning,
382
                            )
383
                        elif len(found[actual_heading]) == 3:
×
384
                            warn(
×
385
                                (
386
                                    _(
387
                                        'Duplicate heading "{}" found, ignoring '
388
                                        'the data in columns {} and {} (sheet: "{}").'
389
                                    )
390
                                ).format(
391
                                    actual_heading,
392
                                    _get_column_letter(ignoring[0] + 1),
393
                                    _get_column_letter(ignoring[1] + 1),
394
                                    sheet_name,
395
                                ),
396
                                DataErrorWarning,
397
                            )
398
                        else:
399
                            warn(
×
400
                                (
401
                                    _(
402
                                        'Duplicate heading "{}" found, ignoring '
403
                                        'the data in column {} (sheet: "{}").'
404
                                    )
405
                                ).format(
406
                                    actual_heading,
407
                                    _get_column_letter(ignoring[0] + 1),
408
                                    sheet_name,
409
                                ),
410
                                DataErrorWarning,
411
                            )
412
            except NotImplementedError:
10✔
413
                # The ListInput type used in the tests doesn't support getting headings.
414
                actual_headings = None
10✔
415
            for j, line in enumerate(lines):
10✔
416
                if all(x is None or x == "" for x in line.values()):
10✔
417
                    # if all(x == '' for x in line.values()):
418
                    continue
×
419
                root_id_or_none = line.get(self.root_id) if self.root_id else None
10✔
420
                cells = OrderedDict()
10✔
421
                for k, header in enumerate(line):
10✔
422
                    heading = actual_headings[k] if actual_headings else header
10✔
423
                    if self.vertical_orientation:
10✔
424
                        # This is misleading as it specifies the row number as the distance vertically
425
                        # and the horizontal 'letter' as a number.
426
                        # https://github.com/OpenDataServices/flatten-tool/issues/153
427
                        cells[header] = Cell(
×
428
                            line[header], (sheet_name, str(k + 1), j + 2, heading)
429
                        )
430
                    else:
431
                        cells[header] = Cell(
10✔
432
                            line[header],
433
                            (sheet_name, _get_column_letter(k + 1), j + 2, heading),
434
                        )
435
                unflattened = unflatten_main_with_parser(
10✔
436
                    self.parser,
437
                    cells,
438
                    self.timezone,
439
                    self.xml,
440
                    self.id_name,
441
                    self.convert_flags,
442
                )
443
                if root_id_or_none not in main_sheet_by_ocid:
10✔
444
                    main_sheet_by_ocid[root_id_or_none] = TemporaryDict(
10✔
445
                        self.id_name, xml=self.xml
446
                    )
447

448
                def inthere(unflattened, id_name):
10✔
449
                    if self.xml and not isinstance(unflattened.get(self.id_name), Cell):
×
450
                        # For an XML tag
451
                        return unflattened[id_name]["text()"].cell_value
×
452
                    else:
453
                        # For a JSON, or an XML attribute
454
                        return unflattened[id_name].cell_value
×
455

456
                if (
10✔
457
                    self.id_name in unflattened
458
                    and inthere(unflattened, self.id_name)
459
                    in main_sheet_by_ocid[root_id_or_none]
460
                ):
461
                    if self.xml and not isinstance(unflattened.get(self.id_name), Cell):
×
462
                        unflattened_id = unflattened.get(self.id_name)[
×
463
                            "text()"
464
                        ].cell_value
465
                    else:
466
                        unflattened_id = unflattened.get(self.id_name).cell_value
×
467
                    merge(
×
468
                        main_sheet_by_ocid[root_id_or_none][unflattened_id],
469
                        unflattened,
470
                        {
471
                            "sheet_name": sheet_name,
472
                            "root_id": self.root_id,
473
                            "root_id_or_none": root_id_or_none,
474
                            "id_name": self.id_name,
475
                            self.id_name: unflattened_id,
476
                        },
477
                    )
478
                else:
479
                    main_sheet_by_ocid[root_id_or_none].append(unflattened)
10✔
480
        temporarydicts_to_lists(main_sheet_by_ocid)
10✔
481
        return sum(main_sheet_by_ocid.values(), [])
10✔
482

483
    def unflatten(self):
10✔
484
        result = self.do_unflatten()
10✔
485
        result = extract_list_to_value(result)
10✔
486
        return result
10✔
487

488
    def fancy_unflatten(self, with_cell_source_map, with_heading_source_map):
10✔
489
        cell_tree = self.do_unflatten()
10✔
490
        result = extract_list_to_value(cell_tree)
10✔
491
        ordered_cell_source_map = None
10✔
492
        heading_source_map = None
10✔
493
        if with_cell_source_map or with_heading_source_map:
10✔
494
            cell_source_map = extract_list_to_error_path(
×
495
                [] if self.root_is_list else [self.root_list_path], cell_tree
496
            )
497
            ordered_items = sorted(cell_source_map.items())
×
498
            row_source_map = OrderedDict()
×
499
            heading_source_map = OrderedDict()
×
500
            for path, _unused in ordered_items:
×
501
                cells = cell_source_map[path]
×
502
                # Prepare row_source_map key
503
                key = "/".join(str(x) for x in path[:-1])
×
504
                if not key in row_source_map:
×
505
                    row_source_map[key] = []
×
506
                if with_heading_source_map:
×
507
                    # Prepare header_source_map key
508
                    header_path_parts = []
×
509
                    for x in path:
×
510
                        try:
×
511
                            int(x)
×
512
                        except:
×
513
                            header_path_parts.append(x)
×
514
                    header_path = "/".join(header_path_parts)
×
515
                    if header_path not in heading_source_map:
×
516
                        heading_source_map[header_path] = []
×
517
                # Populate the row and header source maps
518
                for cell in cells:
×
519
                    sheet, col, row, header = cell
×
520
                    if (sheet, row) not in row_source_map[key]:
×
521
                        row_source_map[key].append((sheet, row))
×
522
                    if with_heading_source_map:
×
523
                        if (sheet, header) not in heading_source_map[header_path]:
×
524
                            heading_source_map[header_path].append((sheet, header))
×
525
        if with_cell_source_map:
10✔
526
            ordered_cell_source_map = OrderedDict(
×
527
                ("/".join(str(x) for x in path), location)
528
                for path, location in ordered_items
529
            )
530
            for key in row_source_map:
×
531
                assert key not in ordered_cell_source_map, _(
×
532
                    "Row/cell collision: {}"
533
                ).format(key)
534
                ordered_cell_source_map[key] = row_source_map[key]
×
535
        return result, ordered_cell_source_map, heading_source_map
10✔
536

537

538
def extract_list_to_error_path(path, input):
10✔
539
    output = {}
×
540
    for i, item in enumerate(input):
×
541
        res = extract_dict_to_error_path(path + [i], item)
×
542
        for p in res:
×
543
            assert p not in output, _("Already have key {}").format(p)
×
544
            output[p] = res[p]
×
545
    return output
×
546

547

548
def extract_dict_to_error_path(path, input):
10✔
549
    output = {}
×
550
    for k in input:
×
551
        if isinstance(input[k], list):
×
552
            res = extract_list_to_error_path(path + [k], input[k])
×
553
            for p in res:
×
554
                assert p not in output, _("Already have key {}").format(p)
×
555
                output[p] = res[p]
×
556
        elif isinstance(input[k], dict):
×
557
            res = extract_dict_to_error_path(path + [k], input[k])
×
558
            for p in res:
×
559
                assert p not in output, _("Already have key {}").format(p)
×
560
                output[p] = res[p]
×
561
        elif isinstance(input[k], Cell):
×
562
            p = tuple(path + [k])
×
563
            assert p not in output, _("Already have key {}").format(p)
×
564
            output[p] = [input[k].cell_location]
×
565
            for sub_cell in input[k].sub_cells:
×
566
                assert sub_cell.cell_value == input[k].cell_value, _(
×
567
                    "Two sub-cells have different values: {}, {}"
568
                ).format(input[k].cell_value, sub_cell.cell_value)
569
                output[p].append(sub_cell.cell_location)
×
570
        else:
571
            raise Exception(
×
572
                _("Unexpected result type in the JSON cell tree: {}").format(input[k])
573
            )
574
    return output
×
575

576

577
def extract_list_to_value(input):
10✔
578
    output = []
10✔
579
    for item in input:
10✔
580
        output.append(extract_dict_to_value(item))
10✔
581
    return output
10✔
582

583

584
def extract_dict_to_value(input):
10✔
585
    output = OrderedDict()
10✔
586
    for k in input:
10✔
587
        if isinstance(input[k], list):
10✔
588
            output[k] = extract_list_to_value(input[k])
×
589
        elif isinstance(input[k], dict):
10✔
590
            output[k] = extract_dict_to_value(input[k])
10✔
591
        elif isinstance(input[k], Cell):
10✔
592
            output[k] = input[k].cell_value
10✔
593
        else:
594
            raise Exception(
×
595
                _("Unexpected result type in the JSON cell tree: {}").format(input[k])
596
            )
597
    return output
10✔
598

599

600
class CSVInput(SpreadsheetInput):
10✔
601
    encoding = "utf-8"
10✔
602

603
    def get_sheet_headings(self, sheet_name):
10✔
604
        sheet_configuration = self.sheet_configuration[self.sheet_names_map[sheet_name]]
10✔
605
        configuration_line = 1 if sheet_configuration else 0
10✔
606
        if not sheet_configuration:
10✔
607
            sheet_configuration = self.base_configuration
10✔
608
        if not self.use_configuration:
10✔
609
            sheet_configuration = {}
×
610
        skip_rows = sheet_configuration.get("skipRows", 0)
10✔
611
        if sheet_configuration.get("ignore"):
10✔
612
            # returning empty headers is a proxy for no data in the sheet.
613
            return []
×
614

615
        with open(
10✔
616
            os.path.join(self.input_name, sheet_name + ".csv"), encoding=self.encoding
617
        ) as main_sheet_file:
618
            r = csvreader(main_sheet_file)
10✔
619
            for num, row in enumerate(r):
10✔
620
                if num == (skip_rows + configuration_line):
10✔
621
                    return row
10✔
622

623
    def read_sheets(self):
10✔
624
        sheet_file_names = os.listdir(self.input_name)
10✔
625
        sheet_names = sorted(
10✔
626
            [fname[:-4] for fname in sheet_file_names if fname.endswith(".csv")]
627
        )
628
        if self.include_sheets:
10✔
629
            for sheet in list(sheet_names):
×
630
                if sheet not in self.include_sheets:
×
631
                    sheet_names.remove(sheet)
×
632
        for sheet in list(self.exclude_sheets) or []:
10✔
633
            try:
10✔
634
                sheet_names.remove(sheet)
10✔
635
            except ValueError:
10✔
636
                pass
10✔
637
        self.sub_sheet_names = sheet_names
10✔
638
        self.sheet_names_map = OrderedDict(
10✔
639
            (sheet_name, sheet_name) for sheet_name in sheet_names
640
        )
641
        self.configure_sheets()
10✔
642

643
    def generate_rows(self, dictreader, sheet_name):
10✔
644
        sheet_configuration = self.sheet_configuration[self.sheet_names_map[sheet_name]]
10✔
645
        configuration_line = 1 if sheet_configuration else 0
10✔
646
        if not sheet_configuration:
10✔
647
            sheet_configuration = self.base_configuration
10✔
648
        if not self.use_configuration:
10✔
649
            sheet_configuration = {}
×
650

651
        skip_rows = sheet_configuration.get("skipRows", 0)
10✔
652
        header_rows = sheet_configuration.get("headerRows", 1)
10✔
653
        for i in range(0, configuration_line + skip_rows):
10✔
654
            previous_row = next(dictreader.reader)  # noqa
×
655
        fieldnames = dictreader.fieldnames
10✔
656
        for i in range(0, header_rows - 1):
10✔
657
            next(dictreader.reader)
×
658
        for line in dictreader:
10✔
659
            yield OrderedDict((fieldname, line[fieldname]) for fieldname in fieldnames)
10✔
660

661
    def get_sheet_configuration(self, sheet_name):
10✔
662
        with open(
10✔
663
            os.path.join(self.input_name, sheet_name + ".csv"), encoding=self.encoding
664
        ) as main_sheet_file:
665
            r = csvreader(main_sheet_file)
10✔
666
            heading_row = next(r)
10✔
667
        if len(heading_row) > 0 and heading_row[0] == "#":
10✔
668
            return heading_row[1:]
×
669
        return []
10✔
670

671
    def get_sheet_lines(self, sheet_name):
10✔
672
        # Pass the encoding to the open function
673
        with open(
10✔
674
            os.path.join(self.input_name, sheet_name + ".csv"), encoding=self.encoding
675
        ) as main_sheet_file:
676
            dictreader = DictReader(main_sheet_file)
10✔
677
            for row in self.generate_rows(dictreader, sheet_name):
10✔
678
                yield row
10✔
679

680

681
class BadXLSXZipFile(BadZipFile):
10✔
682
    pass
10✔
683

684

685
class XLSXInput(SpreadsheetInput):
10✔
686
    def read_sheets(self):
10✔
687
        try:
×
688
            self.workbook = openpyxl.load_workbook(self.input_name, data_only=True)
×
689
        except BadZipFile as e:  # noqa
×
690
            # TODO when we have python3 only add 'from e' to show exception chain
691
            raise BadXLSXZipFile(
×
692
                _("The supplied file has extension .xlsx but isn't an XLSX file.")
693
            )
694

695
        self.sheet_names_map = OrderedDict(
×
696
            (sheet_name, sheet_name) for sheet_name in self.workbook.sheetnames
697
        )
698
        if self.include_sheets:
×
699
            for sheet in list(self.sheet_names_map):
×
700
                if sheet not in self.include_sheets:
×
701
                    self.sheet_names_map.pop(sheet)
×
702
        for sheet in self.exclude_sheets or []:
×
703
            self.sheet_names_map.pop(sheet, None)
×
704

705
        sheet_names = list(sheet for sheet in self.sheet_names_map.keys())
×
706
        self.sub_sheet_names = sheet_names
×
707
        self.configure_sheets()
×
708

709
    def get_sheet_headings(self, sheet_name):
10✔
710
        worksheet = self.workbook[self.sheet_names_map[sheet_name]]
×
711
        sheet_configuration = self.sheet_configuration[self.sheet_names_map[sheet_name]]
×
712
        configuration_line = 1 if sheet_configuration else 0
×
713
        if not sheet_configuration:
×
714
            sheet_configuration = self.base_configuration
×
715
        if not self.use_configuration:
×
716
            sheet_configuration = {}
×
717

718
        skip_rows = sheet_configuration.get("skipRows", 0)
×
719
        if sheet_configuration.get("ignore") or (
×
720
            sheet_configuration.get("hashcomments") and sheet_name.startswith("#")
721
        ):
722
            # returning empty headers is a proxy for no data in the sheet.
723
            return []
×
724

725
        if self.vertical_orientation:
×
726
            return [
×
727
                cell.value
728
                for cell in worksheet[_get_column_letter(skip_rows + 1)][
729
                    configuration_line:
730
                ]
731
            ]
732

733
        try:
×
734
            return [
×
735
                cell.value for cell in worksheet[skip_rows + configuration_line + 1]
736
            ]
737
        except IndexError:
×
738
            # If the heading line is after data in the spreadsheet. i.e when skipRows
739
            return []
×
740

741
    def get_sheet_configuration(self, sheet_name):
10✔
742
        worksheet = self.workbook[self.sheet_names_map[sheet_name]]
×
743
        if worksheet["A1"].value == "#":
×
744
            return [
×
745
                cell.value
746
                for num, cell in enumerate(worksheet[1])
747
                if num != 0 and cell.value
748
            ]
749
        else:
750
            return []
×
751

752
    def get_sheet_lines(self, sheet_name):
10✔
753
        sheet_configuration = self.sheet_configuration[self.sheet_names_map[sheet_name]]
×
754
        configuration_line = 1 if sheet_configuration else 0
×
755
        if not sheet_configuration:
×
756
            sheet_configuration = self.base_configuration
×
757
        if not self.use_configuration:
×
758
            sheet_configuration = {}
×
759

760
        skip_rows = sheet_configuration.get("skipRows", 0)
×
761
        header_rows = sheet_configuration.get("headerRows", 1)
×
762

763
        worksheet = self.workbook[self.sheet_names_map[sheet_name]]
×
764
        if self.vertical_orientation:
×
765
            header_row = worksheet[_get_column_letter(skip_rows + 1)]
×
766
            remaining_rows = worksheet.iter_cols(min_col=skip_rows + header_rows + 1)
×
767
            if configuration_line:
×
768
                header_row = header_row[1:]
×
769
                remaining_rows = worksheet.iter_cols(
×
770
                    min_col=skip_rows + header_rows + 1, min_row=2
771
                )
772
        else:
773
            header_row = worksheet[skip_rows + configuration_line + 1]
×
774
            remaining_rows = worksheet.iter_rows(
×
775
                min_row=skip_rows + configuration_line + header_rows + 1
776
            )
777

778
        coli_to_header = {}
×
779
        for i, header in enumerate(header_row):
×
780
            coli_to_header[i] = header.value
×
781

782
        for row in remaining_rows:
×
783
            output_row = OrderedDict()
×
784
            for i, x in enumerate(row):
×
785
                header = coli_to_header[i]
×
786
                value = x.value
×
787
                if not header:
×
788
                    # None means that the cell will be ignored
789
                    value = None
×
790
                elif (
×
791
                    sheet_configuration.get("hashcomments")
792
                    and isinstance(header, str)
793
                    and header.startswith("#")
794
                ):
795
                    # None means that the cell will be ignored
796
                    value = None
×
797
                output_row[header] = value
×
798
            yield output_row
×
799

800

801
class ODSInput(SpreadsheetInput):
10✔
802
    def read_sheets(self):
10✔
803
        self.workbook = ODSReader(self.input_name)
×
804
        self.sheet_names_map = self.workbook.SHEETS
×
805

806
        if self.include_sheets:
×
807
            for sheet in list(self.sheet_names_map):
×
808
                if sheet not in self.include_sheets:
×
809
                    self.sheet_names_map.pop(sheet)
×
810

811
        for sheet in self.exclude_sheets or []:
×
812
            self.sheet_names_map.pop(sheet, None)
×
813

814
        self.sub_sheet_names = self.sheet_names_map.keys()
×
815
        self.configure_sheets()
×
816

817
    def _resolve_sheet_configuration(self, sheet_name):
10✔
818
        sheet_configuration = self.sheet_configuration[sheet_name]
×
819
        if not self.use_configuration:
×
820
            return {"unused_config_line": True} if sheet_configuration else {}
×
821
        if not sheet_configuration:
×
822
            sheet_configuration = self.base_configuration
×
823
            sheet_configuration["base_configuration"] = True
×
824

825
        return sheet_configuration
×
826

827
    def get_sheet_headings(self, sheet_name):
10✔
828
        worksheet = self.sheet_names_map[sheet_name]
×
829

830
        sheet_configuration = self._resolve_sheet_configuration(sheet_name)
×
831
        configuration_line = (
×
832
            1
833
            if sheet_configuration and "base_configuration" not in sheet_configuration
834
            else 0
835
        )
836

837
        skip_rows = sheet_configuration.get("skipRows", 0)
×
838
        if sheet_configuration.get("ignore") or (
×
839
            sheet_configuration.get("hashcomments") and sheet_name.startswith("#")
840
        ):
841
            # returning empty headers is a proxy for no data in the sheet.
842
            return []
×
843

844
        if self.vertical_orientation:
×
845
            return [
×
846
                row[skip_rows]
847
                for row in worksheet[configuration_line:]
848
                if len(row) > skip_rows
849
            ]
850

851
        try:
×
852
            return [cell for cell in worksheet[skip_rows + configuration_line]]
×
853
        except IndexError:
×
854
            # If the heading line is after data in the spreadsheet. i.e when skipRows
855
            return []
×
856

857
    def get_sheet_configuration(self, sheet_name):
10✔
858
        # See if there are config properties in the spreadsheet
859
        # https://flatten-tool.readthedocs.io/en/latest/unflatten/#configuration-properties-skip-and-header-rows
860
        worksheet = self.sheet_names_map[sheet_name]
×
861

862
        try:
×
863
            # cell A1
864
            if worksheet[0][0] == "#":
×
865
                return worksheet[0]
×
866

867
        except IndexError:
×
868
            pass
×
869

870
        return []
×
871

872
    def get_sheet_lines(self, sheet_name):
10✔
873
        # This generator should yield an ordered dict in the format
874
        # see examples/simple/
875
        # yield OrderedDict([('a/b', '1'), ('a/c', '2'), ('d', '3')])
876
        # yield OrderedDict([('a/b', '4'), ('a/c', '5'), ('d', '6')])
877

878
        sheet_configuration = self._resolve_sheet_configuration(sheet_name)
×
879
        configuration_line = (
×
880
            1
881
            if sheet_configuration and "base_configuration" not in sheet_configuration
882
            else 0
883
        )
884

885
        skip_rows = sheet_configuration.get("skipRows", 0)
×
886
        header_rows = sheet_configuration.get("headerRows", 1)
×
887

888
        worksheet = self.sheet_names_map[sheet_name]
×
889
        if self.vertical_orientation:
×
890
            header_row = [
×
891
                row[skip_rows]
892
                for row in worksheet[configuration_line:]
893
                if len(row) > skip_rows
894
            ]
895
            longest_horizontal_row = max(
×
896
                len(row) for row in worksheet[configuration_line:]
897
            )
898
            remaining_rows = [
×
899
                [
900
                    row[i] if len(row) > i else None
901
                    for row in worksheet[configuration_line:]
902
                    if row
903
                ]
904
                for i in range(1, longest_horizontal_row)
905
            ]
906
        else:
907
            header_row = worksheet[skip_rows + configuration_line]
×
908
            remaining_rows = worksheet[(skip_rows + configuration_line + header_rows) :]
×
909

910
        coli_to_header = {}
×
911
        for i, header in enumerate(header_row):
×
912
            coli_to_header[i] = header
×
913

914
        for row in remaining_rows:
×
915
            output_row = OrderedDict()
×
916
            for i, x in enumerate(row):
×
917

918
                try:
×
919
                    header = coli_to_header[i]
×
920
                except KeyError:
×
921
                    continue
×
922
                value = x
×
923
                if not header:
×
924
                    # None means that the cell will be ignored
925
                    value = None
×
926
                elif sheet_configuration.get("hashcomments") and header.startswith("#"):
×
927
                    # None means that the cell will be ignored
928
                    value = None
×
929
                output_row[header] = value
×
930
            if output_row:
×
931
                if not all(value is None for value in output_row.values()):
×
932
                    yield output_row
×
933

934

935
FORMATS = {"xlsx": XLSXInput, "csv": CSVInput, "ods": ODSInput}
10✔
936

937

938
class ListAsDict(dict):
10✔
939
    pass
10✔
940

941

942
def list_as_dicts_to_temporary_dicts(unflattened, id_name, xml):
10✔
943
    for key, value in list(unflattened.items()):
10✔
944
        if isinstance(value, Cell):
10✔
945
            continue
10✔
946
        if hasattr(value, "items"):
10✔
947
            if not value:
10✔
948
                unflattened.pop(key)
×
949
            list_as_dicts_to_temporary_dicts(value, id_name, xml)
10✔
950
        if isinstance(value, ListAsDict):
10✔
951
            temporarydict = TemporaryDict(id_name, xml=xml)
×
952
            for index in sorted(value.keys()):
×
953
                temporarydict.append(value[index])
×
954
            unflattened[key] = temporarydict
×
955
    return unflattened
10✔
956

957

958
def unflatten_main_with_parser(parser, line, timezone, xml, id_name, convert_flags={}):
10✔
959
    unflattened = OrderedDict()
10✔
960
    for path, cell in line.items():
10✔
961
        # Skip blank cells
962
        if cell.cell_value is None or cell.cell_value == "":
10✔
963
            continue
×
964
        current_path = unflattened
10✔
965
        path_list = [item.rstrip("[]") for item in str(path).split("/")]
10✔
966
        for num, path_item in enumerate(path_list):
10✔
967
            if isint(path_item):
10✔
968
                if num == 0:
×
969
                    warn(
×
970
                        _(
971
                            'Column "{}" has been ignored because it is a number.'
972
                        ).format(path),
973
                        DataErrorWarning,
974
                    )
975
                continue
×
976
            current_type = None
10✔
977
            path_till_now = "/".join(
10✔
978
                [item for item in path_list[: num + 1] if not isint(item)]
979
            )
980
            if parser:
10✔
981
                current_type = parser.flattened.get(path_till_now)
10✔
982
            try:
10✔
983
                next_path_item = path_list[num + 1]
10✔
984
            except IndexError:
10✔
985
                next_path_item = ""
10✔
986

987
            # Quick solution to avoid casting of date as datetime in spreadsheet > xml
988
            if xml:
10✔
989
                if type(cell.cell_value) == datetime.datetime and not next_path_item:
×
990
                    if "datetime" not in str(path):
×
991
                        current_type = "date"
×
992

993
            ## Array
994
            list_index = -1
10✔
995
            if isint(next_path_item):
10✔
996
                if current_type and current_type != "array":
×
997
                    raise ValueError(
×
998
                        _(
999
                            "There is an array at '{}' when the schema says there should be a '{}'"
1000
                        ).format(path_till_now, current_type)
1001
                    )
1002
                list_index = int(next_path_item)
×
1003
                current_type = "array"
×
1004

1005
            if current_type == "array":
10✔
1006
                list_as_dict = current_path.get(path_item)
×
1007
                if list_as_dict is None:
×
1008
                    list_as_dict = ListAsDict()
×
1009
                    current_path[path_item] = list_as_dict
×
1010
                elif type(list_as_dict) is not ListAsDict:
×
1011
                    warn(
×
1012
                        _(
1013
                            "Column {} has been ignored, because it treats {} as an array, but another column does not."
1014
                        ).format(path, path_till_now),
1015
                        DataErrorWarning,
1016
                    )
1017
                    break
×
1018
                new_path = list_as_dict.get(list_index)
×
1019
                if new_path is None:
×
1020
                    new_path = OrderedDict()
×
1021
                    list_as_dict[list_index] = new_path
×
1022
                current_path = new_path
×
1023
                if not xml or num < len(path_list) - 2:
×
1024
                    # In xml "arrays" can have text values, if they're the final element
1025
                    # This corresponds to a tag with text, but also possibly attributes
1026
                    continue
×
1027

1028
            ## Object
1029
            if current_type == "object" or (not current_type and next_path_item):
10✔
1030
                new_path = current_path.get(path_item)
10✔
1031
                if new_path is None:
10✔
1032
                    new_path = OrderedDict()
10✔
1033
                    current_path[path_item] = new_path
10✔
1034
                elif type(new_path) is ListAsDict or not hasattr(new_path, "items"):
10✔
1035
                    warn(
×
1036
                        _(
1037
                            "Column {} has been ignored, because it treats {} as an object, but another column does not."
1038
                        ).format(path, path_till_now),
1039
                        DataErrorWarning,
1040
                    )
1041
                    break
×
1042
                current_path = new_path
10✔
1043
                continue
10✔
1044
            if (
10✔
1045
                current_type
1046
                and current_type not in ["object", "array"]
1047
                and next_path_item
1048
            ):
1049
                raise ValueError(
×
1050
                    _(
1051
                        "There is an object or list at '{}' but it should be an {}"
1052
                    ).format(path_till_now, current_type)
1053
                )
1054

1055
            ## Other Types
1056
            current_path_value = current_path.get(path_item)
10✔
1057
            if not xml and (
10✔
1058
                type(current_path_value) is ListAsDict
1059
                or hasattr(current_path_value, "items")
1060
            ):
1061
                #   ^
1062
                # xml can have an object/array that also has a text value
1063
                warn(
×
1064
                    _(
1065
                        "Column {} has been ignored, because another column treats it as an array or object"
1066
                    ).format(path_till_now),
1067
                    DataErrorWarning,
1068
                )
1069
                continue
×
1070

1071
            value = cell.cell_value
10✔
1072
            if xml and current_type == "array":
10✔
1073
                # In xml "arrays" can have text values, if they're the final element
1074
                # However the type of the text value itself should not be "array",
1075
                # as that would split the text on commas, which we don't want.
1076
                # https://github.com/OpenDataServices/cove/issues/1030
1077
                converted_value = convert_type("", value, timezone, convert_flags)
×
1078
            else:
1079
                converted_value = convert_type(
10✔
1080
                    current_type or "", value, timezone, convert_flags
1081
                )
1082
            cell.cell_value = converted_value
10✔
1083
            if converted_value is not None and converted_value != "":
10✔
1084
                if xml:
10✔
1085
                    # For XML we want to support text and attributes at the
1086
                    # same level, e.g.
1087
                    # <my-element a="b">some text</my-element>
1088
                    # which we represent in a dict as:
1089
                    # {"@a":"b", "text()": "some text"}
1090
                    # To ensure we can attach attributes everywhere, all
1091
                    # element text must be added as a dict with a `text()` key.
1092
                    if path_item.startswith("@"):
×
1093
                        current_path[path_item] = cell
×
1094
                    else:
1095
                        if current_type == "array":
×
1096
                            current_path["text()"] = cell
×
1097
                        elif path_item not in current_path:
×
1098
                            current_path[path_item] = {"text()": cell}
×
1099
                        else:
1100
                            current_path[path_item]["text()"] = cell
×
1101
                else:
1102
                    current_path[path_item] = cell
10✔
1103

1104
    unflattened = list_as_dicts_to_temporary_dicts(unflattened, id_name, xml)
10✔
1105
    return unflattened
10✔
1106

1107

1108
def path_search(
10✔
1109
    nested_dict, path_list, id_fields=None, path=None, top=False, top_sheet=False
1110
):
1111
    if not path_list:
×
1112
        return nested_dict
×
1113

1114
    id_fields = id_fields or {}
×
1115
    parent_field = path_list[0]
×
1116
    path = parent_field if path is None else path + "/" + parent_field
×
1117

1118
    if parent_field.endswith("[]") or top:
×
1119
        if parent_field.endswith("[]"):
×
1120
            parent_field = parent_field[:-2]
×
1121
        if parent_field not in nested_dict:
×
1122
            nested_dict[parent_field] = TemporaryDict(
×
1123
                keyfield=id_name, top_sheet=top_sheet, xml=xml  # noqa
1124
            )
1125
        sub_sheet_id = id_fields.get(path + "/id")
×
1126
        if sub_sheet_id not in nested_dict[parent_field]:
×
1127
            nested_dict[parent_field][sub_sheet_id] = {}
×
1128
        return path_search(
×
1129
            nested_dict[parent_field][sub_sheet_id],
1130
            path_list[1:],
1131
            id_fields=id_fields,
1132
            path=path,
1133
            top_sheet=top_sheet,
1134
        )
1135
    else:
1136
        if parent_field not in nested_dict:
×
1137
            nested_dict[parent_field] = OrderedDict()
×
1138
        return path_search(
×
1139
            nested_dict[parent_field],
1140
            path_list[1:],
1141
            id_fields=id_fields,
1142
            path=path,
1143
            top_sheet=top_sheet,
1144
        )
1145

1146

1147
class TemporaryDict(UserDict):
10✔
1148
    def __init__(self, keyfield, top_sheet=False, xml=False):
10✔
1149
        self.keyfield = keyfield
10✔
1150
        self.items_no_keyfield = []
10✔
1151
        self.data = OrderedDict()
10✔
1152
        self.top_sheet = top_sheet
10✔
1153
        self.xml = xml
10✔
1154

1155
    def __repr__(self):
10✔
1156
        return "TemporaryDict(keyfield={}, items_no_keyfield={}, data={})".format(
×
1157
            repr(self.keyfield), repr(self.items_no_keyfield), repr(self.data)
1158
        )
1159

1160
    def append(self, item):
10✔
1161
        if self.keyfield in item:
10✔
1162
            if self.xml:
×
1163
                if isinstance(item[self.keyfield], Cell):
×
1164
                    # For an XML attribute
1165
                    key = item[self.keyfield].cell_value
×
1166
                elif isinstance(item[self.keyfield]["text()"], Cell):
×
1167
                    # For an XML tag
1168
                    key = item[self.keyfield]["text()"].cell_value
×
1169
                else:
1170
                    key = item[self.keyfield]["text()"]
×
1171
            else:
1172
                if isinstance(item[self.keyfield], Cell):
×
1173
                    key = item[self.keyfield].cell_value
×
1174
                else:
1175
                    key = item[self.keyfield]
×
1176
            if key not in self.data:
×
1177
                self.data[key] = item
×
1178
            else:
1179
                self.data[key].update(item)
×
1180
        else:
1181
            self.items_no_keyfield.append(item)
10✔
1182

1183
    def to_list(self):
10✔
1184
        return list(self.data.values()) + self.items_no_keyfield
10✔
1185

1186

1187
def temporarydicts_to_lists(nested_dict):
10✔
1188
    """Recursively transforms TemporaryDicts to lists inplace."""
1189
    for key, value in nested_dict.items():
10✔
1190
        if isinstance(value, Cell):
10✔
1191
            continue
10✔
1192
        if hasattr(value, "to_list"):
10✔
1193
            temporarydicts_to_lists(value)
10✔
1194
            if hasattr(value, "items_no_keyfield"):
10✔
1195
                for x in value.items_no_keyfield:
10✔
1196
                    temporarydicts_to_lists(x)
10✔
1197
            nested_dict[key] = value.to_list()
10✔
1198
        elif hasattr(value, "items"):
10✔
1199
            temporarydicts_to_lists(value)
10✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc