• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OpenDataServices / flatten-tool / 6507626273

13 Oct 2023 11:25AM UTC coverage: 42.006% (-53.7%) from 95.72%
6507626273

Pull #433

github

odscjames
New "Geo" optional dependencies

https://github.com/OpenDataServices/flatten-tool/issues/424
Pull Request #433: New "Geo" optional dependencies

38 of 38 new or added lines in 6 files covered. (100.0%)

1466 of 3490 relevant lines covered (42.01%)

4.16 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.91
/flattentool/schema.py
1
"""Classes for reading from a JSON schema"""
2

3
from __future__ import print_function, unicode_literals
10✔
4

5
import codecs
10✔
6
import os
10✔
7
import sys
10✔
8
from collections import OrderedDict, UserDict
10✔
9
from warnings import warn
10✔
10

11
import jsonref
10✔
12

13
from flattentool.i18n import _
10✔
14
from flattentool.sheet import Sheet
10✔
15

16
if sys.version_info[:2] > (3, 0):
10✔
17
    import pathlib
10✔
18
else:
19
    import urllib
×
20

21
    import urlparse
×
22

23

24
def get_property_type_set(property_schema_dict):
10✔
25
    property_type = property_schema_dict.get("type", [])
10✔
26
    if not isinstance(property_type, list):
10✔
27
        return set([property_type])
10✔
28
    else:
29
        return set(property_type)
10✔
30

31

32
def make_sub_sheet_name(
10✔
33
    parent_path, property_name, truncation_length=3, path_separator="/"
34
):
35
    return (
10✔
36
        "_".join(
37
            x[:truncation_length] for x in parent_path.split(path_separator) if x != "0"
38
        )
39
        + property_name
40
    )
41

42

43
class TitleLookup(UserDict):
10✔
44
    property_name = None
10✔
45

46
    def lookup_header(self, title_header):
10✔
47
        if type(title_header) == str:
10✔
48
            return self.lookup_header_list(title_header.split(":"))
10✔
49
        else:
50
            return title_header
×
51

52
    def lookup_header_list(self, title_header_list):
10✔
53
        first_title = title_header_list[0]
10✔
54
        remaining_titles = title_header_list[1:]
10✔
55
        try:
10✔
56
            int(first_title)
10✔
57
            return first_title + (
×
58
                "/" + self.lookup_header_list(remaining_titles)
59
                if remaining_titles
60
                else ""
61
            )
62
        except ValueError:
10✔
63
            pass
10✔
64

65
        if first_title in self:
10✔
66
            if remaining_titles:
×
67
                return (
×
68
                    self[first_title].property_name
69
                    + "/"
70
                    + self[first_title].lookup_header_list(remaining_titles)
71
                )
72
            else:
73
                return self[first_title].property_name
×
74
        else:
75
            # If we can't look up the title, treat it and any children as
76
            # field names directly.
77
            # Strip spaces off these.
78
            return "/".join(x.strip(" ") for x in title_header_list)
10✔
79

80
    def __setitem__(self, key, value):
10✔
81
        self.data[key.replace(" ", "").lower()] = value
10✔
82

83
    def __getitem__(self, key):
10✔
84
        if key is None:
10✔
85
            raise KeyError
10✔
86
        else:
87
            return self.data[key.replace(" ", "").lower()]
10✔
88

89
    def __contains__(self, key):
10✔
90
        if key is None:
10✔
91
            return False
×
92
        else:
93
            return key.replace(" ", "").lower() in self.data
10✔
94

95

96
class JsonLoaderLocalRefUsedWhenLocalRefsDisabled(Exception):
10✔
97
    pass
10✔
98

99

100
def jsonloader_local_refs_disabled(uri, **kwargs):
10✔
101
    if is_ref_local(uri):
×
102
        raise JsonLoaderLocalRefUsedWhenLocalRefsDisabled(
×
103
            "Local Ref Used When Local Refs Disabled: " + uri
104
        )
105
    return jsonref.jsonloader(uri, **kwargs)
×
106

107

108
def is_ref_local(uri):
10✔
109
    return uri[:7].lower() != "http://" and uri[:8].lower() != "https://"
×
110

111

112
class SchemaParser(object):
10✔
113
    """Parse the fields of a JSON schema into a flattened structure."""
114

115
    def __init__(
10✔
116
        self,
117
        schema_filename=None,
118
        root_schema_dict=None,
119
        rollup=False,
120
        root_id=None,
121
        use_titles=False,
122
        disable_local_refs=False,
123
        truncation_length=3,
124
        exclude_deprecated_fields=False,
125
        convert_flags={},
126
    ):
127
        self.sub_sheets = {}
10✔
128
        self.main_sheet = Sheet()
10✔
129
        self.sub_sheet_mapping = {}
10✔
130
        self.do_rollup = rollup
10✔
131
        self.rollup = set()
10✔
132
        self.root_id = root_id
10✔
133
        self.use_titles = use_titles
10✔
134
        self.sub_sheet_titles = {}
10✔
135
        self.truncation_length = truncation_length
10✔
136
        self.title_lookup = TitleLookup()
10✔
137
        self.flattened = {}
10✔
138
        self.exclude_deprecated_fields = exclude_deprecated_fields
10✔
139
        self.convert_flags = convert_flags
10✔
140

141
        if root_schema_dict is None and schema_filename is None:
10✔
142
            raise ValueError(
×
143
                _("One of schema_filename or root_schema_dict must be supplied")
144
            )
145
        if root_schema_dict is not None and schema_filename is not None:
10✔
146
            raise ValueError(
×
147
                _("Only one of schema_filename or root_schema_dict should be supplied")
148
            )
149
        if schema_filename:
10✔
150
            if schema_filename.startswith("http"):
10✔
151
                import requests
×
152

153
                r = requests.get(schema_filename)
×
154
                self.root_schema_dict = jsonref.loads(
×
155
                    r.text, object_pairs_hook=OrderedDict
156
                )
157
            else:
158
                if disable_local_refs:
10✔
159
                    with codecs.open(schema_filename, encoding="utf-8") as schema_file:
×
160
                        self.root_schema_dict = jsonref.load(
×
161
                            schema_file,
162
                            object_pairs_hook=OrderedDict,
163
                            loader=jsonloader_local_refs_disabled,
164
                        )
165
                else:
166
                    if sys.version_info[:2] > (3, 0):
10✔
167
                        base_uri = pathlib.Path(
10✔
168
                            os.path.realpath(schema_filename)
169
                        ).as_uri()
170
                    else:
171
                        base_uri = urlparse.urljoin(
×
172
                            "file:",
173
                            urllib.pathname2url(os.path.abspath(schema_filename)),
174
                        )
175
                    with codecs.open(schema_filename, encoding="utf-8") as schema_file:
10✔
176
                        self.root_schema_dict = jsonref.load(
10✔
177
                            schema_file,
178
                            object_pairs_hook=OrderedDict,
179
                            base_uri=base_uri,
180
                        )
181

182
        else:
183
            self.root_schema_dict = root_schema_dict
10✔
184

185
    def parse(self):
10✔
186
        fields = self.parse_schema_dict("", self.root_schema_dict)
10✔
187
        for field, title in fields:
10✔
188
            if self.use_titles:
10✔
189
                if not title:
×
190
                    warn(_("Field {} does not have a title, skipping.").format(field))
×
191
                else:
192
                    self.main_sheet.append(title)
×
193
                    self.main_sheet.titles[field] = title
×
194
            else:
195
                self.main_sheet.append(field)
10✔
196

197
    def parse_schema_dict(
10✔
198
        self,
199
        parent_path,
200
        schema_dict,
201
        parent_id_fields=None,
202
        title_lookup=None,
203
        parent_title="",
204
    ):
205
        if parent_path:
10✔
206
            parent_path = parent_path + "/"
10✔
207
        parent_id_fields = parent_id_fields or []
10✔
208
        title_lookup = self.title_lookup if title_lookup is None else title_lookup
10✔
209

210
        if (
10✔
211
            "type" in schema_dict
212
            and schema_dict["type"] == "array"
213
            and "items" in schema_dict
214
            and "oneOf" in schema_dict["items"]
215
        ):
216
            for oneOf in schema_dict["items"]["oneOf"]:
×
217
                if "type" in oneOf and oneOf["type"] == "object":
×
218
                    for field, child_title in self.parse_schema_dict(
×
219
                        parent_path,
220
                        oneOf,
221
                        parent_id_fields=parent_id_fields,
222
                        title_lookup=title_lookup,
223
                        parent_title=parent_title,
224
                    ):
225
                        yield (field, child_title)
×
226

227
        elif "properties" in schema_dict:
10✔
228
            if "id" in schema_dict["properties"]:
10✔
229
                if self.use_titles:
10✔
230
                    id_fields = parent_id_fields + [
×
231
                        (parent_title if parent_title is not None else parent_path)
232
                        + (schema_dict["properties"]["id"].get("title") or "id")
233
                    ]
234
                else:
235
                    id_fields = parent_id_fields + [parent_path + "id"]
10✔
236
            else:
237
                id_fields = parent_id_fields
10✔
238

239
            for property_name, property_schema_dict in schema_dict[
10✔
240
                "properties"
241
            ].items():
242
                if self.exclude_deprecated_fields and property_schema_dict.get(
10✔
243
                    "deprecated"
244
                ):
245
                    continue
×
246

247
                if (
10✔
248
                    self.exclude_deprecated_fields
249
                    and hasattr(property_schema_dict, "__reference__")
250
                    and property_schema_dict.__reference__.get("deprecated")
251
                ):
252
                    continue
×
253

254
                property_type_set = get_property_type_set(property_schema_dict)
10✔
255

256
                if (
10✔
257
                    hasattr(property_schema_dict, "__reference__")
258
                    and "title" in property_schema_dict.__reference__
259
                ):
260
                    title = property_schema_dict.__reference__["title"]
×
261
                else:
262
                    title = property_schema_dict.get("title")
10✔
263
                if title:
10✔
264
                    title_lookup[title] = TitleLookup()
10✔
265
                    title_lookup[title].property_name = property_name
10✔
266

267
                if "object" in property_type_set:
10✔
268
                    if (
10✔
269
                        self.convert_flags.get("wkt")
270
                        and "type" in property_schema_dict.get("properties", {})
271
                        and "coordinates" in property_schema_dict.get("properties", {})
272
                    ):
273
                        self.flattened[
10✔
274
                            parent_path.replace("/0/", "/") + property_name
275
                        ] = "geojson"
276
                        yield (property_name, title)
10✔
277
                        continue
10✔
278
                    self.flattened[parent_path + property_name] = "object"
10✔
279
                    for field, child_title in self.parse_schema_dict(
10✔
280
                        parent_path + property_name,
281
                        property_schema_dict,
282
                        parent_id_fields=id_fields,
283
                        title_lookup=title_lookup.get(title),
284
                        parent_title=parent_title + title + ":"
285
                        if parent_title is not None and title
286
                        else None,
287
                    ):
288
                        yield (
10✔
289
                            property_name + "/" + field,
290
                            # TODO ambiguous use of "title"
291
                            (
292
                                title + ":" + child_title
293
                                if title and child_title
294
                                else None
295
                            ),
296
                        )
297

298
                elif "array" in property_type_set:
10✔
299
                    flattened_key = parent_path.replace("/0/", "/") + property_name
10✔
300
                    self.flattened[flattened_key] = "array"
10✔
301
                    type_set = get_property_type_set(property_schema_dict["items"])
10✔
302
                    if "string" in type_set or not type_set:
10✔
303
                        self.flattened[flattened_key] = "string_array"
10✔
304
                        yield property_name, title
10✔
305
                    elif "number" in type_set:
10✔
306
                        self.flattened[flattened_key] = "number_array"
10✔
307
                        yield property_name, title
10✔
308
                    elif "array" in type_set:
10✔
309
                        self.flattened[flattened_key] = "array_array"
×
310
                        nested_type_set = get_property_type_set(
×
311
                            property_schema_dict["items"]["items"]
312
                        )
313
                        if "string" in nested_type_set or "number" in nested_type_set:
×
314
                            yield property_name, title
×
315
                        else:
316
                            raise ValueError
×
317
                    elif "object" in type_set:
10✔
318
                        if title:
10✔
319
                            title_lookup[title].property_name = property_name
10✔
320

321
                        if self.use_titles and parent_title is not None:
10✔
322
                            sub_sheet_name = make_sub_sheet_name(
×
323
                                parent_title,
324
                                title or property_name,
325
                                truncation_length=self.truncation_length,
326
                                path_separator=":",
327
                            )
328
                            self.sub_sheet_titles[
×
329
                                (
330
                                    parent_path,
331
                                    property_name,
332
                                )
333
                            ] = sub_sheet_name
334
                        else:
335
                            sub_sheet_name = make_sub_sheet_name(
10✔
336
                                parent_path,
337
                                property_name,
338
                                truncation_length=self.truncation_length,
339
                            )
340
                        # self.sub_sheet_mapping[parent_name+'/'+property_name] = sub_sheet_name
341

342
                        if sub_sheet_name not in self.sub_sheets:
10✔
343
                            self.sub_sheets[sub_sheet_name] = Sheet(
10✔
344
                                root_id=self.root_id, name=sub_sheet_name
345
                            )
346
                        sub_sheet = self.sub_sheets[sub_sheet_name]
10✔
347
                        sub_sheet.title_lookup = title_lookup.get(title)
10✔
348

349
                        for field in id_fields:
10✔
350
                            sub_sheet.add_field(field, id_field=True)
10✔
351
                            sub_sheet.titles[title_lookup.lookup_header(field)] = field
10✔
352
                        fields = self.parse_schema_dict(
10✔
353
                            parent_path + property_name + "/0",
354
                            property_schema_dict["items"],
355
                            parent_id_fields=id_fields,
356
                            title_lookup=title_lookup.get(title),
357
                            parent_title=parent_title + title + ":"
358
                            if parent_title is not None and title
359
                            else None,
360
                        )
361

362
                        rollup_fields = set()
10✔
363
                        for field, child_title in fields:
10✔
364
                            full_path = parent_path + property_name + "/0/" + field
10✔
365
                            if self.use_titles:
10✔
366
                                if not child_title or parent_title is None:
×
367
                                    warn(
×
368
                                        _(
369
                                            "Field {}{}/0/{} is missing a title, skipping."
370
                                        ).format(parent_path, property_name, field)
371
                                    )
372
                                elif not title:
×
373
                                    warn(
×
374
                                        _(
375
                                            "Field {}{} does not have a title, skipping it and all its children."
376
                                        ).format(parent_path, property_name)
377
                                    )
378
                                else:
379
                                    # This code only works for arrays that are at 0 or 1 layer of nesting
380
                                    full_title = (
×
381
                                        parent_title + title + ":" + child_title
382
                                    )
383
                                    sub_sheet.add_field(full_title)
×
384
                                    sub_sheet.titles[full_path] = full_title
×
385
                            else:
386
                                sub_sheet.add_field(full_path)
10✔
387
                            if (
10✔
388
                                self.do_rollup
389
                                and "rollUp" in property_schema_dict
390
                                and field in property_schema_dict["rollUp"]
391
                            ):
392
                                rollup_fields.add(field)
10✔
393
                                self.rollup.add(full_path)
10✔
394
                                yield property_name + "/0/" + field, (
10✔
395
                                    title + ":" + child_title
396
                                    if title and child_title
397
                                    else None
398
                                )
399

400
                        # Check that all items in rollUp are in the schema
401
                        if self.do_rollup and "rollUp" in property_schema_dict:
10✔
402
                            missedRollUp = (
10✔
403
                                set(property_schema_dict["rollUp"]) - rollup_fields
404
                            )
405
                            if missedRollUp:
10✔
406
                                warn(
×
407
                                    "{} in rollUp but not in schema".format(
408
                                        ", ".join(missedRollUp)
409
                                    )
410
                                )
411

412
                    else:
413
                        raise ValueError(
×
414
                            _(
415
                                'Unknown type_set: {}, did you forget to explicitly set the "type" key on "items"?'
416
                            ).format(type_set)
417
                        )
418
                elif "string" in property_type_set or not property_type_set:
10✔
419
                    # We only check for date here, because its the only format
420
                    # for which we need to specially transform the input
421
                    if property_schema_dict.get("format") == "date":
10✔
422
                        self.flattened[
10✔
423
                            parent_path.replace("/0/", "/") + property_name
424
                        ] = "date"
425
                    else:
426
                        self.flattened[
10✔
427
                            parent_path.replace("/0/", "/") + property_name
428
                        ] = "string"
429
                    yield property_name, title
10✔
430
                elif "number" in property_type_set:
10✔
431
                    self.flattened[
×
432
                        parent_path.replace("/0/", "/") + property_name
433
                    ] = "number"
434
                    yield property_name, title
×
435
                elif "integer" in property_type_set:
10✔
436
                    self.flattened[
10✔
437
                        parent_path.replace("/0/", "/") + property_name
438
                    ] = "integer"
439
                    yield property_name, title
10✔
440
                elif "boolean" in property_type_set:
×
441
                    self.flattened[
×
442
                        parent_path.replace("/0/", "/") + property_name
443
                    ] = "boolean"
444
                    yield property_name, title
×
445
                else:
446
                    warn(
×
447
                        _(
448
                            'Unrecognised types {} for property "{}" with context "{}",'
449
                            "so this property has been ignored."
450
                        ).format(repr(property_type_set), property_name, parent_path)
451
                    )
452

453
        else:
454
            warn(
×
455
                _('Skipping field "{}", because it has no properties.').format(
456
                    parent_path
457
                )
458
            )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc