• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geopython / OWSLib / 3663701074

pending completion
3663701074

Pull #851

github

GitHub
Merge 6cd54d613 into 13b1443f7
Pull Request #851: Adding Python 3.10 in CI

7461 of 12701 relevant lines covered (58.74%)

0.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.45
/owslib/wmts.py
1
# -*- coding: UTF-8 -*-
2
# =============================================================================
3
# Copyright (C) 2012 Brad Hards <bradh@frogmouth.net>
4
#
5
# Based on wms.py, which has the following copyright statement:
6
# Copyright (c) 2004, 2006 Sean C. Gillies
7
# Copyright (c) 2005 Nuxeo SARL <http://nuxeo.com>
8
#
9
# Authors : Sean Gillies <sgillies@frii.com>
10
#           Julien Anguenot <ja@nuxeo.com>
11
#
12
# Contact email: sgillies@frii.com
13
# =============================================================================
14

15
"""
1✔
16

17
Abstract
18
--------
19
The wmts module of the OWSlib package provides client-side functionality
20
for fetching tiles from an OGC Web Map Tile Service (WMTS)
21

22

23
Disclaimer
24
----------
25
PLEASE NOTE: the owslib wmts module should be considered in early-beta
26
state: it has been tested against only one WMTS server (NASA EODSIS).
27
More extensive testing is needed and feedback (to bradh@frogmouth.net)
28
would be appreciated.
29

30
"""
31

32
from random import randint
1✔
33
import warnings
1✔
34
from urllib.parse import (urlencode, urlparse, urlunparse, parse_qs,
1✔
35
                          ParseResult)
36
from .etree import etree
1✔
37
from .util import clean_ows_url, testXMLValue, getXMLInteger, Authentication, openURL
1✔
38
from .fgdc import Metadata
1✔
39
from .iso import MD_Metadata
1✔
40
from .ows import ServiceProvider, ServiceIdentification, OperationsMetadata
1✔
41

42

43
_OWS_NS = '{http://www.opengis.net/ows/1.1}'
1✔
44
_WMTS_NS = '{http://www.opengis.net/wmts/1.0}'
1✔
45
_XLINK_NS = '{http://www.w3.org/1999/xlink}'
1✔
46

47
# OpenGIS Web Map Tile Service (WMTS) Implementation Standard
48
# Version 1.0.0, document 07-057r7
49

50
_ABSTRACT_TAG = _OWS_NS + 'Abstract'
1✔
51
_BOUNDING_BOX_TAG = _OWS_NS + 'BoundingBox'
1✔
52
_IDENTIFIER_TAG = _OWS_NS + 'Identifier'
1✔
53
_LOWER_CORNER_TAG = _OWS_NS + 'LowerCorner'
1✔
54
_OPERATIONS_METADATA_TAG = _OWS_NS + 'OperationsMetadata'
1✔
55
_SERVICE_IDENTIFICATION_TAG = _OWS_NS + 'ServiceIdentification'
1✔
56
_SERVICE_PROVIDER_TAG = _OWS_NS + 'ServiceProvider'
1✔
57
_SUPPORTED_CRS_TAG = _OWS_NS + 'SupportedCRS'
1✔
58
_TITLE_TAG = _OWS_NS + 'Title'
1✔
59
_UPPER_CORNER_TAG = _OWS_NS + 'UpperCorner'
1✔
60
_WGS84_BOUNDING_BOX_TAG = _OWS_NS + 'WGS84BoundingBox'
1✔
61

62
_CONTENTS_TAG = _WMTS_NS + 'Contents'
1✔
63
_FORMAT_TAG = _WMTS_NS + 'Format'
1✔
64
_INFO_FORMAT_TAG = _WMTS_NS + 'InfoFormat'
1✔
65
_LAYER_TAG = _WMTS_NS + 'Layer'
1✔
66
_LAYER_REF_TAG = _WMTS_NS + 'LayerRef'
1✔
67
_MATRIX_HEIGHT_TAG = _WMTS_NS + 'MatrixHeight'
1✔
68
_MATRIX_WIDTH_TAG = _WMTS_NS + 'MatrixWidth'
1✔
69
_MAX_TILE_COL_TAG = _WMTS_NS + 'MaxTileCol'
1✔
70
_MAX_TILE_ROW_TAG = _WMTS_NS + 'MaxTileRow'
1✔
71
_MIN_TILE_COL_TAG = _WMTS_NS + 'MinTileCol'
1✔
72
_MIN_TILE_ROW_TAG = _WMTS_NS + 'MinTileRow'
1✔
73
_RESOURCE_URL_TAG = _WMTS_NS + 'ResourceURL'
1✔
74
_SCALE_DENOMINATOR_TAG = _WMTS_NS + 'ScaleDenominator'
1✔
75
_SERVICE_METADATA_URL_TAG = _WMTS_NS + 'ServiceMetadataURL'
1✔
76

77
# Table 7, page 20-21, Parts of Style data structure
78
_STYLE_TAG = _WMTS_NS + 'Style'
1✔
79
_STYLE_LEGEND_URL = _WMTS_NS + 'LegendURL'
1✔
80

81
_THEME_TAG = _WMTS_NS + 'Theme'
1✔
82
_THEMES_TAG = _WMTS_NS + 'Themes'
1✔
83
_TILE_HEIGHT_TAG = _WMTS_NS + 'TileHeight'
1✔
84
_TILE_MATRIX_SET_LINK_TAG = _WMTS_NS + 'TileMatrixSetLink'
1✔
85
_TILE_MATRIX_SET_TAG = _WMTS_NS + 'TileMatrixSet'
1✔
86
_TILE_MATRIX_SET_LIMITS_TAG = _WMTS_NS + 'TileMatrixSetLimits'
1✔
87
_TILE_MATRIX_LIMITS_TAG = _WMTS_NS + 'TileMatrixLimits'
1✔
88
_TILE_MATRIX_TAG = _WMTS_NS + 'TileMatrix'
1✔
89
_TILE_WIDTH_TAG = _WMTS_NS + 'TileWidth'
1✔
90
_TOP_LEFT_CORNER_TAG = _WMTS_NS + 'TopLeftCorner'
1✔
91
_KEYWORDS_TAG = _OWS_NS + 'Keywords'
1✔
92
_KEYWORD_TAG = _OWS_NS + 'Keyword'
1✔
93
_HREF_TAG = _XLINK_NS + 'href'
1✔
94

95

96
class ServiceException(Exception):
1✔
97
    """WMTS ServiceException
98

99
    Attributes:
100
        message -- short error message
101
        xml  -- full xml error message from server
102
    """
103

104
    def __init__(self, message, xml):
1✔
105
        self.message = message
×
106
        self.xml = xml
×
107

108
    def __str__(self):
1✔
109
        return repr(self.message)
×
110

111

112
class CapabilitiesError(Exception):
1✔
113
    pass
1✔
114

115

116
class WebMapTileService(object):
1✔
117
    """Abstraction for OGC Web Map Tile Service (WMTS).
118

119
    Implements IWebMapService.
120
    """
121

122
    def __getitem__(self, name):
1✔
123
        '''Check contents dictionary to allow dict like access to
124
        service layers'''
125
        if name in self.__getattribute__('contents'):
1✔
126
            return self.__getattribute__('contents')[name]
1✔
127
        else:
128
            raise KeyError("No content named %s" % name)
1✔
129

130
    def __init__(self, url, version='1.0.0', xml=None, username=None, password=None,
1✔
131
                 parse_remote_metadata=False, vendor_kwargs=None, headers=None, auth=None,
132
                 timeout=30):
133
        """Initialize.
134

135
        Parameters
136
        ----------
137
        url : string
138
            Base URL for the WMTS service.
139
        version : string
140
            Optional WMTS version. Defaults to '1.0.0'.
141
        xml : string
142
            Optional XML content to use as the content for the initial
143
            GetCapabilities request. Typically only used for testing.
144
        username : string
145
            Optional user name for authentication.
146
        password : string
147
            Optional password for authentication.
148
        parse_remote_metadata: string
149
            Currently unused.
150
        vendor_kwargs : dict
151
            Optional vendor-specific parameters to be included in all
152
            requests.
153
        auth : owslib.util.Authentication
154
            Instance of Authentication class to hold username/password/cert/verify
155
        timeout : int
156
            number of seconds for GetTile request
157

158
        """
159
        self.url = clean_ows_url(url)
1✔
160
        if auth:
1✔
161
            if username:
×
162
                auth.username = username
×
163
            if password:
×
164
                auth.password = password
×
165
        self.version = version
1✔
166
        self.vendor_kwargs = vendor_kwargs
1✔
167
        self._capabilities = None
1✔
168
        self.headers = headers
1✔
169
        self.auth = auth or Authentication(username, password)
1✔
170
        self.timeout = timeout or 30
1✔
171

172
        # Authentication handled by Reader
173
        reader = WMTSCapabilitiesReader(
1✔
174
            self.version, url=self.url, headers=self.headers, auth=self.auth)
175
        if xml:  # read from stored xml
1✔
176
            self._capabilities = reader.readString(xml)
1✔
177
        else:  # read from server
178
            self._capabilities = reader.read(self.url, self.vendor_kwargs)
1✔
179

180
        # Avoid building capabilities metadata if the response is a
181
        # ServiceExceptionReport.
182
        # TODO: check if this needs a namespace
183
        se = self._capabilities.find('ServiceException')
1✔
184
        if se is not None:
1✔
185
            err_message = str(se.text).strip()
×
186
            raise ServiceException(err_message, xml)
×
187

188
        # build metadata objects
189
        self._buildMetadata(parse_remote_metadata)
1✔
190

191
    def _getcapproperty(self):
1✔
192
        # TODO: deprecated function. See ticket #453.
193
        if not self._capabilities:
×
194
            reader = WMTSCapabilitiesReader(
×
195
                self.version, url=self.url, headers=self.headers, auth=self.auth)
196
            # xml = reader.read(self.url, self.vendor_kwargs)
197
            # self._capabilities = ServiceMetadata(xml)
198
            self._capabilities = reader.read(self.url, self.vendor_kwargs)
×
199
        return self._capabilities
×
200

201
    def _buildMetadata(self, parse_remote_metadata=False):
1✔
202
        ''' set up capabilities metadata objects '''
203

204
        self.updateSequence = self._capabilities.attrib.get('updateSequence')
1✔
205

206
        # serviceIdentification metadata
207
        serviceident = self._capabilities.find(_SERVICE_IDENTIFICATION_TAG)
1✔
208
        if serviceident is not None:
1✔
209
            self.identification = ServiceIdentification(serviceident)
1✔
210

211
        # serviceProvider metadata
212
        serviceprov = self._capabilities.find(_SERVICE_PROVIDER_TAG)
1✔
213
        if serviceprov is not None:
1✔
214
            self.provider = ServiceProvider(serviceprov)
1✔
215

216
        # serviceOperations metadata
217
        self.operations = []
1✔
218
        serviceop = self._capabilities.find(_OPERATIONS_METADATA_TAG)
1✔
219
        #  REST only WMTS does not have any Operations
220
        if serviceop is not None:
1✔
221
            for elem in serviceop[:]:
1✔
222
                self.operations.append(OperationsMetadata(elem))
1✔
223

224
        # serviceContents metadata: our assumption is that services use
225
        # a top-level layer as a metadata organizer, nothing more.
226
        self.contents = {}
1✔
227
        caps = self._capabilities.find(_CONTENTS_TAG)
1✔
228

229
        def gather_layers(parent_elem, parent_metadata):
1✔
230
            for index, elem in enumerate(parent_elem.findall(_LAYER_TAG)):
1✔
231
                cm = ContentMetadata(
1✔
232
                    elem, parent=parent_metadata, index=index + 1,
233
                    parse_remote_metadata=parse_remote_metadata)
234
                if cm.id:
1✔
235
                    if cm.id in self.contents:
1✔
236
                        raise KeyError('Content metadata for layer "%s" '
×
237
                                       'already exists' % cm.id)
238
                    self.contents[cm.id] = cm
1✔
239
                gather_layers(elem, cm)
1✔
240
        gather_layers(caps, None)
1✔
241

242
        self.tilematrixsets = {}
1✔
243
        for elem in caps.findall(_TILE_MATRIX_SET_TAG):
1✔
244
            tms = TileMatrixSet(elem)
1✔
245
            if tms.identifier:
1✔
246
                if tms.identifier in self.tilematrixsets:
1✔
247
                    raise KeyError('TileMatrixSet with identifier "%s" '
×
248
                                   'already exists' % tms.identifier)
249
                self.tilematrixsets[tms.identifier] = tms
1✔
250

251
        self.themes = {}
1✔
252
        for elem in self._capabilities.findall(_THEMES_TAG + '/' + _THEME_TAG):
1✔
253
            theme = Theme(elem)
×
254
            if theme.identifier:
×
255
                if theme.identifier in self.themes:
×
256
                    raise KeyError('Theme with identifier "%s" already exists'
×
257
                                   % theme.identifier)
258
                self.themes[theme.identifier] = theme
×
259

260
        serviceMetadataURL = self._capabilities.find(_SERVICE_METADATA_URL_TAG)
1✔
261
        if serviceMetadataURL is not None:
1✔
262
            self.serviceMetadataURL = serviceMetadataURL.attrib[_HREF_TAG]
1✔
263
        else:
264
            self.serviceMetadataURL = None
1✔
265

266
    def items(self):
1✔
267
        '''supports dict-like items() access'''
268
        items = []
×
269
        for item in self.contents:
×
270
            items.append((item, self.contents[item]))
×
271
        return items
×
272

273
    def buildTileRequest(self, layer=None, style=None, format=None,
1✔
274
                         tilematrixset=None, tilematrix=None, row=None,
275
                         column=None, **kwargs):
276
        """Return the URL-encoded parameters for a GetTile request.
277

278
        Parameters
279
        ----------
280
        layer : string
281
            Content layer name.
282
        style : string
283
            Optional style name. Defaults to the first style defined for
284
            the relevant layer in the GetCapabilities response.
285
        format : string
286
            Optional output image format,  such as 'image/jpeg'.
287
            Defaults to the first format defined for the relevant layer
288
            in the GetCapabilities response.
289
        tilematrixset : string
290
            Optional name of tile matrix set to use.
291
            Defaults to the first tile matrix set defined for the
292
            relevant layer in the GetCapabilities response.
293
        tilematrix : string
294
            Name of the tile matrix to use.
295
        row : integer
296
            Row index of tile to request.
297
        column : integer
298
            Column index of tile to request.
299
        **kwargs : extra arguments
300
            anything else e.g. vendor specific parameters
301
        """
302

303
        if (layer is None):
×
304
            raise ValueError("layer is mandatory (cannot be None)")
×
305
        if style is None:
×
306
            style = list(self[layer].styles.keys())[0]
×
307
        if format is None:
×
308
            format = self[layer].formats[0]
×
309
        if tilematrixset is None:
×
310
            tilematrixset = sorted(self[layer].tilematrixsetlinks.keys())[0]
×
311
        if tilematrix is None:
×
312
            msg = 'tilematrix (zoom level) is mandatory (cannot be None)'
×
313
            raise ValueError(msg)
×
314
        if row is None:
×
315
            raise ValueError("row is mandatory (cannot be None)")
×
316
        if column is None:
×
317
            raise ValueError("column is mandatory (cannot be None)")
×
318

319
        request = list()
×
320
        request.append(('SERVICE', 'WMTS'))
×
321
        request.append(('REQUEST', 'GetTile'))
×
322
        request.append(('VERSION', '1.0.0'))
×
323
        request.append(('LAYER', layer))
×
324
        request.append(('STYLE', style))
×
325
        request.append(('TILEMATRIXSET', tilematrixset))
×
326
        request.append(('TILEMATRIX', tilematrix))
×
327
        request.append(('TILEROW', str(row)))
×
328
        request.append(('TILECOL', str(column)))
×
329
        request.append(('FORMAT', format))
×
330

331
        for key, value in list(kwargs.items()):
×
332
            request.append((key, value))
×
333

334
        data = urlencode(request, True)
×
335
        return data
×
336

337
    def buildTileResource(self, layer=None, style=None, format=None,
1✔
338
                          tilematrixset=None, tilematrix=None, row=None,
339
                          column=None, **kwargs):
340

341
        # check the validity of the parameters and set reasonable defaults
342
        if layer is None:
×
343
            raise ValueError("layer is mandatory (cannot be None)")
×
344
        if style is None:
×
345
            style = list(self[layer].styles.keys())[0]
×
346
        if tilematrixset is None:
×
347
            tilematrixset = sorted(self[layer].tilematrixsetlinks.keys())[0]
×
348
        if tilematrix is None:
×
349
            msg = 'tilematrix (zoom level) is mandatory (cannot be None)'
×
350
            raise ValueError(msg)
×
351
        if row is None:
×
352
            raise ValueError("row is mandatory (cannot be None)")
×
353
        if column is None:
×
354
            raise ValueError("column is mandatory (cannot be None)")
×
355

356
        tileresourceurls = []
×
357
        for resourceURL in self[layer].resourceURLs:
×
358
            if resourceURL['resourceType'] == 'tile':
×
359
                tileresourceurls.append(resourceURL)
×
360
        numres = len(tileresourceurls)
×
361
        if numres > 0:
×
362
            # choose random ResourceURL if more than one available
363
            resindex = randint(0, numres - 1)
×
364
            resurl = tileresourceurls[resindex]['template']
×
365
            resurl = resurl.replace('{TileMatrixSet}', tilematrixset)
×
366
            resurl = resurl.replace('{TileMatrix}', tilematrix)
×
367
            resurl = resurl.replace('{TileRow}', str(row))
×
368
            resurl = resurl.replace('{TileCol}', str(column))
×
369
            resurl = resurl.replace('{Style}', style)
×
370
            return resurl
×
371

372
        return None
×
373

374
    @property
1✔
375
    def restonly(self):
376

377
        # if OperationsMetadata is missing completely --> use REST
378
        if len(self.operations) == 0:
×
379
            return True
×
380

381
        # check if KVP or RESTful are available
382
        restenc = False
×
383
        kvpenc = False
×
384
        for operation in self.operations:
×
385
            if operation.name == 'GetTile':
×
386
                for method in operation.methods:
×
387
                    if 'kvp' in str(method['constraints']).lower():
×
388
                        kvpenc = True
×
389
                    if 'rest' in str(method['constraints']).lower():
×
390
                        restenc = True
×
391

392
        # if KVP is available --> use KVP
393
        if kvpenc:
×
394
            return False
×
395

396
        # if the operation has no constraint --> use KVP
397
        if not kvpenc and not restenc:
×
398
            return False
×
399

400
        return restenc
×
401

402
    def gettile(self, base_url=None, layer=None, style=None, format=None,
1✔
403
                tilematrixset=None, tilematrix=None, row=None, column=None,
404
                **kwargs):
405
        """Return a tile from the WMTS.
406

407
        Returns the tile image as a file-like object.
408

409
        Parameters
410
        ----------
411
        base_url : string
412
            Optional URL for request submission. Defaults to the URL of
413
            the GetTile operation as declared in the GetCapabilities
414
            response.
415
        layer : string
416
            Content layer name.
417
        style : string
418
            Optional style name. Defaults to the first style defined for
419
            the relevant layer in the GetCapabilities response.
420
        format : string
421
            Optional output image format,  such as 'image/jpeg'.
422
            Defaults to the first format defined for the relevant layer
423
            in the GetCapabilities response.
424
        tilematrixset : string
425
            Optional name of tile matrix set to use.
426
            Defaults to the first tile matrix set defined for the
427
            relevant layer in the GetCapabilities response.
428
        tilematrix : string
429
            Name of the tile matrix to use.
430
        row : integer
431
            Row index of tile to request.
432
        column : integer
433
            Column index of tile to request.
434
        **kwargs : extra arguments
435
            anything else e.g. vendor specific parameters
436
        """
437
        vendor_kwargs = self.vendor_kwargs or {}
×
438
        vendor_kwargs.update(kwargs)
×
439

440
        # REST only WMTS
441
        if self.restonly:
×
442
            resurl = self.buildTileResource(
×
443
                layer, style, format, tilematrixset, tilematrix,
444
                row, column, **vendor_kwargs)
445
            u = openURL(resurl, headers=self.headers, auth=self.auth, timeout=self.timeout)
×
446
            return u
×
447

448
        # KVP implemetation
449
        data = self.buildTileRequest(layer, style, format, tilematrixset,
×
450
                                     tilematrix, row, column, **vendor_kwargs)
451

452
        if base_url is None:
×
453
            base_url = self.url
×
454
            try:
×
455
                methods = self.getOperationByName('GetTile').methods
×
456
                get_verbs = [x for x in methods
×
457
                             if x.get('type').lower() == 'get']
458
                if len(get_verbs) > 1:
×
459
                    # Filter by constraints
460
                    base_url = next(
×
461
                        x for x in filter(
462
                            list,
463
                            ([pv.get('url')
464
                                for const in pv.get('constraints')
465
                                if 'kvp' in [x.lower() for x in const.values]]
466
                             for pv in get_verbs if pv.get('constraints'))))[0]
467
                elif len(get_verbs) == 1:
×
468
                    base_url = get_verbs[0].get('url')
×
469
            except StopIteration:
×
470
                pass
×
471
        u = openURL(base_url, data, headers=self.headers, auth=self.auth, timeout=self.timeout)
×
472

473
        # check for service exceptions, and return
474
        if u.info()['Content-Type'] == 'application/vnd.ogc.se_xml':
×
475
            se_xml = u.read()
×
476
            se_tree = etree.fromstring(se_xml)
×
477
            err_message = str(se_tree.find('ServiceException').text)
×
478
            raise ServiceException(err_message.strip(), se_xml)
×
479
        return u
×
480

481
    def getServiceXML(self):
1✔
482
        xml = None
×
483
        if self._capabilities is not None:
×
484
            xml = etree.tostring(self._capabilities)
×
485
        return xml
×
486

487
    def getfeatureinfo(self):
1✔
488
        raise NotImplementedError
×
489

490
    def getOperationByName(self, name):
1✔
491
        """Return a named content item."""
492
        for item in self.operations:
1✔
493
            if item.name == name:
1✔
494
                return item
1✔
495
        raise KeyError("No operation named %s" % name)
×
496

497

498
class TileMatrixSet(object):
1✔
499
    '''Holds one TileMatrixSet'''
500
    def __init__(self, elem):
1✔
501
        if elem.tag != _TILE_MATRIX_SET_TAG:
1✔
502
            raise ValueError('%s should be a TileMatrixSet' % (elem,))
×
503
        self.identifier = testXMLValue(elem.find(_IDENTIFIER_TAG)).strip()
1✔
504
        self.crs = testXMLValue(elem.find(_SUPPORTED_CRS_TAG)).strip()
1✔
505
        if self.crs is None or self.identifier is None:
1✔
506
            raise ValueError('%s incomplete TileMatrixSet' % (elem,))
×
507
        self.tilematrix = {}
1✔
508
        for tilematrix in elem.findall(_TILE_MATRIX_TAG):
1✔
509
            tm = TileMatrix(tilematrix)
1✔
510
            if tm.identifier:
1✔
511
                if tm.identifier in self.tilematrix:
1✔
512
                    raise KeyError('TileMatrix with identifier "%s" '
×
513
                                   'already exists' % tm.identifier)
514
                self.tilematrix[tm.identifier] = tm
1✔
515

516

517
class TileMatrix(object):
1✔
518
    '''Holds one TileMatrix'''
519
    def __init__(self, elem):
1✔
520
        if elem.tag != _TILE_MATRIX_TAG:
1✔
521
            raise ValueError('%s should be a TileMatrix' % (elem,))
×
522
        self.identifier = testXMLValue(elem.find(_IDENTIFIER_TAG)).strip()
1✔
523
        sd = testXMLValue(elem.find(_SCALE_DENOMINATOR_TAG))
1✔
524
        if sd is None:
1✔
525
            raise ValueError('%s is missing ScaleDenominator' % (elem,))
×
526
        self.scaledenominator = float(sd)
1✔
527
        tl = testXMLValue(elem.find(_TOP_LEFT_CORNER_TAG))
1✔
528
        if tl is None:
1✔
529
            raise ValueError('%s is missing TopLeftCorner' % (elem,))
×
530
        (lon, lat) = tl.split(" ")
1✔
531
        self.topleftcorner = (float(lon), float(lat))
1✔
532
        width = testXMLValue(elem.find(_TILE_WIDTH_TAG))
1✔
533
        height = testXMLValue(elem.find(_TILE_HEIGHT_TAG))
1✔
534
        if (width is None) or (height is None):
1✔
535
            msg = '%s is missing TileWidth and/or TileHeight' % (elem,)
×
536
            raise ValueError(msg)
×
537
        self.tilewidth = int(width)
1✔
538
        self.tileheight = int(height)
1✔
539
        mw = testXMLValue(elem.find(_MATRIX_WIDTH_TAG))
1✔
540
        mh = testXMLValue(elem.find(_MATRIX_HEIGHT_TAG))
1✔
541
        if (mw is None) or (mh is None):
1✔
542
            msg = '%s is missing MatrixWidth and/or MatrixHeight' % (elem,)
×
543
            raise ValueError(msg)
×
544
        self.matrixwidth = int(mw)
1✔
545
        self.matrixheight = int(mh)
1✔
546

547

548
class Theme:
1✔
549
    """
550
    Abstraction for a WMTS theme
551
    """
552
    def __init__(self, elem):
1✔
553
        if elem.tag != _THEME_TAG:
×
554
            raise ValueError('%s should be a Theme' % (elem,))
×
555
        self.identifier = testXMLValue(elem.find(_IDENTIFIER_TAG)).strip()
×
556
        title = testXMLValue(elem.find(_TITLE_TAG))
×
557
        if title is not None:
×
558
            self.title = title.strip()
×
559
        else:
560
            self.title = None
×
561
        abstract = testXMLValue(elem.find(_ABSTRACT_TAG))
×
562
        if abstract is not None:
×
563
            self.abstract = abstract.strip()
×
564
        else:
565
            self.abstract = None
×
566

567
        self.layerRefs = []
×
568
        layerRefs = elem.findall(_LAYER_REF_TAG)
×
569
        for layerRef in layerRefs:
×
570
            if layerRef.text is not None:
×
571
                self.layerRefs.append(layerRef.text)
×
572

573

574
class TileMatrixLimits(object):
1✔
575
    """
576
    Represents a WMTS TileMatrixLimits element.
577

578
    """
579
    def __init__(self, elem):
1✔
580
        if elem.tag != _TILE_MATRIX_LIMITS_TAG:
1✔
581
            raise ValueError('%s should be a TileMatrixLimits' % elem)
×
582

583
        tm = elem.find(_TILE_MATRIX_TAG)
1✔
584
        if tm is None:
1✔
585
            raise ValueError('Missing TileMatrix in %s' % elem)
×
586
        self.tilematrix = tm.text.strip()
1✔
587

588
        self.mintilerow = getXMLInteger(elem, _MIN_TILE_ROW_TAG)
1✔
589
        self.maxtilerow = getXMLInteger(elem, _MAX_TILE_ROW_TAG)
1✔
590
        self.mintilecol = getXMLInteger(elem, _MIN_TILE_COL_TAG)
1✔
591
        self.maxtilecol = getXMLInteger(elem, _MAX_TILE_COL_TAG)
1✔
592

593
    def __repr__(self):
1✔
594
        fmt = ('<TileMatrixLimits: {self.tilematrix}'
×
595
               ', minRow={self.mintilerow}, maxRow={self.maxtilerow}'
596
               ', minCol={self.mintilecol}, maxCol={self.maxtilecol}>')
597
        return fmt.format(self=self)
×
598

599

600
class TileMatrixSetLink(object):
1✔
601
    """
602
    Represents a WMTS TileMatrixSetLink element.
603

604
    """
605
    @staticmethod
1✔
606
    def from_elements(link_elements):
607
        """
608
        Return a list of TileMatrixSetLink instances derived from the
609
        given list of <TileMatrixSetLink> XML elements.
610

611
        """
612
        # NB. The WMTS spec is contradictory re. the multiplicity
613
        # relationships between Layer and TileMatrixSetLink, and
614
        # TileMatrixSetLink and tileMatrixSet (URI).
615
        # Try to figure out which model has been used by the server.
616
        links = []
1✔
617
        for link_element in link_elements:
1✔
618
            matrix_set_elements = link_element.findall(_TILE_MATRIX_SET_TAG)
1✔
619
            if len(matrix_set_elements) == 0:
1✔
620
                raise ValueError('Missing TileMatrixSet in %s' % link_element)
×
621
            elif len(matrix_set_elements) > 1:
1✔
622
                set_limits_elements = link_element.findall(
×
623
                    _TILE_MATRIX_SET_LIMITS_TAG)
624
                if set_limits_elements:
×
625
                    raise ValueError('Multiple instances of TileMatrixSet'
×
626
                                     ' plus TileMatrixSetLimits in %s' %
627
                                     link_element)
628
                for matrix_set_element in matrix_set_elements:
×
629
                    uri = matrix_set_element.text.strip()
×
630
                    links.append(TileMatrixSetLink(uri))
×
631
            else:
632
                uri = matrix_set_elements[0].text.strip()
1✔
633

634
                tilematrixlimits = {}
1✔
635
                path = '%s/%s' % (_TILE_MATRIX_SET_LIMITS_TAG,
1✔
636
                                  _TILE_MATRIX_LIMITS_TAG)
637
                for limits_element in link_element.findall(path):
1✔
638
                    tml = TileMatrixLimits(limits_element)
1✔
639
                    if tml.tilematrix:
1✔
640
                        if tml.tilematrix in tilematrixlimits:
1✔
641
                            msg = ('TileMatrixLimits with tileMatrix "%s" '
×
642
                                   'already exists' % tml.tilematrix)
643
                            warnings.warn(msg, RuntimeWarning)
×
644
                        tilematrixlimits[tml.tilematrix] = tml
1✔
645

646
                links.append(TileMatrixSetLink(uri, tilematrixlimits))
1✔
647
        return links
1✔
648

649
    def __init__(self, tilematrixset, tilematrixlimits=None):
1✔
650
        self.tilematrixset = tilematrixset
1✔
651

652
        if tilematrixlimits is None:
1✔
653
            self.tilematrixlimits = {}
×
654
        else:
655
            self.tilematrixlimits = tilematrixlimits
1✔
656

657
    def __repr__(self):
1✔
658
        fmt = ('<TileMatrixSetLink: {self.tilematrixset}'
1✔
659
               ', tilematrixlimits={{...}}>')
660
        return fmt.format(self=self)
1✔
661

662

663
class BoundingBox(object):
1✔
664
    """
665
    Represents a BoundingBox element
666
    """
667

668
    def __init__(self, elem) -> None:
1✔
669
        if elem.tag != _BOUNDING_BOX_TAG:
×
670
            raise ValueError('%s should be a BoundingBox' % elem)
×
671

672
        lc = elem.find(_LOWER_CORNER_TAG)
×
673
        uc = elem.find(_UPPER_CORNER_TAG)
×
674

675
        self.ll = [float(s) for s in lc.text.split()]
×
676
        self.ur = [float(s) for s in uc.text.split()]
×
677

678
        self.crs = elem.attrib.get('crs')
×
679
        self.extent = (self.ll[0], self.ll[1], self.ur[0], self.ur[1])
×
680

681
    def __repr__(self):
1✔
682
        fmt = ('<BoundingBox'
×
683
               ', crs={self.crs}'
684
               ', extent={self.extent}>')
685
        return fmt.format(self=self)
×
686

687

688
class ContentMetadata:
1✔
689
    """
690
    Abstraction for WMTS layer metadata.
691

692
    Implements IContentMetadata.
693
    """
694
    def __init__(self, elem, parent=None, index=0, parse_remote_metadata=False):
1✔
695
        if elem.tag != _LAYER_TAG:
1✔
696
            raise ValueError('%s should be a Layer' % (elem,))
×
697

698
        self.parent = parent
1✔
699
        if parent:
1✔
700
            self.index = "%s.%d" % (parent.index, index)
×
701
        else:
702
            self.index = str(index)
1✔
703

704
        self.id = self.name = testXMLValue(elem.find(_IDENTIFIER_TAG))
1✔
705
        # title is mandatory property
706
        self.title = None
1✔
707
        title = testXMLValue(elem.find(_TITLE_TAG))
1✔
708
        if title is not None:
1✔
709
            self.title = title.strip()
1✔
710

711
        self.abstract = testXMLValue(elem.find(_ABSTRACT_TAG))
1✔
712

713
        # Bounding boxes
714
        # There may be multiple, using different CRSes
715
        self.boundingBox = []
1✔
716

717
        bbs = elem.findall(_BOUNDING_BOX_TAG)
1✔
718
        for b in bbs:
1✔
719
            self.boundingBox.append(BoundingBox(b))
×
720

721
        # WGS84 Bounding box
722
        b = elem.find(_WGS84_BOUNDING_BOX_TAG)
1✔
723
        if b is not None:
1✔
724
            lc = b.find(_LOWER_CORNER_TAG)
1✔
725
            uc = b.find(_UPPER_CORNER_TAG)
1✔
726
            ll = [float(s) for s in lc.text.split()]
1✔
727
            ur = [float(s) for s in uc.text.split()]
1✔
728
            self.boundingBoxWGS84 = (ll[0], ll[1], ur[0], ur[1])
1✔
729
        # TODO: there is probably some more logic here, and it should
730
        # probably be shared code
731

732
        self._tilematrixsets = [f.text.strip() for f in
1✔
733
                                elem.findall(_TILE_MATRIX_SET_LINK_TAG + '/' + _TILE_MATRIX_SET_TAG)]
734

735
        link_elements = elem.findall(_TILE_MATRIX_SET_LINK_TAG)
1✔
736
        tile_matrix_set_links = TileMatrixSetLink.from_elements(link_elements)
1✔
737
        self.tilematrixsetlinks = {}
1✔
738
        for tmsl in tile_matrix_set_links:
1✔
739
            if tmsl.tilematrixset:
1✔
740
                if tmsl.tilematrixset in self.tilematrixsetlinks:
1✔
741
                    raise KeyError('TileMatrixSetLink with tilematrixset "%s"'
×
742
                                   ' already exists' %
743
                                   tmsl.tilematrixset)
744
                self.tilematrixsetlinks[tmsl.tilematrixset] = tmsl
1✔
745

746
        self.resourceURLs = []
1✔
747
        for resourceURL in elem.findall(_RESOURCE_URL_TAG):
1✔
748
            resource = {}
×
749
            for attrib in ['format', 'resourceType', 'template']:
×
750
                resource[attrib] = resourceURL.attrib[attrib]
×
751
            self.resourceURLs.append(resource)
×
752

753
        # Styles
754
        self.styles = {}
1✔
755
        for s in elem.findall(_STYLE_TAG):
1✔
756
            style = {}
1✔
757
            isdefaulttext = s.attrib.get('isDefault')
1✔
758
            style['isDefault'] = (isdefaulttext == "true")
1✔
759
            identifier = s.find(_IDENTIFIER_TAG)  # one and mandatory
1✔
760
            if identifier is None:
1✔
761
                raise ValueError('%s missing identifier' % (s,))
×
762

763
            title = s.find(_TITLE_TAG)
1✔
764
            if title is not None:
1✔
765
                style['title'] = testXMLValue(title)
1✔
766

767
            abstract = s.find(_ABSTRACT_TAG)
1✔
768
            if abstract is not None:
1✔
769
                style['abstract'] = testXMLValue(abstract)
×
770

771
            legendURL = s.find(_STYLE_LEGEND_URL)
1✔
772
            if legendURL is not None:
1✔
773
                style['legend'] = legendURL.attrib[_HREF_TAG]
×
774
                if 'width' in list(legendURL.attrib.keys()):
×
775
                    style['width'] = legendURL.attrib.get('width')
×
776
                if 'height' in list(legendURL.attrib.keys()):
×
777
                    style['height'] = legendURL.attrib.get('height')
×
778
                if 'format' in list(legendURL.attrib.keys()):
×
779
                    style['format'] = legendURL.attrib.get('format')
×
780

781
            keywords = [f.text for f in s.findall(
1✔
782
                        _KEYWORDS_TAG + '/' + _KEYWORD_TAG)]
783
            if keywords:  # keywords is a list []
1✔
784
                style['keywords'] = keywords
×
785

786
            self.styles[identifier.text] = style
1✔
787

788
        self.formats = [f.text for f in elem.findall(_FORMAT_TAG)]
1✔
789

790
        self.keywords = [f.text for f in elem.findall(
1✔
791
                         _KEYWORDS_TAG + '/' + _KEYWORD_TAG)]
792
        self.infoformats = [f.text for f in elem.findall(_INFO_FORMAT_TAG)]
1✔
793

794
        self.layers = []
1✔
795
        for child in elem.findall(_LAYER_TAG):
1✔
796
            self.layers.append(ContentMetadata(child, self))
×
797

798
    @property
1✔
799
    def tilematrixsets(self):
800
        # NB. This attribute has been superseeded by the
801
        # `tilematrixsetlinks` attribute defined below, but is included
802
        # for now to provide continuity.
803
        warnings.warn("The 'tilematrixsets' attribute has been deprecated"
1✔
804
                      " and will be removed in a future version of OWSLib."
805
                      " Please use 'tilematrixsetlinks' instead.")
806
        return self._tilematrixsets
1✔
807

808
    def __str__(self):
1✔
809
        return 'Layer Name: %s Title: %s' % (self.name, self.title)
×
810

811

812
class WMTSCapabilitiesReader:
1✔
813
    """Read and parse capabilities document into a lxml.etree infoset
814
    """
815

816
    def __init__(self, version='1.0.0', url=None, un=None, pw=None, headers=None, auth=None):
1✔
817
        """Initialize"""
818
        self.version = version
1✔
819
        self._infoset = None
1✔
820
        self.url = url
1✔
821
        if auth:
1✔
822
            if un:
1✔
823
                auth.username = un
×
824
            if pw:
1✔
825
                auth.password = pw
×
826
        self.auth = auth or Authentication(un, pw)
1✔
827
        self.headers = headers
1✔
828

829
    def capabilities_url(self, service_url, vendor_kwargs=None):
1✔
830
        """Return a capabilities url
831
        """
832
        # Ensure the 'service', 'request', and 'version' parameters,
833
        # and any vendor-specific parameters are included in the URL.
834
        pieces = urlparse(service_url)
1✔
835
        args = parse_qs(pieces.query)
1✔
836
        if 'service' not in args:
1✔
837
            args['service'] = 'WMTS'
1✔
838
        if 'request' not in args:
1✔
839
            args['request'] = 'GetCapabilities'
1✔
840
        if 'version' not in args:
1✔
841
            args['version'] = self.version
1✔
842
        if vendor_kwargs:
1✔
843
            args.update(vendor_kwargs)
×
844
        query = urlencode(args, doseq=True)
1✔
845
        pieces = ParseResult(pieces.scheme, pieces.netloc,
1✔
846
                             pieces.path, pieces.params,
847
                             query, pieces.fragment)
848
        return urlunparse(pieces)
1✔
849

850
    def read(self, service_url, vendor_kwargs=None):
1✔
851
        """Get and parse a WMTS capabilities document, returning an
852
        elementtree instance
853

854
        service_url is the base url, to which is appended the service,
855
        version, and request parameters. Optional vendor-specific
856
        parameters can also be supplied as a dict.
857
        """
858
        getcaprequest = self.capabilities_url(service_url, vendor_kwargs)
1✔
859

860
        # now split it up again to use the generic openURL function...
861
        spliturl = getcaprequest.split('?')
1✔
862
        u = openURL(spliturl[0], spliturl[1], method='Get', headers=self.headers, auth=self.auth)
1✔
863
        return etree.fromstring(u.read())
1✔
864

865
    def readString(self, st):
1✔
866
        """Parse a WMTS capabilities document, returning an elementtree instance
867

868
        string should be an XML capabilities document
869
        """
870
        if not isinstance(st, str) and not isinstance(st, bytes):
1✔
871
            msg = 'String must be of type string or bytes, not %s' % type(st)
×
872
            raise ValueError(msg)
×
873
        return etree.fromstring(st)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc