• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OpenDataServices / flatten-tool / 10202339569

01 Aug 2024 04:44PM UTC coverage: 95.709%. Remained the same
10202339569

push

github

Bjwebb
errors: Use custom exceptions

https://github.com/OpenDataServices/flatten-tool/issues/450

This makes it easier to diambiguate errors deliberately raised by
flatten-tool versus those from other sources. I've left alone a few
exceptions that flatten-tool raises, but which we don't expect to
happen, so didn't seem to be in the same category.

20 of 35 new or added lines in 6 files covered. (57.14%)

57 existing lines in 5 files now uncovered.

3390 of 3542 relevant lines covered (95.71%)

11.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.55
/flattentool/__init__.py
1
import codecs
12✔
2
import datetime
12✔
3
import json
12✔
4
import sys
12✔
5
from collections import OrderedDict
12✔
6
from decimal import Decimal
12✔
7

8
from flattentool.exceptions import FlattenToolError
12✔
9
from flattentool.input import FORMATS as INPUT_FORMATS
12✔
10
from flattentool.json_input import JSONParser
12✔
11
from flattentool.lib import parse_sheet_configuration
12✔
12
from flattentool.output import FORMATS as OUTPUT_FORMATS
12✔
13
from flattentool.output import FORMATS_SUFFIX, LINE_TERMINATORS
12✔
14
from flattentool.schema import SchemaParser
12✔
15
from flattentool.xml_output import toxml
12✔
16

17

18
def create_template(
12✔
19
    schema,
20
    output_name=None,
21
    output_format="all",
22
    main_sheet_name="main",
23
    rollup=False,
24
    root_id=None,
25
    use_titles=False,
26
    disable_local_refs=False,
27
    truncation_length=3,
28
    no_deprecated_fields=False,
29
    line_terminator="CRLF",
30
    convert_wkt=False,
31
    **_,
32
):
33
    """
34
    Creates template file(s) from given inputs
35
    This function is built to deal with commandline input and arguments
36
    but to also be called from elsewhere in future
37

38
    """
39

40
    if line_terminator not in LINE_TERMINATORS.keys():
12✔
NEW
41
        raise FlattenToolError(f"{line_terminator} is not a valid line terminator")
×
42

43
    convert_flags = {"wkt": convert_wkt}
12✔
44

45
    parser = SchemaParser(
12✔
46
        schema_filename=schema,
47
        rollup=rollup,
48
        root_id=root_id,
49
        use_titles=use_titles,
50
        disable_local_refs=disable_local_refs,
51
        truncation_length=truncation_length,
52
        exclude_deprecated_fields=no_deprecated_fields,
53
        convert_flags=convert_flags,
54
    )
55
    parser.parse()
12✔
56

57
    def spreadsheet_output(spreadsheet_output_class, name):
12✔
58
        spreadsheet_output = spreadsheet_output_class(
12✔
59
            parser=parser,
60
            main_sheet_name=main_sheet_name,
61
            output_name=name,
62
            line_terminator=LINE_TERMINATORS[line_terminator],
63
        )
64
        spreadsheet_output.write_sheets()
12✔
65

66
    if output_format == "all":
12✔
67
        if not output_name:
12✔
UNCOV
68
            output_name = "template"
×
69
        for format_name, spreadsheet_output_class in OUTPUT_FORMATS.items():
12✔
70
            spreadsheet_output(
12✔
71
                spreadsheet_output_class, output_name + FORMATS_SUFFIX[format_name]
72
            )
73

74
    elif output_format in OUTPUT_FORMATS.keys():  # in dictionary of allowed formats
12✔
75
        if not output_name:
12✔
UNCOV
76
            output_name = "template" + FORMATS_SUFFIX[output_format]
×
77
        spreadsheet_output(OUTPUT_FORMATS[output_format], output_name)
12✔
78

79
    else:
NEW
80
        raise FlattenToolError("The requested format is not available")
×
81

82

83
def flatten(
12✔
84
    input_name,
85
    schema=None,
86
    output_name=None,
87
    output_format="all",
88
    main_sheet_name="main",
89
    root_list_path="main",
90
    root_is_list=False,
91
    sheet_prefix="",
92
    filter_field=None,
93
    filter_value=None,
94
    preserve_fields=None,
95
    rollup=False,
96
    root_id=None,
97
    use_titles=False,
98
    xml=False,
99
    id_name="id",
100
    disable_local_refs=False,
101
    remove_empty_schema_columns=False,
102
    truncation_length=3,
103
    line_terminator="CRLF",
104
    convert_wkt=False,
105
    **_,
106
):
107
    """
108
    Flatten a nested structure (JSON) to a flat structure (spreadsheet - csv or xlsx).
109

110
    """
111

112
    if (filter_field is None and filter_value is not None) or (
12✔
113
        filter_field is not None and filter_value is None
114
    ):
NEW
115
        raise FlattenToolError("You must use filter_field and filter_value together")
×
116

117
    if line_terminator not in LINE_TERMINATORS.keys():
12✔
NEW
118
        raise FlattenToolError(f"{line_terminator} is not a valid line terminator")
×
119

120
    convert_flags = {"wkt": convert_wkt}
12✔
121

122
    if schema:
12✔
123
        schema_parser = SchemaParser(
12✔
124
            schema_filename=schema,
125
            rollup=rollup,
126
            root_id=root_id,
127
            use_titles=use_titles,
128
            disable_local_refs=disable_local_refs,
129
            truncation_length=truncation_length,
130
            convert_flags=convert_flags,
131
        )
132
        schema_parser.parse()
12✔
133
    else:
134
        schema_parser = None
12✔
135

136
    # context manager to clean up ZODB database when it exits
137
    with JSONParser(
12✔
138
        json_filename=input_name,
139
        root_list_path=None if root_is_list else root_list_path,
140
        schema_parser=schema_parser,
141
        rollup=rollup,
142
        root_id=root_id,
143
        use_titles=use_titles,
144
        xml=xml,
145
        id_name=id_name,
146
        filter_field=filter_field,
147
        filter_value=filter_value,
148
        preserve_fields=preserve_fields,
149
        remove_empty_schema_columns=remove_empty_schema_columns,
150
        truncation_length=truncation_length,
151
        persist=True,
152
        convert_flags=convert_flags,
153
    ) as parser:
154

155
        def spreadsheet_output(spreadsheet_output_class, name):
12✔
156
            spreadsheet_output = spreadsheet_output_class(
12✔
157
                parser=parser,
158
                main_sheet_name=main_sheet_name,
159
                output_name=name,
160
                sheet_prefix=sheet_prefix,
161
                line_terminator=LINE_TERMINATORS[line_terminator],
162
            )
163
            spreadsheet_output.write_sheets()
12✔
164

165
        if output_format == "all":
12✔
166
            if not output_name:
12✔
UNCOV
167
                output_name = "flattened"
×
168
            for format_name, spreadsheet_output_class in OUTPUT_FORMATS.items():
12✔
169
                spreadsheet_output(
12✔
170
                    spreadsheet_output_class, output_name + FORMATS_SUFFIX[format_name]
171
                )
172

173
        elif output_format in OUTPUT_FORMATS.keys():  # in dictionary of allowed formats
12✔
174
            if not output_name:
12✔
UNCOV
175
                output_name = "flattened" + FORMATS_SUFFIX[output_format]
×
176
            spreadsheet_output(OUTPUT_FORMATS[output_format], output_name)
12✔
177

178
        else:
NEW
179
            raise FlattenToolError("The requested format is not available")
×
180

181

182
# From http://bugs.python.org/issue16535
183
class NumberStr(float):
12✔
184
    def __init__(self, o):
12✔
185
        # We don't call the parent here, since we're deliberately altering it's functionality
186
        # pylint: disable=W0231
187
        self.o = o
12✔
188

189
    def __repr__(self):
12✔
UNCOV
190
        return str(self.o)
×
191

192
    # This is needed for this trick to work in python 3.4
193
    def __float__(self):
12✔
UNCOV
194
        return self
×
195

196

197
def decimal_datetime_default(o):
12✔
198
    if isinstance(o, Decimal):
12✔
199
        if int(o) == o:
12✔
200
            return int(o)
12✔
201
        else:
202
            return NumberStr(o)
12✔
203
    if isinstance(o, datetime.datetime):
12✔
204
        return str(o)
12✔
UNCOV
205
    raise TypeError(repr(o) + " is not JSON serializable")
×
206

207

208
def unflatten(
12✔
209
    input_name,
210
    base_json=None,
211
    input_format=None,
212
    output_name=None,
213
    root_list_path=None,
214
    root_is_list=False,
215
    encoding="utf8",
216
    timezone_name="UTC",
217
    root_id=None,
218
    schema="",
219
    convert_titles=False,
220
    cell_source_map=None,
221
    heading_source_map=None,
222
    id_name=None,
223
    xml=False,
224
    vertical_orientation=False,
225
    metatab_name=None,
226
    metatab_only=False,
227
    metatab_schema="",
228
    metatab_vertical_orientation=False,
229
    xml_schemas=None,
230
    default_configuration="",
231
    disable_local_refs=False,
232
    xml_comment=None,
233
    truncation_length=3,
234
    convert_wkt=False,
235
    **_,
236
):
237
    """
238
    Unflatten a flat structure (spreadsheet - csv or xlsx) into a nested structure (JSON).
239

240
    """
241

242
    if input_format is None:
12✔
243
        raise FlattenToolError(
12✔
244
            "You must specify an input format (may autodetect in future"
245
        )
246
    elif input_format not in INPUT_FORMATS:
12✔
247
        raise FlattenToolError("The requested format is not available")
12✔
248
    if metatab_name and base_json:
12✔
249
        raise FlattenToolError("Not allowed to use base_json with metatab")
12✔
250

251
    convert_flags = {"wkt": convert_wkt}
12✔
252

253
    if root_is_list:
12✔
254
        base = None
12✔
255
    elif base_json:
12✔
256
        with open(base_json) as fp:
12✔
257
            base = json.load(fp, object_pairs_hook=OrderedDict)
12✔
258
    else:
259
        base = OrderedDict()
12✔
260

261
    base_configuration = parse_sheet_configuration(
12✔
262
        [item.strip() for item in default_configuration.split(",")]
263
    )
264

265
    cell_source_map_data = OrderedDict()
12✔
266
    heading_source_map_data = OrderedDict()
12✔
267

268
    if metatab_name and not root_is_list:
12✔
269
        spreadsheet_input_class = INPUT_FORMATS[input_format]
12✔
270
        spreadsheet_input = spreadsheet_input_class(
12✔
271
            input_name=input_name,
272
            timezone_name=timezone_name,
273
            root_list_path="meta",
274
            include_sheets=[metatab_name],
275
            convert_titles=convert_titles,
276
            vertical_orientation=metatab_vertical_orientation,
277
            id_name=id_name,
278
            xml=xml,
279
            use_configuration=False,
280
            convert_flags=convert_flags,
281
        )
282
        if metatab_schema:
12✔
283
            parser = SchemaParser(
12✔
284
                schema_filename=metatab_schema,
285
                disable_local_refs=disable_local_refs,
286
                convert_flags=convert_flags,
287
            )
288
            parser.parse()
12✔
289
            spreadsheet_input.parser = parser
12✔
290
        spreadsheet_input.encoding = encoding
12✔
291
        spreadsheet_input.read_sheets()
12✔
292
        (
12✔
293
            result,
294
            cell_source_map_data_meta,
295
            heading_source_map_data_meta,
296
        ) = spreadsheet_input.fancy_unflatten(
297
            with_cell_source_map=cell_source_map,
298
            with_heading_source_map=heading_source_map,
299
        )
300
        for key, value in (cell_source_map_data_meta or {}).items():
12✔
301
            ## strip off meta/0/ from start of source map as actually data is at top level
302
            cell_source_map_data[key[7:]] = value
12✔
303
        for key, value in (heading_source_map_data_meta or {}).items():
12✔
304
            ## strip off meta/ from start of source map as actually data is at top level
305
            heading_source_map_data[key[5:]] = value
12✔
306

307
        # update individual keys from base configuration
308
        base_configuration.update(
12✔
309
            spreadsheet_input.sheet_configuration.get(metatab_name, {})
310
        )
311

312
        if result:
12✔
313
            base.update(result[0])
12✔
314

315
    if root_list_path is None:
12✔
316
        root_list_path = base_configuration.get("RootListPath", "main")
12✔
317
    if id_name is None:
12✔
318
        id_name = base_configuration.get("IDName", "id")
12✔
319

320
    if not metatab_only or root_is_list:
12✔
321
        spreadsheet_input_class = INPUT_FORMATS[input_format]
12✔
322
        spreadsheet_input = spreadsheet_input_class(
12✔
323
            input_name=input_name,
324
            timezone_name=timezone_name,
325
            root_list_path=root_list_path,
326
            root_is_list=root_is_list,
327
            root_id=root_id,
328
            convert_titles=convert_titles,
329
            exclude_sheets=[metatab_name],
330
            vertical_orientation=vertical_orientation,
331
            id_name=id_name,
332
            xml=xml,
333
            base_configuration=base_configuration,
334
            convert_flags=convert_flags,
335
        )
336
        if schema:
12✔
337
            parser = SchemaParser(
12✔
338
                schema_filename=schema,
339
                rollup=True,
340
                root_id=root_id,
341
                disable_local_refs=disable_local_refs,
342
                truncation_length=truncation_length,
343
                convert_flags=convert_flags,
344
            )
345
            parser.parse()
12✔
346
            spreadsheet_input.parser = parser
12✔
347
        spreadsheet_input.encoding = encoding
12✔
348
        spreadsheet_input.read_sheets()
12✔
349
        (
12✔
350
            result,
351
            cell_source_map_data_main,
352
            heading_source_map_data_main,
353
        ) = spreadsheet_input.fancy_unflatten(
354
            with_cell_source_map=cell_source_map,
355
            with_heading_source_map=heading_source_map,
356
        )
357
        cell_source_map_data.update(cell_source_map_data_main or {})
12✔
358
        heading_source_map_data.update(heading_source_map_data_main or {})
12✔
359
        if root_is_list:
12✔
360
            base = list(result)
12✔
361
        else:
362
            base[root_list_path] = list(result)
12✔
363

364
    if xml:
12✔
365
        xml_root_tag = base_configuration.get("XMLRootTag", "iati-activities")
12✔
366
        xml_output = toxml(
12✔
367
            base,
368
            xml_root_tag,
369
            xml_schemas=xml_schemas,
370
            root_list_path=root_list_path,
371
            xml_comment=xml_comment,
372
        )
373
        if output_name is None:
12✔
374
            sys.stdout.buffer.write(xml_output)
12✔
375
        else:
376
            with codecs.open(output_name, "wb") as fp:
12✔
377
                fp.write(xml_output)
12✔
378
    else:
379
        if output_name is None:
12✔
380
            print(
12✔
381
                json.dumps(
382
                    base, indent=4, default=decimal_datetime_default, ensure_ascii=False
383
                )
384
            )
385
        else:
386
            with codecs.open(output_name, "w", encoding="utf-8") as fp:
12✔
387
                json.dump(
12✔
388
                    base,
389
                    fp,
390
                    indent=4,
391
                    default=decimal_datetime_default,
392
                    ensure_ascii=False,
393
                )
394
    if cell_source_map:
12✔
395
        with codecs.open(cell_source_map, "w", encoding="utf-8") as fp:
12✔
396
            json.dump(
12✔
397
                cell_source_map_data,
398
                fp,
399
                indent=4,
400
                default=decimal_datetime_default,
401
                ensure_ascii=False,
402
            )
403
    if heading_source_map:
12✔
404
        with codecs.open(heading_source_map, "w", encoding="utf-8") as fp:
12✔
405
            json.dump(
12✔
406
                heading_source_map_data,
407
                fp,
408
                indent=4,
409
                default=decimal_datetime_default,
410
                ensure_ascii=False,
411
            )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc