• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geopython / OWSLib / 3663701074

pending completion
3663701074

Pull #851

github

GitHub
Merge 6cd54d613 into 13b1443f7
Pull Request #851: Adding Python 3.10 in CI

7461 of 12701 relevant lines covered (58.74%)

0.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.38
/owslib/feature/wfs100.py
1
# =============================================================================
2
# OWSLib. Copyright (C) 2005 Sean C. Gillies
3
#
4
# Contact email: sgillies@frii.com
5
#
6
# $Id: wfs.py 503 2006-02-01 17:09:12Z dokai $
7
# =============================================================================
8

9
import itertools
1✔
10
import logging
1✔
11

12
from owslib import util
1✔
13

14
from io import BytesIO
1✔
15
from urllib.parse import urlencode
1✔
16
from owslib.util import (
1✔
17
    testXMLValue,
18
    extract_xml_list,
19
    ServiceException,
20
    xmltag_split,
21
    Authentication,
22
    openURL,
23
    log,
24
)
25
from owslib.etree import etree
1✔
26
from owslib.fgdc import Metadata
1✔
27
from owslib.iso import MD_Metadata
1✔
28
from owslib.crs import Crs
1✔
29
from owslib.namespaces import Namespaces
1✔
30
from owslib.feature.schema import get_schema
1✔
31
from owslib.feature.common import (
1✔
32
    WFSCapabilitiesReader,
33
    AbstractContentMetadata,
34
)
35

36
LOGGER = logging.getLogger(__name__)
1✔
37

38
has_pyproj = False
1✔
39

40
try:
1✔
41
    import pyproj
1✔
42
    has_pyproj = True
×
43
except ImportError:
1✔
44
    LOGGER.warning('pyproj not installed')
1✔
45

46
n = Namespaces()
1✔
47
WFS_NAMESPACE = n.get_namespace("wfs")
1✔
48
OGC_NAMESPACE = n.get_namespace("ogc")
1✔
49

50

51
# TODO: use nspath in util.py
52
def nspath(path, ns=WFS_NAMESPACE):
1✔
53
    """
54
    Prefix the given path with the given namespace identifier.
55

56
    Parameters
57
    ----------
58
    path : string
59
        ElementTree API Compatible path expression
60

61
    ns : string
62
        The XML namespace. Defaults to WFS namespace.
63
    """
64
    components = []
1✔
65
    for component in path.split("/"):
1✔
66
        if component != "*":
1✔
67
            component = "{%s}%s" % (ns, component)
1✔
68
        components.append(component)
1✔
69
    return "/".join(components)
1✔
70

71

72
class WebFeatureService_1_0_0(object):
1✔
73
    """Abstraction for OGC Web Feature Service (WFS).
74

75
    Implements IWebFeatureService.
76
    """
77

78
    def __new__(
1✔
79
        self,
80
        url,
81
        version,
82
        xml,
83
        parse_remote_metadata=False,
84
        timeout=30,
85
        headers=None,
86
        username=None,
87
        password=None,
88
        auth=None,
89
    ):
90
        """ overridden __new__ method
91

92
        @type url: string
93
        @param url: url of WFS capabilities document
94
        @type xml: string
95
        @param xml: elementtree object
96
        @type parse_remote_metadata: boolean
97
        @param parse_remote_metadata: whether to fully process MetadataURL elements
98
        @param headers: HTTP headers to send with requests
99
        @param timeout: time (in seconds) after which requests should timeout
100
        @param username: service authentication username
101
        @param password: service authentication password
102
        @param auth: instance of owslib.util.Authentication
103
        @return: initialized WebFeatureService_1_0_0 object
104
        """
105
        obj = object.__new__(self)
1✔
106
        obj.__init__(
1✔
107
            url,
108
            version,
109
            xml,
110
            parse_remote_metadata,
111
            timeout,
112
            headers=headers,
113
            username=username,
114
            password=password,
115
            auth=auth,
116
        )
117
        return obj
1✔
118

119
    def __getitem__(self, name):
1✔
120
        """ check contents dictionary to allow dict like access to service layers"""
121
        if name in list(self.__getattribute__("contents").keys()):
1✔
122
            return self.__getattribute__("contents")[name]
1✔
123
        else:
124
            raise KeyError("No content named %s" % name)
1✔
125

126
    def __init__(
1✔
127
        self,
128
        url,
129
        version,
130
        xml=None,
131
        parse_remote_metadata=False,
132
        timeout=30,
133
        headers=None,
134
        username=None,
135
        password=None,
136
        auth=None,
137
    ):
138
        """Initialize."""
139
        if auth:
1✔
140
            if username:
1✔
141
                auth.username = username
×
142
            if password:
1✔
143
                auth.password = password
×
144
        self.url = url
1✔
145
        self.version = version
1✔
146
        self.timeout = timeout
1✔
147
        self.headers = headers
1✔
148
        self.auth = auth or Authentication(username, password)
1✔
149
        self._capabilities = None
1✔
150
        reader = WFSCapabilitiesReader(self.version, headers=self.headers, auth=self.auth)
1✔
151
        if xml:
1✔
152
            self._capabilities = reader.readString(xml)
1✔
153
        else:
154
            self._capabilities = reader.read(self.url, self.timeout)
1✔
155
        self._buildMetadata(parse_remote_metadata)
1✔
156

157
    def _buildMetadata(self, parse_remote_metadata=False):
1✔
158
        """set up capabilities metadata objects: """
159

160
        self.updateSequence = self._capabilities.attrib.get("updateSequence")
1✔
161

162
        # serviceIdentification metadata
163
        serviceelem = self._capabilities.find(nspath("Service"))
1✔
164
        self.identification = ServiceIdentification(serviceelem, self.version)
1✔
165

166
        # serviceProvider metadata
167
        self.provider = ServiceProvider(serviceelem)
1✔
168

169
        # serviceOperations metadata
170
        self.operations = []
1✔
171
        for elem in self._capabilities.find(nspath("Capability/Request"))[:]:
1✔
172
            self.operations.append(OperationMetadata(elem))
1✔
173

174
        # serviceContents metadata: our assumption is that services use a top-level
175
        # layer as a metadata organizer, nothing more.
176

177
        self.contents = {}
1✔
178
        featuretypelist = self._capabilities.find(nspath("FeatureTypeList"))
1✔
179
        features = self._capabilities.findall(nspath("FeatureTypeList/FeatureType"))
1✔
180
        for feature in features:
1✔
181
            cm = ContentMetadata(
1✔
182
                feature, featuretypelist, parse_remote_metadata, auth=self.auth
183
            )
184
            self.contents[cm.id] = cm
1✔
185

186
        # exceptions
187
        self.exceptions = [
1✔
188
            f.text for f in self._capabilities.findall("Capability/Exception/Format")
189
        ]
190

191
    def getcapabilities(self):
1✔
192
        """Request and return capabilities document from the WFS as a
193
        file-like object.
194
        NOTE: this is effectively redundant now"""
195
        reader = WFSCapabilitiesReader(self.version, auth=self.auth)
1✔
196
        return openURL(
1✔
197
            reader.capabilities_url(self.url), timeout=self.timeout,
198
            headers=self.headers, auth=self.auth
199
        )
200

201
    def items(self):
1✔
202
        """supports dict-like items() access"""
203
        items = []
×
204
        for item in self.contents:
×
205
            items.append((item, self.contents[item]))
×
206
        return items
×
207

208
    def getfeature(
1✔
209
        self,
210
        typename=None,
211
        filter=None,
212
        bbox=None,
213
        featureid=None,
214
        featureversion=None,
215
        propertyname="*",
216
        maxfeatures=None,
217
        srsname=None,
218
        outputFormat=None,
219
        method="{http://www.opengis.net/wfs}Get",
220
        startindex=None,
221
    ):
222
        """Request and return feature data as a file-like object.
223

224
        Parameters
225
        ----------
226
        typename : list
227
            List of typenames (string)
228
        filter : string
229
            XML-encoded OGC filter expression.
230
        bbox : tuple
231
            (left, bottom, right, top) in the feature type's coordinates.
232
        featureid : list
233
            List of unique feature ids (string)
234
        featureversion : string
235
            Default is most recent feature version.
236
        propertyname : list
237
            List of feature property names. '*' matches all.
238
        maxfeatures : int
239
            Maximum number of features to be returned.
240
        method : string
241
            Qualified name of the HTTP DCP method to use.
242
        srsname: string
243
            EPSG code to request the data in
244
        outputFormat: string (optional)
245
            Requested response format of the request.
246
        startindex: int (optional)
247
            Start position to return feature set (paging in combination with maxfeatures)
248

249

250
        There are 3 different modes of use
251

252
        1) typename and bbox (simple spatial query)
253
        2) typename and filter (more expressive)
254
        3) featureid (direct access to known features)
255
        """
256
        try:
×
257
            base_url = next(
×
258
                (
259
                    m.get("url")
260
                    for m in self.getOperationByName("GetFeature").methods
261
                    if m.get("type").lower() == method.lower()
262
                )
263
            )
264
        except StopIteration:
×
265
            base_url = self.url
×
266
        request = {"service": "WFS", "version": self.version, "request": "GetFeature"}
×
267

268
        # check featureid
269
        if featureid:
×
270
            request["featureid"] = ",".join(featureid)
×
271
        elif bbox and typename:
×
272
            request["bbox"] = ",".join([repr(x) for x in bbox])
×
273
        elif filter and typename:
×
274
            request["filter"] = str(filter)
×
275

276
        if srsname:
×
277
            request["srsname"] = str(srsname)
×
278

279
        assert len(typename) > 0
×
280
        request["typename"] = ",".join(typename)
×
281

282
        if propertyname is not None:
×
283
            if not isinstance(propertyname, list):
×
284
                propertyname = [propertyname]
×
285
            request["propertyname"] = ",".join(propertyname)
×
286

287
        if featureversion:
×
288
            request["featureversion"] = str(featureversion)
×
289
        if maxfeatures:
×
290
            request["maxfeatures"] = str(maxfeatures)
×
291
        if startindex:
×
292
            request["startindex"] = str(startindex)
×
293

294
        if outputFormat is not None:
×
295
            request["outputFormat"] = outputFormat
×
296

297
        data = urlencode(request)
×
298
        log.debug("Making request: %s?%s" % (base_url, data))
×
299
        u = openURL(base_url, data, method, timeout=self.timeout,
×
300
                    headers=self.headers, auth=self.auth)
301

302
        # check for service exceptions, rewrap, and return
303
        # We're going to assume that anything with a content-length > 32k
304
        # is data. We'll check anything smaller.
305

306
        if "Content-Length" in u.info():
×
307
            length = int(u.info()["Content-Length"])
×
308
            have_read = False
×
309
        else:
310
            data = u.read()
×
311
            have_read = True
×
312
            length = len(data)
×
313

314
        if length < 32000:
×
315
            if not have_read:
×
316
                data = u.read()
×
317

318
            try:
×
319
                tree = etree.fromstring(data)
×
320
            except BaseException:
×
321
                # Not XML
322
                return BytesIO(data)
×
323
            else:
324
                if tree.tag == "{%s}ServiceExceptionReport" % OGC_NAMESPACE:
×
325
                    se = tree.find(nspath("ServiceException", OGC_NAMESPACE))
×
326
                    raise ServiceException(str(se.text).strip())
×
327
                return BytesIO(data)
×
328
        else:
329
            if have_read:
×
330
                return BytesIO(data)
×
331
            return u
×
332

333
    def getOperationByName(self, name):
1✔
334
        """Return a named content item."""
335
        for item in self.operations:
1✔
336
            if item.name == name:
1✔
337
                return item
1✔
338
        raise KeyError("No operation named %s" % name)
×
339

340
    def get_schema(self, typename):
1✔
341
        """
342
        Get layer schema compatible with :class:`fiona` schema object
343
        """
344

345
        return get_schema(self.url, typename, self.version, auth=self.auth)
×
346

347

348
class ServiceIdentification(object):
1✔
349
    """ Implements IServiceIdentificationMetadata """
350

351
    def __init__(self, infoset, version):
1✔
352
        self._root = infoset
1✔
353
        self.type = testXMLValue(self._root.find(nspath("Name")))
1✔
354
        self.version = version
1✔
355
        self.title = testXMLValue(self._root.find(nspath("Title")))
1✔
356
        self.abstract = testXMLValue(self._root.find(nspath("Abstract")))
1✔
357
        self.keywords = [f.text for f in self._root.findall(nspath("Keywords"))]
1✔
358
        self.fees = testXMLValue(self._root.find(nspath("Fees")))
1✔
359
        self.accessconstraints = testXMLValue(
1✔
360
            self._root.find(nspath("AccessConstraints"))
361
        )
362

363

364
class ServiceProvider(object):
1✔
365
    """ Implements IServiceProviderMetatdata """
366

367
    def __init__(self, infoset):
1✔
368
        self._root = infoset
1✔
369
        self.name = testXMLValue(self._root.find(nspath("Name")))
1✔
370
        self.url = testXMLValue(self._root.find(nspath("OnlineResource")))
1✔
371
        self.keywords = extract_xml_list(self._root.find(nspath("Keywords")))
1✔
372

373

374
class ContentMetadata(AbstractContentMetadata):
1✔
375
    """Abstraction for WFS metadata.
376

377
    Implements IMetadata.
378
    """
379

380
    def __init__(
1✔
381
        self, elem, parent, parse_remote_metadata=False, timeout=30, auth=None
382
    ):
383
        """."""
384
        super(ContentMetadata, self).__init__(auth)
1✔
385
        self.id = testXMLValue(elem.find(nspath("Name")))
1✔
386
        self.title = testXMLValue(elem.find(nspath("Title")))
1✔
387
        self.abstract = testXMLValue(elem.find(nspath("Abstract")))
1✔
388
        self.keywords = [f.text for f in elem.findall(nspath("Keywords"))]
1✔
389

390
        # bboxes
391
        self.boundingBox = None
1✔
392
        b = elem.find(nspath("LatLongBoundingBox"))
1✔
393
        srs = elem.find(nspath("SRS"))
1✔
394

395
        if b is not None:
1✔
396
            self.boundingBox = (
1✔
397
                float(b.attrib["minx"]),
398
                float(b.attrib["miny"]),
399
                float(b.attrib["maxx"]),
400
                float(b.attrib["maxy"]),
401
                Crs(srs.text),
402
            )
403

404
        # transform wgs84 bbox from given default bboxt
405
        self.boundingBoxWGS84 = None
1✔
406

407
        if has_pyproj and b is not None and srs is not None:
1✔
408
            wgs84 = pyproj.CRS.from_epsg(4326)
×
409
            try:
×
410
                src_srs = pyproj.CRS.from_string(srs.text)
×
411
                transformer = pyproj.Transformer.from_crs(src_srs, wgs84, always_xy=True)
×
412
                mincorner = transformer.transform(b.attrib["minx"], b.attrib["miny"])
×
413
                maxcorner = transformer.transform(b.attrib["maxx"], b.attrib["maxy"])
×
414

415
                self.boundingBoxWGS84 = (
×
416
                    mincorner[0],
417
                    mincorner[1],
418
                    maxcorner[0],
419
                    maxcorner[1],
420
                )
421
            except RuntimeError:
×
422
                pass
×
423
        # crs options
424
        self.crsOptions = [Crs(srs.text) for srs in elem.findall(nspath("SRS"))]
1✔
425

426
        # verbs
427
        ops = itertools.chain(
1✔
428
            parent.findall(nspath("Operations/*")), elem.findall(nspath("Operations/*"))
429
        )
430
        self.verbOptions = list({op.tag for op in ops})
1✔
431

432
        # others not used but needed for iContentMetadata harmonisation
433
        self.styles = None
1✔
434
        self.timepositions = None
1✔
435
        self.defaulttimeposition = None
1✔
436

437
        # MetadataURLs
438
        self.metadataUrls = []
1✔
439
        for m in elem.findall(nspath("MetadataURL")):
1✔
440
            metadataUrl = {
1✔
441
                "type": testXMLValue(m.attrib["type"], attrib=True),
442
                "format": testXMLValue(m.attrib["format"], attrib=True),
443
                "url": testXMLValue(m),
444
            }
445
            self.metadataUrls.append(metadataUrl)
1✔
446

447
    def parse_remote_metadata(self, timeout=30):
1✔
448
        """Parse remote metadata for MetadataURL of format 'XML' and add it as metadataUrl['metadata']"""
449
        for metadataUrl in self.metadataUrls:
1✔
450
            if (
×
451
                metadataUrl["url"] is not None and metadataUrl["format"].lower() == "xml"
452
            ):
453
                try:
×
454
                    content = openURL(
×
455
                        metadataUrl["url"], timeout=timeout, headers=self.headers, auth=self.auth
456
                    )
457
                    doc = etree.fromstring(content.read())
×
458
                    if metadataUrl["type"] == "FGDC":
×
459
                        mdelem = doc.find(".//metadata")
×
460
                        if mdelem is not None:
×
461
                            metadataUrl["metadata"] = Metadata(mdelem)
×
462
                        else:
463
                            metadataUrl["metadata"] = None
×
464
                    elif metadataUrl["type"] == "TC211":
×
465
                        mdelem = doc.find(
×
466
                            ".//" + util.nspath_eval("gmd:MD_Metadata", n.get_namespaces(["gmd"]))
467
                        ) or doc.find(
468
                            ".//" + util.nspath_eval("gmi:MI_Metadata", n.get_namespaces(["gmi"]))
469
                        )
470
                        if mdelem is not None:
×
471
                            metadataUrl["metadata"] = MD_Metadata(mdelem)
×
472
                        else:
473
                            metadataUrl["metadata"] = None
×
474
                except Exception:
×
475
                    metadataUrl["metadata"] = None
×
476

477

478
class OperationMetadata:
1✔
479
    """Abstraction for WFS metadata.
480

481
    Implements IMetadata.
482
    """
483

484
    def __init__(self, elem):
1✔
485
        """."""
486
        self.name = xmltag_split(elem.tag)
1✔
487
        # formatOptions
488
        self.formatOptions = [f.tag for f in elem.findall(nspath("ResultFormat/*"))]
1✔
489
        self.methods = []
1✔
490
        for verb in elem.findall(nspath("DCPType/HTTP/*")):
1✔
491
            url = verb.attrib["onlineResource"]
1✔
492
            self.methods.append({"type": xmltag_split(verb.tag), "url": url})
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc