• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geopython / OWSLib / 3663701074

pending completion
3663701074

Pull #851

github

GitHub
Merge 6cd54d613 into 13b1443f7
Pull Request #851: Adding Python 3.10 in CI

7461 of 12701 relevant lines covered (58.74%)

0.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.82
/owslib/util.py
1
# -*- coding: ISO-8859-15 -*-
2
# =============================================================================
3
# Copyright (c) 2022 Tom Kralidis
4
#
5
# Authors : Tom Kralidis <tomkralidis@gmail.com>
6
#
7
# Contact email: tomkralidis@gmail.com
8
# =============================================================================
9

10
import os
1✔
11
import sys
1✔
12
from collections import OrderedDict
1✔
13
from dateutil import parser
1✔
14
from datetime import datetime, timedelta
1✔
15
import pytz
1✔
16
from owslib.etree import etree, ParseError
1✔
17
from owslib.namespaces import Namespaces
1✔
18
from urllib.parse import urlsplit, urlencode, urlparse, parse_qs, urlunparse, parse_qsl
1✔
19
import copy
1✔
20

21
from io import StringIO, BytesIO
1✔
22

23
import re
1✔
24
from copy import deepcopy
1✔
25
import warnings
1✔
26
import requests
1✔
27
from requests.auth import AuthBase
1✔
28
import codecs
1✔
29

30
"""
31
Utility functions and classes
32
"""
33

34

35
class ServiceException(Exception):
1✔
36
    # TODO: this should go in ows common module when refactored.
37
    pass
1✔
38

39

40
# http://stackoverflow.com/questions/6256183/combine-two-dictionaries-of-dictionaries-python
41
def dict_union(d1, d2):
1✔
42
    return dict((x, (dict_union(d1.get(x, {}), d2[x]) if isinstance(d2.get(x), dict) else d2.get(x, d1.get(x))))
×
43
                for x in set(list(d1.keys()) + list(d2.keys())))
44

45

46
# Infinite DateTimes for Python.  Used in SWE 2.0 and other OGC specs as "INF" and "-INF"
47
class InfiniteDateTime(object):
1✔
48
    def __lt__(self, other):
1✔
49
        return False
×
50

51
    def __gt__(self, other):
1✔
52
        return True
×
53

54
    def timetuple(self):
1✔
55
        return tuple()
×
56

57

58
class NegativeInfiniteDateTime(object):
1✔
59
    def __lt__(self, other):
1✔
60
        return True
×
61

62
    def __gt__(self, other):
1✔
63
        return False
×
64

65
    def timetuple(self):
1✔
66
        return tuple()
×
67

68

69
first_cap_re = re.compile('(.)([A-Z][a-z]+)')
1✔
70
all_cap_re = re.compile('([a-z0-9])([A-Z])')
1✔
71

72

73
def format_string(prop_string):
1✔
74
    """
75
        Formats a property string to remove spaces and go from CamelCase to pep8
76
        from: http://stackoverflow.com/questions/1175208/elegant-python-function-to-convert-camelcase-to-camel-case
77
    """
78
    if prop_string is None:
1✔
79
        return ''
×
80
    st_r = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', prop_string)
1✔
81
    st_r = st_r.replace(' ', '')
1✔
82
    return re.sub('([a-z0-9])([A-Z])', r'\1_\2', st_r).lower()
1✔
83

84

85
def xml_to_dict(root, prefix=None, depth=1, diction=None):
1✔
86
    """
87
        Recursively iterates through an xml element to convert each element in the tree to a (key,val).
88
        Where key is the element tag and val is the inner-text of the element.
89
        Note that this recursively go through the tree until the depth specified.
90

91
        Parameters
92
        ===========
93
        :root - root xml element, starting point of iteration
94
        :prefix - a string to prepend to the resulting key (optional)
95
        :depth - the number of depths to process in the tree (optional)
96
        :diction - the dictionary to insert the (tag,text) pairs into (optional)
97

98
        Return
99
        =======
100
        Dictionary of (key,value); where key is the element tag stripped of namespace and cleaned up to be pep8 and
101
        value is the inner-text of the element. Note that duplicate elements will be replaced by the last element of the
102
        same tag in the tree.
103
    """
104
    ret = diction if diction is not None else dict()
1✔
105
    for child in root:
1✔
106
        val = testXMLValue(child)
1✔
107
        # skip values that are empty or None
108
        if val is None or val == '':
1✔
109
            if depth > 1:
1✔
110
                ret = xml_to_dict(child, prefix=prefix, depth=(depth - 1), diction=ret)
1✔
111
            continue
1✔
112

113
        key = format_string(child.tag.split('}')[-1])
1✔
114

115
        if prefix is not None:
1✔
116
            key = prefix + key
×
117

118
        ret[key] = val
1✔
119
        if depth > 1:
1✔
120
            ret = xml_to_dict(child, prefix=prefix, depth=(depth - 1), diction=ret)
1✔
121

122
    return ret
1✔
123

124

125
class ResponseWrapper(object):
1✔
126
    """
127
    Return object type from openURL.
128

129
    Provides a thin shim around requests response object to maintain code compatibility.
130
    """
131
    def __init__(self, response):
1✔
132
        self._response = response
1✔
133

134
    def info(self):
1✔
135
        return self._response.headers
1✔
136

137
    def read(self):
1✔
138
        return self._response.content
1✔
139

140
    def geturl(self):
1✔
141
        return self._response.url.replace('&&', '&')
×
142

143
    # @TODO: __getattribute__ for poking at response
144

145

146
def openURL(url_base, data=None, method='Get', cookies=None, username=None, password=None, timeout=30, headers=None,
1✔
147
            verify=True, cert=None, auth=None):
148
    """
149
    Function to open URLs.
150

151
    Uses requests library but with additional checks for OGC service exceptions and url formatting.
152
    Also handles cookies and simple user password authentication.
153

154
    :param headers: (optional) Dictionary of HTTP Headers to send with the :class:`Request`.
155
    :param verify: (optional) whether the SSL cert will be verified. A CA_BUNDLE path can also be provided.
156
                   Defaults to ``True``.
157
    :param cert: (optional) A file with a client side certificate for SSL authentication
158
                 to send with the :class:`Request`.
159
    :param auth: Instance of owslib.util.Authentication
160
    """
161

162
    headers = headers if headers is not None else {}
1✔
163
    rkwargs = {}
1✔
164

165
    rkwargs['timeout'] = timeout
1✔
166

167
    if auth:
1✔
168
        if username:
1✔
169
            auth.username = username
×
170
        if password:
1✔
171
            auth.password = password
×
172
        if cert:
1✔
173
            auth.cert = cert
×
174
        verify = verify and auth.verify
1✔
175
    else:
176
        auth = Authentication(username, password, cert, verify)
1✔
177

178
    if auth.username and auth.password:
1✔
179
        rkwargs['auth'] = (auth.username, auth.password)
×
180
    elif auth.auth_delegate is not None:
1✔
181
        rkwargs['auth'] = auth.auth_delegate
×
182

183
    rkwargs['cert'] = auth.cert
1✔
184
    rkwargs['verify'] = verify
1✔
185

186
    # FIXUP for WFS in particular, remove xml style namespace
187
    # @TODO does this belong here?
188
    method = method.split("}")[-1]
1✔
189

190
    if method.lower() == 'post':
1✔
191
        try:
×
192
            etree.fromstring(data)
×
193
            headers['Content-Type'] = 'text/xml'
×
194
        except (ParseError, UnicodeEncodeError):
×
195
            pass
×
196

197
        rkwargs['data'] = data
×
198

199
    elif method.lower() == 'get':
1✔
200
        rkwargs['params'] = data
1✔
201

202
    else:
203
        raise ValueError("Unknown method ('%s'), expected 'get' or 'post'" % method)
×
204

205
    if cookies is not None:
1✔
206
        rkwargs['cookies'] = cookies
×
207

208
    req = requests.request(method.upper(), url_base, headers=headers, **rkwargs)
1✔
209

210
    if req.status_code in [400, 401]:
1✔
211
        raise ServiceException(req.text)
×
212

213
    if req.status_code in [404, 500, 502, 503, 504]:    # add more if needed
1✔
214
        req.raise_for_status()
×
215

216
    # check for service exceptions without the http header set
217
    if 'Content-Type' in req.headers and \
1✔
218
            req.headers['Content-Type'] in ['text/xml', 'application/xml', 'application/vnd.ogc.se_xml']:
219
        # just in case 400 headers were not set, going to have to read the xml to see if it's an exception report.
220
        se_tree = etree.fromstring(req.content)
1✔
221

222
        # to handle the variety of namespaces and terms across services
223
        # and versions, especially for "legacy" responses like WMS 1.3.0
224
        possible_errors = [
1✔
225
            '{http://www.opengis.net/ows}Exception',
226
            '{http://www.opengis.net/ows/1.1}Exception',
227
            '{http://www.opengis.net/ogc}ServiceException',
228
            'ServiceException'
229
        ]
230

231
        for possible_error in possible_errors:
1✔
232
            serviceException = se_tree.find(possible_error)
1✔
233
            if serviceException is not None:
1✔
234
                # and we need to deal with some message nesting
235
                raise ServiceException('\n'.join([t.strip() for t in serviceException.itertext() if t.strip()]))
1✔
236

237
    return ResponseWrapper(req)
1✔
238

239

240
# default namespace for nspath is OWS common
241
OWS_NAMESPACE = 'http://www.opengis.net/ows/1.1'
1✔
242

243

244
def nspath(path, ns=OWS_NAMESPACE):
1✔
245

246
    """
247

248
    Prefix the given path with the given namespace identifier.
249

250
    Parameters
251
    ----------
252

253
    - path: ElementTree API Compatible path expression
254
    - ns: the XML namespace URI.
255

256
    """
257

258
    if ns is None or path is None:
1✔
259
        return -1
×
260

261
    components = []
1✔
262
    for component in path.split('/'):
1✔
263
        if component != '*':
1✔
264
            component = '{%s}%s' % (ns, component)
1✔
265
        components.append(component)
1✔
266
    return '/'.join(components)
1✔
267

268

269
def nspath_eval(xpath, namespaces):
1✔
270
    ''' Return an etree friendly xpath '''
271
    out = []
1✔
272
    for chunks in xpath.split('/'):
1✔
273
        namespace, element = chunks.split(':')
1✔
274
        out.append('{%s}%s' % (namespaces[namespace], element))
1✔
275
    return '/'.join(out)
1✔
276

277

278
def cleanup_namespaces(element):
1✔
279
    """ Remove unused namespaces from an element """
280
    if etree.__name__ == 'lxml.etree':
1✔
281
        etree.cleanup_namespaces(element)
×
282
        return element
×
283
    else:
284
        return etree.fromstring(etree.tostring(element))
1✔
285

286

287
def add_namespaces(root, ns_keys):
1✔
288
    if isinstance(ns_keys, str):
1✔
289
        ns_keys = [ns_keys]
1✔
290

291
    namespaces = Namespaces()
1✔
292

293
    ns_keys = [(x, namespaces.get_namespace(x)) for x in ns_keys]
1✔
294

295
    if etree.__name__ != 'lxml.etree':
1✔
296
        # We can just add more namespaces when not using lxml.
297
        # We can't re-add an existing namespaces.  Get a list of current
298
        # namespaces in use
299
        existing_namespaces = set()
1✔
300
        for elem in root.iter():
1✔
301
            if elem.tag[0] == "{":
1✔
302
                uri, tag = elem.tag[1:].split("}")
1✔
303
                existing_namespaces.add(namespaces.get_namespace_from_url(uri))
1✔
304
        for key, link in ns_keys:
1✔
305
            if link is not None and key not in existing_namespaces:
1✔
306
                root.set("xmlns:%s" % key, link)
1✔
307
        return root
1✔
308
    else:
309
        # lxml does not support setting xmlns attributes
310
        # Update the elements nsmap with new namespaces
311
        new_map = root.nsmap
×
312
        for key, link in ns_keys:
×
313
            if link is not None:
×
314
                new_map[key] = link
×
315
        # Recreate the root element with updated nsmap
316
        new_root = etree.Element(root.tag, nsmap=new_map)
×
317
        # Carry over attributes
318
        for a, v in list(root.items()):
×
319
            new_root.set(a, v)
×
320
        # Carry over children
321
        for child in root:
×
322
            new_root.append(deepcopy(child))
×
323
        return new_root
×
324

325

326
def getXMLInteger(elem, tag):
1✔
327
    """
328
    Return the text within the named tag as an integer.
329

330
    Raises an exception if the tag cannot be found or if its textual
331
    value cannot be converted to an integer.
332

333
    Parameters
334
    ----------
335

336
    - elem: the element to search within
337
    - tag: the name of the tag to look for
338

339
    """
340
    e = elem.find(tag)
1✔
341
    if e is None:
1✔
342
        raise ValueError('Missing %s in %s' % (tag, elem))
×
343
    return int(e.text.strip())
1✔
344

345

346
def testXMLValue(val, attrib=False):
1✔
347
    """
348

349
    Test that the XML value exists, return val.text, else return None
350

351
    Parameters
352
    ----------
353

354
    - val: the value to be tested
355

356
    """
357

358
    if val is not None:
1✔
359
        if attrib:
1✔
360
            return val.strip()
1✔
361
        elif val.text:
1✔
362
            return val.text.strip()
1✔
363
        else:
364
            return None
1✔
365
    else:
366
        return None
1✔
367

368

369
def testXMLAttribute(element, attribute):
1✔
370
    """
371

372
    Test that the XML element and attribute exist, return attribute's value, else return None
373

374
    Parameters
375
    ----------
376

377
    - element: the element containing the attribute
378
    - attribute: the attribute name
379

380
    """
381
    if element is not None:
1✔
382
        return element.get(attribute)
1✔
383

384
    return None
1✔
385

386

387
def http_post(url=None, request=None, lang='en-US', timeout=10, username=None, password=None, auth=None, headers=None):
1✔
388
    """
389

390
    Invoke an HTTP POST request
391

392
    Parameters
393
    ----------
394

395
    - url: the URL of the server
396
    - request: the request message
397
    - lang: the language
398
    - timeout: timeout in seconds
399
    - auth: owslib.util.Auth instance
400
    - headers: HTTP headers to send with requests
401

402
    """
403

404
    if url is None:
1✔
405
        raise ValueError("URL required")
×
406

407
    u = urlsplit(url)
1✔
408

409
    headers_ = {
1✔
410
        'User-Agent': 'OWSLib (https://geopython.github.io/OWSLib)',
411
        'Content-type': 'text/xml',
412
        'Accept': 'text/xml,application/xml',
413
        'Accept-Language': lang,
414
        'Accept-Encoding': 'gzip,deflate',
415
        'Host': u.netloc,
416
    }
417

418
    if headers:
1✔
419
        headers_.update(headers)
×
420

421
    if isinstance(request, dict):
1✔
422
        headers_['Content-type'] = 'application/json'
1✔
423
        headers_.pop('Accept')
1✔
424

425
    rkwargs = {}
1✔
426

427
    if auth:
1✔
428
        if username:
1✔
429
            auth.username = username
×
430
        if password:
1✔
431
            auth.password = password
×
432
    else:
433
        auth = Authentication(username, password)
1✔
434
    if auth.username is not None and auth.password is not None:
1✔
435
        rkwargs['auth'] = (auth.username, auth.password)
×
436
    elif auth.auth_delegate is not None:
1✔
437
        rkwargs['auth'] = auth.auth_delegate
×
438
    rkwargs['verify'] = auth.verify
1✔
439
    rkwargs['cert'] = auth.cert
1✔
440

441
    if not isinstance(request, dict):
1✔
442
        return requests.post(url, request, headers=headers_, **rkwargs)
1✔
443
    else:
444
        return requests.post(url, json=request, headers=headers_, **rkwargs)
1✔
445

446

447
def http_prepare(*args, **kwargs):
1✔
448
    # Copy input kwargs so the dict can be modified
449
    rkwargs = copy.deepcopy(kwargs)
1✔
450

451
    # Use Authentication instance if provided, else create one
452
    auth = rkwargs.pop('auth', None)
1✔
453
    if auth is not None:
1✔
454
        if isinstance(auth, (tuple, list)):
×
455
            auth = Authentication(*auth)
×
456
    else:
457
        auth = Authentication()
1✔
458

459
    # Populate values with other arguments supplied
460
    if 'username' in rkwargs:
1✔
461
        auth.username = rkwargs.pop('username')
×
462
    if 'password' in rkwargs:
1✔
463
        auth.password = rkwargs.pop('password')
×
464
    if 'cert' in rkwargs:
1✔
465
        auth.cert = rkwargs.pop('cert')
×
466
    if 'verify' in rkwargs:
1✔
467
        auth.verify = rkwargs.pop('verify')
×
468

469
    # Build keyword args for call to requests.get()
470
    if auth.username and auth.password:
1✔
471
        rkwargs.setdefault('auth', (auth.username, auth.password))
×
472
    elif auth.auth_delegate is not None:
1✔
473
        rkwargs['auth'] = auth.auth_delegate
×
474
    else:
475
        rkwargs.setdefault('auth', None)
1✔
476
    rkwargs.setdefault('cert', rkwargs.get('cert'))
1✔
477
    rkwargs.setdefault('verify', rkwargs.get('verify', True))
1✔
478

479
    return rkwargs
1✔
480

481

482
def http_get(*args, **kwargs):
1✔
483
    rkwargs = http_prepare(*args, **kwargs)
1✔
484
    return requests.get(*args, **rkwargs)
1✔
485

486

487
def http_put(*args, **kwargs):
1✔
488
    rkwargs = http_prepare(*args, **kwargs)
×
489

490
    if 'data' in kwargs:
×
491
        if isinstance(kwargs['data'], dict):
×
492
            rkwargs['json'] = kwargs['data']
×
493
            rkwargs.pop('data')
×
494
        else:
495
            rkwargs['data'] = kwargs['data']
×
496

497
    return requests.put(*args, **rkwargs)
×
498

499

500
def http_delete(*args, **kwargs):
1✔
501
    rkwargs = http_prepare(*args, **kwargs)
×
502
    return requests.delete(*args, **rkwargs)
×
503

504

505
def element_to_string(element, encoding=None, xml_declaration=False):
1✔
506
    """
507
    Returns a string from a XML object
508

509
    Parameters
510
    ----------
511
    - element: etree Element
512
    - encoding (optional): encoding in string form. 'utf-8', 'ISO-8859-1', etc.
513
    - xml_declaration (optional): whether to include xml declaration
514

515
    """
516

517
    output = None
1✔
518

519
    if encoding is None:
1✔
520
        encoding = "ISO-8859-1"
1✔
521

522
    if etree.__name__ == 'lxml.etree':
1✔
523
        if xml_declaration:
×
524
            if encoding in ['unicode', 'utf-8']:
×
525
                output = '<?xml version="1.0" encoding="utf-8" standalone="no"?>\n{}'.format(
×
526
                    etree.tostring(element, encoding='unicode'))
527
            else:
528
                output = etree.tostring(element, encoding=encoding, xml_declaration=True)
×
529
        else:
530
            output = etree.tostring(element)
×
531
    else:
532
        if xml_declaration:
1✔
533
            output = '<?xml version="1.0" encoding="{}" standalone="no"?>\n{}'.format(
×
534
                encoding, etree.tostring(element, encoding=encoding))
535
        else:
536
            output = etree.tostring(element)
1✔
537

538
    return output
1✔
539

540

541
def xml2string(xml):
1✔
542
    """
543

544
    Return a string of XML object
545

546
    Parameters
547
    ----------
548

549
    - xml: xml string
550

551
    """
552
    warnings.warn("DEPRECIATION WARNING!  You should now use the 'element_to_string' method \
×
553
                   The 'xml2string' method will be removed in a future version of OWSLib.")
554
    return '<?xml version="1.0" encoding="ISO-8859-1" standalone="no"?>\n' + xml
×
555

556

557
def xmlvalid(xml, xsd):
1✔
558
    """
559

560
    Test whether an XML document is valid
561

562
    Parameters
563
    ----------
564

565
    - xml: XML content
566
    - xsd: pointer to XML Schema (local file path or URL)
567

568
    """
569

570
    xsd1 = etree.parse(xsd)
×
571
    xsd2 = etree.XMLSchema(xsd1)
×
572

573
    doc = etree.parse(StringIO(xml))
×
574
    return xsd2.validate(doc)
×
575

576

577
def xmltag_split(tag):
1✔
578
    ''' Return XML element bare tag name (without prefix) '''
579
    try:
1✔
580
        return tag.split('}')[1]
1✔
581
    except Exception:
1✔
582
        return tag
1✔
583

584

585
def getNamespace(element):
1✔
586
    ''' Utility method to extract the namespace from an XML element tag encoded as {namespace}localname. '''
587
    if element.tag[0] == '{':
1✔
588
        return element.tag[1:].split("}")[0]
1✔
589
    else:
590
        return ""
1✔
591

592

593
def build_get_url(base_url, params, overwrite=False):
1✔
594
    ''' Utility function to build a full HTTP GET URL from the service base URL and a dictionary of HTTP parameters.
595

596
    TODO: handle parameters case-insensitive?
597

598
    @param overwrite: boolean flag to allow overwrite of parameters of the base_url (default: False)
599
    '''
600

601
    qs_base = []
1✔
602
    if base_url.find('?') != -1:
1✔
603
        qs_base = parse_qsl(base_url.split('?')[1])
1✔
604

605
    qs_params = []
1✔
606
    for key, value in list(params.items()):
1✔
607
        qs_params.append((key, value))
1✔
608

609
    qs = qs_add = []
1✔
610
    if overwrite is True:
1✔
611
        # all params and additional base
612
        qs = qs_params
1✔
613
        qs_add = qs_base
1✔
614
    else:
615
        # all base and additional params
616
        qs = qs_base
1✔
617
        qs_add = qs_params
1✔
618

619
    pars = [x[0] for x in qs]
1✔
620

621
    for key, value in qs_add:
1✔
622
        if key not in pars:
1✔
623
            qs.append((key, value))
1✔
624

625
    urlqs = urlencode(tuple(qs))
1✔
626
    return base_url.split('?')[0] + '?' + urlqs
1✔
627

628

629
def dump(obj, prefix=''):
1✔
630
    '''Utility function to print to standard output a generic object with all its attributes.'''
631

632
    print(("{} {}.{} : {}".format(prefix, obj.__module__, obj.__class__.__name__, obj.__dict__)))
1✔
633

634

635
def getTypedValue(data_type, value):
1✔
636
    '''Utility function to cast a string value to the appropriate XSD type. '''
637

638
    # If the default value is empty
639
    if value is None:
1✔
640
        return
×
641

642
    if data_type == 'boolean':
1✔
643
        return True if value.lower() == 'true' else False
1✔
644
    elif data_type == 'integer':
1✔
645
        return int(value)
1✔
646
    elif data_type == 'float':
1✔
647
        return float(value)
1✔
648
    elif data_type == 'string':
1✔
649
        return str(value)
1✔
650
    else:
651
        return value  # no type casting
1✔
652

653

654
def extract_time(element):
1✔
655
    ''' return a datetime object based on a gml text string
656

657
ex:
658
<gml:beginPosition>2006-07-27T21:10:00Z</gml:beginPosition>
659
<gml:endPosition indeterminatePosition="now"/>
660

661
If there happens to be a strange element with both attributes and text,
662
use the text.
663
ex: <gml:beginPosition indeterminatePosition="now">2006-07-27T21:10:00Z</gml:beginPosition>
664
Would be 2006-07-27T21:10:00Z, not 'now'
665

666
'''
667
    if element is None:
1✔
668
        return None
1✔
669

670
    try:
1✔
671
        dt = parser.parse(element.text)
1✔
672
    except Exception:
1✔
673
        att = testXMLValue(element.attrib.get('indeterminatePosition'), True)
1✔
674
        if att and att == 'now':
1✔
675
            dt = datetime.utcnow()
1✔
676
            dt.replace(tzinfo=pytz.utc)
1✔
677
        else:
678
            dt = None
×
679
    return dt
1✔
680

681

682
def extract_xml_list(elements):
1✔
683
    """
684
Some people don't have seperate tags for their keywords and seperate them with
685
a newline. This will extract out all of the keywords correctly.
686
"""
687
    keywords = (re.split(r'[\n\r]+', f.text) for f in elements if f.text)
1✔
688
    flattened = (item.strip() for sublist in keywords for item in sublist)
1✔
689
    remove_blank = [_f for _f in flattened if _f]
1✔
690
    return remove_blank
1✔
691

692

693
def strip_bom(raw_text):
1✔
694
    """ return the raw (assumed) xml response without the BOM
695
    """
696
    boms = [
1✔
697
        # utf-8
698
        codecs.BOM_UTF8,
699
        # utf-16
700
        codecs.BOM,
701
        codecs.BOM_BE,
702
        codecs.BOM_LE,
703
        codecs.BOM_UTF16,
704
        codecs.BOM_UTF16_LE,
705
        codecs.BOM_UTF16_BE,
706
        # utf-32
707
        codecs.BOM_UTF32,
708
        codecs.BOM_UTF32_LE,
709
        codecs.BOM_UTF32_BE
710
    ]
711

712
    if isinstance(raw_text, bytes):
1✔
713
        for bom in boms:
1✔
714
            if raw_text.startswith(bom):
1✔
715
                return raw_text[len(bom):]
1✔
716
    return raw_text
1✔
717

718

719
def clean_ows_url(url):
1✔
720
    """
721
    clean an OWS URL of basic service elements
722

723
    source: https://stackoverflow.com/a/11640565
724
    """
725

726
    if url is None or not url.startswith('http'):
1✔
727
        return url
1✔
728

729
    filtered_kvp = {}
1✔
730
    basic_service_elements = ('service', 'version', 'request')
1✔
731

732
    parsed = urlparse(url)
1✔
733
    qd = parse_qs(parsed.query, keep_blank_values=True)
1✔
734

735
    for key, value in list(qd.items()):
1✔
736
        if key.lower() not in basic_service_elements:
1✔
737
            filtered_kvp[key] = value
1✔
738

739
    newurl = urlunparse([
1✔
740
        parsed.scheme,
741
        parsed.netloc,
742
        parsed.path,
743
        parsed.params,
744
        urlencode(filtered_kvp, doseq=True),
745
        parsed.fragment
746
    ])
747

748
    return newurl
1✔
749

750

751
def bind_url(url):
1✔
752
    """binds an HTTP GET query string endpiont"""
753
    if url.find('?') == -1:  # like http://host/wms
1✔
754
        binder = '?'
1✔
755

756
    # if like http://host/wms?foo=bar& or http://host/wms?foo=bar
757
    if url.find('=') != -1:
1✔
758
        if url.find('&', -1) != -1:  # like http://host/wms?foo=bar&
1✔
759
            binder = ''
1✔
760
        else:  # like http://host/wms?foo=bar
761
            binder = '&'
×
762

763
    # if like http://host/wms?foo
764
    if url.find('?') != -1:
1✔
765
        if url.find('?', -1) != -1:  # like http://host/wms?
1✔
766
            binder = ''
×
767
        elif url.find('&', -1) == -1:  # like http://host/wms?foo=bar
1✔
768
            binder = '&'
×
769
    return '%s%s' % (url, binder)
1✔
770

771

772
import logging
1✔
773
# Null logging handler
774
NullHandler = logging.NullHandler
1✔
775

776
log = logging.getLogger('owslib')
1✔
777
log.addHandler(NullHandler())
1✔
778

779

780
def which_etree():
1✔
781
    """decipher which etree library is being used by OWSLib"""
782

783
    which_etree = None
×
784

785
    if 'lxml' in etree.__file__:
×
786
        which_etree = 'lxml.etree'
×
787
    elif 'xml/etree' in etree.__file__:
×
788
        which_etree = 'xml.etree'
×
789
    elif 'elementree' in etree.__file__:
×
790
        which_etree = 'elementtree.ElementTree'
×
791

792
    return which_etree
×
793

794

795
def findall(root, xpath, attribute_name=None, attribute_value=None):
1✔
796
    """Find elements recursively from given root element based on
797
    xpath and possibly given attribute
798

799
    :param root: Element root element where to start search
800
    :param xpath: xpath defintion, like {http://foo/bar/namespace}ElementName
801
    :param attribute_name: name of possible attribute of given element
802
    :param attribute_value: value of the attribute
803
    :return: list of elements or None
804
    """
805

806
    found_elements = []
1✔
807

808
    if attribute_name is not None and attribute_value is not None:
1✔
809
        xpath = '%s[@%s="%s"]' % (xpath, attribute_name, attribute_value)
1✔
810
    found_elements = root.findall('.//' + xpath)
1✔
811

812
    if found_elements == []:
1✔
813
        found_elements = None
×
814
    return found_elements
1✔
815

816

817
def datetime_from_iso(iso):
1✔
818
    """returns a datetime object from dates in the format 2001-07-01T00:00:00Z or 2001-07-01T00:00:00.000Z """
819
    try:
×
820
        iso_datetime = datetime.strptime(iso, "%Y-%m-%dT%H:%M:%SZ")
×
821
    except Exception:
×
822
        iso_datetime = datetime.strptime(iso, "%Y-%m-%dT%H:%M:%S.%fZ")
×
823
    return iso_datetime
×
824

825

826
def datetime_from_ansi(ansi):
1✔
827
    """Converts an ansiDate (expressed as a number = the nuber of days since the datum origin of ansi)
828
    to a python datetime object.
829
    """
830

831
    datumOrigin = datetime(1600, 12, 31, 0, 0, 0)
×
832

833
    return datumOrigin + timedelta(ansi)
×
834

835

836
def is_number(s):
1✔
837
    """simple helper to test if value is number as requests with numbers don't
838
    need quote marks
839
    """
840
    try:
×
841
        float(s)
×
842
        return True
×
843
    except ValueError:
×
844
        return False
×
845

846

847
def makeString(value):
1✔
848
    # using repr unconditionally breaks things in some circumstances if a
849
    # value is already a string
850
    if type(value) is not str:
1✔
851
        sval = repr(value)
1✔
852
    else:
853
        sval = value
×
854
    return sval
1✔
855

856

857
def param_list_to_url_string(param_list, param_name):
1✔
858
    """Converts list of tuples for certain WCS GetCoverage keyword arguments
859
    (subsets, resolutions, sizes) to a url-encoded string
860
    """
861
    string = ''
×
862
    for param in param_list:
×
863
        if len(param) > 2:
×
864
            if not is_number(param[1]):
×
865
                string += "&" + urlencode({param_name: param[0] + '("' + makeString(param[1]) + '","' + makeString(param[2]) + '")'})  # noqa
×
866
            else:
867
                string += "&" + urlencode({param_name: param[0] + "(" + makeString(param[1]) + "," + makeString(param[2]) + ")"})  # noqa
×
868
        else:
869
            if not is_number(param[1]):
×
870
                string += "&" + urlencode({param_name: param[0] + '("' + makeString(param[1]) + '")'})  # noqa
×
871
            else:
872
                string += "&" + urlencode({param_name: param[0] + "(" + makeString(param[1]) + ")"})  # noqa
×
873
    return string
×
874

875

876
def is_vector_grid(grid_elem):
1✔
877
    pass
×
878

879

880
class Authentication(object):
1✔
881

882
    _USERNAME = None
1✔
883
    _PASSWORD = None
1✔
884
    _AUTH_DELEGATE = None
1✔
885
    _CERT = None
1✔
886
    _VERIFY = None
1✔
887

888
    def __init__(self, username=None, password=None,
1✔
889
                 cert=None, verify=True, shared=False,
890
                 auth_delegate=None):
891
        '''
892
        :param str username=None: Username for basic authentication, None for
893
            unauthenticated access (or if using cert/verify)
894
        :param str password=None: Password for basic authentication, None for
895
            unauthenticated access (or if using cert/verify)
896
        :param cert=None: Either a str (path to a combined certificate/key) or
897
            tuple/list of paths (certificate, key). If supplied, the target
898
            files must exist.
899
        :param verify=True: Either a bool (verify SSL certificates, use system
900
            CA bundle) or str (path to a specific CA bundle). If a str, the
901
            target file must exist.
902
        :param bool shared=False: Set to True to make the values be class-level
903
            attributes (shared among instances where shared=True) instead of
904
            instance-level (shared=False, default)
905
        :param AuthBase auth_delegate=None: Instance of requests' AuthBase to
906
            allow arbitrary authentication schemes - mutually exclusive with
907
            username/password arguments.
908
        '''
909
        self.shared = shared
1✔
910
        self._username = username
1✔
911
        self._password = password
1✔
912
        self._cert = cert
1✔
913
        self._verify = verify
1✔
914
        self._auth_delegate = auth_delegate
1✔
915

916
        # Trigger the setters to validate the parameters. This couldn't be done directly
917
        # since some parameters are mutually exclusive.
918
        self.username = username
1✔
919
        self.password = password
1✔
920
        self.cert = cert
1✔
921
        self.verify = verify
1✔
922
        self.auth_delegate = auth_delegate
1✔
923

924
    @property
1✔
925
    def username(self):
926
        if self.shared:
1✔
927
            return self._USERNAME
×
928
        return self._username
1✔
929

930
    @username.setter
1✔
931
    def username(self, value):
932
        if value is not None:
1✔
933

934
            if not isinstance(value, str):
×
935
                raise TypeError('Value for "username" must be a str')
×
936

937
            if self.auth_delegate is not None:
×
938
                raise ValueError('Authentication instances may have username/password or auth_delegate set,'
×
939
                                 ' but not both')
940

941
        if self.shared:
1✔
942
            self.__class__._USERNAME = value
×
943
        else:
944
            self._username = value
1✔
945

946
    @property
1✔
947
    def password(self):
948
        if self.shared:
1✔
949
            return self._PASSWORD
×
950
        return self._password
1✔
951

952
    @password.setter
1✔
953
    def password(self, value):
954
        if value is not None:
1✔
955

956
            if not isinstance(value, str):
×
957
                raise TypeError('Value for "password" must be a str')
×
958

959
            if self.auth_delegate is not None:
×
960
                raise ValueError('Authentication instances may have username/password or auth_delegate set,'
×
961
                                 ' but not both')
962

963
        if self.shared:
1✔
964
            self.__class__._PASSWORD = value
×
965
        else:
966
            self._password = value
1✔
967

968
    @property
1✔
969
    def cert(self):
970
        if self.shared:
1✔
971
            return self._CERT
×
972
        return self._cert
1✔
973

974
    @cert.setter
1✔
975
    def cert(self, certificate, key=None):
1✔
976
        error = 'Value for "cert" must be a str path to a file or list/tuple of str paths'
1✔
977
        value = None
1✔
978
        if certificate is None:
1✔
979
            value = certificate
1✔
980
        elif isinstance(certificate, (list, tuple)):
×
981
            for _ in certificate:
×
982
                if not isinstance(_, str):
×
983
                    raise TypeError(error)
×
984
                os.stat(_)  # Raises OSError/FileNotFoundError if missing
×
985
            # Both paths supplied as same argument
986
            value = tuple(certificate)
×
987
        elif isinstance(certificate, str):
×
988
            os.stat(certificate)  # Raises OSError/FileNotFoundError if missing
×
989
            if isinstance(key, str):
×
990
                # Separate files for certificate and key
991
                value = (certificate, key)
×
992
            else:
993
                # Assume combined file of both certificate and key
994
                value = certificate
×
995
        else:
996
            raise TypeError(error)
×
997
        if self.shared:
1✔
998
            self.__class__._CERT = value
×
999
        else:
1000
            self._cert = value
1✔
1001

1002
    @property
1✔
1003
    def verify(self):
1004
        if self.shared:
1✔
1005
            return self._VERIFY
×
1006
        return self._verify
1✔
1007

1008
    @verify.setter
1✔
1009
    def verify(self, value):
1010
        if value is None:
1✔
1011
            pass  # Passthrough when clearing the value
×
1012
        elif not isinstance(value, (bool, str)):
1✔
1013
            raise TypeError(
×
1014
                'Value for "verify" must a bool or str path to a file')
1015
        elif isinstance(value, str):
1✔
1016
            os.stat(value)  # Raises OSError/FileNotFoundError if missing
×
1017
        if self.shared:
1✔
1018
            self.__class__._VERIFY = value
×
1019
        else:
1020
            self._verify = value
1✔
1021

1022
    @property
1✔
1023
    def auth_delegate(self):
1024
        if self.shared:
1✔
1025
            return self._AUTH_DELEGATE
×
1026
        return self._auth_delegate
1✔
1027

1028
    @auth_delegate.setter
1✔
1029
    def auth_delegate(self, value):
1030
        if value is not None:
1✔
1031
            if not isinstance(value, AuthBase):
×
1032
                raise TypeError('Value for "auth_delegate" must be an instance of AuthBase')
×
1033

1034
            if self.username is not None or self.password is not None:
×
1035
                raise ValueError('Authentication instances may have username/password or auth_delegate set,'
×
1036
                                 ' but not both')
1037

1038
        if self.shared:
1✔
1039
            self.__class__._AUTH_DELEGATE = value
×
1040
        else:
1041
            self._auth_delegate = value
1✔
1042

1043
    @property
1✔
1044
    def urlopen_kwargs(self):
1045
        if self.auth_delegate is not None:
×
1046
            raise NotImplementedError("The urlopen_kwargs property is not supported when auth_delegate is set")
×
1047

1048
        return {
×
1049
            'username': self.username,
1050
            'password': self.password,
1051
            'cert': self.cert,
1052
            'verify': self.verify
1053
        }
1054

1055
    def __repr__(self, *args, **kwargs):
1✔
1056
        return '<{} shared={} username={} password={} cert={} verify={} auth_delegate={}>'.format(
×
1057
            self.__class__.__name__, self.shared, self.username, self.password, self.cert, self.verify,
1058
            self.auth_delegate)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc