• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geopython / OWSLib / 11628884181

01 Nov 2024 11:49AM UTC coverage: 58.814% (-1.3%) from 60.156%
11628884181

Pull #548

github

web-flow
Merge 4b9b7cf1f into ae98c2039
Pull Request #548: Strip out redundant __new__ to enable object pickling

8364 of 14221 relevant lines covered (58.81%)

1.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

16.33
/owslib/coverage/wcs110.py
1
# -*- coding: ISO-8859-15 -*-
2
# =============================================================================
3
# Copyright (c) 2004, 2006 Sean C. Gillies
4
# Copyright (c) 2007 STFC <http://www.stfc.ac.uk>
5
#
6
# Authors:
7
#          Dominic Lowe <d.lowe@rl.ac.uk>
8
#
9
# Contact email: d.lowe@rl.ac.uk
10
# =============================================================================
11

12
# NOTE: Does not conform to new interfaces yet #################
13

14
import errno
2✔
15
import logging
2✔
16
import os
2✔
17
from urllib.parse import urlencode
2✔
18

19
from .wcsBase import WCSBase, WCSCapabilitiesReader, ServiceException
2✔
20
from owslib.etree import etree
2✔
21
from owslib.coverage import wcsdecoder
2✔
22
from owslib.crs import Crs
2✔
23
from owslib.util import openURL, testXMLValue
2✔
24

25
LOGGER = logging.getLogger(__name__)
2✔
26

27

28
class Namespaces_1_1_0():
2✔
29

30
    def WCS(self, tag):
2✔
31
        return '{http://www.opengis.net/wcs/1.1}' + tag
×
32

33
    def WCS_OWS(self, tag):
2✔
34
        return '{http://www.opengis.net/wcs/1.1/ows}' + tag
×
35

36
    def OWS(self, tag):
2✔
37
        return '{http://www.opengis.net/ows}' + tag
×
38

39

40
class WebCoverageService_1_1_0(WCSBase):
2✔
41
    """Abstraction for OGC Web Coverage Service (WCS), version 1.1.0
42
    Implements IWebCoverageService.
43
    """
44

45
    version = '1.1.0'
2✔
46
    ns = Namespaces_1_1_0()
2✔
47

48
    def __getitem__(self, name):
2✔
49
        ''' check contents dictionary to allow dict like access to service layers'''
50
        if name in list(self.__getattribute__('contents').keys()):
×
51
            return self.__getattribute__('contents')[name]
×
52
        else:
53
            raise KeyError("No content named %s" % name)
×
54

55
    def __init__(self, url, xml, cookies, auth=None, timeout=30, headers=None):
2✔
56
        super(WebCoverageService_1_1_0, self).__init__(auth=auth, timeout=timeout, headers=headers)
×
57

58
        self.url = url
×
59
        self.cookies = cookies
×
60
        self.timeout = timeout
×
61
        # initialize from saved capability document or access the server
62
        reader = WCSCapabilitiesReader(self.version, self.cookies, self.auth, headers=self.headers)
×
63
        if xml:
×
64
            self._capabilities = reader.readString(xml)
×
65
        else:
66
            self._capabilities = reader.read(self.url, self.timeout)
×
67

68
        # check for exceptions
69
        se = self._capabilities.find(self.ns.OWS('Exception'))
×
70

71
        if se is not None:
×
72
            err_message = str(se.text).strip()
×
73
            raise ServiceException(err_message, xml)
×
74

75
        # build metadata objects:
76

77
        self.updateSequence = self._capabilities.attrib.get('updateSequence')
×
78

79
        # serviceIdentification metadata
80
        elem = self._capabilities.find(self.ns.WCS_OWS('ServiceIdentification'))
×
81
        if elem is None:
×
82
            elem = self._capabilities.find(self.ns.OWS('ServiceIdentification'))
×
83
        self.identification = ServiceIdentification(elem, self.ns)
×
84

85
        # serviceProvider
86
        elem = self._capabilities.find(self.ns.OWS('ServiceProvider')) or self._capabilities.find(self.ns.OWS('ServiceProvider'))  # noqa
×
87
        self.provider = ServiceProvider(elem, self.ns)
×
88

89
        # serviceOperations
90
        self.operations = []
×
91
        for elem in self._capabilities.findall(
×
92
                self.ns.OWS('OperationsMetadata') + '/' + self.ns.OWS('Operation')):
93
            self.operations.append(Operation(elem, self.ns))
×
94

95
        # exceptions - ***********TO DO *************
96
            self.exceptions = [f.text for f in self._capabilities.findall('Capability/Exception/Format')]
×
97

98
        # serviceContents: our assumption is that services use a top-level layer
99
        # as a metadata organizer, nothing more.
100
        self.contents = {}
×
101
        top = self._capabilities.find(self.ns.WCS('Contents') + '/' + self.ns.WCS('CoverageSummary'))
×
102
        for elem in self._capabilities.findall(
×
103
                self.ns.WCS('Contents') + '/' + self.ns.WCS('CoverageSummary') + '/' + self.ns.WCS('CoverageSummary')):
104
            cm = ContentMetadata(elem, top, self, self.ns)
×
105
            self.contents[cm.id] = cm
×
106

107
        if self.contents == {}:
×
108
            # non-hierarchical.
109
            top = None
×
110
            for elem in self._capabilities.findall(self.ns.WCS('Contents') + '/' + self.ns.WCS('CoverageSummary')):
×
111
                cm = ContentMetadata(elem, top, self, self.ns)
×
112
                # make the describeCoverage requests to populate the supported formats/crs attributes
113
                self.contents[cm.id] = cm
×
114

115
    def items(self):
2✔
116
        '''supports dict-like items() access'''
117
        items = []
×
118
        for item in self.contents:
×
119
            items.append((item, self.contents[item]))
×
120
        return items
×
121

122
    # TO DECIDE: Offer repackaging of coverageXML/Multipart MIME output?
123
    # def getData(self, directory='outputdir', outputfile='coverage.nc',  **kwargs):
124
    #     u=self.getCoverageRequest(**kwargs)
125
    #     #create the directory if it doesn't exist:
126
    #     try:
127
    #         os.mkdir(directory)
128
    #     except OSError, e:
129
    #         # Ignore directory exists error
130
    #         if e.errno <> errno.EEXIST:
131
    #             raise
132
    #     #elif wcs.version=='1.1.0':
133
    #     #Could be multipart mime or XML Coverages document, need to use the decoder...
134
    #     decoder=wcsdecoder.WCSDecoder(u)
135
    #     x=decoder.getCoverages()
136
    #     if type(x) is wcsdecoder.MpartMime:
137
    #         filenames=x.unpackToDir(directory)
138
    #         #print 'Files from 1.1.0 service written to %s directory'%(directory)
139
    #     else:
140
    #         filenames=x
141
    #     return filenames
142

143
    # TO DO: Handle rest of the  WCS 1.1.0 keyword parameters e.g. GridCRS etc.
144
    def getCoverage(self, identifier=None, bbox=None, time=None, format=None, store=False, rangesubset=None,
2✔
145
                    gridbaseCRS=None, gridtype=None, gridCS=None, gridorigin=None, gridoffsets=None,
146
                    method='Get', timeout=30, **kwargs):
147
        """Request and return a coverage from the WCS as a file-like object
148
        note: additional **kwargs helps with multi-version implementation
149
        core keyword arguments should be supported cross version
150
        example:
151
        cvg=wcs.getCoverageRequest(identifier=['TuMYrRQ4'], time=['2792-06-01T00:00:00.0'], bbox=(-112,36,-106,41),
152
                                   format='application/netcdf', store='true')
153

154
        is equivalent to:
155
        http://myhost/mywcs?SERVICE=WCS&REQUEST=GetCoverage&IDENTIFIER=TuMYrRQ4&VERSION=1.1.0&BOUNDINGBOX=-180,-90,180,90&TIMESEQUENCE=2792-06-01T00:00:00.0&FORMAT=application/netcdf
156

157
        if store = true, returns a coverages XML file
158
        if store = false, returns a multipart mime
159
        """
160
        msg = 'WCS 1.1.0 DEBUG: Parameters passed to GetCoverage: identifier={}, bbox={}, time={}, format={}, rangesubset={}, gridbaseCRS={}, gridtype={}, gridCS={}, gridorigin={}, gridoffsets={}, method={}, other_arguments={}'  # noqa
×
161
        LOGGER.debug(msg.format(
×
162
            identifier, bbox, time, format, rangesubset, gridbaseCRS, gridtype, gridCS,
163
            gridorigin, gridoffsets, method, str(kwargs)))
164

165
        if method == 'Get':
×
166
            method = self.ns.WCS_OWS('Get')
×
167
        try:
×
168
            base_url = next((m.get('url') for m in self.getOperationByName('GetCoverage').methods
×
169
                            if m.get('type').lower() == method.lower()))
170
        except StopIteration:
×
171
            base_url = self.url
×
172

173
        # process kwargs
174
        request = {'version': self.version, 'request': 'GetCoverage', 'service': 'WCS'}
×
175
        assert len(identifier) > 0
×
176
        request['identifier'] = identifier
×
177
        # request['identifier'] = ','.join(identifier)
178
        if bbox:
×
179
            request['boundingbox'] = ','.join([repr(x) for x in bbox])
×
180
        if time:
×
181
            request['timesequence'] = ','.join(time)
×
182
        request['format'] = format
×
183
        request['store'] = store
×
184

185
        # rangesubset: untested - require a server implementation
186
        if rangesubset:
×
187
            request['RangeSubset'] = rangesubset
×
188

189
        # GridCRS structure: untested - require a server implementation
190
        if gridbaseCRS:
×
191
            request['gridbaseCRS'] = gridbaseCRS
×
192
        if gridtype:
×
193
            request['gridtype'] = gridtype
×
194
        if gridCS:
×
195
            request['gridCS'] = gridCS
×
196
        if gridorigin:
×
197
            request['gridorigin'] = gridorigin
×
198
        if gridoffsets:
×
199
            request['gridoffsets'] = gridoffsets
×
200

201
        # anything else e.g. vendor specific parameters must go through kwargs
202
        if kwargs:
×
203
            for kw in kwargs:
×
204
                request[kw] = kwargs[kw]
×
205

206
        # encode and request
207
        data = urlencode(request)
×
208

209
        u = openURL(base_url, data, method, self.cookies, auth=self.auth, timeout=timeout, headers=self.headers)
×
210
        return u
×
211

212
    def getOperationByName(self, name):
2✔
213
        """Return a named operation item."""
214
        for item in self.operations:
×
215
            if item.name == name:
×
216
                return item
×
217
        raise KeyError("No operation named %s" % name)
×
218

219

220
class Operation(object):
2✔
221
    """Abstraction for operation metadata
222
    Implements IOperationMetadata.
223
    """
224
    ns = Namespaces_1_1_0()
2✔
225

226
    def __init__(self, elem, nmSpc):
2✔
227

228
        self.name = elem.get('name')
×
229
        self.formatOptions = [f.text for f in elem.findall(nmSpc.WCS_OWS('Parameter') + '/' + nmSpc.WCS_OWS('AllowedValues') + '/' + nmSpc.WCS_OWS('Value'))]  # noqa
×
230
        methods = []
×
231
        for verb in elem.findall(nmSpc.WCS_OWS('DCP') + '/' + nmSpc.WCS_OWS('HTTP/*')):
×
232
            url = verb.attrib['{http://www.w3.org/1999/xlink}href']
×
233
            methods.append((verb.tag, {'url': url}))
×
234
        self.methods = dict(methods)
×
235

236

237
class ServiceIdentification(object):
2✔
238
    """ Abstraction for ServiceIdentification Metadata
239
    implements IServiceIdentificationMetadata"""
240

241
    def __init__(self, elem, nmSpc):
2✔
242
        self.service = "WCS"
×
243

244
        self.title = testXMLValue(elem.find(nmSpc.OWS('Title')))
×
245
        if self.title is None:  # may have used the wcs ows namespace:
×
246
            self.title = testXMLValue(elem.find(nmSpc.WCS_OWS('Title')))
×
247
        if self.title is None:  # may have used the other wcs ows namespace:
×
248
            self.title = testXMLValue(elem.find(nmSpc.OWS('Title')))
×
249

250
        self.abstract = testXMLValue(elem.find(nmSpc.OWS('Abstract')))
×
251
        if self.abstract is None:  # may have used the wcs ows namespace:
×
252
            self.abstract = testXMLValue(elem.find(nmSpc.WCS_OWS('Abstract')))
×
253
        if self.title is None:  # may have used the other wcs ows namespace:
×
254
            self.title = testXMLValue(elem.find(nmSpc.OWS('Abstract')))
×
255
        if elem.find(nmSpc.OWS('Abstract')) is not None:
×
256
            self.abstract = elem.find(nmSpc.OWS('Abstract')).text
×
257
        else:
258
            self.abstract = None
×
259
        self.keywords = [f.text for f in elem.findall(nmSpc.OWS('Keywords') + '/' + nmSpc.OWS('Keyword'))]
×
260
        # self.link = elem.find(nmSpc.WCS('Service') + '/' + nmSpc.WCS('OnlineResource')).attrib.get('{http://www.w3.org/1999/xlink}href', '')  # noqa
261

262
        if elem.find(nmSpc.OWS('ServiceType')) is not None:
×
263
            self.type = elem.find(nmSpc.OWS('ServiceType')).text
×
264
        if elem.find(nmSpc.OWS('ServiceTypeVersion')) is not None:
×
265
            self.version = elem.find(nmSpc.OWS('ServiceTypeVersion')).text
×
266

267
        if elem.find(nmSpc.WCS_OWS('Fees')) is not None:
×
268
            self.fees = elem.find(nmSpc.WCS_OWS('Fees')).text
×
269
        else:
270
            self.fees = elem.find(nmSpc.OWS('Fees')).text
×
271

272
        if elem.find(nmSpc.WCS_OWS('AccessConstraints')) is not None:
×
273
            self.accessConstraints = elem.find(nmSpc.WCS_OWS('AccessConstraints')).text
×
274
        else:
275
            self.accessConstraints = elem.find(nmSpc.OWS('AccessConstraints')).text
×
276

277

278
class ServiceProvider(object):
2✔
279
    """ Abstraction for ServiceProvider metadata
280
    implements IServiceProviderMetadata """
281

282
    def __init__(self, elem, nmSpc):
2✔
283
        name = elem.find(nmSpc.OWS('ProviderName'))
×
284

285
        if name is not None:
×
286
            self.name = name.text
×
287
        else:
288
            self.name = None
×
289

290
        # self.contact=ServiceContact(elem.find(nmSpc.OWS('ServiceContact')))
291
        self.contact = ContactMetadata(elem, nmSpc)
×
292
        self.url = self.name  # no obvious definitive place for url in wcs, repeat provider name?
×
293

294

295
class ContactMetadata(object):
2✔
296
    ''' implements IContactMetadata'''
297

298
    def __init__(self, elem, nmSpc):
2✔
299
        try:
×
300
            self.name = elem.find(nmSpc.OWS('ServiceContact') + '/' + nmSpc.OWS('IndividualName')).text
×
301
        except AttributeError:
×
302
            self.name = None
×
303

304
        try:
×
305
            self.organization = elem.find(nmSpc.OWS('ProviderName')).text
×
306
        except AttributeError:
×
307
            self.organization = None
×
308

309
        try:
×
310
            self.address = elem.find(
×
311
                nmSpc.OWS('ServiceContact') + '/' + nmSpc.OWS('ContactInfo') + '/' + nmSpc.OWS('Address') + '/' + nmSpc.OWS('DeliveryPoint')).text  # noqa
312
        except AttributeError:
×
313
            self.address = None
×
314
        try:
×
315
            self.city = elem.find(
×
316
                nmSpc.OWS('ServiceContact') + '/' + nmSpc.OWS('ContactInfo') + '/' + nmSpc.OWS('Address') + '/' + nmSpc.OWS('City')).text  # noqa
317
        except AttributeError:
×
318
            self.city = None
×
319

320
        try:
×
321
            self.region = elem.find(
×
322
                nmSpc.OWS('ServiceContact') + '/' + nmSpc.OWS('ContactInfo') + '/' + nmSpc.OWS('Address') + '/' + nmSpc.OWS('AdministrativeArea')).text  # noqa
323
        except AttributeError:
×
324
            self.region = None
×
325

326
        try:
×
327
            self.postcode = elem.find(
×
328
                nmSpc.OWS('ServiceContact') + '/' + nmSpc.OWS('ContactInfo') + '/' + nmSpc.OWS('Address') + '/' + nmSpc.OWS('PostalCode')).text  # noqa
329
        except AttributeError:
×
330
            self.postcode = None
×
331

332
        try:
×
333
            self.country = elem.find(
×
334
                nmSpc.OWS('ServiceContact') + '/' + nmSpc.OWS('ContactInfo') + '/' + nmSpc.OWS('Address') + '/' + nmSpc.OWS('Country')).text  # noqa
335
        except AttributeError:
×
336
            self.country = None
×
337

338
        try:
×
339
            self.email = elem.find(
×
340
                nmSpc.OWS('ServiceContact') + '/' + nmSpc.OWS('ContactInfo') + '/' + nmSpc.OWS('Address') + '/' + nmSpc.OWS('ElectronicMailAddress')).text  # noqa
341
        except AttributeError:
×
342
            self.email = None
×
343

344

345
class ContentMetadata(object):
2✔
346
    """Abstraction for WCS ContentMetadata
347
    Implements IContentMetadata
348
    """
349

350
    def __init__(self, elem, parent, service, nmSpc):
2✔
351
        """Initialize."""
352
        # TODO - examine the parent for bounding box info.
353

354
        self._service = service
×
355
        self._elem = elem
×
356
        self._parent = parent
×
357
        self.id = self._checkChildAndParent(nmSpc.WCS('Identifier'))
×
358
        self.description = self._checkChildAndParent(nmSpc.WCS('Description'))
×
359
        self.title = self._checkChildAndParent(nmSpc.OWS('Title'))
×
360
        self.abstract = self._checkChildAndParent(nmSpc.OWS('Abstract'))
×
361

362
        # keywords.
363
        self.keywords = []
×
364
        for kw in elem.findall(nmSpc.OWS('Keywords') + '/' + nmSpc.OWS('Keyword')):
×
365
            if kw is not None:
×
366
                self.keywords.append(kw.text)
×
367

368
        # also inherit any keywords from parent coverage summary (if there is one)
369
        if parent is not None:
×
370
            for kw in parent.findall(nmSpc.OWS('Keywords') + '/' + nmSpc.OWS('Keyword')):
×
371
                if kw is not None:
×
372
                    self.keywords.append(kw.text)
×
373

374
        self.boundingBox = None  # needed for iContentMetadata harmonisation
×
375
        self.boundingBoxWGS84 = None
×
376
        b = elem.find(nmSpc.OWS('WGS84BoundingBox'))
×
377
        if b is not None:
×
378
            lc = b.find(nmSpc.OWS('LowerCorner')).text
×
379
            uc = b.find(nmSpc.OWS('UpperCorner')).text
×
380
            self.boundingBoxWGS84 = (
×
381
                float(lc.split()[0]), float(lc.split()[1]),
382
                float(uc.split()[0]), float(uc.split()[1]),
383
            )
384

385
        # bboxes - other CRS
386
        self.boundingboxes = []
×
387
        for bbox in elem.findall(nmSpc.OWS('BoundingBox')):
×
388
            if bbox is not None:
×
389
                try:
×
390
                    lc = b.find(nmSpc.OWS('LowerCorner')).text
×
391
                    uc = b.find(nmSpc.OWS('UpperCorner')).text
×
392
                    boundingBox = (
×
393
                        float(lc.split()[0]), float(lc.split()[1]),
394
                        float(uc.split()[0]), float(uc.split()[1]),
395
                        b.attrib['crs'])
396
                    self.boundingboxes.append(boundingBox)
×
397
                except Exception:
×
398
                    pass
×
399

400
        # others not used but needed for iContentMetadata harmonisation
401
        self.styles = None
×
402
        self.crsOptions = None
×
403

404
        # SupportedCRS
405
        self.supportedCRS = []
×
406
        for crs in elem.findall(nmSpc.WCS('SupportedCRS')):
×
407
            self.supportedCRS.append(Crs(crs.text))
×
408

409
        # SupportedFormats
410
        self.supportedFormats = []
×
411
        for format in elem.findall(nmSpc.WCS('SupportedFormat')):
×
412
            self.supportedFormats.append(format.text)
×
413

414
    # grid is either a gml:Grid or a gml:RectifiedGrid if supplied as part of the DescribeCoverage response.
415
    def _getGrid(self):
2✔
416
        grid = None
×
417
        # TODO- convert this to 1.1 from 1.0
418
        # if not hasattr(self, 'descCov'):
419
        #         self.descCov=self._service.getDescribeCoverage(self.id)
420
        # gridelem= self.descCov.find(ns('CoverageOffering/')+ns('domainSet/')+ns('spatialDomain/')+'{http://www.opengis.net/gml}RectifiedGrid')  # noqa
421
        # if gridelem is not None:
422
        #     grid=RectifiedGrid(gridelem)
423
        # else:
424
        #     gridelem=self.descCov.find(ns('CoverageOffering/')+ns('domainSet/')+ns('spatialDomain/')+'{http://www.opengis.net/gml}Grid')
425
        #     grid=Grid(gridelem)
426
        return grid
×
427
    grid = property(_getGrid, None)
2✔
428

429
    # time limits/postions require a describeCoverage request therefore only resolve when requested
430
    def _getTimeLimits(self):
2✔
431
        timelimits = []
×
432
        # TODO- convert this to 1.1 from 1.0
433
        # for elem in self._service.getDescribeCoverage(self.id).findall(
434
        #         ns('CoverageDescription/') + ns('Domain/') + ns('TemporalDomain/') + ns('TimePeriod/')):
435
        #     subelems = elem.getchildren()
436
        #     timelimits = [subelems[0].text, subelems[1].text]
437
        return timelimits
×
438
    timelimits = property(_getTimeLimits, None)
2✔
439

440
    # TODO timepositions property
441
    def _getTimePositions(self):
2✔
442
        return []
×
443
    timepositions = property(_getTimePositions, None)
2✔
444

445
    def _checkChildAndParent(self, path):
2✔
446
        ''' checks child coverage  summary, and if item not found checks higher level coverage summary'''
447
        try:
×
448
            value = self._elem.find(path).text
×
449
        except Exception:
×
450
            try:
×
451
                value = self._parent.find(path).text
×
452
            except Exception:
×
453
                value = None
×
454
        return value
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc