• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OpenDataServices / flatten-tool / 10202339569

01 Aug 2024 04:44PM UTC coverage: 95.709%. Remained the same
10202339569

push

github

Bjwebb
errors: Use custom exceptions

https://github.com/OpenDataServices/flatten-tool/issues/450

This makes it easier to diambiguate errors deliberately raised by
flatten-tool versus those from other sources. I've left alone a few
exceptions that flatten-tool raises, but which we don't expect to
happen, so didn't seem to be in the same category.

20 of 35 new or added lines in 6 files covered. (57.14%)

57 existing lines in 5 files now uncovered.

3390 of 3542 relevant lines covered (95.71%)

11.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.57
/flattentool/schema.py
1
"""Classes for reading from a JSON schema"""
2

3
from __future__ import print_function, unicode_literals
12✔
4

5
import codecs
12✔
6
import os
12✔
7
import sys
12✔
8
from collections import OrderedDict, UserDict
12✔
9
from warnings import warn
12✔
10

11
import jsonref
12✔
12

13
from flattentool.exceptions import (
12✔
14
    FlattenToolError,
15
    FlattenToolValueError,
16
    FlattenToolWarning,
17
)
18
from flattentool.i18n import _
12✔
19
from flattentool.sheet import Sheet
12✔
20

21
if sys.version_info[:2] > (3, 0):
12✔
22
    import pathlib
12✔
23
else:
UNCOV
24
    import urllib
×
25

UNCOV
26
    import urlparse
×
27

28

29
def get_property_type_set(property_schema_dict):
12✔
30
    property_type = property_schema_dict.get("type", [])
12✔
31
    if not isinstance(property_type, list):
12✔
32
        return set([property_type])
12✔
33
    else:
34
        return set(property_type)
12✔
35

36

37
def make_sub_sheet_name(
12✔
38
    parent_path, property_name, truncation_length=3, path_separator="/"
39
):
40
    return (
12✔
41
        "_".join(
42
            x[:truncation_length] for x in parent_path.split(path_separator) if x != "0"
43
        )
44
        + property_name
45
    )
46

47

48
class TitleLookup(UserDict):
12✔
49
    property_name = None
12✔
50

51
    def lookup_header(self, title_header):
12✔
52
        if type(title_header) == str:
12✔
53
            return self.lookup_header_list(title_header.split(":"))
12✔
54
        else:
55
            return title_header
12✔
56

57
    def lookup_header_list(self, title_header_list):
12✔
58
        first_title = title_header_list[0]
12✔
59
        remaining_titles = title_header_list[1:]
12✔
60
        try:
12✔
61
            int(first_title)
12✔
62
            return first_title + (
12✔
63
                "/" + self.lookup_header_list(remaining_titles)
64
                if remaining_titles
65
                else ""
66
            )
67
        except ValueError:
12✔
68
            pass
12✔
69

70
        if first_title in self:
12✔
71
            if remaining_titles:
12✔
72
                return (
12✔
73
                    self[first_title].property_name
74
                    + "/"
75
                    + self[first_title].lookup_header_list(remaining_titles)
76
                )
77
            else:
78
                return self[first_title].property_name
12✔
79
        else:
80
            # If we can't look up the title, treat it and any children as
81
            # field names directly.
82
            # Strip spaces off these.
83
            return "/".join(x.strip(" ") for x in title_header_list)
12✔
84

85
    def __setitem__(self, key, value):
12✔
86
        self.data[key.replace(" ", "").lower()] = value
12✔
87

88
    def __getitem__(self, key):
12✔
89
        if key is None:
12✔
90
            raise KeyError
10✔
91
        else:
92
            return self.data[key.replace(" ", "").lower()]
12✔
93

94
    def __contains__(self, key):
12✔
95
        if key is None:
12✔
UNCOV
96
            return False
2✔
97
        else:
98
            return key.replace(" ", "").lower() in self.data
12✔
99

100

101
class JsonLoaderLocalRefUsedWhenLocalRefsDisabled(FlattenToolError):
12✔
102
    pass
12✔
103

104

105
def jsonloader_local_refs_disabled(uri, **kwargs):
12✔
106
    if is_ref_local(uri):
12✔
107
        raise JsonLoaderLocalRefUsedWhenLocalRefsDisabled(
12✔
108
            "Local Ref Used When Local Refs Disabled: " + uri
109
        )
UNCOV
110
    return jsonref.jsonloader(uri, **kwargs)
×
111

112

113
def is_ref_local(uri):
12✔
114
    return uri[:7].lower() != "http://" and uri[:8].lower() != "https://"
12✔
115

116

117
class SchemaParser(object):
12✔
118
    """Parse the fields of a JSON schema into a flattened structure."""
119

120
    def __init__(
12✔
121
        self,
122
        schema_filename=None,
123
        root_schema_dict=None,
124
        rollup=False,
125
        root_id=None,
126
        use_titles=False,
127
        disable_local_refs=False,
128
        truncation_length=3,
129
        exclude_deprecated_fields=False,
130
        convert_flags={},
131
    ):
132
        self.sub_sheets = {}
12✔
133
        self.main_sheet = Sheet()
12✔
134
        self.sub_sheet_mapping = {}
12✔
135
        self.do_rollup = rollup
12✔
136
        self.rollup = set()
12✔
137
        self.root_id = root_id
12✔
138
        self.use_titles = use_titles
12✔
139
        self.sub_sheet_titles = {}
12✔
140
        self.truncation_length = truncation_length
12✔
141
        self.title_lookup = TitleLookup()
12✔
142
        self.flattened = {}
12✔
143
        self.exclude_deprecated_fields = exclude_deprecated_fields
12✔
144
        self.convert_flags = convert_flags
12✔
145

146
        if root_schema_dict is None and schema_filename is None:
12✔
147
            raise FlattenToolValueError(
12✔
148
                _("One of schema_filename or root_schema_dict must be supplied")
149
            )
150
        if root_schema_dict is not None and schema_filename is not None:
12✔
151
            raise FlattenToolValueError(
12✔
152
                _("Only one of schema_filename or root_schema_dict should be supplied")
153
            )
154
        if schema_filename:
12✔
155
            if schema_filename.startswith("http"):
12✔
156
                import requests
12✔
157

158
                r = requests.get(schema_filename)
12✔
159
                self.root_schema_dict = jsonref.loads(
12✔
160
                    r.text, object_pairs_hook=OrderedDict
161
                )
162
            else:
163
                if disable_local_refs:
12✔
164
                    with codecs.open(schema_filename, encoding="utf-8") as schema_file:
12✔
165
                        self.root_schema_dict = jsonref.load(
12✔
166
                            schema_file,
167
                            object_pairs_hook=OrderedDict,
168
                            loader=jsonloader_local_refs_disabled,
169
                        )
170
                else:
171
                    if sys.version_info[:2] > (3, 0):
12✔
172
                        base_uri = pathlib.Path(
12✔
173
                            os.path.realpath(schema_filename)
174
                        ).as_uri()
175
                    else:
UNCOV
176
                        base_uri = urlparse.urljoin(
×
177
                            "file:",
178
                            urllib.pathname2url(os.path.abspath(schema_filename)),
179
                        )
180
                    with codecs.open(schema_filename, encoding="utf-8") as schema_file:
12✔
181
                        self.root_schema_dict = jsonref.load(
12✔
182
                            schema_file,
183
                            object_pairs_hook=OrderedDict,
184
                            base_uri=base_uri,
185
                        )
186

187
        else:
188
            self.root_schema_dict = root_schema_dict
12✔
189

190
    def parse(self):
12✔
191
        fields = self.parse_schema_dict("", self.root_schema_dict)
12✔
192
        for field, title in fields:
12✔
193
            if self.use_titles:
12✔
194
                if not title:
12✔
195
                    warn(
12✔
196
                        _("Field {} does not have a title, skipping.").format(field),
197
                        FlattenToolWarning,
198
                    )
199
                else:
200
                    self.main_sheet.append(title)
12✔
201
                    self.main_sheet.titles[field] = title
12✔
202
            else:
203
                self.main_sheet.append(field)
12✔
204

205
    def parse_schema_dict(
12✔
206
        self,
207
        parent_path,
208
        schema_dict,
209
        parent_id_fields=None,
210
        title_lookup=None,
211
        parent_title="",
212
    ):
213
        if parent_path:
12✔
214
            parent_path = parent_path + "/"
12✔
215
        parent_id_fields = parent_id_fields or []
12✔
216
        title_lookup = self.title_lookup if title_lookup is None else title_lookup
12✔
217

218
        if (
12✔
219
            "type" in schema_dict
220
            and schema_dict["type"] == "array"
221
            and "items" in schema_dict
222
            and "oneOf" in schema_dict["items"]
223
        ):
224
            for oneOf in schema_dict["items"]["oneOf"]:
12✔
225
                if "type" in oneOf and oneOf["type"] == "object":
12✔
226
                    for field, child_title in self.parse_schema_dict(
12✔
227
                        parent_path,
228
                        oneOf,
229
                        parent_id_fields=parent_id_fields,
230
                        title_lookup=title_lookup,
231
                        parent_title=parent_title,
232
                    ):
233
                        yield (field, child_title)
12✔
234

235
        elif "properties" in schema_dict:
12✔
236
            if "id" in schema_dict["properties"]:
12✔
237
                if self.use_titles:
12✔
238
                    id_fields = parent_id_fields + [
12✔
239
                        (parent_title if parent_title is not None else parent_path)
240
                        + (schema_dict["properties"]["id"].get("title") or "id")
241
                    ]
242
                else:
243
                    id_fields = parent_id_fields + [parent_path + "id"]
12✔
244
            else:
245
                id_fields = parent_id_fields
12✔
246

247
            for property_name, property_schema_dict in schema_dict[
12✔
248
                "properties"
249
            ].items():
250
                if self.exclude_deprecated_fields and property_schema_dict.get(
12✔
251
                    "deprecated"
252
                ):
253
                    continue
12✔
254

255
                if (
12✔
256
                    self.exclude_deprecated_fields
257
                    and hasattr(property_schema_dict, "__reference__")
258
                    and property_schema_dict.__reference__.get("deprecated")
259
                ):
260
                    continue
12✔
261

262
                property_type_set = get_property_type_set(property_schema_dict)
12✔
263

264
                if (
12✔
265
                    hasattr(property_schema_dict, "__reference__")
266
                    and "title" in property_schema_dict.__reference__
267
                ):
268
                    title = property_schema_dict.__reference__["title"]
12✔
269
                else:
270
                    title = property_schema_dict.get("title")
12✔
271
                if title:
12✔
272
                    title_lookup[title] = TitleLookup()
12✔
273
                    title_lookup[title].property_name = property_name
12✔
274

275
                if "object" in property_type_set:
12✔
276
                    if (
12✔
277
                        self.convert_flags.get("wkt")
278
                        and "type" in property_schema_dict.get("properties", {})
279
                        and "coordinates" in property_schema_dict.get("properties", {})
280
                    ):
281
                        self.flattened[
12✔
282
                            parent_path.replace("/0/", "/") + property_name
283
                        ] = "geojson"
284
                        yield (property_name, title)
12✔
285
                        continue
12✔
286
                    self.flattened[parent_path + property_name] = "object"
12✔
287
                    for field, child_title in self.parse_schema_dict(
12✔
288
                        parent_path + property_name,
289
                        property_schema_dict,
290
                        parent_id_fields=id_fields,
291
                        title_lookup=title_lookup.get(title),
292
                        parent_title=parent_title + title + ":"
293
                        if parent_title is not None and title
294
                        else None,
295
                    ):
296
                        yield (
12✔
297
                            property_name + "/" + field,
298
                            # TODO ambiguous use of "title"
299
                            (
300
                                title + ":" + child_title
301
                                if title and child_title
302
                                else None
303
                            ),
304
                        )
305

306
                elif "array" in property_type_set:
12✔
307
                    flattened_key = parent_path.replace("/0/", "/") + property_name
12✔
308
                    self.flattened[flattened_key] = "array"
12✔
309
                    type_set = get_property_type_set(property_schema_dict["items"])
12✔
310
                    if "string" in type_set or not type_set:
12✔
311
                        self.flattened[flattened_key] = "string_array"
12✔
312
                        yield property_name, title
12✔
313
                    elif "number" in type_set:
12✔
314
                        self.flattened[flattened_key] = "number_array"
12✔
315
                        yield property_name, title
12✔
316
                    elif "array" in type_set:
12✔
317
                        self.flattened[flattened_key] = "array_array"
12✔
318
                        nested_type_set = get_property_type_set(
12✔
319
                            property_schema_dict["items"]["items"]
320
                        )
321
                        if "string" in nested_type_set or "number" in nested_type_set:
12✔
322
                            yield property_name, title
12✔
323
                        else:
NEW
324
                            raise FlattenToolValueError
×
325
                    elif "object" in type_set:
12✔
326
                        if title:
12✔
327
                            title_lookup[title].property_name = property_name
12✔
328

329
                        if self.use_titles and parent_title is not None:
12✔
330
                            sub_sheet_name = make_sub_sheet_name(
12✔
331
                                parent_title,
332
                                title or property_name,
333
                                truncation_length=self.truncation_length,
334
                                path_separator=":",
335
                            )
336
                            self.sub_sheet_titles[
12✔
337
                                (
338
                                    parent_path,
339
                                    property_name,
340
                                )
341
                            ] = sub_sheet_name
342
                        else:
343
                            sub_sheet_name = make_sub_sheet_name(
12✔
344
                                parent_path,
345
                                property_name,
346
                                truncation_length=self.truncation_length,
347
                            )
348
                        # self.sub_sheet_mapping[parent_name+'/'+property_name] = sub_sheet_name
349

350
                        if sub_sheet_name not in self.sub_sheets:
12✔
351
                            self.sub_sheets[sub_sheet_name] = Sheet(
12✔
352
                                root_id=self.root_id, name=sub_sheet_name
353
                            )
354
                        sub_sheet = self.sub_sheets[sub_sheet_name]
12✔
355
                        sub_sheet.title_lookup = title_lookup.get(title)
12✔
356

357
                        for field in id_fields:
12✔
358
                            sub_sheet.add_field(field, id_field=True)
12✔
359
                            sub_sheet.titles[title_lookup.lookup_header(field)] = field
12✔
360
                        fields = self.parse_schema_dict(
12✔
361
                            parent_path + property_name + "/0",
362
                            property_schema_dict["items"],
363
                            parent_id_fields=id_fields,
364
                            title_lookup=title_lookup.get(title),
365
                            parent_title=parent_title + title + ":"
366
                            if parent_title is not None and title
367
                            else None,
368
                        )
369

370
                        rollup_fields = set()
12✔
371
                        for field, child_title in fields:
12✔
372
                            full_path = parent_path + property_name + "/0/" + field
12✔
373
                            if self.use_titles:
12✔
374
                                if not child_title or parent_title is None:
12✔
375
                                    warn(
12✔
376
                                        _(
377
                                            "Field {}{}/0/{} is missing a title, skipping."
378
                                        ).format(parent_path, property_name, field),
379
                                        FlattenToolWarning,
380
                                    )
381
                                elif not title:
12✔
382
                                    warn(
12✔
383
                                        _(
384
                                            "Field {}{} does not have a title, skipping it and all its children."
385
                                        ).format(parent_path, property_name),
386
                                        FlattenToolWarning,
387
                                    )
388
                                else:
389
                                    # This code only works for arrays that are at 0 or 1 layer of nesting
390
                                    full_title = (
12✔
391
                                        parent_title + title + ":" + child_title
392
                                    )
393
                                    sub_sheet.add_field(full_title)
12✔
394
                                    sub_sheet.titles[full_path] = full_title
12✔
395
                            else:
396
                                sub_sheet.add_field(full_path)
12✔
397
                            if (
12✔
398
                                self.do_rollup
399
                                and "rollUp" in property_schema_dict
400
                                and field in property_schema_dict["rollUp"]
401
                            ):
402
                                rollup_fields.add(field)
12✔
403
                                self.rollup.add(full_path)
12✔
404
                                yield property_name + "/0/" + field, (
12✔
405
                                    title + ":" + child_title
406
                                    if title and child_title
407
                                    else None
408
                                )
409

410
                        # Check that all items in rollUp are in the schema
411
                        if self.do_rollup and "rollUp" in property_schema_dict:
12✔
412
                            missedRollUp = (
12✔
413
                                set(property_schema_dict["rollUp"]) - rollup_fields
414
                            )
415
                            if missedRollUp:
12✔
416
                                warn(
12✔
417
                                    "{} in rollUp but not in schema".format(
418
                                        ", ".join(missedRollUp)
419
                                    ),
420
                                    FlattenToolWarning,
421
                                )
422

423
                    else:
NEW
424
                        raise FlattenToolValueError(
×
425
                            _(
426
                                'Unknown type_set: {}, did you forget to explicitly set the "type" key on "items"?'
427
                            ).format(type_set)
428
                        )
429
                elif "string" in property_type_set or not property_type_set:
12✔
430
                    # We only check for date here, because its the only format
431
                    # for which we need to specially transform the input
432
                    if property_schema_dict.get("format") == "date":
12✔
433
                        self.flattened[
12✔
434
                            parent_path.replace("/0/", "/") + property_name
435
                        ] = "date"
436
                    else:
437
                        self.flattened[
12✔
438
                            parent_path.replace("/0/", "/") + property_name
439
                        ] = "string"
440
                    yield property_name, title
12✔
441
                elif "number" in property_type_set:
12✔
442
                    self.flattened[
12✔
443
                        parent_path.replace("/0/", "/") + property_name
444
                    ] = "number"
445
                    yield property_name, title
12✔
446
                elif "integer" in property_type_set:
12✔
447
                    self.flattened[
12✔
448
                        parent_path.replace("/0/", "/") + property_name
449
                    ] = "integer"
450
                    yield property_name, title
12✔
451
                elif "boolean" in property_type_set:
12✔
452
                    self.flattened[
12✔
453
                        parent_path.replace("/0/", "/") + property_name
454
                    ] = "boolean"
455
                    yield property_name, title
12✔
456
                else:
UNCOV
457
                    warn(
×
458
                        _(
459
                            'Unrecognised types {} for property "{}" with context "{}",'
460
                            "so this property has been ignored."
461
                        ).format(repr(property_type_set), property_name, parent_path),
462
                        FlattenToolWarning,
463
                    )
464

465
        else:
466
            warn(
12✔
467
                _('Skipping field "{}", because it has no properties.').format(
468
                    parent_path
469
                ),
470
                FlattenToolWarning,
471
            )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc