• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geopython / OWSLib / 11628884181

01 Nov 2024 11:49AM UTC coverage: 58.814% (-1.3%) from 60.156%
11628884181

Pull #548

github

web-flow
Merge 4b9b7cf1f into ae98c2039
Pull Request #548: Strip out redundant __new__ to enable object pickling

8364 of 14221 relevant lines covered (58.81%)

1.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

19.21
/owslib/coverage/wcs200.py
1
# -*- coding: ISO-8859-15 -*-
2
# =============================================================================
3
# Copyright (c) 2004, 2006 Sean C. Gillies
4
# Copyright (c) 2007 STFC <http://www.stfc.ac.uk>
5
#
6
# Authors:
7
#          Oliver Clements <olcl@pml.ac.uk>
8
#
9
# Contact email: olcl@pml.ac.uk
10
# =============================================================================
11

12
# !!! NOTE: Does not conform to new interfaces yet #################
13

14
from datetime import timedelta
2✔
15
import errno
2✔
16
import logging
2✔
17
import os
2✔
18
from urllib.parse import urlencode
2✔
19

20
import dateutil.parser as parser
2✔
21

22
from owslib.coverage.wcsBase import WCSBase, WCSCapabilitiesReader, ServiceException
2✔
23
from owslib.crs import Crs
2✔
24
from owslib.etree import etree
2✔
25
from owslib.ows import (
2✔
26
    OwsCommon,
27
    ServiceIdentification,
28
    ServiceProvider,
29
    OperationsMetadata
30
)
31
from owslib.util import datetime_from_ansi, datetime_from_iso, openURL, param_list_to_url_string, testXMLValue
2✔
32

33
LOGGER = logging.getLogger(__name__)
2✔
34

35

36
#  function to save writing out WCS namespace in full each time
37
def ns(tag):
2✔
38
    return "{http://www.opengis.net/ows/2.0}" + tag
×
39

40

41
def nsWCS2(tag):
2✔
42
    return "{http://www.opengis.net/wcs/2.0}" + tag
×
43

44

45
class WebCoverageService_2_0_0(WCSBase):
2✔
46
    """Abstraction for OGC Web Coverage Service (WCS), version 2.0.0
47
    Implements IWebCoverageService.
48
    """
49

50
    def __getitem__(self, name):
2✔
51
        """ check contents dictionary to allow dict like access to service layers"""
52
        if name in list(self.__getattribute__("contents").keys()):
×
53
            return self.__getattribute__("contents")[name]
×
54
        else:
55
            raise KeyError("No content named %s" % name)
×
56

57
    def __init__(self, url, xml, cookies, auth=None, timeout=30, headers=None):
2✔
58
        super(WebCoverageService_2_0_0, self).__init__(auth=auth, timeout=timeout, headers=headers)
×
59
        self.version = "2.0.0"
×
60
        self.url = url
×
61
        self.cookies = cookies
×
62
        self.timeout = timeout
×
63
        self.ows_common = OwsCommon(version="2.0.0")
×
64
        # initialize from saved capability document or access the server
65
        reader = WCSCapabilitiesReader(self.version, self.cookies, self.auth, headers=self.headers)
×
66
        if xml:
×
67
            self._capabilities = reader.readString(xml)
×
68
        else:
69
            self._capabilities = reader.read(self.url, self.timeout)
×
70

71
        # check for exceptions
72
        se = self._capabilities.find("ServiceException")
×
73

74
        if se is not None:
×
75
            err_message = str(se.text).strip()
×
76
            raise ServiceException(err_message, xml)
×
77

78
        # serviceIdentification metadata
79
        subelem = self._capabilities.find(ns("ServiceIdentification"))
×
80
        self.identification = ServiceIdentification(
×
81
            subelem, namespace=self.ows_common.namespace
82
        )
83

84
        # serviceProvider metadata
85
        serviceproviderelem = self._capabilities.find(ns("ServiceProvider"))
×
86
        self.provider = ServiceProvider(
×
87
            serviceproviderelem, namespace=self.ows_common.namespace
88
        )
89

90
        # serviceOperations metadata
91
        self.operations = []
×
92
        for elem in self._capabilities.find(ns("OperationsMetadata"))[:]:
×
93
            if elem.tag != ns("ExtendedCapabilities"):
×
94
                self.operations.append(
×
95
                    OperationsMetadata(elem, namespace=self.ows_common.namespace)
96
                )
97

98
        # serviceContents metadata
99
        self.contents = {}
×
100
        for elem in self._capabilities.findall(
×
101
            nsWCS2("Contents/") + nsWCS2("CoverageSummary")
102
        ):
103
            cm = ContentMetadata(elem, self)
×
104
            self.contents[cm.id] = cm
×
105

106
        # exceptions
107
        self.exceptions = [
×
108
            f.text for f in self._capabilities.findall("Capability/Exception/Format")
109
        ]
110

111
    def items(self):
2✔
112
        """supports dict-like items() access"""
113
        items = []
×
114
        for item in self.contents:
×
115
            items.append((item, self.contents[item]))
×
116
        return items
×
117

118
    def getCoverage(
2✔
119
        self,
120
        identifier=None,
121
        bbox=None,
122
        time=None,
123
        format=None,
124
        subsets=None,
125
        resolutions=None,
126
        sizes=None,
127
        crs=None,
128
        width=None,
129
        height=None,
130
        resx=None,
131
        resy=None,
132
        resz=None,
133
        parameter=None,
134
        method="Get",
135
        timeout=30,
136
        **kwargs
137
    ):
138
        """Request and return a coverage from the WCS as a file-like object
139
        note: additional **kwargs helps with multi-version implementation
140
        core keyword arguments should be supported cross version
141
        example:
142
        cvg=wcs.getCoverage(identifier=['TuMYrRQ4'], timeSequence=['2792-06-01T00:00:00.0'], bbox=(-112,36,-106,41),
143
                            format='cf-netcdf')
144

145
        is equivalent to:
146
        http://myhost/mywcs?SERVICE=WCS&REQUEST=GetCoverage&IDENTIFIER=TuMYrRQ4&VERSION=1.1.0&BOUNDINGBOX=-180,-90,180,90&TIME=2792-06-01T00:00:00.0&FORMAT=cf-netcdf
147

148
        example 2.0.1 URL
149
        http://earthserver.pml.ac.uk/rasdaman/ows?&SERVICE=WCS&VERSION=2.0.1&REQUEST=GetCoverage
150
        &COVERAGEID=V2_monthly_CCI_chlor_a_insitu_test&SUBSET=Lat(40,50)&SUBSET=Long(-10,0)&SUBSET=ansi(144883,145000)&FORMAT=application/netcdf
151

152
        cvg=wcs.getCoverage(identifier=['myID'], format='application/netcdf', subsets=[('axisName',min,max),
153
                            ('axisName', min, max),('axisName',min,max)])
154

155

156
        """
157
        LOGGER.debug(
×
158
            "WCS 2.0.0 DEBUG: Parameters passed to GetCoverage: identifier=%s, bbox=%s, time=%s, format=%s, crs=%s, width=%s, height=%s, resx=%s, resy=%s, resz=%s, parameter=%s, method=%s, other_arguments=%s"  # noqa
159
            % (
160
                identifier,
161
                bbox,
162
                time,
163
                format,
164
                crs,
165
                width,
166
                height,
167
                resx,
168
                resy,
169
                resz,
170
                parameter,
171
                method,
172
                str(kwargs),
173
            )
174
        )
175

176
        try:
×
177
            base_url = next(
×
178
                (
179
                    m.get("url")
180
                    for m in self.getOperationByName("GetCoverage").methods
181
                    if m.get("type").lower() == method.lower()
182
                )
183
            )
184
        except StopIteration:
×
185
            base_url = self.url
×
186

187
        LOGGER.debug("WCS 2.0.0 DEBUG: base url of server: %s" % base_url)
×
188

189
        request = {"version": self.version, "request": "GetCoverage", "service": "WCS"}
×
190
        assert len(identifier) > 0
×
191
        request["CoverageID"] = identifier[0]
×
192

193
        if crs:
×
194
            request["crs"] = crs
×
195
        request["format"] = format
×
196
        if width:
×
197
            request["width"] = width
×
198
        if height:
×
199
            request["height"] = height
×
200

201
        # anything else e.g. vendor specific parameters must go through kwargs
202
        if kwargs:
×
203
            for kw in kwargs:
×
204
                request[kw] = kwargs[kw]
×
205

206
        # encode and request
207
        data = urlencode(request)
×
208
        if subsets:
×
209
            data += param_list_to_url_string(subsets, 'subset')
×
210
        if resolutions:
×
211
            LOGGER.debug('Adding vendor-specific RESOLUTION parameter.')
×
212
            data += param_list_to_url_string(resolutions, 'resolution')
×
213
        if sizes:
×
214
            LOGGER.debug('Adding vendor-specific SIZE parameter.')
×
215
            data += param_list_to_url_string(sizes, 'size')
×
216
        LOGGER.debug("WCS 2.0.0 DEBUG: Second part of URL: %s" % data)
×
217

218
        u = openURL(base_url, data, method, self.cookies, auth=self.auth, timeout=timeout, headers=self.headers)
×
219
        return u
×
220

221
    def getOperationByName(self, name):
2✔
222
        """Return a named operation item."""
223
        for item in self.operations:
×
224
            if item.name == name:
×
225
                return item
×
226
        raise KeyError("No operation named %s" % name)
×
227

228

229
class ContentMetadata(object):
2✔
230
    """
231
    Implements IContentMetadata
232
    """
233

234
    def __init__(self, elem, service):
2✔
235
        """Initialize. service is required so that describeCoverage requests may be made"""
236
        # TODO - examine the parent for bounding box info.
237
        self._elem = elem
×
238
        self._service = service
×
239
        self.id = elem.find(nsWCS2("CoverageId")).text
×
240
        self.title = testXMLValue(elem.find(ns("label")))
×
241
        self.abstract = testXMLValue(elem.find(ns("description")))
×
242
        self.keywords = [
×
243
            f.text for f in elem.findall(ns("keywords") + "/" + ns("keyword"))
244
        ]
245
        self.boundingBox = None  # needed for iContentMetadata harmonisation
×
246
        self.boundingBoxWGS84 = None
×
247
        b = elem.find(ns("lonLatEnvelope"))
×
248
        if b is not None:
×
249
            gmlpositions = b.findall("{http://www.opengis.net/gml}pos")
×
250
            lc = gmlpositions[0].text
×
251
            uc = gmlpositions[1].text
×
252
            self.boundingBoxWGS84 = (
×
253
                float(lc.split()[0]),
254
                float(lc.split()[1]),
255
                float(uc.split()[0]),
256
                float(uc.split()[1]),
257
            )
258
        # others not used but needed for iContentMetadata harmonisation
259
        self.styles = None
×
260
        self.crsOptions = None
×
261
        self.defaulttimeposition = None
×
262

263
    # grid is either a gml:Grid or a gml:RectifiedGrid if supplied as part of the DescribeCoverage response.
264
    def _getGrid(self):
2✔
265
        if not hasattr(self, "descCov"):
×
266
            self.descCov = self._service.getDescribeCoverage(self.id)
×
267
        gridelem = self.descCov.find(
×
268
            nsWCS2("CoverageDescription/") + "{http://www.opengis.net/gml/3.2}domainSet/" + "{http://www.opengis.net/gml/3.3/rgrid}ReferenceableGridByVectors"  # noqa
269
        )
270
        if gridelem is not None:
×
271
            grid = ReferenceableGridByVectors(gridelem)
×
272
        else:
273
            # HERE I LOOK FOR RECTIFIEDGRID
274
            gridelem = self.descCov.find(
×
275
                nsWCS2("CoverageDescription/") + "{http://www.opengis.net/gml/3.2}domainSet/" + "{http://www.opengis.net/gml/3.2}RectifiedGrid"  # noqa
276
            )
277
            grid = RectifiedGrid(gridelem)
×
278
        return grid
×
279

280
    grid = property(_getGrid, None)
2✔
281

282
    # timelimits are the start/end times, timepositions are all timepoints. WCS servers can declare one or both
283
    # or neither of these.
284
    # in wcs 2.0 this can be gathered from the Envelope tag
285
    def _getTimeLimits(self):
2✔
286
        # timepoints, timelimits=[],[]
287
        # b=self._elem.find(ns('lonLatEnvelope'))
288
        # if b is not None:
289
        #     timepoints=b.findall('{http://www.opengis.net/gml}timePosition')
290
        # else:
291
        #     #have to make a describeCoverage request...
292
        #     if not hasattr(self, 'descCov'):
293
        #         self.descCov=self._service.getDescribeCoverage(self.id)
294
        #     for pos in self.descCov.findall(
295
        #           ns('CoverageOffering/')+ns('domainSet/')+ns('temporalDomain/')+'{http://www.opengis.net/gml}timePosition'):
296
        #         timepoints.append(pos)
297
        # if timepoints:
298
        #         timelimits=[timepoints[0].text,timepoints[1].text]
299
        return [self.timepositions[0], self.timepositions[-1]]
×
300

301
    timelimits = property(_getTimeLimits, None)
2✔
302

303
    def _getTimePositions(self):
2✔
304

305
        timepositions = []
×
306
        if not hasattr(self, "descCov"):
×
307
            self.descCov = self._service.getDescribeCoverage(self.id)
×
308

309
        gridelem = self.descCov.find(
×
310
            nsWCS2("CoverageDescription/") + "{http://www.opengis.net/gml/3.2}domainSet/" + "{http://www.opengis.net/gml/3.3/rgrid}ReferenceableGridByVectors"  # noqa
311
        )
312
        if gridelem is not None:
×
313
            # irregular time axis
314
            cooeficients = []
×
315

316
            grid_axes = gridelem.findall(
×
317
                "{http://www.opengis.net/gml/3.3/rgrid}generalGridAxis"
318
            )
319
            for elem in grid_axes:
×
320
                if elem.find(
×
321
                    "{http://www.opengis.net/gml/3.3/rgrid}GeneralGridAxis/{http://www.opengis.net/gml/3.3/rgrid}gridAxesSpanned"  # noqa
322
                ).text in ["ansi", "unix"]:
323
                    cooeficients = elem.find(
×
324
                        "{http://www.opengis.net/gml/3.3/rgrid}GeneralGridAxis/{http://www.opengis.net/gml/3.3/rgrid}coefficients"  # noqa
325
                    ).text.split(" ")
326
            for x in cooeficients:
×
327
                x = x.replace('"', "")
×
328
                t_date = datetime_from_iso(x)
×
329
                timepositions.append(t_date)
×
330
        else:
331
            # regular time
332
            if len(self.grid.origin) > 2:
×
333
                t_grid = self.grid
×
334
                t_date = t_grid.origin[2]
×
335
                start_pos = parser.parse(t_date, fuzzy=True)
×
336
                step = float(t_grid.offsetvectors[2][2])
×
337

338
                start_pos = start_pos + timedelta(days=(step / 2))
×
339
                no_steps = int(t_grid.highlimits[2])
×
340
                for x in range(no_steps):
×
341
                    t_pos = start_pos + timedelta(days=(step * x))
×
342
                    # t_date = datetime_from_ansi(t_pos)
343
                    # t_date = t_pos.isoformat()
344
                    timepositions.append(t_pos)
×
345
            else:
346
                # no time axis
347
                timepositions = None
×
348

349
        return timepositions
×
350

351
    timepositions = property(_getTimePositions, None)
2✔
352

353
    def _getOtherBoundingBoxes(self):
2✔
354
        """ incomplete, should return other bounding boxes not in WGS84
355
            #TODO: find any other bounding boxes. Need to check for gml:EnvelopeWithTimePeriod."""
356

357
        bboxes = []
×
358

359
        if not hasattr(self, "descCov"):
×
360
            self.descCov = self._service.getDescribeCoverage(self.id)
×
361

362
        for envelope in self.descCov.findall(
×
363
            nsWCS2("CoverageDescription/") + "{http://www.opengis.net/gml/3.2}boundedBy/" + "{http://www.opengis.net/gml/3.2}Envelope"  # noqa
364
        ):
365
            bbox = {}
×
366
            bbox["nativeSrs"] = envelope.attrib["srsName"]
×
367
            lc = envelope.find("{http://www.opengis.net/gml/3.2}lowerCorner")
×
368
            lc = lc.text.split()
×
369
            uc = envelope.find("{http://www.opengis.net/gml/3.2}upperCorner")
×
370
            uc = uc.text.split()
×
371
            bbox["bbox"] = (float(lc[0]), float(lc[1]), float(uc[0]), float(uc[1]))
×
372
            bboxes.append(bbox)
×
373

374
        return bboxes
×
375

376
    boundingboxes = property(_getOtherBoundingBoxes, None)
2✔
377

378
    def _getSupportedCRSProperty(self):
2✔
379
        # gets supported crs info
380
        crss = []
×
381
        for elem in self._service.getDescribeCoverage(self.id).findall(
×
382
            ns("CoverageOffering/") + ns("supportedCRSs/") + ns("responseCRSs")
383
        ):
384
            for crs in elem.text.split(" "):
×
385
                crss.append(Crs(crs))
×
386
        for elem in self._service.getDescribeCoverage(self.id).findall(
×
387
            ns("CoverageOffering/") + ns("supportedCRSs/") + ns("requestResponseCRSs")
388
        ):
389
            for crs in elem.text.split(" "):
×
390
                crss.append(Crs(crs))
×
391
        for elem in self._service.getDescribeCoverage(self.id).findall(
×
392
            ns("CoverageOffering/") + ns("supportedCRSs/") + ns("nativeCRSs")
393
        ):
394
            for crs in elem.text.split(" "):
×
395
                crss.append(Crs(crs))
×
396
        return crss
×
397

398
    supportedCRS = property(_getSupportedCRSProperty, None)
2✔
399

400
    def _getSupportedFormatsProperty(self):
2✔
401
        # gets supported formats info
402
        frmts = []
×
403
        for elem in self._service._capabilities.findall(
×
404
            nsWCS2("ServiceMetadata/") + nsWCS2("formatSupported")
405
        ):
406
            frmts.append(elem.text)
×
407
        return frmts
×
408

409
    supportedFormats = property(_getSupportedFormatsProperty, None)
2✔
410

411
    def _getAxisDescriptionsProperty(self):
2✔
412
        # gets any axis descriptions contained in the rangeset (requires a DescribeCoverage call to server).
413
        axisDescs = []
×
414
        for elem in self._service.getDescribeCoverage(self.id).findall(
×
415
            ns("CoverageOffering/") + ns("rangeSet/") + ns("RangeSet/") + ns("axisDescription/") + ns("AxisDescription")
416
        ):
417
            axisDescs.append(
×
418
                AxisDescription(elem)
419
            )  # create a 'AxisDescription' object.
420
        return axisDescs
×
421

422
    axisDescriptions = property(_getAxisDescriptionsProperty, None)
2✔
423

424

425
# Adding classes to represent gml:grid and gml:rectifiedgrid. One of these is used for the cvg.grid property
426
# (where cvg is a member of the contents dictionary)
427
# There is no simple way to convert the offset values in a rectifiedgrid grid to real values without CRS understanding,
428
# therefore this is beyond the current scope of owslib, so the representation here is purely to provide access
429
# to the information in the GML.
430

431

432
class Grid(object):
2✔
433
    """ Simple grid class to provide axis and value information for a gml grid """
434

435
    def __init__(self, grid):
2✔
436
        self.axislabels = []
×
437
        self.dimension = None
×
438
        self.lowlimits = []
×
439
        self.highlimits = []
×
440

441
        if grid is not None:
×
442
            self.dimension = int(grid.get("dimension"))
×
443
            self.lowlimits = grid.find(
×
444
                "{http://www.opengis.net/gml/3.2}limits/{http://www.opengis.net/gml/3.2}GridEnvelope/{http://www.opengis.net/gml/3.2}low"  # noqa
445
            ).text.split(" ")
446
            self.highlimits = grid.find(
×
447
                "{http://www.opengis.net/gml/3.2}limits/{http://www.opengis.net/gml/3.2}GridEnvelope/{http://www.opengis.net/gml/3.2}high"  # noqa
448
            ).text.split(" ")
449
            for axis in grid.findall("{http://www.opengis.net/gml/3.2}axisLabels")[
×
450
                0
451
            ].text.split(" "):
452
                self.axislabels.append(axis)
×
453

454

455
class RectifiedGrid(Grid):
2✔
456
    """ RectifiedGrid class, extends Grid with additional offset vector information """
457

458
    def __init__(self, rectifiedgrid):
2✔
459
        super(RectifiedGrid, self).__init__(rectifiedgrid)
×
460
        self.origin = rectifiedgrid.find(
×
461
            "{http://www.opengis.net/gml/3.2}origin/{http://www.opengis.net/gml/3.2}Point/{http://www.opengis.net/gml/3.2}pos"  # noqa
462
        ).text.split()
463
        self.offsetvectors = []
×
464
        for offset in rectifiedgrid.findall(
×
465
            "{http://www.opengis.net/gml/3.2}offsetVector"
466
        ):
467
            self.offsetvectors.append(offset.text.split())
×
468

469

470
class ReferenceableGridByVectors(Grid):
2✔
471
    """ ReferenceableGridByVectors class, extends Grid with additional vector information """
472

473
    def __init__(self, refereceablegridbyvectors):
2✔
474
        super(ReferenceableGridByVectors, self).__init__(refereceablegridbyvectors)
×
475
        self.origin = refereceablegridbyvectors.find(
×
476
            "{http://www.opengis.net/gml/3.3/rgrid}origin/{http://www.opengis.net/gml/3.2}Point/{http://www.opengis.net/gml/3.2}pos"  # noqa
477
        ).text.split()
478
        self.offsetvectors = []
×
479
        for offset in refereceablegridbyvectors.findall(
×
480
            "{http://www.opengis.net/gml/3.3/rgrid}generalGridAxis/{http://www.opengis.net/gml/3.3/rgrid}GeneralGridAxis/{http://www.opengis.net/gml/3.3/rgrid}offsetVector"  # noqa
481
        ):
482
            self.offsetvectors.append(offset.text.split())
×
483

484

485
class AxisDescription(object):
2✔
486
    """ Class to represent the AxisDescription element optionally found as part of the RangeSet and used to
487
    define ordinates of additional dimensions such as wavelength bands or pressure levels"""
488

489
    def __init__(self, axisdescElem):
2✔
490
        self.name = self.label = None
×
491
        self.values = []
×
492
        for elem in axisdescElem.getchildren():
×
493
            if elem.tag == ns("name"):
×
494
                self.name = elem.text
×
495
            elif elem.tag == ns("label"):
×
496
                self.label = elem.text
×
497
            elif elem.tag == ns("values"):
×
498
                for child in elem.getchildren():
×
499
                    self.values.append(child.text)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc