• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geopython / OWSLib / 3904052670

pending completion
3904052670

Pull #858

github

GitHub
Merge e9d863ae2 into 0e7c60b94
Pull Request #858: Fix 804 previous commit for xml

3 of 3 new or added lines in 1 file covered. (100.0%)

7503 of 12705 relevant lines covered (59.06%)

0.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

19.13
/owslib/coverage/wcs200.py
1
# -*- coding: ISO-8859-15 -*-
2
# =============================================================================
3
# Copyright (c) 2004, 2006 Sean C. Gillies
4
# Copyright (c) 2007 STFC <http://www.stfc.ac.uk>
5
#
6
# Authors :
7
#          Oliver Clements <olcl@pml.ac.uk>
8
#
9
# Contact email: olcl@pml.ac.uk
10
# =============================================================================
11

12
# !!! NOTE: Does not conform to new interfaces yet #################
13

14

15
from owslib.coverage.wcsBase import WCSBase, WCSCapabilitiesReader, ServiceException
1✔
16
from owslib.ows import (
1✔
17
    OwsCommon,
18
    ServiceIdentification,
19
    ServiceProvider,
20
    OperationsMetadata,
21
)
22

23
from urllib.parse import urlencode
1✔
24
from owslib.util import openURL, testXMLValue
1✔
25
from owslib.etree import etree
1✔
26
from owslib.crs import Crs
1✔
27
import os
1✔
28
import errno
1✔
29
import dateutil.parser as parser
1✔
30
from datetime import timedelta
1✔
31
import logging
1✔
32
from owslib.util import log, datetime_from_ansi, datetime_from_iso, param_list_to_url_string
1✔
33

34

35
#  function to save writing out WCS namespace in full each time
36
def ns(tag):
1✔
37
    return "{http://www.opengis.net/ows/2.0}" + tag
×
38

39

40
def nsWCS2(tag):
1✔
41
    return "{http://www.opengis.net/wcs/2.0}" + tag
×
42

43

44
class WebCoverageService_2_0_0(WCSBase):
1✔
45
    """Abstraction for OGC Web Coverage Service (WCS), version 2.0.0
46
    Implements IWebCoverageService.
47
    """
48

49
    def __getitem__(self, name):
1✔
50
        """ check contents dictionary to allow dict like access to service layers"""
51
        if name in list(self.__getattribute__("contents").keys()):
×
52
            return self.__getattribute__("contents")[name]
×
53
        else:
54
            raise KeyError("No content named %s" % name)
×
55

56
    def __init__(self, url, xml, cookies, auth=None, timeout=30, headers=None):
1✔
57
        super(WebCoverageService_2_0_0, self).__init__(auth=auth, timeout=timeout, headers=headers)
×
58
        self.version = "2.0.0"
×
59
        self.url = url
×
60
        self.cookies = cookies
×
61
        self.timeout = timeout
×
62
        self.ows_common = OwsCommon(version="2.0.0")
×
63
        # initialize from saved capability document or access the server
64
        reader = WCSCapabilitiesReader(self.version, self.cookies, self.auth, headers=self.headers)
×
65
        if xml:
×
66
            self._capabilities = reader.readString(xml)
×
67
        else:
68
            self._capabilities = reader.read(self.url, self.timeout)
×
69

70
        # check for exceptions
71
        se = self._capabilities.find("ServiceException")
×
72

73
        if se is not None:
×
74
            err_message = str(se.text).strip()
×
75
            raise ServiceException(err_message, xml)
×
76

77
        # serviceIdentification metadata
78
        subelem = self._capabilities.find(ns("ServiceIdentification"))
×
79
        self.identification = ServiceIdentification(
×
80
            subelem, namespace=self.ows_common.namespace
81
        )
82

83
        # serviceProvider metadata
84
        serviceproviderelem = self._capabilities.find(ns("ServiceProvider"))
×
85
        self.provider = ServiceProvider(
×
86
            serviceproviderelem, namespace=self.ows_common.namespace
87
        )
88

89
        # serviceOperations metadata
90
        self.operations = []
×
91
        for elem in self._capabilities.find(ns("OperationsMetadata"))[:]:
×
92
            if elem.tag != ns("ExtendedCapabilities"):
×
93
                self.operations.append(
×
94
                    OperationsMetadata(elem, namespace=self.ows_common.namespace)
95
                )
96

97
        # serviceContents metadata
98
        self.contents = {}
×
99
        for elem in self._capabilities.findall(
×
100
            nsWCS2("Contents/") + nsWCS2("CoverageSummary")
101
        ):
102
            cm = ContentMetadata(elem, self)
×
103
            self.contents[cm.id] = cm
×
104

105
        # exceptions
106
        self.exceptions = [
×
107
            f.text for f in self._capabilities.findall("Capability/Exception/Format")
108
        ]
109

110
    def items(self):
1✔
111
        """supports dict-like items() access"""
112
        items = []
×
113
        for item in self.contents:
×
114
            items.append((item, self.contents[item]))
×
115
        return items
×
116

117
    def getCoverage(
1✔
118
        self,
119
        identifier=None,
120
        bbox=None,
121
        time=None,
122
        format=None,
123
        subsets=None,
124
        resolutions=None,
125
        sizes=None,
126
        crs=None,
127
        width=None,
128
        height=None,
129
        resx=None,
130
        resy=None,
131
        resz=None,
132
        parameter=None,
133
        method="Get",
134
        timeout=30,
135
        **kwargs
136
    ):
137
        """Request and return a coverage from the WCS as a file-like object
138
        note: additional **kwargs helps with multi-version implementation
139
        core keyword arguments should be supported cross version
140
        example:
141
        cvg=wcs.getCoverage(identifier=['TuMYrRQ4'], timeSequence=['2792-06-01T00:00:00.0'], bbox=(-112,36,-106,41),
142
                            format='cf-netcdf')
143

144
        is equivalent to:
145
        http://myhost/mywcs?SERVICE=WCS&REQUEST=GetCoverage&IDENTIFIER=TuMYrRQ4&VERSION=1.1.0&BOUNDINGBOX=-180,-90,180,90&TIME=2792-06-01T00:00:00.0&FORMAT=cf-netcdf
146

147
        example 2.0.1 URL
148
        http://earthserver.pml.ac.uk/rasdaman/ows?&SERVICE=WCS&VERSION=2.0.1&REQUEST=GetCoverage
149
        &COVERAGEID=V2_monthly_CCI_chlor_a_insitu_test&SUBSET=Lat(40,50)&SUBSET=Long(-10,0)&SUBSET=ansi(144883,145000)&FORMAT=application/netcdf
150

151
        cvg=wcs.getCoverage(identifier=['myID'], format='application/netcdf', subsets=[('axisName',min,max),
152
                            ('axisName', min, max),('axisName',min,max)])
153

154

155
        """
156
        if log.isEnabledFor(logging.DEBUG):
×
157
            log.debug(
×
158
                "WCS 2.0.0 DEBUG: Parameters passed to GetCoverage: identifier=%s, bbox=%s, time=%s, format=%s, crs=%s, width=%s, height=%s, resx=%s, resy=%s, resz=%s, parameter=%s, method=%s, other_arguments=%s"  # noqa
159
                % (
160
                    identifier,
161
                    bbox,
162
                    time,
163
                    format,
164
                    crs,
165
                    width,
166
                    height,
167
                    resx,
168
                    resy,
169
                    resz,
170
                    parameter,
171
                    method,
172
                    str(kwargs),
173
                )
174
            )
175

176
        try:
×
177
            base_url = next(
×
178
                (
179
                    m.get("url")
180
                    for m in self.getOperationByName("GetCoverage").methods
181
                    if m.get("type").lower() == method.lower()
182
                )
183
            )
184
        except StopIteration:
×
185
            base_url = self.url
×
186

187
        log.debug("WCS 2.0.0 DEBUG: base url of server: %s" % base_url)
×
188

189
        request = {"version": self.version, "request": "GetCoverage", "service": "WCS"}
×
190
        assert len(identifier) > 0
×
191
        request["CoverageID"] = identifier[0]
×
192

193
        if crs:
×
194
            request["crs"] = crs
×
195
        request["format"] = format
×
196
        if width:
×
197
            request["width"] = width
×
198
        if height:
×
199
            request["height"] = height
×
200

201
        # anything else e.g. vendor specific parameters must go through kwargs
202
        if kwargs:
×
203
            for kw in kwargs:
×
204
                request[kw] = kwargs[kw]
×
205

206
        # encode and request
207
        data = urlencode(request)
×
208
        if subsets:
×
209
            data += param_list_to_url_string(subsets, 'subset')
×
210
        if resolutions:
×
211
            log.debug('Adding vendor-specific RESOLUTION parameter.')
×
212
            data += param_list_to_url_string(resolutions, 'resolution')
×
213
        if sizes:
×
214
            log.debug('Adding vendor-specific SIZE parameter.')
×
215
            data += param_list_to_url_string(sizes, 'size')
×
216
        log.debug("WCS 2.0.0 DEBUG: Second part of URL: %s" % data)
×
217

218
        u = openURL(base_url, data, method, self.cookies, auth=self.auth, timeout=timeout, headers=self.headers)
×
219
        return u
×
220

221
    def getOperationByName(self, name):
1✔
222
        """Return a named operation item."""
223
        for item in self.operations:
×
224
            if item.name == name:
×
225
                return item
×
226
        raise KeyError("No operation named %s" % name)
×
227

228

229
class ContentMetadata(object):
1✔
230
    """
231
    Implements IContentMetadata
232
    """
233

234
    def __init__(self, elem, service):
1✔
235
        """Initialize. service is required so that describeCoverage requests may be made"""
236
        # TODO - examine the parent for bounding box info.
237
        self._elem = elem
×
238
        self._service = service
×
239
        self.id = elem.find(nsWCS2("CoverageId")).text
×
240
        self.title = testXMLValue(elem.find(ns("label")))
×
241
        self.abstract = testXMLValue(elem.find(ns("description")))
×
242
        self.keywords = [
×
243
            f.text for f in elem.findall(ns("keywords") + "/" + ns("keyword"))
244
        ]
245
        self.boundingBox = None  # needed for iContentMetadata harmonisation
×
246
        self.boundingBoxWGS84 = None
×
247
        b = elem.find(ns("lonLatEnvelope"))
×
248
        if b is not None:
×
249
            gmlpositions = b.findall("{http://www.opengis.net/gml}pos")
×
250
            lc = gmlpositions[0].text
×
251
            uc = gmlpositions[1].text
×
252
            self.boundingBoxWGS84 = (
×
253
                float(lc.split()[0]),
254
                float(lc.split()[1]),
255
                float(uc.split()[0]),
256
                float(uc.split()[1]),
257
            )
258
        # others not used but needed for iContentMetadata harmonisation
259
        self.styles = None
×
260
        self.crsOptions = None
×
261
        self.defaulttimeposition = None
×
262

263
    # grid is either a gml:Grid or a gml:RectifiedGrid if supplied as part of the DescribeCoverage response.
264
    def _getGrid(self):
1✔
265
        if not hasattr(self, "descCov"):
×
266
            self.descCov = self._service.getDescribeCoverage(self.id)
×
267
        gridelem = self.descCov.find(
×
268
            nsWCS2("CoverageDescription/") + "{http://www.opengis.net/gml/3.2}domainSet/" + "{http://www.opengis.net/gml/3.3/rgrid}ReferenceableGridByVectors"  # noqa
269
        )
270
        if gridelem is not None:
×
271
            grid = ReferenceableGridByVectors(gridelem)
×
272
        else:
273
            # HERE I LOOK FOR RECTIFIEDGRID
274
            gridelem = self.descCov.find(
×
275
                nsWCS2("CoverageDescription/") + "{http://www.opengis.net/gml/3.2}domainSet/" + "{http://www.opengis.net/gml/3.2}RectifiedGrid"  # noqa
276
            )
277
            grid = RectifiedGrid(gridelem)
×
278
        return grid
×
279

280
    grid = property(_getGrid, None)
1✔
281

282
    # timelimits are the start/end times, timepositions are all timepoints. WCS servers can declare one or both
283
    # or neither of these.
284
    # in wcs 2.0 this can be gathered from the Envelope tag
285
    def _getTimeLimits(self):
1✔
286
        # timepoints, timelimits=[],[]
287
        # b=self._elem.find(ns('lonLatEnvelope'))
288
        # if b is not None:
289
        #     timepoints=b.findall('{http://www.opengis.net/gml}timePosition')
290
        # else:
291
        #     #have to make a describeCoverage request...
292
        #     if not hasattr(self, 'descCov'):
293
        #         self.descCov=self._service.getDescribeCoverage(self.id)
294
        #     for pos in self.descCov.findall(
295
        #           ns('CoverageOffering/')+ns('domainSet/')+ns('temporalDomain/')+'{http://www.opengis.net/gml}timePosition'):
296
        #         timepoints.append(pos)
297
        # if timepoints:
298
        #         timelimits=[timepoints[0].text,timepoints[1].text]
299
        return [self.timepositions[0], self.timepositions[-1]]
×
300

301
    timelimits = property(_getTimeLimits, None)
1✔
302

303
    def _getTimePositions(self):
1✔
304

305
        timepositions = []
×
306
        if not hasattr(self, "descCov"):
×
307
            self.descCov = self._service.getDescribeCoverage(self.id)
×
308

309
        gridelem = self.descCov.find(
×
310
            nsWCS2("CoverageDescription/") + "{http://www.opengis.net/gml/3.2}domainSet/" + "{http://www.opengis.net/gml/3.3/rgrid}ReferenceableGridByVectors"  # noqa
311
        )
312
        if gridelem is not None:
×
313
            # irregular time axis
314
            cooeficients = []
×
315

316
            grid_axes = gridelem.findall(
×
317
                "{http://www.opengis.net/gml/3.3/rgrid}generalGridAxis"
318
            )
319
            for elem in grid_axes:
×
320
                if elem.find(
×
321
                    "{http://www.opengis.net/gml/3.3/rgrid}GeneralGridAxis/{http://www.opengis.net/gml/3.3/rgrid}gridAxesSpanned"  # noqa
322
                ).text in ["ansi", "unix"]:
323
                    cooeficients = elem.find(
×
324
                        "{http://www.opengis.net/gml/3.3/rgrid}GeneralGridAxis/{http://www.opengis.net/gml/3.3/rgrid}coefficients"  # noqa
325
                    ).text.split(" ")
326
            for x in cooeficients:
×
327
                x = x.replace('"', "")
×
328
                t_date = datetime_from_iso(x)
×
329
                timepositions.append(t_date)
×
330
        else:
331
            # regular time
332
            if len(self.grid.origin) > 2:
×
333
                t_grid = self.grid
×
334
                t_date = t_grid.origin[2]
×
335
                start_pos = parser.parse(t_date, fuzzy=True)
×
336
                step = float(t_grid.offsetvectors[2][2])
×
337

338
                start_pos = start_pos + timedelta(days=(step / 2))
×
339
                no_steps = int(t_grid.highlimits[2])
×
340
                for x in range(no_steps):
×
341
                    t_pos = start_pos + timedelta(days=(step * x))
×
342
                    # t_date = datetime_from_ansi(t_pos)
343
                    # t_date = t_pos.isoformat()
344
                    timepositions.append(t_pos)
×
345
            else:
346
                # no time axis
347
                timepositions = None
×
348

349
        return timepositions
×
350

351
    timepositions = property(_getTimePositions, None)
1✔
352

353
    def _getOtherBoundingBoxes(self):
1✔
354
        """ incomplete, should return other bounding boxes not in WGS84
355
            #TODO: find any other bounding boxes. Need to check for gml:EnvelopeWithTimePeriod."""
356

357
        bboxes = []
×
358

359
        if not hasattr(self, "descCov"):
×
360
            self.descCov = self._service.getDescribeCoverage(self.id)
×
361

362
        for envelope in self.descCov.findall(
×
363
            nsWCS2("CoverageDescription/") + "{http://www.opengis.net/gml/3.2}boundedBy/" + "{http://www.opengis.net/gml/3.2}Envelope"  # noqa
364
        ):
365
            bbox = {}
×
366
            bbox["nativeSrs"] = envelope.attrib["srsName"]
×
367
            lc = envelope.find("{http://www.opengis.net/gml/3.2}lowerCorner")
×
368
            lc = lc.text.split()
×
369
            uc = envelope.find("{http://www.opengis.net/gml/3.2}upperCorner")
×
370
            uc = uc.text.split()
×
371
            bbox["bbox"] = (float(lc[0]), float(lc[1]), float(uc[0]), float(uc[1]))
×
372
            bboxes.append(bbox)
×
373

374
        return bboxes
×
375

376
    boundingboxes = property(_getOtherBoundingBoxes, None)
1✔
377

378
    def _getSupportedCRSProperty(self):
1✔
379
        # gets supported crs info
380
        crss = []
×
381
        for elem in self._service.getDescribeCoverage(self.id).findall(
×
382
            ns("CoverageOffering/") + ns("supportedCRSs/") + ns("responseCRSs")
383
        ):
384
            for crs in elem.text.split(" "):
×
385
                crss.append(Crs(crs))
×
386
        for elem in self._service.getDescribeCoverage(self.id).findall(
×
387
            ns("CoverageOffering/") + ns("supportedCRSs/") + ns("requestResponseCRSs")
388
        ):
389
            for crs in elem.text.split(" "):
×
390
                crss.append(Crs(crs))
×
391
        for elem in self._service.getDescribeCoverage(self.id).findall(
×
392
            ns("CoverageOffering/") + ns("supportedCRSs/") + ns("nativeCRSs")
393
        ):
394
            for crs in elem.text.split(" "):
×
395
                crss.append(Crs(crs))
×
396
        return crss
×
397

398
    supportedCRS = property(_getSupportedCRSProperty, None)
1✔
399

400
    def _getSupportedFormatsProperty(self):
1✔
401
        # gets supported formats info
402
        frmts = []
×
403
        for elem in self._service._capabilities.findall(
×
404
            nsWCS2("ServiceMetadata/") + nsWCS2("formatSupported")
405
        ):
406
            frmts.append(elem.text)
×
407
        return frmts
×
408

409
    supportedFormats = property(_getSupportedFormatsProperty, None)
1✔
410

411
    def _getAxisDescriptionsProperty(self):
1✔
412
        # gets any axis descriptions contained in the rangeset (requires a DescribeCoverage call to server).
413
        axisDescs = []
×
414
        for elem in self._service.getDescribeCoverage(self.id).findall(
×
415
            ns("CoverageOffering/") + ns("rangeSet/") + ns("RangeSet/") + ns("axisDescription/") + ns("AxisDescription")
416
        ):
417
            axisDescs.append(
×
418
                AxisDescription(elem)
419
            )  # create a 'AxisDescription' object.
420
        return axisDescs
×
421

422
    axisDescriptions = property(_getAxisDescriptionsProperty, None)
1✔
423

424

425
# Adding classes to represent gml:grid and gml:rectifiedgrid. One of these is used for the cvg.grid property
426
# (where cvg is a member of the contents dictionary)
427
# There is no simple way to convert the offset values in a rectifiedgrid grid to real values without CRS understanding,
428
# therefore this is beyond the current scope of owslib, so the representation here is purely to provide access
429
# to the information in the GML.
430

431

432
class Grid(object):
1✔
433
    """ Simple grid class to provide axis and value information for a gml grid """
434

435
    def __init__(self, grid):
1✔
436
        self.axislabels = []
×
437
        self.dimension = None
×
438
        self.lowlimits = []
×
439
        self.highlimits = []
×
440

441
        if grid is not None:
×
442
            self.dimension = int(grid.get("dimension"))
×
443
            self.lowlimits = grid.find(
×
444
                "{http://www.opengis.net/gml/3.2}limits/{http://www.opengis.net/gml/3.2}GridEnvelope/{http://www.opengis.net/gml/3.2}low"  # noqa
445
            ).text.split(" ")
446
            self.highlimits = grid.find(
×
447
                "{http://www.opengis.net/gml/3.2}limits/{http://www.opengis.net/gml/3.2}GridEnvelope/{http://www.opengis.net/gml/3.2}high"  # noqa
448
            ).text.split(" ")
449
            for axis in grid.findall("{http://www.opengis.net/gml/3.2}axisLabels")[
×
450
                0
451
            ].text.split(" "):
452
                self.axislabels.append(axis)
×
453

454

455
class RectifiedGrid(Grid):
1✔
456
    """ RectifiedGrid class, extends Grid with additional offset vector information """
457

458
    def __init__(self, rectifiedgrid):
1✔
459
        super(RectifiedGrid, self).__init__(rectifiedgrid)
×
460
        self.origin = rectifiedgrid.find(
×
461
            "{http://www.opengis.net/gml/3.2}origin/{http://www.opengis.net/gml/3.2}Point/{http://www.opengis.net/gml/3.2}pos"  # noqa
462
        ).text.split()
463
        self.offsetvectors = []
×
464
        for offset in rectifiedgrid.findall(
×
465
            "{http://www.opengis.net/gml/3.2}offsetVector"
466
        ):
467
            self.offsetvectors.append(offset.text.split())
×
468

469

470
class ReferenceableGridByVectors(Grid):
1✔
471
    """ ReferenceableGridByVectors class, extends Grid with additional vector information """
472

473
    def __init__(self, refereceablegridbyvectors):
1✔
474
        super(ReferenceableGridByVectors, self).__init__(refereceablegridbyvectors)
×
475
        self.origin = refereceablegridbyvectors.find(
×
476
            "{http://www.opengis.net/gml/3.3/rgrid}origin/{http://www.opengis.net/gml/3.2}Point/{http://www.opengis.net/gml/3.2}pos"  # noqa
477
        ).text.split()
478
        self.offsetvectors = []
×
479
        for offset in refereceablegridbyvectors.findall(
×
480
            "{http://www.opengis.net/gml/3.3/rgrid}generalGridAxis/{http://www.opengis.net/gml/3.3/rgrid}GeneralGridAxis/{http://www.opengis.net/gml/3.3/rgrid}offsetVector"  # noqa
481
        ):
482
            self.offsetvectors.append(offset.text.split())
×
483

484

485
class AxisDescription(object):
1✔
486
    """ Class to represent the AxisDescription element optionally found as part of the RangeSet and used to
487
    define ordinates of additional dimensions such as wavelength bands or pressure levels"""
488

489
    def __init__(self, axisdescElem):
1✔
490
        self.name = self.label = None
×
491
        self.values = []
×
492
        for elem in axisdescElem.getchildren():
×
493
            if elem.tag == ns("name"):
×
494
                self.name = elem.text
×
495
            elif elem.tag == ns("label"):
×
496
                self.label = elem.text
×
497
            elif elem.tag == ns("values"):
×
498
                for child in elem.getchildren():
×
499
                    self.values.append(child.text)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc