• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geopython / OWSLib / 11628884181

01 Nov 2024 11:49AM UTC coverage: 58.814% (-1.3%) from 60.156%
11628884181

Pull #548

github

web-flow
Merge 4b9b7cf1f into ae98c2039
Pull Request #548: Strip out redundant __new__ to enable object pickling

8364 of 14221 relevant lines covered (58.81%)

1.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

44.95
/owslib/owscontext/atom.py
1
# -*- coding: utf-8 -*-
2
# =============================================================================
3
# Authors : Alexander Kmoch <allixender@gmail.com>
4
#
5
# =============================================================================
6

7
"""
8
API for OGC Web Services Context Document (OWS Context) format.
9

10
ATOM XML Encoding: http://www.opengeospatial.org/standards/owc
11

12
OGC OWS Context Atom Encoding Standard 1.0 (12-084r2)
13
"""
14

15
import logging
2✔
16

17
from owslib.etree import etree, ParseError
2✔
18
from owslib import util
2✔
19
from owslib.namespaces import Namespaces
2✔
20
from owslib.owscontext.common import is_empty, extract_p, \
2✔
21
    try_int, try_float
22
from owslib.util import nspath_eval, element_to_string
2✔
23

24

25
LOGGER = logging.getLogger(__name__)
2✔
26

27
# default variables
28
add_namespaces = {"georss": "http://www.georss.org/georss",
2✔
29
                  "owc": "http://www.opengis.net/owc/1.0",
30
                  "xml": "http://www.w3.org/XML/1998/namespace"}
31

32

33
def get_namespaces():
2✔
34
    n = Namespaces()
2✔
35
    ns = n.get_namespaces(["atom", "dc", "gml", "gml32", "xlink"])
2✔
36
    ns.update(add_namespaces)
2✔
37
    ns[None] = n.get_namespace("atom")
2✔
38
    return ns
2✔
39

40

41
ns = get_namespaces()
2✔
42

43

44
def nspv(path):
2✔
45
    """
46
    short-hand syntax seen in waterml2.py
47
    :param path: xpath namespace aware
48
    :return: xml element
49
    """
50
    return nspath_eval(path, ns)
×
51

52

53
def ns_elem(ns_prefix, elem_name):
2✔
54
    ns_uri = ns.get(ns_prefix)
×
55
    if ns_uri is not None:
×
56
        return """{%(ns_uri)s}%(elem_name)s""" % {"ns_uri": ns_uri,
×
57
                                                  "elem_name": elem_name}
58

59

60
def parse_owc_content(content_node):
2✔
61
    mimetype = util.testXMLAttribute(content_node, 'type')
2✔
62
    url = util.testXMLAttribute(content_node, 'href')
2✔
63
    title = util.testXMLAttribute(content_node, 'title')
2✔
64
    child_elem = None
2✔
65
    if len(list(content_node)) > 0:
2✔
66
        child_elem = element_to_string(
2✔
67
            list(content_node)[0], False)
68

69
    content_dict = {
2✔
70
        "type": mimetype,
71
        "url": url,
72
        "content": str(child_elem),
73
        "title": title
74
    }
75
    return content_dict
2✔
76

77

78
def parse_entry(entry_node):
2✔
79
    """
80
    parse an aotm entry into a feature/resource dict to build OwcResource from
81

82
    :param entry_node: xml element root node of the atom:entry
83
    :return: dictionary for OwcResource.from_dict()
84
    """
85
    resource_base_dict = {
2✔
86
        "type": "Feature",
87
        "id": None,
88
        "geometry": None,
89
        "properties": {
90
            'title': None,
91
            'abstract': None,
92
            'updated': None,
93
            'date': None,
94
            'authors': [],
95

96
            'publisher': None,
97
            'rights': None,
98
            'categories': [],
99
            "links": {
100
                "alternates": [],
101
                "previews": [],
102
                "data": [],
103
                "via": [],
104
            },
105
            'offerings': [],
106
            'active': None,
107
            'minscaledenominator': None,
108
            'maxscaledenominator': None,
109
            'folder': None
110
        }
111
    }
112

113
    # <id>ftp://ftp.remotesensing.org/pub/geotiff/samples/gdal_eg/cea.txt</id>
114
    val = entry_node.find(util.nspath_eval('atom:id', ns))
2✔
115
    id = util.testXMLValue(val)
2✔
116
    # LOGGER.debug("entry :id %s :: %s", id, val)
117
    resource_base_dict.update({"id": id})
2✔
118

119
    # <title>GeoTIFF Example</title>
120
    val = entry_node.find(util.nspath_eval('atom:title', ns))
2✔
121
    title = util.testXMLValue(val)
2✔
122
    # LOGGER.debug("entry: title %s :: %s", id, val)
123
    resource_base_dict['properties'].update({"title": title})
2✔
124

125
    # <updated>2011-11-01T00:00:00Z</updated>
126
    val = entry_node.find(util.nspath_eval('atom:updated', ns))
2✔
127
    update_date = util.testXMLValue(val)
2✔
128
    # LOGGER.debug("entry: updated %s :: %s", update_date, val)
129
    resource_base_dict['properties'].update({"updated": update_date})
2✔
130

131
    # <dc:publisher>
132
    val = entry_node.find(util.nspath_eval('dc:publisher', ns))
2✔
133
    publisher = util.testXMLValue(val)
2✔
134
    # LOGGER.debug("entry: dc:publisher %s :: %s", publisher, val)
135
    resource_base_dict['properties'].update({"publisher": publisher})
2✔
136

137
    # <dc:rights>
138
    val = entry_node.find(util.nspath_eval('dc:rights', ns))
2✔
139
    rights = util.testXMLValue(val)
2✔
140
    # LOGGER.debug("entry: rights %s :: %s", rights, val)
141
    resource_base_dict['properties'].update({"rights": rights})
2✔
142

143
    # <georss:where>
144
    val = entry_node.find(util.nspath_eval('georss:where', ns))
2✔
145
    if val is not None:
2✔
146
        if len(list(val)) > 0:
2✔
147
            # xmltxt = etree.tostring(
148
            #     list(val)[0], encoding='utf8', method='xml')
149
            xmltxt = element_to_string(
2✔
150
                list(val)[0], False)
151
            # TODO here parse geometry??
152
            # LOGGER.debug("entry: geometry %s :: %s", xmltxt, val)
153
            resource_base_dict.update({"geometry": xmltxt.decode('utf-8')})
2✔
154

155
    # <content type = "text" > aka subtitle, aka abstract
156
    val = entry_node.find(util.nspath_eval('atom:content', ns))
2✔
157
    subtitle = util.testXMLValue(val)
2✔
158
    # LOGGER.debug("entry: subtitle %s :: %s", subtitle, val)
159
    resource_base_dict['properties'].update({"abstract": subtitle})
2✔
160

161
    # <author> ..
162
    #                 <name>
163
    #       <email>
164
    vals = entry_node.findall(util.nspath_eval('atom:author', ns))
2✔
165
    authors = []
2✔
166
    for val in vals:
2✔
167
        val_name = val.find(util.nspath_eval('atom:name', ns))
2✔
168
        val_email = val.find(util.nspath_eval('atom:email', ns))
2✔
169
        val_uri = val.find(util.nspath_eval('atom:uri', ns))
2✔
170
        name = util.testXMLValue(val_name)
2✔
171
        email = util.testXMLValue(val_email)
2✔
172
        uri = util.testXMLValue(val_uri)
2✔
173
        author = {
2✔
174
            "name": name,
175
            "email": email,
176
            "uri": uri
177
        }
178
        # LOGGER.debug("entry: author %s :: %s", author, vals)
179
        if not is_empty(author):
2✔
180
            authors.append(author)
2✔
181

182
    resource_base_dict['properties'].update({"authors": authors})
2✔
183

184
    # <link rel="enclosure" type="image/png"
185
    #   length="12345" title="..." href="http://o..."/>
186
    # <link rel="icon" type="image/png" title="Preview f..."
187
    #   href="http://..."/>
188
    # <link rel="via" type="application/vnd.ogc.wms_xml"
189
    #   title="Original .." href="...."/>
190
    vals = entry_node.findall(util.nspath_eval('atom:link', ns))
2✔
191
    links_alternates = []
2✔
192
    links_previews = []
2✔
193
    links_data = []
2✔
194
    links_via = []
2✔
195
    for val in vals:
2✔
196
        rel = util.testXMLAttribute(val, 'rel')
2✔
197
        href = util.testXMLAttribute(val, 'href')
2✔
198
        mimetype = util.testXMLAttribute(val, 'type')
2✔
199
        lang = util.testXMLAttribute(val, 'lang')
2✔
200
        title = util.testXMLAttribute(val, 'title')
2✔
201
        length = util.testXMLAttribute(val, 'length')
2✔
202
        link = {
2✔
203
            "href": href,
204
            "type": mimetype,
205
            "length": length,
206
            "lang": lang,
207
            "title": title,
208
            "rel": rel
209
        }
210
        # LOGGER.debug("entry: link %s :: %s", link, vals)
211
        if link.get("rel") == "alternate" and not is_empty(link):
2✔
212
            links_alternates.append(link)
×
213
        elif link.get("rel") == "icon" and not is_empty(link):
2✔
214
            links_previews.append(link)
2✔
215
        elif link.get("rel") == "enclosure" and not is_empty(link):
2✔
216
            links_data.append(link)
2✔
217
        elif link.get("rel") == "via" and not is_empty(link):
2✔
218
            links_via.append(link)
2✔
219
        else:
220
            LOGGER.warning(
2✔
221
                "unknown link type in Ows Resource entry section: %r", link)
222

223
    resource_base_dict['properties']['links'].update(
2✔
224
        {"alternates": links_alternates})
225
    resource_base_dict['properties']['links'].update(
2✔
226
        {"previews": links_previews})
227
    resource_base_dict['properties']['links'].update({"data": links_data})
2✔
228
    resource_base_dict['properties']['links'].update({"via": links_via})
2✔
229

230
    # <owc:offering code="http://www.opengis.net/spec/owc-at...">
231
    #   <owc:content type="image/tiff" href=".."
232
    #   <owc:offering code="http://www.opengis.net/spec....l">
233
    #                         <owc:content type="application/gml+xml">
234
    #   <owc:operation code="GetCapabilities" method="GET"
235
    #       type="applica..." href="..."
236
    #       <owc:request type="application/xml"> ..
237
    #     <owc:styleSet>
238
    #       <owc:name>raster</owc:name>
239
    #       <owc:title>Default Raster</owc:title>
240
    #       <owc:abstract>A sample style that draws a </owc:abstract>
241
    #       <owc:legendURL href="h...." type="image/png"/>
242
    #     </owc:styleSet>
243
    offering_nodes = entry_node.findall(util.nspath_eval('owc:offering', ns))
2✔
244
    offerings = []
2✔
245
    for offering_node in offering_nodes:
2✔
246
        offering_code = util.testXMLAttribute(offering_node, 'code')
2✔
247
        operations = []
2✔
248
        contents = []
2✔
249
        styles = []
2✔
250
        operation_nodes = offering_node.findall(
2✔
251
            util.nspath_eval('owc:operation', ns))
252
        for op_val in operation_nodes:
2✔
253
            operations_code = util.testXMLAttribute(op_val, 'code')
2✔
254
            http_method = util.testXMLAttribute(op_val, 'method')
2✔
255
            mimetype = util.testXMLAttribute(op_val, 'type')
2✔
256
            request_url = util.testXMLAttribute(op_val, 'href')
2✔
257
            req_content_val = val.find(util.nspath_eval('owc:request', ns))
2✔
258
            req_content = None
2✔
259
            if req_content_val is not None:
2✔
260
                req_content = parse_owc_content(req_content_val)
×
261

262
            # TODO no example for result/response
263
            op_dict = {
2✔
264
                "code": operations_code,
265
                "method": http_method,
266
                "type": mimetype,
267
                "href": request_url,
268
                "request": None if is_empty(req_content) else req_content,
269
                "result": None
270
            }
271
            # LOGGER.debug("entry: operation %s :: %s", op_dict, vals)
272
            if not is_empty(op_dict):
2✔
273
                operations.append(op_dict)
2✔
274

275
        content_nodes = offering_node.findall(
2✔
276
            util.nspath_eval('owc:content', ns))
277
        for cont_val in content_nodes:
2✔
278
            content_dict = parse_owc_content(cont_val)
2✔
279
            # LOGGER.debug("entry: content_dict %s :: %s", content_dict, vals)
280
            if not is_empty(content_dict):
2✔
281
                contents.append(content_dict)
2✔
282

283
        style_nodes = offering_node.findall(
2✔
284
            util.nspath_eval('owc:styleSet', ns))
285
        for style_val in style_nodes:
2✔
286
            val_name = style_val.find(util.nspath_eval('owc:name', ns))
2✔
287
            val_title = style_val.find(util.nspath_eval('owc:title', ns))
2✔
288
            val_abstr = style_val.find(util.nspath_eval('owc:abstract', ns))
2✔
289
            val_uri = style_val.find(util.nspath_eval('owc:legendURL', ns))
2✔
290
            name = util.testXMLValue(val_name)
2✔
291
            title = util.testXMLValue(val_title)
2✔
292
            abstr = util.testXMLValue(val_abstr)
2✔
293
            legend_url = util.testXMLAttribute(val_uri, 'href')
2✔
294
            style_set = {
2✔
295
                "name": name,
296
                "title": title,
297
                "abstract": abstr,
298
                "default": None,
299
                "legendURL": legend_url,
300
                "content": None
301
            }
302
            # LOGGER.debug("entry: style_set %s :: %s", style_set, vals)
303
            if not is_empty(style_set):
2✔
304
                styles.append(style_set)
2✔
305

306
        offering_dict = {
2✔
307
            "code": offering_code,
308
            "operations": operations,
309
            "contents": contents,
310
            "styles": styles
311
        }
312
        if offering_code is not None:
2✔
313
            offerings.append(offering_dict)
2✔
314

315
    resource_base_dict['properties'].update(
2✔
316
        {"offerings": offerings})
317

318
    # TODO no examples for active attribute
319
    # <owc:minScaleDenominator>2500</owc:minScaleDenominator>
320
    val = entry_node.find(util.nspath_eval('owc:minScaleDenominator', ns))
2✔
321
    min_scale_denominator = util.testXMLValue(val)
2✔
322
    # LOGGER.debug("entry: min-scale-... %s :: %s", min_scale_denominator, val)
323
    resource_base_dict['properties'].update(
2✔
324
        {"minscaledenominator": min_scale_denominator})
325

326
    # <owc:maxScaleDenominator>25000</owc:maxScaleDenominator>
327
    val = entry_node.find(util.nspath_eval('owc:maxScaleDenominator', ns))
2✔
328
    max_scale_denominator = util.testXMLValue(val)
2✔
329
    # LOGGER.debug("entry: max_scale_... %s :: %s", max_scale_denominator, val)
330
    resource_base_dict['properties'].update(
2✔
331
        {"maxscaledenominator": max_scale_denominator})
332
    # TODO no examples for folder attribute
333

334
    return resource_base_dict
2✔
335

336

337
def decode_atomxml(xml_string):
2✔
338
    """
339
    here parse atom xml to a dict for instanciating of OWC:Context
340
    :param xmlstring:
341
    :return: OwcContext-ready dict
342
    """
343
    context_base_dict = {
2✔
344
        "type": "FeatureCollection",
345
        "id": None,
346
        "bbox": None,
347
        "properties": {
348
            "lang": None,
349
            "links": {
350
                "profiles": [],
351
                "via": [],
352
            },
353
            'title': None,
354
            'abstract': None,
355
            'updated': None,
356
            'authors': [],
357
            'publisher': None,
358
            'generator': None,
359
            'display': None,
360
            'rights': None,
361
            'date': None,
362
            'categories': [],
363
        },
364
        'features': []
365
    }
366
    feed_root = etree.fromstring(xml_string)
2✔
367
    # feed_root = etree.parse(xml_bytes)
368
    # feed_root xml lang use?
369
    # # LOGGER.debug(feed_root)
370
    # feed xml:lang=en
371
    # lang = feed_root.get('{http://www.w3.org/XML/1998/namespace}lang')
372
    lang = util.testXMLAttribute(
2✔
373
        feed_root, '{http://www.w3.org/XML/1998/namespace}lang')
374
    # LOGGER.debug("lang %s ", lang)
375
    context_base_dict['properties'].update({"lang": lang})
2✔
376

377
    # <id>
378
    val = feed_root.find(util.nspath_eval('atom:id', ns))
2✔
379
    id = util.testXMLValue(val)
2✔
380
    # LOGGER.debug("id %s :: %s", id, val)
381
    context_base_dict.update({"id": id})
2✔
382

383
    # <link rel="profile"
384
    #   href="http://www.opengis.net/spec/owc-atom/1.0/req/core"
385
    #   title="compliant bla bla"
386
    # < link rel = "via" type = "application/xml" href = "..." title = "..."
387
    vals = feed_root.findall(util.nspath_eval('atom:link', ns))
2✔
388
    links_profile = []
2✔
389
    links_via = []
2✔
390
    for val in vals:
2✔
391
        rel = util.testXMLAttribute(val, 'rel')
2✔
392
        href = util.testXMLAttribute(val, 'href')
2✔
393
        mimetype = util.testXMLAttribute(val, 'type')
2✔
394
        lang = util.testXMLAttribute(val, 'lang')
2✔
395
        title = util.testXMLAttribute(val, 'title')
2✔
396
        length = util.testXMLAttribute(val, 'length')
2✔
397
        link = {
2✔
398
            "href": href,
399
            "type": mimetype,
400
            "length": length,
401
            "lang": lang,
402
            "title": title,
403
            "rel": rel
404
        }
405
        # LOGGER.debug("link %s :: %s", link, vals)
406
        if link.get("rel") == "profile" and not is_empty(link):
2✔
407
            links_profile.append(link)
2✔
408
        elif link.get("rel") == "via" and not is_empty(link):
2✔
409
            links_via.append(link)
2✔
410
        else:
411
            LOGGER.warning("unknown link type in Ows Context section: %r", link)
×
412

413
    context_base_dict['properties']['links'].update(
2✔
414
        {"profiles": links_profile})
415
    context_base_dict['properties']['links'].update({"via": links_via})
2✔
416

417
    # <title>
418
    val = feed_root.find(util.nspath_eval('atom:title', ns))
2✔
419
    title = util.testXMLValue(val)
2✔
420
    # LOGGER.debug("title %s :: %s", title, val)
421
    context_base_dict['properties'].update({"title": title})
2✔
422

423
    # <subtitle type = "html"
424
    val = feed_root.find(util.nspath_eval('atom:subtitle', ns))
2✔
425
    subtitle = util.testXMLValue(val)
2✔
426
    # LOGGER.debug("subtitle %s :: %s", subtitle, val)
427
    context_base_dict['properties'].update({"abstract": subtitle})
2✔
428

429
    # <author> ..
430
    #                 <name>
431
    #       <email>
432
    vals = feed_root.findall(util.nspath_eval('atom:author', ns))
2✔
433
    authors = []
2✔
434
    for val in vals:
2✔
435
        val_name = val.find(util.nspath_eval('atom:name', ns))
2✔
436
        val_email = val.find(util.nspath_eval('atom:email', ns))
2✔
437
        val_uri = val.find(util.nspath_eval('atom:uri', ns))
2✔
438
        name = util.testXMLValue(val_name)
2✔
439
        email = util.testXMLValue(val_email)
2✔
440
        uri = util.testXMLValue(val_uri)
2✔
441
        author = {
2✔
442
            "name": name,
443
            "email": email,
444
            "uri": uri
445
        }
446
        # LOGGER.debug("author %s :: %s", author, vals)
447
        if not is_empty(author):
2✔
448
            authors.append(author)
2✔
449

450
    context_base_dict['properties'].update({"authors": authors})
2✔
451

452
    # <georss:where>
453
    val = feed_root.find(util.nspath_eval('georss:where', ns))
2✔
454
    if val is not None:
2✔
455
        if len(list(val)) > 0:
2✔
456
            xmltxt = element_to_string(
2✔
457
                list(val)[0], False)
458
            # LOGGER.debug("geometry %s :: %s", xmltxt, val)
459
            context_base_dict['properties'].update({"bbox": xmltxt.decode('utf-8')})
2✔
460

461
    # <updated>2012-11-04T17:26:23Z</updated>
462
    val = feed_root.find(util.nspath_eval('atom:updated', ns))
2✔
463
    update_date = util.testXMLValue(val)
2✔
464
    # LOGGER.debug("updated %s :: %s", update_date, val)
465
    context_base_dict['properties'].update({"updated": update_date})
2✔
466

467
    # <dc:date>2009-01-23T09:08:56.000Z/2009-01-23T09:14:08.000Z</dc:date>
468
    val = feed_root.find(util.nspath_eval('dc:date', ns))
2✔
469
    time_interval_of_interest = util.testXMLValue(val)
2✔
470
    # LOGGER.debug("dc:date %s :: %s", time_interval_of_interest, val)
471
    context_base_dict['properties'].update(
2✔
472
        {"date": time_interval_of_interest})
473

474
    # <rights>
475
    val = feed_root.find(util.nspath_eval('atom:rights', ns))
2✔
476
    rights = util.testXMLValue(val)
2✔
477
    # LOGGER.debug("rights %s :: %s", rights, val)
478
    context_base_dict['properties'].update({"rights": rights})
2✔
479

480
    # <dc:publisher>
481
    val = feed_root.find(util.nspath_eval('dc:publisher', ns))
2✔
482
    publisher = util.testXMLValue(val)
2✔
483
    # LOGGER.debug("dc:publisher %s :: %s", publisher, val)
484
    context_base_dict['properties'].update({"publisher": publisher})
2✔
485

486
    # <owc:display>
487
    #                 <owc:pixelWidth>
488
    val_display = feed_root.find(util.nspath_eval('owc:display', ns))
2✔
489
    val_pixel_width = None if val_display is None \
2✔
490
        else val_display.find(util.nspath_eval('owc:pixelWidth', ns))
491
    val_pixel_height = None if val_display is None \
2✔
492
        else val_display.find(util.nspath_eval('owc:pixelHeight', ns))
493
    val_mm_per_pixel = None if val_display is None \
2✔
494
        else val_display.find(util.nspath_eval('owc:mmPerPixel', ns))
495
    pixel_width = util.testXMLValue(val_pixel_width)
2✔
496
    pixel_height = util.testXMLValue(val_pixel_height)
2✔
497
    mm_per_pixel = util.testXMLValue(val_mm_per_pixel)
2✔
498
    owc_display = {
2✔
499
        "pixelWidth": pixel_width,
500
        "pixelHeight": pixel_height,
501
        "mmPerPixel": mm_per_pixel
502
    }
503
    # LOGGER.debug("display %s :: %s", owc_display, val_display)
504
    if not is_empty(owc_display):
2✔
505
        context_base_dict['properties'].update({"display": owc_display})
2✔
506

507
    # <generator uri="http://w.." version="1.0">MiraMon</generator>
508
    val = feed_root.find(util.nspath_eval('atom:generator', ns))
2✔
509
    name = util.testXMLValue(val)
2✔
510
    version = util.testXMLAttribute(val, 'version')
2✔
511
    uri = util.testXMLAttribute(val, 'uri')
2✔
512
    owc_generator = {
2✔
513
        "name": name,
514
        "version": version,
515
        "uri": uri
516
    }
517
    # LOGGER.debug("generator %s :: %s", owc_generator, val)
518
    if not is_empty(owc_generator):
2✔
519
        context_base_dict['properties'].update({"generator": owc_generator})
2✔
520

521
    # <category term="maps" label="This file contains maps"/>
522
    vals = feed_root.findall(util.nspath_eval('atom:category', ns))
2✔
523
    categories = []
2✔
524
    for val in vals:
2✔
525
        term = util.testXMLAttribute(val, 'term')
2✔
526
        scheme = util.testXMLAttribute(val, 'scheme')
2✔
527
        label = util.testXMLAttribute(val, 'label')
2✔
528
        category = {
2✔
529
            "term": term,
530
            "scheme": scheme,
531
            "label": label
532
        }
533
        # LOGGER.debug("category %s :: %s", category, vals)
534
        if not is_empty(category):
2✔
535
            categories.append(category)
2✔
536

537
    context_base_dict['properties'].update({"categories": categories})
2✔
538

539
    # <entry> ...
540
    entries = feed_root.findall(util.nspath_eval('atom:entry', ns))
2✔
541
    resources = []
2✔
542
    for entry in entries:
2✔
543
        entry_dict = parse_entry(entry)
2✔
544

545
        if entry_dict.get("id") is not None:
2✔
546
            resources.append(entry_dict)
2✔
547
        else:
548
            LOGGER.warning("feature entry has no id, not allowed: skipping!")
×
549

550
    context_base_dict.update({"features": resources})
2✔
551
    return context_base_dict
2✔
552

553

554
def encode_atomxml(obj_d):
2✔
555
    """
556
    encode instance of OwcContext dict into atom xml encoding,
557
    because we can't do circular imports
558
    :param obj_d: the dict from owscontext to dict
559
    :return: b'atomxml'
560
    """
561

562
    # try:
563
    #     xml_tree = axml_context(obj_d)
564
    #     tree = etree.ElementTree(xml_tree)
565
    #     return tree
566
    # except TypeError as te:
567
    #     LOGGER.warning('problem encoding context to xml', te)
568
    #     raise te
569
    # except AttributeError as ae:
570
    #     LOGGER.warning('problem encoding context to xml', ae)
571
    #     raise ae
572
    # except ValueError as ve:
573
    #     LOGGER.warning('problem encoding context to xml', ve)
574
    #     raise ve
575
    # except ParseError as pe:
576
    #     LOGGER.warning('problem encoding context to xml', pe)
577
    #     raise pe
578
    xml_tree = axml_context(obj_d)
×
579
    tree = etree.ElementTree(xml_tree)
×
580
    return element_to_string(tree, encoding='utf-8', xml_declaration=False)
×
581

582

583
def axml_context(d):
2✔
584
    """
585
    encodes base OwcContext as dict to atom xml tree
586
    :param d:
587
    :return:
588
    """
589
    xml = etree.Element("feed", nsmap=ns)
×
590
    etree.SubElement(xml, "id").text = d['id']
×
591

592
    spec_reference = [axml_link(do) for do in
×
593
                      extract_p('properties.links.profiles', d, [])]
594
    [xml.append(el) for el in spec_reference if el is not None]
×
595

596
    area_of_interest = extract_p('bbox', d, None)
×
597
    if area_of_interest is not None:
×
598
        try:
×
599
            gml = etree.fromstring(area_of_interest)
×
600
            georss = etree.SubElement(xml, ns_elem("georss", "where"))
×
601
            georss.append(gml)
×
602
        except Exception as ex:
×
603
            LOGGER.warning('could encode bbox into georss:where', ex)
×
604
            pass
×
605

606
    context_metadata = [axml_link(do) for do in
×
607
                        extract_p('properties.links.via', d, [])]
608
    [xml.append(el) for el in context_metadata if el is not None]
×
609

610
    language = extract_p('properties.lang', d, None)
×
611
    if language is not None:
×
612
        xml.set(ns_elem("xml", "lang"), language)
×
613

614
    title = extract_p('properties.title', d, None)
×
615
    if title is not None:
×
616
        etree.SubElement(xml, "title").text = title
×
617

618
    # <subtitle type = "html"
619
    subtitle = extract_p('properties.abstract', d, None)
×
620
    if subtitle is not None:
×
621
        etree.SubElement(xml, "subtitle").text = subtitle
×
622

623
    update_date = extract_p('properties.updated', d, None)
×
624
    if update_date is not None:
×
625
        etree.SubElement(xml, "updated").text = update_date
×
626

627
    authors = [axml_author(do) for do in extract_p('properties.authors', d, [])]
×
628
    [xml.append(el) for el in authors if el is not None]
×
629

630
    publisher = extract_p('properties.publisher', d, None)
×
631
    if publisher is not None:
×
632
        etree.SubElement(xml, ns_elem("dc", "publisher")).text = publisher
×
633

634
    creator_application = axml_creator_app(extract_p('properties.generator', d, None))
×
635
    if creator_application is not None and not is_empty(creator_application):
×
636
        xml.append(creator_application)
×
637

638
    creator_display = axml_display(extract_p('properties.display', d, None))
×
639
    if creator_display is not None:
×
640
        xml.append(creator_display)
×
641

642
    rights = extract_p('properties.rights', d, None)
×
643
    if rights is not None:
×
644
        etree.SubElement(xml, "rights").text = rights
×
645

646
    time_interval_of_interest = extract_p('properties.date', d, None)
×
647
    if time_interval_of_interest is not None:
×
648
        etree.SubElement(xml, ns_elem("dc", "date")).text = time_interval_of_interest
×
649

650
    keywords = [axml_category(do) for do in
×
651
                extract_p('properties.categories', d, [])]
652
    [xml.append(el) for el in keywords if el is not None]
×
653

654
    # here we generate single elements and attach them
655
    resources = [axml_resource(do) for do in
×
656
                 extract_p('features', d, [])]
657
    [xml.append(el) for el in resources if el is not None]
×
658

659
    return xml
×
660

661

662
def axml_resource(d):
2✔
663
    """
664
    encodes an OwcResource as dict into atom xml tree
665
    :param d:
666
    :return:
667
    """
668
    entry = etree.Element("entry", nsmap=ns)
×
669

670
    etree.SubElement(entry, "id").text = d['id']
×
671

672
    geospatial_extent = extract_p('geometry', d, None)
×
673
    if geospatial_extent is not None:
×
674
        try:
×
675
            gml = etree.fromstring(geospatial_extent)
×
676
            georss = etree.SubElement(entry, ns_elem("georss", "where"))
×
677
            georss.append(gml)
×
678
        except Exception as ex:
×
679
            LOGGER.warning('could encode geometry into georss:where', ex)
×
680
            pass
×
681

682
    title = d['properties']['title']
×
683
    if title is not None:
×
684
        etree.SubElement(entry, "title").text = title
×
685

686
    subtitle = extract_p('properties.abstract', d, None)
×
687
    # <content type = "text" >
688
    if subtitle is not None:
×
689
        etree.SubElement(entry, "content").text = subtitle
×
690

691
    update_date = extract_p('properties.updated', d, None)
×
692
    if update_date is not None:
×
693
        etree.SubElement(entry, "updated").text = update_date
×
694

695
    authors = [axml_author(do) for do in
×
696
               extract_p('properties.authors', d, [])]
697
    [entry.append(el) for el in authors if el is not None]
×
698

699
    publisher = extract_p('properties.publisher', d, None)
×
700
    if update_date is not None:
×
701
        etree.SubElement(entry, ns_elem("dc", "publisher")).text = publisher
×
702

703
    rights = extract_p('properties.rights', d, None)
×
704
    if update_date is not None:
×
705
        etree.SubElement(entry, ns_elem("dc", "rights")).text = rights
×
706

707
    temporal_extent = extract_p('properties.date', d, None)
×
708
    if temporal_extent is not None:
×
709
        etree.SubElement(entry, "date").text = temporal_extent
×
710

711
    keywords = [axml_category(do) for do in
×
712
                extract_p('properties.categories', d, [])]
713
    [entry.append(el) for el in keywords if el is not None]
×
714

715
    resource_metadata = [axml_link(do) for do in
×
716
                         extract_p('properties.links.via', d, [])]
717
    [entry.append(el) for el in resource_metadata if el is not None]
×
718

719
    content_description = [axml_content(do)
×
720
                           for do in extract_p('properties.links.alternates', d, [])]
721
    [entry.append(el) for el in content_description if el is not None]
×
722

723
    preview = [axml_link(do) for do in
×
724
               extract_p('properties.links.preview', d, [])]
725
    [entry.append(el) for el in preview if el is not None]
×
726

727
    content_by_ref = [axml_link(do) for do in
×
728
                      extract_p('properties.links.data', d, [])]
729
    [entry.append(el) for el in content_by_ref if el is not None]
×
730

731
    offerings = [axml_offering(do) for do in
×
732
                 extract_p('properties.offerings', d, [])]
733
    [entry.append(el) for el in offerings if el is not None]
×
734

735
    # TODO no examples for active attribute
736
    active = extract_p('properties.active', d, None)
×
737
    if active is not None:
×
738
        etree.SubElement(entry, "active").text = active
×
739

740
    min_scale_denominator = try_float(extract_p(
×
741
        'properties.minscaledenominator', d, None))
742
    # <owc:minScaleDenominator>2500</owc:minScaleDenominator>
743
    if min_scale_denominator is not None:
×
744
        etree.SubElement(entry, ns_elem(
×
745
            "owc", "minScaleDenominator")).text = str(min_scale_denominator)
746

747
    max_scale_denominator = try_float(extract_p(
×
748
        'properties.maxscaledenominator', d, None))
749
    # <owc:maxScaleDenominator>25000</owc:maxScaleDenominator>
750
    if max_scale_denominator is not None:
×
751
        etree.SubElement(entry, ns_elem(
×
752
            "owc", "maxScaleDenominator")).text = str(max_scale_denominator)
753

754
    # TODO no examples for folder attribute
755
    folder = extract_p('properties.folder', d, None)
×
756
    if folder is not None:
×
757
        etree.SubElement(entry, "folder").text = folder
×
758

759
    # xml.append(entry)
760
    return entry
×
761

762

763
def axml_creator_app(d):
2✔
764
    # <generator uri="http://w.." version="1.0">MiraMon</generator>
765
    if is_empty(d):
×
766
        return None
×
767
    else:
768
        try:
×
769
            creator_app = etree.Element("generator", nsmap=ns)
×
770
            title = extract_p('title', d, None)
×
771
            if title is not None:
×
772
                creator_app.text = title
×
773
            uri = extract_p('uri', d, None)
×
774
            if uri is not None:
×
775
                creator_app.set("uri", uri)
×
776
            version = extract_p('version', d, None)
×
777
            if version is not None:
×
778
                creator_app.set("version", version)
×
779
            return creator_app
×
780
        except Exception as ex:
×
781
            LOGGER.warning('could encode creator_app', ex)
×
782
            return None
×
783

784

785
def axml_display(d):
2✔
786
    # <owc:display>
787
    #                 <owc:pixelWidth>
788
    if is_empty(d):
×
789
        return None
×
790
    else:
791
        try:
×
792
            creator_display = etree.Element(ns_elem("owc", "display"), nsmap=ns)
×
793
            pixel_width = try_int(extract_p('pixelWidth', d, None))
×
794
            if pixel_width is not None:
×
795
                etree.SubElement(creator_display, ns_elem(
×
796
                    "owc", "pixelWidth")).text = str(pixel_width)
797
            pixel_height = try_int(extract_p('pixelHeight', d, None))
×
798
            if pixel_height is not None:
×
799
                etree.SubElement(creator_display, ns_elem(
×
800
                    "owc", "pixelHeight")).text = str(pixel_height)
801
            mm_per_pixel = try_float(extract_p('mmPerPixel', d, None))
×
802
            if mm_per_pixel is not None:
×
803
                etree.SubElement(creator_display, ns_elem(
×
804
                    "owc", "mmPerPixel")).text = str(mm_per_pixel)
805
            return creator_display
×
806
        except Exception as ex:
×
807
            LOGGER.warning('could encode creator_display', ex)
×
808
            return None
×
809

810

811
def axml_link(d):
2✔
812
    # < link rel = "via" type = "application/xml" href = "..." title = "..."
813
    if is_empty(d):
×
814
        return None
×
815
    else:
816
        try:
×
817
            link = etree.Element("link", nsmap=ns)
×
818
            href = extract_p('href', d, None)
×
819
            if href is not None:
×
820
                link.set("href", href)
×
821
            rel = extract_p('rel', d, None)
×
822
            if rel is not None:
×
823
                link.set("rel", rel)
×
824
            mimetype = extract_p('type', d, None)
×
825
            if mimetype is not None:
×
826
                link.set("type", mimetype)
×
827
            lang = extract_p('lang', d, None)
×
828
            if lang is not None:
×
829
                link.set("lang", lang)
×
830
            title = extract_p('title', d, None)
×
831
            if title is not None:
×
832
                link.set("title", title)
×
833
            length = try_int(extract_p('length', d, None))
×
834
            if length is not None:
×
835
                link.set("length", str(length))
×
836
            return link
×
837
        except Exception as ex:
×
838
            LOGGER.warning('could not encode link', ex)
×
839
            return None
×
840

841

842
def axml_category(d):
2✔
843
    # <category term="maps" label="This file contains maps"/>
844
    if is_empty(d):
×
845
        return None
×
846
    else:
847
        try:
×
848
            category = etree.Element("category", nsmap=ns)
×
849
            term = extract_p('term', d, None)
×
850
            if term is not None:
×
851
                category.set("term", term)
×
852
            scheme = extract_p('scheme', d, None)
×
853
            if scheme is not None:
×
854
                category.set("scheme", scheme)
×
855
            label = extract_p('label', d, None)
×
856
            if label is not None:
×
857
                category.set("label", label)
×
858
            return category
×
859
        except Exception as ex:
×
860
            LOGGER.warning('could encode category', ex)
×
861
            return None
×
862

863

864
def axml_author(d):
2✔
865
    # <author> ..
866
    #                 <name>
867
    #       <email>
868
    if is_empty(d):
×
869
        return None
×
870
    else:
871
        try:
×
872
            author = etree.Element("author", nsmap=ns)
×
873
            name = extract_p('name', d, None)
×
874
            if name is not None:
×
875
                etree.SubElement(author, "name").text = name
×
876
            email = extract_p('email', d, None)
×
877
            if email is not None:
×
878
                etree.SubElement(author, "email").text = email
×
879
            uri = extract_p('uri', d, None)
×
880
            if uri is not None:
×
881
                etree.SubElement(author, "uri").text = uri
×
882
            return author
×
883
        except Exception as ex:
×
884
            LOGGER.warning('could encode author', ex)
×
885
            return None
×
886

887

888
def axml_offering(d):
2✔
889
    # <owc:offering code="http://www.opengis.net/spec/owc-at...">
890
    #   <owc:offering code="http://www.opengis.net/spec....l">
891
    #                         <owc:content type="application/gml+xml">
892
    if is_empty(d):
×
893
        return None
×
894
    else:
895
        try:
×
896
            offering_code = extract_p('code', d, None)
×
897
            offering = etree.Element(ns_elem("owc", "offering"), attrib={"code": offering_code}, nsmap=ns)
×
898

899
            # use axml_operation here
900
            operations = [axml_operation(do) for do in
×
901
                          extract_p('operations', d, [])]
902
            [offering.append(el) for el in operations if el is not None]
×
903
            # use axml_content here
904
            contents = [axml_content(do) for do in
×
905
                        extract_p('contents', d, [])]
906
            [offering.append(el) for el in contents if el is not None]
×
907
            # use axml_styeset here
908
            styles = [axml_styleset(do) for do in
×
909
                      extract_p('styles', d, [])]
910
            [offering.append(el) for el in styles if el is not None]
×
911
            return offering
×
912
        except Exception as ex:
×
913
            LOGGER.warning('could encode offering', ex)
×
914
            return None
×
915

916

917
def axml_operation(d):
2✔
918
    #   <owc:operation code="GetCapabilities" method="GET"
919
    #       type="applica..." href="..."
920
    #       <owc:request type="application/xml"> ..
921
    # etree.SubElement(entry, ns_elem("owc", "offering"), name="blah").text = "some value1"
922
    if is_empty(d):
×
923
        return None
×
924
    else:
925
        try:
×
926
            operation = etree.Element(ns_elem("owc", "operation"), nsmap=ns)
×
927

928
            operations_code = extract_p('code', d, None)
×
929
            if operations_code is not None:
×
930
                operation.set("code", operations_code)
×
931
            http_method = extract_p('method', d, None)
×
932
            if http_method is not None:
×
933
                operation.set("method", http_method)
×
934
            mimetype = extract_p('type', d, None)
×
935
            if mimetype is not None:
×
936
                operation.set("type", mimetype)
×
937
            request_url = extract_p('href', d, None)
×
938
            if request_url is not None:
×
939
                operation.set("href", request_url)
×
940

941
            # use axml_content here
942
            request = extract_p('request', d, None)
×
943
            request_enc = None if request is None else axml_content(request)
×
944
            if request_enc is not None:
×
945
                operation.append(request_enc)
×
946
            # use axml_content here
947
            result = extract_p('result', d, None)
×
948
            result_enc = None if result is None else axml_content(result)
×
949
            if result_enc is not None:
×
950
                operation.append(result_enc)
×
951
            return operation
×
952
        except Exception as ex:
×
953
            LOGGER.warning('could encode operation', ex)
×
954
            return None
×
955

956

957
def axml_styleset(d):
2✔
958
    #     <owc:styleSet>
959
    #       <owc:name>raster</owc:name>
960
    #       <owc:title>Default Raster</owc:title>
961
    #       <owc:abstract>A sample style that draws a </owc:abstract>
962
    #       <owc:legendURL href="h...." type="image/png"/>
963
    #     </owc:styleSet>
964
    if is_empty(d):
×
965
        return None
×
966
    else:
967
        try:
×
968
            styleset = etree.Element(ns_elem("owc", "styleSet"), nsmap=ns)
×
969

970
            name = extract_p('name', d, None)
×
971
            if name is not None:
×
972
                etree.SubElement(styleset, ns_elem("owc", "name")).text = name
×
973
            title = extract_p('title', d, None)
×
974
            if title is not None:
×
975
                etree.SubElement(styleset, ns_elem("owc", "title")).text = title
×
976
            subtitle = extract_p('abstract', d, None)
×
977
            if subtitle is not None:
×
978
                etree.SubElement(styleset, ns_elem("owc", "abstract")).text = subtitle
×
979
            is_default = extract_p('default', d, None)
×
980
            # TODO no example for default setting on style set
981
            if is_default is not None:
×
982
                etree.SubElement(styleset, ns_elem("owc", "default")).text = is_default
×
983
            legend_url = extract_p('legendURL', d, None)
×
984
            if legend_url is not None:
×
985
                etree.SubElement(styleset, ns_elem("owc", "legendURL")).text = legend_url
×
986
            # TODO no example for content on style set
987
            content = extract_p('content', d, None)
×
988
            content_enc = None if content is None else axml_content(content)
×
989
            if content_enc is not None:
×
990
                styleset.append(content_enc)
×
991
            return styleset
×
992
        except Exception as ex:
×
993
            LOGGER.warning('could encode styleset', ex)
×
994
            return None
×
995

996

997
def axml_content(d):
2✔
998
    """
999
    OwcContent dict to Atom XML
1000
    :param d:
1001
    :return:
1002
    """
1003
    #   <owc:content type="image/tiff" href=".."
1004
    if is_empty(d):
×
1005
        return None
×
1006
    else:
1007
        try:
×
1008
            content_elem = etree.Element(ns_elem("owc", "content"), nsmap=ns)
×
1009

1010
            mimetype = extract_p('type', d, None)
×
1011
            if mimetype is not None:
×
1012
                content_elem.set("type", mimetype)
×
1013
            url = extract_p('url', d, None)
×
1014
            if url is not None:
×
1015
                content_elem.set("href", url)
×
1016
            title = extract_p('title', d, None)
×
1017
            if title is not None:
×
1018
                content_elem.set("title", title)
×
1019

1020
            content = extract_p('content', d, None)
×
1021
            if content is None:
×
1022
                content_elem.text = content
×
1023
            return content_elem
×
1024
        except Exception as ex:
×
1025
            LOGGER.warning('could encode content', ex)
×
1026
            return None
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc