• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geopython / OWSLib / 3663701074

pending completion
3663701074

Pull #851

github

GitHub
Merge 6cd54d613 into 13b1443f7
Pull Request #851: Adding Python 3.10 in CI

7461 of 12701 relevant lines covered (58.74%)

0.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.2
/owslib/feature/wfs110.py
1
# -*- coding: utf-8 -*-
2
# =============================================================================
3
# Copyright (c) 2011 Tom Kralidis
4
#
5
# Authors : Tom Kralidis <tomkralidis@gmail.com>
6
#
7
# Contact email: tomkralidis@gmail.com
8
# =============================================================================
9

10
from io import BytesIO
1✔
11
from urllib.parse import urlencode
1✔
12
from owslib.util import (
1✔
13
    testXMLValue,
14
    nspath_eval,
15
    ServiceException,
16
    Authentication,
17
    # openURL,
18
)
19
from owslib.etree import etree
1✔
20
from owslib.fgdc import Metadata
1✔
21
from owslib.iso import MD_Metadata
1✔
22
from owslib.ows import (
1✔
23
    OwsCommon,
24
    ServiceIdentification,
25
    ServiceProvider,
26
    Constraint,
27
    Parameter,
28
    OperationsMetadata,
29
    BoundingBox
30
)
31
from owslib.fes import FilterCapabilities
1✔
32
from owslib.crs import Crs
1✔
33
from owslib.feature import WebFeatureService_
1✔
34
from owslib.feature.common import (
1✔
35
    WFSCapabilitiesReader,
36
    AbstractContentMetadata,
37
)
38
from owslib.namespaces import Namespaces
1✔
39
from owslib.util import log, openURL
1✔
40

41

42
def get_namespaces():
1✔
43
    n = Namespaces()
1✔
44
    return n.get_namespaces(["gmd", "gml", "gmi", "ogc", "ows", "wfs"])
1✔
45

46

47
namespaces = get_namespaces()
1✔
48

49

50
class WebFeatureService_1_1_0(WebFeatureService_):
1✔
51
    """Abstraction for OGC Web Feature Service (WFS).
52

53
    Implements IWebFeatureService.
54
    """
55

56
    def __new__(
1✔
57
        self,
58
        url,
59
        version,
60
        xml,
61
        parse_remote_metadata=False,
62
        timeout=30,
63
        headers=None,
64
        username=None,
65
        password=None,
66
        auth=None,
67
    ):
68
        """ overridden __new__ method
69

70
        @type url: string
71
        @param url: url of WFS capabilities document
72
        @type xml: string
73
        @param xml: elementtree object
74
        @type parse_remote_metadata: boolean
75
        @param parse_remote_metadata: whether to fully process MetadataURL elements
76
        @param headers: HTTP headers to send with requests
77
        @param timeout: time (in seconds) after which requests should timeout
78
        @param username: service authentication username
79
        @param password: service authentication password
80
        @param auth: instance of owslib.util.Authentication
81
        @return: initialized WebFeatureService_1_1_0 object
82
        """
83
        obj = object.__new__(self)
1✔
84
        obj.__init__(
1✔
85
            url,
86
            version,
87
            xml,
88
            parse_remote_metadata,
89
            timeout,
90
            headers=headers,
91
            username=username,
92
            password=password,
93
            auth=auth,
94
        )
95
        return obj
1✔
96

97
    def __getitem__(self, name):
1✔
98
        """ check contents dictionary to allow dict like access to service layers"""
99
        if name in list(self.__getattribute__("contents").keys()):
×
100
            return self.__getattribute__("contents")[name]
×
101
        else:
102
            raise KeyError("No content named %s" % name)
×
103

104
    def __init__(
1✔
105
        self,
106
        url,
107
        version,
108
        xml=None,
109
        parse_remote_metadata=False,
110
        timeout=30,
111
        headers=None,
112
        username=None,
113
        password=None,
114
        auth=None,
115
    ):
116
        """Initialize."""
117
        if auth:
1✔
118
            if username:
1✔
119
                auth.username = username
×
120
            if password:
1✔
121
                auth.password = password
×
122
        else:
123
            auth = Authentication(username, password)
×
124
        super(WebFeatureService_1_1_0, self).__init__(auth)
1✔
125
        self.url = url
1✔
126
        self.version = version
1✔
127
        self.headers = headers
1✔
128
        self.timeout = timeout
1✔
129
        self._capabilities = None
1✔
130
        self.owscommon = OwsCommon("1.0.0")
1✔
131
        reader = WFSCapabilitiesReader(self.version, headers=self.headers, auth=self.auth)
1✔
132
        if xml:
1✔
133
            self._capabilities = reader.readString(xml)
1✔
134
        else:
135
            self._capabilities = reader.read(self.url, self.timeout)
1✔
136
        self._buildMetadata(parse_remote_metadata)
1✔
137

138
    def _buildMetadata(self, parse_remote_metadata=False):
1✔
139
        """set up capabilities metadata objects: """
140

141
        self.updateSequence = self._capabilities.attrib.get("updateSequence")
1✔
142

143
        # ServiceIdentification
144
        val = self._capabilities.find(
1✔
145
            nspath_eval("ows:ServiceIdentification", namespaces)
146
        )
147
        if val is not None:
1✔
148
            self.identification = ServiceIdentification(val, self.owscommon.namespace)
1✔
149
        # ServiceProvider
150
        val = self._capabilities.find(
1✔
151
            nspath_eval("ows:ServiceProvider", namespaces)
152
        )
153
        if val is not None:
1✔
154
            self.provider = ServiceProvider(val, self.owscommon.namespace)
1✔
155
        # ServiceOperations metadata
156
        self.operations = []
1✔
157
        for elem in self._capabilities.findall(
1✔
158
            nspath_eval("ows:OperationsMetadata/ows:Operation", namespaces)
159
        ):
160
            self.operations.append(OperationsMetadata(elem, self.owscommon.namespace))
1✔
161
        self.constraints = {}
1✔
162
        for elem in self._capabilities.findall(
1✔
163
            nspath_eval("ows:OperationsMetadata/ows:Constraint", namespaces)
164
        ):
165
            self.constraints[elem.attrib["name"]] = Constraint(
1✔
166
                elem, self.owscommon.namespace
167
            )
168
        self.parameters = {}
1✔
169
        for elem in self._capabilities.findall(
1✔
170
            nspath_eval("ows:OperationsMetadata/ows:Parameter", namespaces)
171
        ):
172
            self.parameters[elem.attrib["name"]] = Parameter(
×
173
                elem, self.owscommon.namespace
174
            )
175

176
        # FilterCapabilities
177
        val = self._capabilities.find(
1✔
178
            nspath_eval("ogc:Filter_Capabilities", namespaces)
179
        )
180
        self.filters = FilterCapabilities(val)
1✔
181

182
        # serviceContents metadata: our assumption is that services use a top-level
183
        # layer as a metadata organizer, nothing more.
184

185
        self.contents = {}
1✔
186
        features = self._capabilities.findall(
1✔
187
            nspath_eval("wfs:FeatureTypeList/wfs:FeatureType", namespaces)
188
        )
189
        if features is not None:
1✔
190
            for feature in features:
1✔
191
                cm = ContentMetadata(feature, parse_remote_metadata, headers=self.headers, auth=self.auth)
1✔
192
                self.contents[cm.id] = cm
1✔
193

194
        # exceptions
195
        self.exceptions = [
1✔
196
            f.text for f in self._capabilities.findall("Capability/Exception/Format")
197
        ]
198

199
    def getcapabilities(self):
1✔
200
        """Request and return capabilities document from the WFS as a
201
        file-like object.
202
        NOTE: this is effectively redundant now"""
203
        reader = WFSCapabilitiesReader(self.version, auth=self.auth)
×
204
        return openURL(
×
205
            reader.capabilities_url(self.url), timeout=self.timeout,
206
            headers=self.headers, auth=self.auth
207
        )
208

209
    def items(self):
1✔
210
        """supports dict-like items() access"""
211
        items = []
×
212
        for item in self.contents:
×
213
            items.append((item, self.contents[item]))
×
214
        return items
×
215

216
    def getfeature(
1✔
217
        self,
218
        typename=None,
219
        filter=None,
220
        bbox=None,
221
        featureid=None,
222
        featureversion=None,
223
        propertyname=None,
224
        maxfeatures=None,
225
        srsname=None,
226
        outputFormat=None,
227
        method="Get",
228
        startindex=None,
229
        sortby=None,
230
    ):
231
        """Request and return feature data as a file-like object.
232

233
        Parameters
234
        ----------
235
        typename : list
236
            List of typenames (string)
237
        filter : string
238
            XML-encoded OGC filter expression.
239
        bbox : tuple
240
            (left, bottom, right, top) in the feature type's coordinates.
241
        featureid : list
242
            List of unique feature ids (string)
243
        featureversion : string
244
            Default is most recent feature version.
245
        propertyname : list
246
            List of feature property names. For Get request, '*' matches all.
247
            For Post request, leave blank (None) to get all properties.
248
        maxfeatures : int
249
            Maximum number of features to be returned.
250
        method : string
251
            Qualified name of the HTTP DCP method to use.
252
        srsname: string
253
            EPSG code to request the data in
254
        outputFormat: string (optional)
255
            Requested response format of the request.
256
        startindex: int (optional)
257
            Start position to return feature set (paging in combination with maxfeatures)
258
        sortby: list (optional)
259
            List of property names whose values should be used to order
260
            (upon presentation) the set of feature instances that
261
            satify the query.
262

263
        There are 3 different modes of use
264

265
        1) typename and bbox (simple spatial query). It is assumed, that
266
            bbox coordinates are given *always* in the east,north order
267
        2) typename and filter (more expressive)
268
        3) featureid (direct access to known features)
269
        """
270
        try:
1✔
271
            base_url = next(
1✔
272
                (
273
                    m.get("url")
274
                    for m in self.getOperationByName("GetFeature").methods
275
                    if m.get("type").lower() == method.lower()
276
                )
277
            )
278
        except StopIteration:
×
279
            base_url = self.url
×
280
        request = {"service": "WFS", "version": self.version, "request": "GetFeature"}
1✔
281

282
        if method.lower() == "get":
1✔
283
            if not isinstance(typename, list):
1✔
284
                typename = [typename]
1✔
285

286
            if srsname is not None:
1✔
287
                request["srsname"] = str(srsname)
×
288

289
                # Check, if desired SRS is supported by the service for each
290
                # typename. Warning will be thrown if that SRS is not allowed."
291
                for name in typename:
×
292
                    _ = self.getSRS(srsname, name)
×
293

294
            # check featureid
295
            if featureid:
1✔
296
                request["featureid"] = ",".join(featureid)
×
297

298
            # bbox
299
            elif bbox and typename:
1✔
300
                request["bbox"] = self.getBBOXKVP(bbox, typename)
×
301

302
            # or filter
303
            elif filter and typename:
1✔
304
                request["filter"] = str(filter)
1✔
305

306
            assert len(typename) > 0
1✔
307
            request["typename"] = ",".join(typename)
1✔
308

309
            if propertyname is None:
1✔
310
                propertyname = "*"
1✔
311

312
            if not isinstance(propertyname, list):
1✔
313
                propertyname = [propertyname]
1✔
314
            request["propertyname"] = ",".join(propertyname)
1✔
315

316
            if sortby is not None:
1✔
317
                if not isinstance(sortby, list):
×
318
                    sortby = [sortby]
×
319
                request["sortby"] = ",".join(sortby)
×
320

321
            if featureversion is not None:
1✔
322
                request["featureversion"] = str(featureversion)
×
323
            if maxfeatures is not None:
1✔
324
                request["maxfeatures"] = str(maxfeatures)
×
325
            if startindex is not None:
1✔
326
                request["startindex"] = str(startindex)
×
327
            if outputFormat is not None:
1✔
328
                request["outputFormat"] = outputFormat
×
329

330
            data = urlencode(request)
1✔
331
            log.debug("Making request: %s?%s" % (base_url, data))
1✔
332

333
        elif method.lower() == "post":
×
334
            base_url, data = self.getPOSTGetFeatureRequest(
×
335
                typename=typename,
336
                filter=filter,
337
                bbox=bbox,
338
                featureid=featureid,
339
                featureversion=featureversion,
340
                propertyname=propertyname,
341
                maxfeatures=maxfeatures,
342
                outputFormat=outputFormat,
343
                method='Post',
344
                startindex=startindex,
345
                sortby=sortby,
346
            )
347

348
        u = openURL(base_url, data, method, timeout=self.timeout,
1✔
349
                    headers=self.headers, auth=self.auth)
350

351
        # check for service exceptions, rewrap, and return
352
        # We're going to assume that anything with a content-length > 32k
353
        # is data. We'll check anything smaller.
354
        if "Content-Length" in u.info():
1✔
355
            length = int(u.info()["Content-Length"])
×
356
            have_read = False
×
357
        else:
358
            data = u.read()
1✔
359
            have_read = True
1✔
360
            length = len(data)
1✔
361

362
        if length < 32000:
1✔
363
            if not have_read:
×
364
                data = u.read()
×
365

366
            try:
×
367
                tree = etree.fromstring(data)
×
368
            except BaseException:
×
369
                # Not XML
370
                return BytesIO(data)
×
371
            else:
372
                if tree.tag == "{%s}ServiceExceptionReport" % namespaces["ogc"]:
×
373
                    se = tree.find(nspath_eval("ServiceException", namespaces["ogc"]))
×
374
                    raise ServiceException(str(se.text).strip())
×
375
                else:
376
                    return BytesIO(data)
×
377
        else:
378
            if have_read:
1✔
379
                return BytesIO(data)
1✔
380
            return u
×
381

382
    def getOperationByName(self, name):
1✔
383
        """Return a named content item."""
384
        for item in self.operations:
1✔
385
            if item.name == name:
1✔
386
                return item
1✔
387
        raise KeyError("No operation named %s" % name)
×
388

389

390
class ContentMetadata(AbstractContentMetadata):
1✔
391
    """Abstraction for WFS metadata.
392

393
    Implements IMetadata.
394
    """
395

396
    def __init__(self, elem, parse_remote_metadata=False, timeout=30, headers=None, auth=None):
1✔
397
        """."""
398
        super(ContentMetadata, self).__init__(headers=headers, auth=auth)
1✔
399
        self.id = testXMLValue(elem.find(nspath_eval("wfs:Name", namespaces)))
1✔
400
        self.title = testXMLValue(elem.find(nspath_eval("wfs:Title", namespaces)))
1✔
401
        self.abstract = testXMLValue(elem.find(nspath_eval("wfs:Abstract", namespaces)))
1✔
402
        self.keywords = [
1✔
403
            f.text
404
            for f in elem.findall(nspath_eval("ows:Keywords/ows:Keyword", namespaces))
405
        ]
406

407
        # bbox
408
        self.boundingBoxWGS84 = None
1✔
409
        b = BoundingBox(
1✔
410
            elem.find(nspath_eval("ows:WGS84BoundingBox", namespaces)),
411
            namespaces["ows"],
412
        )
413
        if b is not None:
1✔
414
            try:
1✔
415
                self.boundingBoxWGS84 = (
1✔
416
                    float(b.minx),
417
                    float(b.miny),
418
                    float(b.maxx),
419
                    float(b.maxy),
420
                )
421
            except TypeError:
×
422
                self.boundingBoxWGS84 = None
×
423
        # crs options
424
        self.crsOptions = [
1✔
425
            Crs(srs.text)
426
            for srs in elem.findall(nspath_eval("wfs:OtherSRS", namespaces))
427
        ]
428
        dsrs = testXMLValue(elem.find(nspath_eval("wfs:DefaultSRS", namespaces)))
1✔
429
        if dsrs is not None:  # first element is default srs
1✔
430
            self.crsOptions.insert(0, Crs(dsrs))
1✔
431

432
        # verbs
433
        self.verbOptions = [
1✔
434
            op.text
435
            for op in elem.findall(
436
                nspath_eval("wfs:Operations/wfs:Operation", namespaces)
437
            )
438
        ]
439

440
        # output formats
441
        self.outputFormats = [
1✔
442
            op.text
443
            for op in elem.findall(
444
                nspath_eval("wfs:OutputFormats/wfs:Format", namespaces)
445
            )
446
        ]
447

448
        # MetadataURLs
449
        self.metadataUrls = []
1✔
450
        for m in elem.findall(nspath_eval("wfs:MetadataURL", namespaces)):
1✔
451
            metadataUrl = {
1✔
452
                "type": testXMLValue(m.attrib["type"], attrib=True),
453
                "format": testXMLValue(m.attrib["format"], attrib=True),
454
                "url": testXMLValue(m),
455
            }
456
            self.metadataUrls.append(metadataUrl)
1✔
457

458
        if parse_remote_metadata:
1✔
459
            self.parse_remote_metadata(timeout)
1✔
460

461
        # others not used but needed for iContentMetadata harmonisation
462
        self.styles = None
1✔
463
        self.timepositions = None
1✔
464
        self.defaulttimeposition = None
1✔
465

466
    def parse_remote_metadata(self, timeout=30):
1✔
467
        """Parse remote metadata for MetadataURL of format 'text/xml' and add it as metadataUrl['metadata']"""
468
        for metadataUrl in self.metadataUrls:
1✔
469
            if (
1✔
470
                metadataUrl["url"] is not None and metadataUrl["format"].lower() == "text/xml"
471
            ):
472
                try:
1✔
473
                    content = openURL(metadataUrl["url"], timeout=timeout, headers=self.headers, auth=self.auth)
1✔
474
                    doc = etree.fromstring(content.read())
1✔
475

476
                    if metadataUrl["type"] == "FGDC":
1✔
477
                        mdelem = doc.find(".//metadata")
×
478
                        if mdelem is not None:
×
479
                            metadataUrl["metadata"] = Metadata(mdelem)
×
480
                        else:
481
                            metadataUrl["metadata"] = None
×
482
                    elif metadataUrl["type"] in ["TC211", "19115", "19139"]:
1✔
483
                        mdelem = doc.find(
1✔
484
                            ".//" + nspath_eval("gmd:MD_Metadata", namespaces)
485
                        ) or doc.find(
486
                            ".//" + nspath_eval("gmi:MI_Metadata", namespaces)
487
                        )
488
                        if mdelem is not None:
1✔
489
                            metadataUrl["metadata"] = MD_Metadata(mdelem)
1✔
490
                        else:
491
                            metadataUrl["metadata"] = None
×
492
                except Exception:
×
493
                    metadataUrl["metadata"] = None
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc