• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geopython / OWSLib / 11282283562

10 Oct 2024 09:29PM UTC coverage: 60.113% (-0.02%) from 60.128%
11282283562

Pull #949

github

web-flow
Merge 1058b0b85 into 852fe6d4b
Pull Request #949: Remove dependency on the Pytz library

2 of 2 new or added lines in 1 file covered. (100.0%)

82 existing lines in 4 files now uncovered.

8533 of 14195 relevant lines covered (60.11%)

0.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.13
/owslib/util.py
1
# -*- coding: ISO-8859-15 -*-
2
# =============================================================================
3
# Copyright (c) 2024 Tom Kralidis
4
#
5
# Authors : Tom Kralidis <tomkralidis@gmail.com>
6
#
7
# Contact email: tomkralidis@gmail.com
8
# =============================================================================
9

10
import os
1✔
11
import sys
1✔
12
from collections import OrderedDict
1✔
13
from dateutil import parser
1✔
14
from datetime import datetime, timezone
1✔
15
from owslib.etree import etree, ParseError
1✔
16
from owslib.namespaces import Namespaces
1✔
17
from urllib.parse import urlsplit, urlencode, urlparse, parse_qs, urlunparse, parse_qsl
1✔
18
import copy
1✔
19

20
from io import StringIO, BytesIO
1✔
21

22
import re
1✔
23
from copy import deepcopy
1✔
24
import warnings
1✔
25
import requests
1✔
26
from requests.auth import AuthBase
1✔
27
import codecs
1✔
28

29
"""
1✔
30
Utility functions and classes
31
"""
32

33

34
class ServiceException(Exception):
1✔
35
    # TODO: this should go in ows common module when refactored.
36
    pass
1✔
37

38

39
# http://stackoverflow.com/questions/6256183/combine-two-dictionaries-of-dictionaries-python
40
def dict_union(d1, d2):
1✔
41
    return dict((x, (dict_union(d1.get(x, {}), d2[x]) if isinstance(d2.get(x), dict) else d2.get(x, d1.get(x))))
×
42
                for x in set(list(d1.keys()) + list(d2.keys())))
43

44

45
# Infinite DateTimes for Python.  Used in SWE 2.0 and other OGC specs as "INF" and "-INF"
46
class InfiniteDateTime(object):
1✔
47
    def __lt__(self, other):
1✔
48
        return False
×
49

50
    def __gt__(self, other):
1✔
51
        return True
×
52

53
    def timetuple(self):
1✔
54
        return tuple()
×
55

56

57
class NegativeInfiniteDateTime(object):
1✔
58
    def __lt__(self, other):
1✔
59
        return True
×
60

61
    def __gt__(self, other):
1✔
62
        return False
×
63

64
    def timetuple(self):
1✔
65
        return tuple()
×
66

67

68
first_cap_re = re.compile('(.)([A-Z][a-z]+)')
1✔
69
all_cap_re = re.compile('([a-z0-9])([A-Z])')
1✔
70

71

72
def format_string(prop_string):
1✔
73
    """
74
        Formats a property string to remove spaces and go from CamelCase to pep8
75
        from: http://stackoverflow.com/questions/1175208/elegant-python-function-to-convert-camelcase-to-camel-case
76
    """
77
    if prop_string is None:
1✔
78
        return ''
×
79
    st_r = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', prop_string)
1✔
80
    st_r = st_r.replace(' ', '')
1✔
81
    return re.sub('([a-z0-9])([A-Z])', r'\1_\2', st_r).lower()
1✔
82

83

84
def xml_to_dict(root, prefix=None, depth=1, diction=None):
1✔
85
    """
86
        Recursively iterates through an xml element to convert each element in the tree to a (key,val).
87
        Where key is the element tag and val is the inner-text of the element.
88
        Note that this recursively go through the tree until the depth specified.
89

90
        Parameters
91
        ===========
92
        :root - root xml element, starting point of iteration
93
        :prefix - a string to prepend to the resulting key (optional)
94
        :depth - the number of depths to process in the tree (optional)
95
        :diction - the dictionary to insert the (tag,text) pairs into (optional)
96

97
        Return
98
        =======
99
        Dictionary of (key,value); where key is the element tag stripped of namespace and cleaned up to be pep8 and
100
        value is the inner-text of the element. Note that duplicate elements will be replaced by the last element of the
101
        same tag in the tree.
102
    """
103
    ret = diction if diction is not None else dict()
1✔
104
    for child in root:
1✔
105
        val = testXMLValue(child)
1✔
106
        # skip values that are empty or None
107
        if val is None or val == '':
1✔
108
            if depth > 1:
1✔
109
                ret = xml_to_dict(child, prefix=prefix, depth=(depth - 1), diction=ret)
1✔
110
            continue
1✔
111

112
        key = format_string(child.tag.split('}')[-1])
1✔
113

114
        if prefix is not None:
1✔
115
            key = prefix + key
×
116

117
        ret[key] = val
1✔
118
        if depth > 1:
1✔
119
            ret = xml_to_dict(child, prefix=prefix, depth=(depth - 1), diction=ret)
1✔
120

121
    return ret
1✔
122

123

124
class ResponseWrapper(object):
1✔
125
    """
126
    Return object type from openURL.
127

128
    Provides a thin shim around requests response object to maintain code compatibility.
129
    """
130
    def __init__(self, response):
1✔
131
        self._response = response
1✔
132

133
    def info(self):
1✔
134
        return self._response.headers
1✔
135

136
    def read(self):
1✔
137
        return self._response.content
1✔
138

139
    def geturl(self):
1✔
140
        return self._response.url.replace('&&', '&')
×
141

142
    # @TODO: __getattribute__ for poking at response
143

144

145
def openURL(url_base, data=None, method='Get', cookies=None, username=None, password=None, timeout=30, headers=None,
1✔
146
            verify=True, cert=None, auth=None):
147
    """
148
    Function to open URLs.
149

150
    Uses requests library but with additional checks for OGC service exceptions and url formatting.
151
    Also handles cookies and simple user password authentication.
152

153
    :param headers: (optional) Dictionary of HTTP Headers to send with the :class:`Request`.
154
    :param verify: (optional) whether the SSL cert will be verified. A CA_BUNDLE path can also be provided.
155
                   Defaults to ``True``.
156
    :param cert: (optional) A file with a client side certificate for SSL authentication
157
                 to send with the :class:`Request`.
158
    :param auth: Instance of owslib.util.Authentication
159
    """
160

161
    headers = headers if headers is not None else {}
1✔
162
    rkwargs = {}
1✔
163

164
    rkwargs['timeout'] = timeout
1✔
165

166
    if auth:
1✔
167
        if username:
1✔
168
            auth.username = username
×
169
        if password:
1✔
170
            auth.password = password
×
171
        if cert:
1✔
172
            auth.cert = cert
×
173
        verify = verify and auth.verify
1✔
174
    else:
175
        auth = Authentication(username, password, cert, verify)
×
176

177
    if auth.username and auth.password:
1✔
178
        rkwargs['auth'] = (auth.username, auth.password)
×
179
    elif auth.auth_delegate is not None:
1✔
180
        rkwargs['auth'] = auth.auth_delegate
×
181

182
    rkwargs['cert'] = auth.cert
1✔
183
    rkwargs['verify'] = verify
1✔
184

185
    # FIXUP for WFS in particular, remove xml style namespace
186
    # @TODO does this belong here?
187
    method = method.split("}")[-1]
1✔
188

189
    if method.lower() == 'post':
1✔
190
        try:
×
191
            etree.fromstring(data)
×
192
            headers['Content-Type'] = 'text/xml'
×
193
        except (ParseError, UnicodeEncodeError):
×
194
            pass
×
195

196
        rkwargs['data'] = data
×
197

198
    elif method.lower() == 'get':
1✔
199
        rkwargs['params'] = data
1✔
200

201
    else:
202
        raise ValueError("Unknown method ('%s'), expected 'get' or 'post'" % method)
×
203

204
    if cookies is not None:
1✔
205
        rkwargs['cookies'] = cookies
×
206

207
    req = requests.request(method.upper(), url_base, headers=headers, **rkwargs)
1✔
208

209
    if req.status_code in [400, 401]:
1✔
210
        raise ServiceException(req.text)
1✔
211

212
    if req.status_code in [404, 500, 502, 503, 504]:    # add more if needed
1✔
213
        req.raise_for_status()
1✔
214

215
    # check for service exceptions without the http header set
216
    if 'Content-Type' in req.headers and \
1✔
217
            req.headers['Content-Type'] in ['text/xml', 'application/xml', 'application/vnd.ogc.se_xml']:
218
        # just in case 400 headers were not set, going to have to read the xml to see if it's an exception report.
219
        se_tree = etree.fromstring(req.content)
1✔
220

221
        # to handle the variety of namespaces and terms across services
222
        # and versions, especially for "legacy" responses like WMS 1.3.0
223
        possible_errors = [
1✔
224
            '{http://www.opengis.net/ows}Exception',
225
            '{http://www.opengis.net/ows/1.1}Exception',
226
            '{http://www.opengis.net/ogc}ServiceException',
227
            'ServiceException'
228
        ]
229

230
        for possible_error in possible_errors:
1✔
231
            serviceException = se_tree.find(possible_error)
1✔
232
            if serviceException is not None:
1✔
233
                # and we need to deal with some message nesting
UNCOV
234
                raise ServiceException('\n'.join([t.strip() for t in serviceException.itertext() if t.strip()]))
×
235

236
    return ResponseWrapper(req)
1✔
237

238

239
# default namespace for nspath is OWS common
240
OWS_NAMESPACE = 'http://www.opengis.net/ows/1.1'
1✔
241

242

243
def nspath(path, ns=OWS_NAMESPACE):
1✔
244

245
    """
246

247
    Prefix the given path with the given namespace identifier.
248

249
    Parameters
250
    ----------
251

252
    - path: ElementTree API Compatible path expression
253
    - ns: the XML namespace URI.
254

255
    """
256

257
    if ns is None or path is None:
1✔
258
        return -1
×
259

260
    components = []
1✔
261
    for component in path.split('/'):
1✔
262
        if component != '*':
1✔
263
            component = '{%s}%s' % (ns, component)
1✔
264
        components.append(component)
1✔
265
    return '/'.join(components)
1✔
266

267

268
def nspath_eval(xpath, namespaces):
1✔
269
    ''' Return an etree friendly xpath '''
270
    out = []
1✔
271
    for chunks in xpath.split('/'):
1✔
272
        namespace, element = chunks.split(':')
1✔
273
        out.append('{%s}%s' % (namespaces[namespace], element))
1✔
274
    return '/'.join(out)
1✔
275

276

277
def cleanup_namespaces(element):
1✔
278
    """ Remove unused namespaces from an element """
279
    etree.cleanup_namespaces(element)
1✔
280
    return element
1✔
281

282

283
def add_namespaces(root, ns_keys):
1✔
284
    if isinstance(ns_keys, str):
1✔
285
        ns_keys = [ns_keys]
1✔
286

287
    namespaces = Namespaces()
1✔
288

289
    ns_keys = [(x, namespaces.get_namespace(x)) for x in ns_keys]
1✔
290

291
    # lxml does not support setting xmlns attributes
292
    # Update the elements nsmap with new namespaces
293
    new_map = root.nsmap
1✔
294
    for key, link in ns_keys:
1✔
295
        if link is not None:
1✔
296
            new_map[key] = link
1✔
297
    # Recreate the root element with updated nsmap
298
    new_root = etree.Element(root.tag, nsmap=new_map)
1✔
299
    # Carry over attributes
300
    for a, v in list(root.items()):
1✔
301
        new_root.set(a, v)
1✔
302
    # Carry over children
303
    for child in root:
1✔
304
        new_root.append(deepcopy(child))
1✔
305
    return new_root
1✔
306

307

308
def getXMLInteger(elem, tag):
1✔
309
    """
310
    Return the text within the named tag as an integer.
311

312
    Raises an exception if the tag cannot be found or if its textual
313
    value cannot be converted to an integer.
314

315
    Parameters
316
    ----------
317

318
    - elem: the element to search within
319
    - tag: the name of the tag to look for
320

321
    """
322
    e = elem.find(tag)
×
323
    if e is None:
×
324
        raise ValueError('Missing %s in %s' % (tag, elem))
×
325
    return int(e.text.strip())
×
326

327

328
def testXMLValue(val, attrib=False):
1✔
329
    """
330

331
    Test that the XML value exists, return val.text, else return None
332

333
    Parameters
334
    ----------
335

336
    - val: the value to be tested
337

338
    """
339

340
    if val is not None:
1✔
341
        if attrib:
1✔
342
            return val.strip()
1✔
343
        elif val.text:
1✔
344
            return val.text.strip()
1✔
345
        else:
346
            return None
1✔
347
    else:
348
        return None
1✔
349

350

351
def testXMLAttribute(element, attribute):
1✔
352
    """
353

354
    Test that the XML element and attribute exist, return attribute's value, else return None
355

356
    Parameters
357
    ----------
358

359
    - element: the element containing the attribute
360
    - attribute: the attribute name
361

362
    """
363
    if element is not None:
1✔
364
        return element.get(attribute)
1✔
365

366
    return None
1✔
367

368

369
def http_post(url=None, request=None, lang='en-US', timeout=10, username=None, password=None, auth=None, headers=None):
1✔
370
    """
371

372
    Invoke an HTTP POST request
373

374
    Parameters
375
    ----------
376

377
    - url: the URL of the server
378
    - request: the request message
379
    - lang: the language
380
    - timeout: timeout in seconds
381
    - auth: owslib.util.Auth instance
382
    - headers: HTTP headers to send with requests
383

384
    """
385

386
    if url is None:
1✔
387
        raise ValueError("URL required")
×
388

389
    u = urlsplit(url)
1✔
390

391
    headers_ = {
1✔
392
        'User-Agent': 'OWSLib (https://owslib.readthedocs.io',
393
        'Content-type': 'text/xml',
394
        'Accept': 'text/xml,application/xml',
395
        'Accept-Language': lang,
396
        'Accept-Encoding': 'gzip,deflate',
397
        'Host': u.netloc,
398
    }
399

400
    if headers:
1✔
401
        headers_.update(headers)
1✔
402

403
    if isinstance(request, dict):
1✔
404
        headers_['Content-type'] = 'application/json'
1✔
405
        headers_.pop('Accept')
1✔
406

407
    rkwargs = {}
1✔
408

409
    if auth:
1✔
410
        if username:
1✔
411
            auth.username = username
×
412
        if password:
1✔
413
            auth.password = password
×
414
    else:
415
        auth = Authentication(username, password)
1✔
416
    if auth.username is not None and auth.password is not None:
1✔
417
        rkwargs['auth'] = (auth.username, auth.password)
×
418
    elif auth.auth_delegate is not None:
1✔
419
        rkwargs['auth'] = auth.auth_delegate
×
420
    rkwargs['verify'] = auth.verify
1✔
421
    rkwargs['cert'] = auth.cert
1✔
422

423
    if not isinstance(request, dict):
1✔
424
        return requests.post(url, request, headers=headers_, **rkwargs)
1✔
425
    else:
426
        return requests.post(url, json=request, headers=headers_, **rkwargs)
1✔
427

428

429
def http_prepare(*args, **kwargs):
1✔
430
    # Copy input kwargs so the dict can be modified
431
    rkwargs = copy.deepcopy(kwargs)
1✔
432

433
    # Use Authentication instance if provided, else create one
434
    auth = rkwargs.pop('auth', None)
1✔
435
    if auth is not None:
1✔
436
        if isinstance(auth, (tuple, list)):
1✔
437
            auth = Authentication(*auth)
×
438
    else:
439
        auth = Authentication()
1✔
440

441
    # Populate values with other arguments supplied
442
    if 'username' in rkwargs:
1✔
443
        auth.username = rkwargs.pop('username')
×
444
    if 'password' in rkwargs:
1✔
445
        auth.password = rkwargs.pop('password')
×
446
    if 'cert' in rkwargs:
1✔
447
        auth.cert = rkwargs.pop('cert')
×
448
    if 'verify' in rkwargs:
1✔
449
        auth.verify = rkwargs.pop('verify')
×
450

451
    # Build keyword args for call to requests.get()
452
    if auth.username and auth.password:
1✔
453
        rkwargs.setdefault('auth', (auth.username, auth.password))
1✔
454
    elif auth.auth_delegate is not None:
1✔
455
        rkwargs['auth'] = auth.auth_delegate
×
456
    else:
457
        rkwargs.setdefault('auth', None)
1✔
458
    rkwargs.setdefault('cert', rkwargs.get('cert'))
1✔
459
    rkwargs.setdefault('verify', rkwargs.get('verify', True))
1✔
460

461
    return rkwargs
1✔
462

463

464
def http_get(*args, **kwargs):
1✔
465
    rkwargs = http_prepare(*args, **kwargs)
1✔
466
    return requests.get(*args, **rkwargs)
1✔
467

468

469
def http_put(*args, **kwargs):
1✔
470
    rkwargs = http_prepare(*args, **kwargs)
×
471

472
    if 'data' in kwargs:
×
473
        if isinstance(kwargs['data'], dict):
×
474
            rkwargs['json'] = kwargs['data']
×
475
            rkwargs.pop('data')
×
476
        else:
477
            rkwargs['data'] = kwargs['data']
×
478

479
    return requests.put(*args, **rkwargs)
×
480

481

482
def http_delete(*args, **kwargs):
1✔
483
    rkwargs = http_prepare(*args, **kwargs)
×
484
    return requests.delete(*args, **rkwargs)
×
485

486

487
def element_to_string(element, encoding=None, xml_declaration=False):
1✔
488
    """
489
    Returns a string from a XML object
490

491
    Parameters
492
    ----------
493
    - element: etree Element
494
    - encoding (optional): encoding in string form. 'utf-8', 'ISO-8859-1', etc.
495
    - xml_declaration (optional): whether to include xml declaration
496

497
    """
498

499
    output = None
1✔
500

501
    if encoding is None:
1✔
502
        encoding = "ISO-8859-1"
1✔
503

504
    if xml_declaration:
1✔
505
        if encoding in ['unicode', 'utf-8']:
×
506
            output = '<?xml version="1.0" encoding="utf-8" standalone="no"?>\n{}'.format(
×
507
                etree.tostring(element, encoding='unicode'))
508
        else:
509
            output = etree.tostring(element, encoding=encoding, xml_declaration=True)
×
510
    else:
511
        output = etree.tostring(element)
1✔
512

513
    return output
1✔
514

515

516
def xml2string(xml):
1✔
517
    """
518

519
    Return a string of XML object
520

521
    Parameters
522
    ----------
523

524
    - xml: xml string
525

526
    """
527
    warnings.warn("DEPRECIATION WARNING!  You should now use the 'element_to_string' method \
×
528
                   The 'xml2string' method will be removed in a future version of OWSLib.")
529
    return '<?xml version="1.0" encoding="ISO-8859-1" standalone="no"?>\n' + xml
×
530

531

532
def xmlvalid(xml, xsd):
1✔
533
    """
534

535
    Test whether an XML document is valid
536

537
    Parameters
538
    ----------
539

540
    - xml: XML content
541
    - xsd: pointer to XML Schema (local file path or URL)
542

543
    """
544

545
    xsd1 = etree.parse(xsd)
×
546
    xsd2 = etree.XMLSchema(xsd1)
×
547

548
    doc = etree.parse(StringIO(xml))
×
549
    return xsd2.validate(doc)
×
550

551

552
def xmltag_split(tag):
1✔
553
    ''' Return XML element bare tag name (without prefix) '''
554
    try:
1✔
555
        return tag.split('}')[1]
1✔
556
    except Exception:
1✔
557
        return tag
1✔
558

559

560
def getNamespace(element):
1✔
561
    ''' Utility method to extract the namespace from an XML element tag encoded as {namespace}localname. '''
562
    if element.tag[0] == '{':
1✔
563
        return element.tag[1:].split("}")[0]
1✔
564
    else:
565
        return ""
1✔
566

567

568
def build_get_url(base_url, params, overwrite=False, doseq=False):
1✔
569
    ''' Utility function to build a full HTTP GET URL from the service base URL and a dictionary of HTTP parameters.
570

571
    TODO: handle parameters case-insensitive?
572

573
    @param overwrite: boolean flag to allow overwrite of parameters of the base_url (default: False)
574
    '''
575

576
    qs_base = []
1✔
577
    if base_url.find('?') != -1:
1✔
578
        qs_base = parse_qsl(base_url.split('?')[1])
1✔
579

580
    qs_params = []
1✔
581
    for key, value in list(params.items()):
1✔
582
        qs_params.append((key, value))
1✔
583

584
    qs = qs_add = []
1✔
585
    if overwrite is True:
1✔
586
        # all params and additional base
587
        qs = qs_params
1✔
588
        qs_add = qs_base
1✔
589
    else:
590
        # all base and additional params
591
        qs = qs_base
1✔
592
        qs_add = qs_params
1✔
593

594
    pars = [x[0] for x in qs]
1✔
595

596
    for key, value in qs_add:
1✔
597
        if key not in pars:
1✔
598
            qs.append((key, value))
1✔
599

600
    urlqs = urlencode(tuple(qs), doseq=doseq)
1✔
601
    return base_url.split('?')[0] + '?' + urlqs
1✔
602

603

604
def dump(obj, prefix=''):
1✔
605
    '''Utility function to print to standard output a generic object with all its attributes.'''
606

607
    return "{} {}.{} : {}".format(prefix, obj.__module__, obj.__class__.__name__, obj.__dict__)
1✔
608

609

610
def getTypedValue(data_type, value):
1✔
611
    '''Utility function to cast a string value to the appropriate XSD type. '''
612

613
    # If the default value is empty
614
    if value is None:
1✔
615
        return
×
616

617
    if data_type == 'boolean':
1✔
618
        return True if value.lower() == 'true' else False
1✔
619
    elif data_type == 'integer':
1✔
620
        return int(value)
1✔
621
    elif data_type == 'float':
1✔
622
        return float(value)
1✔
623
    elif data_type == 'string':
1✔
624
        return str(value)
1✔
625
    else:
626
        return value  # no type casting
1✔
627

628

629
def extract_time(element):
1✔
630
    ''' return a datetime object based on a gml text string
631

632
ex:
633
<gml:beginPosition>2006-07-27T21:10:00Z</gml:beginPosition>
634
<gml:endPosition indeterminatePosition="now"/>
635

636
If there happens to be a strange element with both attributes and text,
637
use the text.
638
ex: <gml:beginPosition indeterminatePosition="now">2006-07-27T21:10:00Z</gml:beginPosition>
639
Would be 2006-07-27T21:10:00Z, not 'now'
640

641
'''
642
    if element is None:
1✔
643
        return None
1✔
644

645
    try:
1✔
646
        dt = parser.parse(element.text)
1✔
647
    except Exception:
1✔
648
        att = testXMLValue(element.attrib.get('indeterminatePosition'), True)
1✔
649
        if att and att == 'now':
1✔
650
            dt = datetime.utcnow().replace(tzinfo=timezone.utc)
1✔
651
        else:
652
            dt = None
×
653
    return dt
1✔
654

655

656
def extract_xml_list(elements):
1✔
657
    """
658
Some people don't have seperate tags for their keywords and seperate them with
659
a newline. This will extract out all of the keywords correctly.
660
"""
661
    keywords = (re.split(r'[\n\r]+', f.text) for f in elements if f.text)
1✔
662
    flattened = (item.strip() for sublist in keywords for item in sublist)
1✔
663
    remove_blank = [_f for _f in flattened if _f]
1✔
664
    return remove_blank
1✔
665

666

667
def strip_bom(raw_text):
1✔
668
    """ return the raw (assumed) xml response without the BOM
669
    """
670
    boms = [
1✔
671
        # utf-8
672
        codecs.BOM_UTF8,
673
        # utf-16
674
        codecs.BOM,
675
        codecs.BOM_BE,
676
        codecs.BOM_LE,
677
        codecs.BOM_UTF16,
678
        codecs.BOM_UTF16_LE,
679
        codecs.BOM_UTF16_BE,
680
        # utf-32
681
        codecs.BOM_UTF32,
682
        codecs.BOM_UTF32_LE,
683
        codecs.BOM_UTF32_BE
684
    ]
685

686
    if isinstance(raw_text, bytes):
1✔
687
        for bom in boms:
1✔
688
            if raw_text.startswith(bom):
1✔
689
                return raw_text[len(bom):]
1✔
690
    return raw_text
1✔
691

692

693
def clean_ows_url(url):
1✔
694
    """
695
    clean an OWS URL of basic service elements
696

697
    source: https://stackoverflow.com/a/11640565
698
    """
699

700
    if url is None or not url.startswith('http'):
1✔
701
        return url
1✔
702

703
    filtered_kvp = {}
1✔
704
    basic_service_elements = ('service', 'version', 'request')
1✔
705

706
    parsed = urlparse(url)
1✔
707
    qd = parse_qs(parsed.query, keep_blank_values=True)
1✔
708

709
    for key, value in list(qd.items()):
1✔
710
        if key.lower() not in basic_service_elements:
1✔
711
            filtered_kvp[key] = value
1✔
712

713
    newurl = urlunparse([
1✔
714
        parsed.scheme,
715
        parsed.netloc,
716
        parsed.path,
717
        parsed.params,
718
        urlencode(filtered_kvp, doseq=True),
719
        parsed.fragment
720
    ])
721

722
    return newurl
1✔
723

724

725
def bind_url(url):
1✔
726
    """binds an HTTP GET query string endpiont"""
727
    if url.find('?') == -1:  # like http://host/wms
1✔
728
        binder = '?'
1✔
729

730
    # if like http://host/wms?foo=bar& or http://host/wms?foo=bar
731
    if url.find('=') != -1:
1✔
732
        if url.find('&', -1) != -1:  # like http://host/wms?foo=bar&
1✔
733
            binder = ''
1✔
734
        else:  # like http://host/wms?foo=bar
735
            binder = '&'
×
736

737
    # if like http://host/wms?foo
738
    if url.find('?') != -1:
1✔
739
        if url.find('?', -1) != -1:  # like http://host/wms?
1✔
740
            binder = ''
×
741
        elif url.find('&', -1) == -1:  # like http://host/wms?foo=bar
1✔
742
            binder = '&'
×
743
    return '%s%s' % (url, binder)
1✔
744

745

746
def findall(root, xpath, attribute_name=None, attribute_value=None):
1✔
747
    """Find elements recursively from given root element based on
748
    xpath and possibly given attribute
749

750
    :param root: Element root element where to start search
751
    :param xpath: xpath defintion, like {http://foo/bar/namespace}ElementName
752
    :param attribute_name: name of possible attribute of given element
753
    :param attribute_value: value of the attribute
754
    :return: list of elements or None
755
    """
756

757
    found_elements = []
1✔
758

759
    if attribute_name is not None and attribute_value is not None:
1✔
760
        xpath = '%s[@%s="%s"]' % (xpath, attribute_name, attribute_value)
1✔
761
    found_elements = root.findall('.//' + xpath)
1✔
762

763
    if found_elements == []:
1✔
764
        found_elements = None
×
765
    return found_elements
1✔
766

767

768
def datetime_from_iso(iso):
1✔
769
    """returns a datetime object from dates in the format 2001-07-01T00:00:00Z or 2001-07-01T00:00:00.000Z """
770
    try:
×
771
        iso_datetime = datetime.strptime(iso, "%Y-%m-%dT%H:%M:%SZ")
×
772
    except Exception:
×
773
        iso_datetime = datetime.strptime(iso, "%Y-%m-%dT%H:%M:%S.%fZ")
×
774
    return iso_datetime
×
775

776

777
def datetime_from_ansi(ansi):
1✔
778
    """Converts an ansiDate (expressed as a number = the nuber of days since the datum origin of ansi)
779
    to a Python datetime object.
780
    """
781

782
    datumOrigin = datetime(1600, 12, 31, 0, 0, 0)
×
783

784
    return datumOrigin + timedelta(ansi)
×
785

786

787
def is_number(s):
1✔
788
    """simple helper to test if value is number as requests with numbers don't
789
    need quote marks
790
    """
791
    try:
×
792
        float(s)
×
793
        return True
×
794
    except ValueError:
×
795
        return False
×
796

797

798
def makeString(value):
1✔
799
    # using repr unconditionally breaks things in some circumstances if a
800
    # value is already a string
801
    if type(value) is not str:
1✔
802
        sval = repr(value)
1✔
803
    else:
804
        sval = value
×
805
    return sval
1✔
806

807

808
def param_list_to_url_string(param_list, param_name):
1✔
809
    """Converts list of tuples for certain WCS GetCoverage keyword arguments
810
    (subsets, resolutions, sizes) to a url-encoded string
811
    """
812
    string = ''
×
813
    for param in param_list:
×
814
        if len(param) > 2:
×
815
            if not is_number(param[1]):
×
816
                string += "&" + urlencode({param_name: param[0] + '("' + makeString(param[1]) + '","' + makeString(param[2]) + '")'})  # noqa
×
817
            else:
818
                string += "&" + urlencode({param_name: param[0] + "(" + makeString(param[1]) + "," + makeString(param[2]) + ")"})  # noqa
×
819
        else:
820
            if not is_number(param[1]):
×
821
                string += "&" + urlencode({param_name: param[0] + '("' + makeString(param[1]) + '")'})  # noqa
×
822
            else:
823
                string += "&" + urlencode({param_name: param[0] + "(" + makeString(param[1]) + ")"})  # noqa
×
824
    return string
×
825

826

827
def is_vector_grid(grid_elem):
1✔
828
    pass
×
829

830

831
class Authentication(object):
1✔
832

833
    _USERNAME = None
1✔
834
    _PASSWORD = None
1✔
835
    _AUTH_DELEGATE = None
1✔
836
    _CERT = None
1✔
837
    _VERIFY = None
1✔
838

839
    def __init__(self, username=None, password=None,
1✔
840
                 cert=None, verify=True, shared=False,
841
                 auth_delegate=None):
842
        '''
843
        :param str username=None: Username for basic authentication, None for
844
            unauthenticated access (or if using cert/verify)
845
        :param str password=None: Password for basic authentication, None for
846
            unauthenticated access (or if using cert/verify)
847
        :param cert=None: Either a str (path to a combined certificate/key) or
848
            tuple/list of paths (certificate, key). If supplied, the target
849
            files must exist.
850
        :param verify=True: Either a bool (verify SSL certificates, use system
851
            CA bundle) or str (path to a specific CA bundle). If a str, the
852
            target file must exist.
853
        :param bool shared=False: Set to True to make the values be class-level
854
            attributes (shared among instances where shared=True) instead of
855
            instance-level (shared=False, default)
856
        :param AuthBase auth_delegate=None: Instance of requests' AuthBase to
857
            allow arbitrary authentication schemes - mutually exclusive with
858
            username/password arguments.
859
        '''
860
        self.shared = shared
1✔
861
        self._username = username
1✔
862
        self._password = password
1✔
863
        self._cert = cert
1✔
864
        self._verify = verify
1✔
865
        self._auth_delegate = auth_delegate
1✔
866

867
        # Trigger the setters to validate the parameters. This couldn't be done directly
868
        # since some parameters are mutually exclusive.
869
        self.username = username
1✔
870
        self.password = password
1✔
871
        self.cert = cert
1✔
872
        self.verify = verify
1✔
873
        self.auth_delegate = auth_delegate
1✔
874

875
    @property
1✔
876
    def username(self):
1✔
877
        if self.shared:
1✔
878
            return self._USERNAME
×
879
        return self._username
1✔
880

881
    @username.setter
1✔
882
    def username(self, value):
1✔
883
        if value is not None:
1✔
884

885
            if not isinstance(value, str):
1✔
886
                raise TypeError('Value for "username" must be a str')
×
887

888
            if self.auth_delegate is not None:
1✔
889
                raise ValueError('Authentication instances may have username/password or auth_delegate set,'
×
890
                                 ' but not both')
891

892
        if self.shared:
1✔
893
            self.__class__._USERNAME = value
×
894
        else:
895
            self._username = value
1✔
896

897
    @property
1✔
898
    def password(self):
1✔
899
        if self.shared:
1✔
900
            return self._PASSWORD
×
901
        return self._password
1✔
902

903
    @password.setter
1✔
904
    def password(self, value):
1✔
905
        if value is not None:
1✔
906

907
            if not isinstance(value, str):
1✔
908
                raise TypeError('Value for "password" must be a str')
×
909

910
            if self.auth_delegate is not None:
1✔
911
                raise ValueError('Authentication instances may have username/password or auth_delegate set,'
×
912
                                 ' but not both')
913

914
        if self.shared:
1✔
915
            self.__class__._PASSWORD = value
×
916
        else:
917
            self._password = value
1✔
918

919
    @property
1✔
920
    def cert(self):
1✔
921
        if self.shared:
1✔
922
            return self._CERT
×
923
        return self._cert
1✔
924

925
    @cert.setter
1✔
926
    def cert(self, certificate, key=None):
1✔
927
        error = 'Value for "cert" must be a str path to a file or list/tuple of str paths'
1✔
928
        value = None
1✔
929
        if certificate is None:
1✔
930
            value = certificate
1✔
931
        elif isinstance(certificate, (list, tuple)):
×
932
            for _ in certificate:
×
933
                if not isinstance(_, str):
×
934
                    raise TypeError(error)
×
935
                os.stat(_)  # Raises OSError/FileNotFoundError if missing
×
936
            # Both paths supplied as same argument
937
            value = tuple(certificate)
×
938
        elif isinstance(certificate, str):
×
939
            os.stat(certificate)  # Raises OSError/FileNotFoundError if missing
×
940
            if isinstance(key, str):
×
941
                # Separate files for certificate and key
942
                value = (certificate, key)
×
943
            else:
944
                # Assume combined file of both certificate and key
945
                value = certificate
×
946
        else:
947
            raise TypeError(error)
×
948
        if self.shared:
1✔
949
            self.__class__._CERT = value
×
950
        else:
951
            self._cert = value
1✔
952

953
    @property
1✔
954
    def verify(self):
1✔
955
        if self.shared:
1✔
956
            return self._VERIFY
×
957
        return self._verify
1✔
958

959
    @verify.setter
1✔
960
    def verify(self, value):
1✔
961
        if value is None:
1✔
962
            pass  # Passthrough when clearing the value
×
963
        elif not isinstance(value, (bool, str)):
1✔
964
            raise TypeError(
×
965
                'Value for "verify" must a bool or str path to a file')
966
        elif isinstance(value, str):
1✔
967
            os.stat(value)  # Raises OSError/FileNotFoundError if missing
×
968
        if self.shared:
1✔
969
            self.__class__._VERIFY = value
×
970
        else:
971
            self._verify = value
1✔
972

973
    @property
1✔
974
    def auth_delegate(self):
1✔
975
        if self.shared:
1✔
976
            return self._AUTH_DELEGATE
×
977
        return self._auth_delegate
1✔
978

979
    @auth_delegate.setter
1✔
980
    def auth_delegate(self, value):
1✔
981
        if value is not None:
1✔
982
            if not isinstance(value, AuthBase):
×
983
                raise TypeError('Value for "auth_delegate" must be an instance of AuthBase')
×
984

985
            if self.username is not None or self.password is not None:
×
986
                raise ValueError('Authentication instances may have username/password or auth_delegate set,'
×
987
                                 ' but not both')
988

989
        if self.shared:
1✔
990
            self.__class__._AUTH_DELEGATE = value
×
991
        else:
992
            self._auth_delegate = value
1✔
993

994
    @property
1✔
995
    def urlopen_kwargs(self):
1✔
996
        if self.auth_delegate is not None:
×
997
            raise NotImplementedError("The urlopen_kwargs property is not supported when auth_delegate is set")
×
998

999
        return {
×
1000
            'username': self.username,
1001
            'password': self.password,
1002
            'cert': self.cert,
1003
            'verify': self.verify
1004
        }
1005

1006
    def __repr__(self, *args, **kwargs):
1✔
1007
        return '<{} shared={} username={} password={} cert={} verify={} auth_delegate={}>'.format(
×
1008
            self.__class__.__name__, self.shared, self.username, self.password, self.cert, self.verify,
1009
            self.auth_delegate)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc