• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OpenDataServices / flatten-tool / 6507626273

13 Oct 2023 11:25AM UTC coverage: 42.006% (-53.7%) from 95.72%
6507626273

Pull #433

github

odscjames
New "Geo" optional dependencies

https://github.com/OpenDataServices/flatten-tool/issues/424
Pull Request #433: New "Geo" optional dependencies

38 of 38 new or added lines in 6 files covered. (100.0%)

1466 of 3490 relevant lines covered (42.01%)

4.16 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.42
/flattentool/__init__.py
1
import codecs
10✔
2
import json
10✔
3
import sys
10✔
4
from collections import OrderedDict
10✔
5
from decimal import Decimal
10✔
6

7
from flattentool.input import FORMATS as INPUT_FORMATS
10✔
8
from flattentool.json_input import JSONParser
10✔
9
from flattentool.lib import parse_sheet_configuration
10✔
10
from flattentool.output import FORMATS as OUTPUT_FORMATS
10✔
11
from flattentool.output import FORMATS_SUFFIX, LINE_TERMINATORS
10✔
12
from flattentool.schema import SchemaParser
10✔
13
from flattentool.xml_output import toxml
10✔
14

15

16
def create_template(
10✔
17
    schema,
18
    output_name=None,
19
    output_format="all",
20
    main_sheet_name="main",
21
    rollup=False,
22
    root_id=None,
23
    use_titles=False,
24
    disable_local_refs=False,
25
    truncation_length=3,
26
    no_deprecated_fields=False,
27
    line_terminator="CRLF",
28
    convert_wkt=False,
29
    **_,
30
):
31
    """
32
    Creates template file(s) from given inputs
33
    This function is built to deal with commandline input and arguments
34
    but to also be called from elsewhere in future
35

36
    """
37

38
    if line_terminator not in LINE_TERMINATORS.keys():
10✔
39
        raise Exception(f"{line_terminator} is not a valid line terminator")
×
40

41
    convert_flags = {"wkt": convert_wkt}
10✔
42

43
    parser = SchemaParser(
10✔
44
        schema_filename=schema,
45
        rollup=rollup,
46
        root_id=root_id,
47
        use_titles=use_titles,
48
        disable_local_refs=disable_local_refs,
49
        truncation_length=truncation_length,
50
        exclude_deprecated_fields=no_deprecated_fields,
51
        convert_flags=convert_flags,
52
    )
53
    parser.parse()
10✔
54

55
    def spreadsheet_output(spreadsheet_output_class, name):
10✔
56
        spreadsheet_output = spreadsheet_output_class(
10✔
57
            parser=parser,
58
            main_sheet_name=main_sheet_name,
59
            output_name=name,
60
            line_terminator=LINE_TERMINATORS[line_terminator],
61
        )
62
        spreadsheet_output.write_sheets()
10✔
63

64
    if output_format == "all":
10✔
65
        if not output_name:
10✔
66
            output_name = "template"
×
67
        for format_name, spreadsheet_output_class in OUTPUT_FORMATS.items():
10✔
68
            spreadsheet_output(
10✔
69
                spreadsheet_output_class, output_name + FORMATS_SUFFIX[format_name]
70
            )
71

72
    elif output_format in OUTPUT_FORMATS.keys():  # in dictionary of allowed formats
×
73
        if not output_name:
×
74
            output_name = "template" + FORMATS_SUFFIX[output_format]
×
75
        spreadsheet_output(OUTPUT_FORMATS[output_format], output_name)
×
76

77
    else:
78
        raise Exception("The requested format is not available")
×
79

80

81
def flatten(
10✔
82
    input_name,
83
    schema=None,
84
    output_name=None,
85
    output_format="all",
86
    main_sheet_name="main",
87
    root_list_path="main",
88
    root_is_list=False,
89
    sheet_prefix="",
90
    filter_field=None,
91
    filter_value=None,
92
    preserve_fields=None,
93
    rollup=False,
94
    root_id=None,
95
    use_titles=False,
96
    xml=False,
97
    id_name="id",
98
    disable_local_refs=False,
99
    remove_empty_schema_columns=False,
100
    truncation_length=3,
101
    line_terminator="CRLF",
102
    convert_wkt=False,
103
    **_,
104
):
105
    """
106
    Flatten a nested structure (JSON) to a flat structure (spreadsheet - csv or xlsx).
107

108
    """
109

110
    if (filter_field is None and filter_value is not None) or (
10✔
111
        filter_field is not None and filter_value is None
112
    ):
113
        raise Exception("You must use filter_field and filter_value together")
×
114

115
    if line_terminator not in LINE_TERMINATORS.keys():
10✔
116
        raise Exception(f"{line_terminator} is not a valid line terminator")
×
117

118
    convert_flags = {"wkt": convert_wkt}
10✔
119

120
    if schema:
10✔
121
        schema_parser = SchemaParser(
×
122
            schema_filename=schema,
123
            rollup=rollup,
124
            root_id=root_id,
125
            use_titles=use_titles,
126
            disable_local_refs=disable_local_refs,
127
            truncation_length=truncation_length,
128
            convert_flags=convert_flags,
129
        )
130
        schema_parser.parse()
×
131
    else:
132
        schema_parser = None
10✔
133

134
    # context manager to clean up ZODB database when it exits
135
    with JSONParser(
10✔
136
        json_filename=input_name,
137
        root_list_path=None if root_is_list else root_list_path,
138
        schema_parser=schema_parser,
139
        rollup=rollup,
140
        root_id=root_id,
141
        use_titles=use_titles,
142
        xml=xml,
143
        id_name=id_name,
144
        filter_field=filter_field,
145
        filter_value=filter_value,
146
        preserve_fields=preserve_fields,
147
        remove_empty_schema_columns=remove_empty_schema_columns,
148
        truncation_length=truncation_length,
149
        persist=True,
150
        convert_flags=convert_flags,
151
    ) as parser:
152

153
        def spreadsheet_output(spreadsheet_output_class, name):
10✔
154
            spreadsheet_output = spreadsheet_output_class(
10✔
155
                parser=parser,
156
                main_sheet_name=main_sheet_name,
157
                output_name=name,
158
                sheet_prefix=sheet_prefix,
159
                line_terminator=LINE_TERMINATORS[line_terminator],
160
            )
161
            spreadsheet_output.write_sheets()
10✔
162

163
        if output_format == "all":
10✔
164
            if not output_name:
×
165
                output_name = "flattened"
×
166
            for format_name, spreadsheet_output_class in OUTPUT_FORMATS.items():
×
167
                spreadsheet_output(
×
168
                    spreadsheet_output_class, output_name + FORMATS_SUFFIX[format_name]
169
                )
170

171
        elif output_format in OUTPUT_FORMATS.keys():  # in dictionary of allowed formats
10✔
172
            if not output_name:
10✔
173
                output_name = "flattened" + FORMATS_SUFFIX[output_format]
×
174
            spreadsheet_output(OUTPUT_FORMATS[output_format], output_name)
10✔
175

176
        else:
177
            raise Exception("The requested format is not available")
×
178

179

180
# From http://bugs.python.org/issue16535
181
class NumberStr(float):
10✔
182
    def __init__(self, o):
10✔
183
        # We don't call the parent here, since we're deliberately altering it's functionality
184
        # pylint: disable=W0231
185
        self.o = o
10✔
186

187
    def __repr__(self):
10✔
188
        return str(self.o)
×
189

190
    # This is needed for this trick to work in python 3.4
191
    def __float__(self):
10✔
192
        return self
×
193

194

195
def decimal_default(o):
10✔
196
    if isinstance(o, Decimal):
10✔
197
        if int(o) == o:
10✔
198
            return int(o)
×
199
        else:
200
            return NumberStr(o)
10✔
201
    raise TypeError(repr(o) + " is not JSON serializable")
×
202

203

204
def unflatten(
10✔
205
    input_name,
206
    base_json=None,
207
    input_format=None,
208
    output_name=None,
209
    root_list_path=None,
210
    root_is_list=False,
211
    encoding="utf8",
212
    timezone_name="UTC",
213
    root_id=None,
214
    schema="",
215
    convert_titles=False,
216
    cell_source_map=None,
217
    heading_source_map=None,
218
    id_name=None,
219
    xml=False,
220
    vertical_orientation=False,
221
    metatab_name=None,
222
    metatab_only=False,
223
    metatab_schema="",
224
    metatab_vertical_orientation=False,
225
    xml_schemas=None,
226
    default_configuration="",
227
    disable_local_refs=False,
228
    xml_comment=None,
229
    truncation_length=3,
230
    convert_wkt=False,
231
    **_,
232
):
233
    """
234
    Unflatten a flat structure (spreadsheet - csv or xlsx) into a nested structure (JSON).
235

236
    """
237

238
    if input_format is None:
10✔
239
        raise Exception("You must specify an input format (may autodetect in future")
×
240
    elif input_format not in INPUT_FORMATS:
10✔
241
        raise Exception("The requested format is not available")
×
242
    if metatab_name and base_json:
10✔
243
        raise Exception("Not allowed to use base_json with metatab")
×
244

245
    convert_flags = {"wkt": convert_wkt}
10✔
246

247
    if root_is_list:
10✔
248
        base = None
×
249
    elif base_json:
10✔
250
        with open(base_json) as fp:
×
251
            base = json.load(fp, object_pairs_hook=OrderedDict)
×
252
    else:
253
        base = OrderedDict()
10✔
254

255
    base_configuration = parse_sheet_configuration(
10✔
256
        [item.strip() for item in default_configuration.split(",")]
257
    )
258

259
    cell_source_map_data = OrderedDict()
10✔
260
    heading_source_map_data = OrderedDict()
10✔
261

262
    if metatab_name and not root_is_list:
10✔
263
        spreadsheet_input_class = INPUT_FORMATS[input_format]
×
264
        spreadsheet_input = spreadsheet_input_class(
×
265
            input_name=input_name,
266
            timezone_name=timezone_name,
267
            root_list_path="meta",
268
            include_sheets=[metatab_name],
269
            convert_titles=convert_titles,
270
            vertical_orientation=metatab_vertical_orientation,
271
            id_name=id_name,
272
            xml=xml,
273
            use_configuration=False,
274
            convert_flags=convert_flags,
275
        )
276
        if metatab_schema:
×
277
            parser = SchemaParser(
×
278
                schema_filename=metatab_schema,
279
                disable_local_refs=disable_local_refs,
280
                convert_flags=convert_flags,
281
            )
282
            parser.parse()
×
283
            spreadsheet_input.parser = parser
×
284
        spreadsheet_input.encoding = encoding
×
285
        spreadsheet_input.read_sheets()
×
286
        (
×
287
            result,
288
            cell_source_map_data_meta,
289
            heading_source_map_data_meta,
290
        ) = spreadsheet_input.fancy_unflatten(
291
            with_cell_source_map=cell_source_map,
292
            with_heading_source_map=heading_source_map,
293
        )
294
        for key, value in (cell_source_map_data_meta or {}).items():
×
295
            ## strip off meta/0/ from start of source map as actually data is at top level
296
            cell_source_map_data[key[7:]] = value
×
297
        for key, value in (heading_source_map_data_meta or {}).items():
×
298
            ## strip off meta/ from start of source map as actually data is at top level
299
            heading_source_map_data[key[5:]] = value
×
300

301
        # update individual keys from base configuration
302
        base_configuration.update(
×
303
            spreadsheet_input.sheet_configuration.get(metatab_name, {})
304
        )
305

306
        if result:
×
307
            base.update(result[0])
×
308

309
    if root_list_path is None:
10✔
310
        root_list_path = base_configuration.get("RootListPath", "main")
10✔
311
    if id_name is None:
10✔
312
        id_name = base_configuration.get("IDName", "id")
10✔
313

314
    if not metatab_only or root_is_list:
10✔
315
        spreadsheet_input_class = INPUT_FORMATS[input_format]
10✔
316
        spreadsheet_input = spreadsheet_input_class(
10✔
317
            input_name=input_name,
318
            timezone_name=timezone_name,
319
            root_list_path=root_list_path,
320
            root_is_list=root_is_list,
321
            root_id=root_id,
322
            convert_titles=convert_titles,
323
            exclude_sheets=[metatab_name],
324
            vertical_orientation=vertical_orientation,
325
            id_name=id_name,
326
            xml=xml,
327
            base_configuration=base_configuration,
328
            convert_flags=convert_flags,
329
        )
330
        if schema:
10✔
331
            parser = SchemaParser(
10✔
332
                schema_filename=schema,
333
                rollup=True,
334
                root_id=root_id,
335
                disable_local_refs=disable_local_refs,
336
                truncation_length=truncation_length,
337
                convert_flags=convert_flags,
338
            )
339
            parser.parse()
10✔
340
            spreadsheet_input.parser = parser
10✔
341
        spreadsheet_input.encoding = encoding
10✔
342
        spreadsheet_input.read_sheets()
10✔
343
        (
10✔
344
            result,
345
            cell_source_map_data_main,
346
            heading_source_map_data_main,
347
        ) = spreadsheet_input.fancy_unflatten(
348
            with_cell_source_map=cell_source_map,
349
            with_heading_source_map=heading_source_map,
350
        )
351
        cell_source_map_data.update(cell_source_map_data_main or {})
10✔
352
        heading_source_map_data.update(heading_source_map_data_main or {})
10✔
353
        if root_is_list:
10✔
354
            base = list(result)
×
355
        else:
356
            base[root_list_path] = list(result)
10✔
357

358
    if xml:
10✔
359
        xml_root_tag = base_configuration.get("XMLRootTag", "iati-activities")
×
360
        xml_output = toxml(
×
361
            base,
362
            xml_root_tag,
363
            xml_schemas=xml_schemas,
364
            root_list_path=root_list_path,
365
            xml_comment=xml_comment,
366
        )
367
        if output_name is None:
×
368
            sys.stdout.buffer.write(xml_output)
×
369
        else:
370
            with codecs.open(output_name, "wb") as fp:
×
371
                fp.write(xml_output)
×
372
    else:
373
        if output_name is None:
10✔
374
            print(
10✔
375
                json.dumps(base, indent=4, default=decimal_default, ensure_ascii=False)
376
            )
377
        else:
378
            with codecs.open(output_name, "w", encoding="utf-8") as fp:
×
379
                json.dump(
×
380
                    base, fp, indent=4, default=decimal_default, ensure_ascii=False
381
                )
382
    if cell_source_map:
10✔
383
        with codecs.open(cell_source_map, "w", encoding="utf-8") as fp:
×
384
            json.dump(
×
385
                cell_source_map_data,
386
                fp,
387
                indent=4,
388
                default=decimal_default,
389
                ensure_ascii=False,
390
            )
391
    if heading_source_map:
10✔
392
        with codecs.open(heading_source_map, "w", encoding="utf-8") as fp:
×
393
            json.dump(
×
394
                heading_source_map_data,
395
                fp,
396
                indent=4,
397
                default=decimal_default,
398
                ensure_ascii=False,
399
            )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc