• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geopython / OWSLib / 11628884181

01 Nov 2024 11:49AM UTC coverage: 58.814% (-1.3%) from 60.156%
11628884181

Pull #548

github

web-flow
Merge 4b9b7cf1f into ae98c2039
Pull Request #548: Strip out redundant __new__ to enable object pickling

8364 of 14221 relevant lines covered (58.81%)

1.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.83
/owslib/opensearch.py
1
# =============================================================================
2
# Copyright (c) 2023 Tom Kralidis
3
#
4
# Authors : Tom Kralidis <tomkralidis@gmail.com>
5
#
6
# Contact email: tomkralidis@gmail.com
7
# =============================================================================
8

9
"""
10
API for A9 OpenSearch (description and query syntax).
11

12
https://github.com/dewitt/opensearch
13

14
Supports version 1.1 core as well as geo, time and parameters extensions.
15
"""
16

17
import logging
2✔
18
from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
2✔
19

20
from owslib.etree import etree
2✔
21
from owslib.owscontext.atom import decode_atomxml
2✔
22
from owslib.util import Authentication, http_get, nspath_eval, testXMLValue
2✔
23

24
LOGGER = logging.getLogger(__name__)
2✔
25

26
namespaces = {
2✔
27
    'os': 'http://a9.com/-/spec/opensearch/1.1/',
28
    'geo': 'http://a9.com/-/spec/opensearch/extensions/geo/1.0/',
29
    'parameters': 'http://a9.com/-/spec/opensearch/extensions/parameters/1.0/',  # noqa
30
    'time': 'http://a9.com/-/spec/opensearch/extensions/time/1.0/'
31
}
32

33

34
class OpenSearch(object):
2✔
35
    """ OpenSearch request class """
36
    def __init__(self, url, lang='en-US', version='1.1', timeout=10, xml=None,
2✔
37
                 username=None, password=None, auth=None, headers=None):
38
        """
39

40
        Initialize an OpenSearch client
41

42
        Parameters
43
        ----------
44

45
        - url: the URL of the OpenSearch endpoint
46
        - lang: the language (default is 'en-US')
47
        - version: version (default is '2.0.2')
48
        - timeout: timeout in seconds
49
        - xml: string of OpenSearch Description Document
50
        - username: username for HTTP basic authentication
51
        - password: password for HTTP basic authentication
52
        - auth: instance of owslib.util.Authentication
53
        - headers: HTTP headers to send with requests
54
        """
55

56
        if auth:
2✔
57
            if username:
×
58
                auth.username = username
×
59
            if password:
×
60
                auth.password = password
×
61

62
        self.url = url
2✔
63
        self.lang = lang
2✔
64
        self.version = version
2✔
65
        self.timeout = timeout
2✔
66
        self.auth = auth or Authentication(username, password)
2✔
67
        self.headers = headers
2✔
68

69
        if xml is not None:
2✔
70
            LOGGER.debug('Loading from XML stream')
×
71
            self._exml = etree.fromstring(xml)
×
72
        else:
73
            LOGGER.debug('Loading from URL')
2✔
74
            self.request = self.url
2✔
75
            response = http_get(self.request, timeout=self.timeout)
2✔
76
            self._exml = etree.fromstring(response.content)
2✔
77

78
        self.description = Description(self._exml)
2✔
79

80
    def search(self, type_: str, **kwargs) -> dict:
2✔
81
        """
82
        Invoke an OpenSearch search
83

84
        :param type_: media type of search
85
        :param kwargs: `dict` of key value pairs. When an OpenSearch
86
                       description document has URL Parameters, kwargs keys must
87
                       match accordingly.  When no OpenSearch URL Parameters
88
                       are defined, To pass kwargs keys which are OpenSearch
89
                       parameters such as `geo:time` or `eo:orbitNumber`, pass
90
                       the keys as `geo_time` or `eo_orbitNumber`.
91

92
        :returns: dict of response
93
        """
94

95
        if type_ not in self.description.urls.keys():
2✔
96
            msg = 'Invalid URL type'
×
97
            raise RuntimeError(msg)
×
98

99
        template = self.description.urls[type_]['template']
2✔
100

101
        if self.description.urls[type_]['parameters']:
2✔
102
            LOGGER.debug('Validating kwargs against defined parameters')
2✔
103
            for key, value in kwargs.items():
2✔
104
                if key not in self.description.urls[type_]['parameters']:
2✔
105
                    msg = f'parameter {key} not found'
2✔
106
                    LOGGER.debug(msg)
2✔
107
                    raise RuntimeError(msg)
2✔
108
                if 'options' in self.description.urls[type_]['parameters'][key]:
2✔
109
                    LOGGER.debug('Validating against parameter options')
2✔
110
                    if value not in self.description.urls[type_]['parameters'][key]['options']:
2✔
111
                        msg = f"{value} not in {self.description.urls[type_]['parameters'][key]['options']}"
2✔
112
                        LOGGER.debug(msg)
2✔
113
                        raise RuntimeError(msg)
2✔
114

115
                LOGGER.debug(f'Setting parameter {key} in URL template')
2✔
116

117
                template = template_replace_token(
2✔
118
                    template,
119
                    self.description.urls[type_]['parameters'][key]['value'],
120
                    value)
121

122
        else:
123
            LOGGER.debug('Best effort against no defined parameters')
×
124
            for key, value in kwargs.items():
×
125
                template = template_replace_token(
×
126
                    template,
127
                    key.replace('_', ':'),
128
                    value)
129

130
        response = http_get(prune_url(template), timeout=self.timeout)
2✔
131

132
        if 'json' in type_:
2✔
133
            LOGGER.debug('Returning dict of JSON response')
2✔
134
            response = response.json()
2✔
135
        elif 'atom' in type_:
×
136
            LOGGER.debug('Returning dict of Atom response')
×
137
            response = decode_atomxml(response.content)
×
138
        else:
139
            LOGGER.debug('Unknown/unsupported response, returning as is')
×
140

141
        return response
2✔
142

143

144
class Description:
2✔
145
    def __init__(self, md):
2✔
146

147
        self.urls = {}
2✔
148

149
        LOGGER.debug('Parsing Description')
2✔
150

151
        val = md.find(nspath_eval('os:ShortName', namespaces))
2✔
152
        self.shortname = testXMLValue(val)
2✔
153

154
        val = md.find(nspath_eval('os:LongName', namespaces))
2✔
155
        self.longname = testXMLValue(val)
2✔
156

157
        val = md.find(nspath_eval('os:Description', namespaces))
2✔
158
        self.description = testXMLValue(val)
2✔
159

160
        val = md.find(nspath_eval('os:Language', namespaces))
2✔
161
        self.language = testXMLValue(val)
2✔
162

163
        val = md.find(nspath_eval('os:InputEncoding', namespaces))
2✔
164
        self.inputencoding = testXMLValue(val)
2✔
165

166
        val = md.find(nspath_eval('os:OutputEncoding', namespaces))
2✔
167
        self.outputencoding = testXMLValue(val)
2✔
168

169
        val = md.find(nspath_eval('os:Tags', namespaces))
2✔
170
        self.tags = testXMLValue(val).split()
2✔
171

172
        val = md.find(nspath_eval('os:Contact', namespaces))
2✔
173
        self.contact = testXMLValue(val)
2✔
174

175
        val = md.find(nspath_eval('os:Developer', namespaces))
2✔
176
        self.developer = testXMLValue(val)
2✔
177

178
        val = md.find(nspath_eval('os:Attribution', namespaces))
2✔
179
        self.attribution = testXMLValue(val)
2✔
180

181
        val = md.find(nspath_eval('os:SyndicationRight', namespaces))
2✔
182
        self.syndicationright = testXMLValue(val)
2✔
183

184
        val = md.find(nspath_eval('os:AdultContent', namespaces))
2✔
185
        self.adultcontent = testXMLValue(val)
2✔
186

187
        for u in md.findall(nspath_eval('os:Url', namespaces)):
2✔
188
            url_type = u.attrib.get('type')
2✔
189

190
            url_def = {
2✔
191
                'rel': u.attrib.get('rel'),
192
                'template': u.attrib.get('template'),
193
                'parameters': {}
194
            }
195
            for p in u.findall(nspath_eval('parameters:Parameter', namespaces)):
2✔
196
                p_name = p.attrib.get('name')
2✔
197
                p_def = {
2✔
198
                    'pattern': p.attrib.get('pattern'),
199
                    'title': p.attrib.get('title'),
200
                    'value': p.attrib.get('value')
201
                }
202

203
                options = [o.attrib.get('value') for o in p.findall(nspath_eval('parameters:Option', namespaces))]
2✔
204
                if len(options) > 0:
2✔
205
                    p_def['options'] = options
2✔
206

207
                url_def['parameters'][p_name] = p_def
2✔
208

209
            self.urls[url_type] = url_def
2✔
210

211

212
def template_replace_token(template: str, token: str, value: str) -> str:
2✔
213
    """
214
    Helper function to replace OpenSearch token in a URL template
215

216
    :param template: URL template
217
    :param token: token to replace
218
    :param value: value to replace token with
219

220
    :returns: updated URL template
221
    """
222

223
    token2 = token.replace('}', '?}')
2✔
224

225
    if token2 in template:
2✔
226
        LOGGER.debug('Replacing optional token')
2✔
227
        token_to_replace = token2
2✔
228
    else:
229
        LOGGER.debug('Replacing required token')
×
230
        token_to_replace = token
×
231

232
    LOGGER.debug(f'Token to replace: {token_to_replace}')
2✔
233

234
    LOGGER.debug(f'Template: {template}')
2✔
235
    template2 = template.replace(token_to_replace, value)
2✔
236
    LOGGER.debug(f'Template (replaced): {template2}')
2✔
237

238
    return template2
2✔
239

240

241
def prune_url(url: str) -> str:
2✔
242
    """
243
    Helper function to prune URL of unused template parameters
244

245
    https://stackoverflow.com/a/2506477
246

247
    :param url: URL
248

249
    :returns: updated URL without unused template parameters
250
    """
251

252
    query_params_out = {}
2✔
253

254
    url_parts = list(urlparse(url))
2✔
255

256
    query_params_in = dict(parse_qsl(url_parts[4]))
2✔
257

258
    for key, value in query_params_in.items():
2✔
259
        if '{' not in value and '}' not in value:
2✔
260
            query_params_out[key] = value
2✔
261

262
    url_parts[4] = urlencode(query_params_out)
2✔
263

264
    return urlunparse(url_parts)
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc