• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geopython / OWSLib / 3663701074

pending completion
3663701074

Pull #851

github

GitHub
Merge 6cd54d613 into 13b1443f7
Pull Request #851: Adding Python 3.10 in CI

7461 of 12701 relevant lines covered (58.74%)

0.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.77
/owslib/feature/wfs200.py
1
# =============================================================================
2
# OWSLib. Copyright (C) 2005 Sean C. Gillies
3
#
4
# Contact email: sgillies@frii.com
5
#
6
# $Id: wfs.py 503 2006-02-01 17:09:12Z dokai $
7
# =============================================================================
8

9
# owslib imports:
10
from owslib import util
1✔
11
from owslib.fgdc import Metadata
1✔
12
from owslib.iso import MD_Metadata
1✔
13
from owslib.ows import Constraint, ServiceIdentification, ServiceProvider, OperationsMetadata
1✔
14
from owslib.etree import etree
1✔
15
from owslib.util import nspath, testXMLValue, openURL, Authentication
1✔
16
from owslib.crs import Crs
1✔
17
from owslib.feature import WebFeatureService_
1✔
18
from owslib.feature.common import (
1✔
19
    WFSCapabilitiesReader,
20
    AbstractContentMetadata,
21
)
22
from owslib.namespaces import Namespaces
1✔
23

24
# other imports
25
from io import BytesIO
1✔
26
from urllib.parse import urlencode
1✔
27

28
import logging
1✔
29
from owslib.util import log
1✔
30

31
n = Namespaces()
1✔
32
WFS_NAMESPACE = n.get_namespace("wfs20")
1✔
33
OWS_NAMESPACE = n.get_namespace("ows110")
1✔
34
OGC_NAMESPACE = n.get_namespace("ogc")
1✔
35
GML_NAMESPACE = n.get_namespace("gml")
1✔
36
FES_NAMESPACE = n.get_namespace("fes")
1✔
37

38

39
class ServiceException(Exception):
1✔
40
    pass
1✔
41

42

43
class WebFeatureService_2_0_0(WebFeatureService_):
1✔
44
    """Abstraction for OGC Web Feature Service (WFS).
45

46
    Implements IWebFeatureService.
47
    """
48

49
    def __new__(
1✔
50
        self,
51
        url,
52
        version,
53
        xml,
54
        parse_remote_metadata=False,
55
        timeout=30,
56
        headers=None,
57
        username=None,
58
        password=None,
59
        auth=None,
60
    ):
61
        """ overridden __new__ method
62

63
        @type url: string
64
        @param url: url of WFS capabilities document
65
        @type xml: string
66
        @param xml: elementtree object
67
        @type parse_remote_metadata: boolean
68
        @param parse_remote_metadata: whether to fully process MetadataURL elements
69
        @param headers: HTTP headers to send with requests
70
        @param timeout: time (in seconds) after which requests should timeout
71
        @param username: service authentication username
72
        @param password: service authentication password
73
        @param auth: instance of owslib.util.Authentication
74
        @return: initialized WebFeatureService_2_0_0 object
75
        """
76
        obj = object.__new__(self)
1✔
77
        obj.__init__(
1✔
78
            url,
79
            version,
80
            xml,
81
            parse_remote_metadata,
82
            timeout,
83
            headers=headers,
84
            username=username,
85
            password=password,
86
            auth=auth,
87
        )
88
        return obj
1✔
89

90
    def __getitem__(self, name):
1✔
91
        """ check contents dictionary to allow dict like access to service layers"""
92
        if name in list(self.__getattribute__("contents").keys()):
×
93
            return self.__getattribute__("contents")[name]
×
94
        else:
95
            raise KeyError("No content named %s" % name)
×
96

97
    def __init__(
1✔
98
        self,
99
        url,
100
        version,
101
        xml=None,
102
        parse_remote_metadata=False,
103
        timeout=30,
104
        headers=None,
105
        username=None,
106
        password=None,
107
        auth=None,
108
    ):
109
        """Initialize."""
110
        if auth:
1✔
111
            if username:
1✔
112
                auth.username = username
×
113
            if password:
1✔
114
                auth.password = password
×
115
        else:
116
            auth = Authentication()
×
117
        super(WebFeatureService_2_0_0, self).__init__(auth)
1✔
118
        log.debug("building WFS %s" % url)
1✔
119
        self.url = url
1✔
120
        self.version = version
1✔
121
        self.timeout = timeout
1✔
122
        self.headers = headers
1✔
123
        self._capabilities = None
1✔
124
        reader = WFSCapabilitiesReader(self.version, headers=self.headers, auth=self.auth)
1✔
125
        if xml:
1✔
126
            self._capabilities = reader.readString(xml)
×
127
        else:
128
            self._capabilities = reader.read(self.url, self.timeout)
1✔
129
        self._buildMetadata(parse_remote_metadata)
1✔
130

131
    def _buildMetadata(self, parse_remote_metadata=False):
1✔
132
        """set up capabilities metadata objects: """
133

134
        self.updateSequence = self._capabilities.attrib.get("updateSequence")
1✔
135

136
        # serviceIdentification metadata
137
        serviceidentelem = self._capabilities.find(nspath("ServiceIdentification"))
1✔
138
        if serviceidentelem is not None:
1✔
139
            self.identification = ServiceIdentification(serviceidentelem)
1✔
140
        # need to add to keywords list from featuretypelist information:
141
        featuretypelistelem = self._capabilities.find(
1✔
142
            nspath("FeatureTypeList", ns=WFS_NAMESPACE)
143
        )
144
        if featuretypelistelem is not None:
1✔
145
            featuretypeelems = featuretypelistelem.findall(
1✔
146
                nspath("FeatureType", ns=WFS_NAMESPACE)
147
            )
148
            if serviceidentelem is not None:
1✔
149
                for f in featuretypeelems:
1✔
150
                    kwds = f.findall(nspath("Keywords/Keyword", ns=OWS_NAMESPACE))
1✔
151
                    if kwds is not None:
1✔
152
                        for kwd in kwds[:]:
1✔
153
                            if kwd.text not in self.identification.keywords:
1✔
154
                                self.identification.keywords.append(kwd.text)
1✔
155

156
        # TODO: update serviceProvider metadata, miss it out for now
157
        serviceproviderelem = self._capabilities.find(nspath("ServiceProvider"))
1✔
158
        if serviceproviderelem is not None:
1✔
159
            self.provider = ServiceProvider(serviceproviderelem)
1✔
160

161
        # serviceOperations metadata
162
        self.operations = []
1✔
163

164
        for elem in self._capabilities.find(nspath("OperationsMetadata"))[:]:
1✔
165
            if elem.tag != nspath("ExtendedCapabilities"):
1✔
166
                self.operations.append(OperationsMetadata(elem))
1✔
167
        self.constraints = {}
1✔
168
        for elem in self._capabilities.findall(
1✔
169
            nspath("OperationsMetadata/Constraint", ns=WFS_NAMESPACE)
170
        ):
171
            self.constraints[elem.attrib["name"]] = Constraint(
×
172
                elem, self.owscommon.namespace
173
            )
174
        self.parameters = {}
1✔
175
        for elem in self._capabilities.findall(
1✔
176
            nspath("OperationsMetadata/Parameter", ns=WFS_NAMESPACE)
177
        ):
178
            self.parameters[elem.attrib["name"]] = Parameter(
×
179
                elem, self.owscommon.namespace
180
            )
181

182
        # serviceContents metadata: our assumption is that services use a top-level
183
        # layer as a metadata organizer, nothing more.
184

185
        self.contents = {}
1✔
186
        featuretypelist = self._capabilities.find(
1✔
187
            nspath("FeatureTypeList", ns=WFS_NAMESPACE)
188
        )
189
        features = self._capabilities.findall(
1✔
190
            nspath("FeatureTypeList/FeatureType", ns=WFS_NAMESPACE)
191
        )
192
        for feature in features:
1✔
193
            cm = ContentMetadata(
1✔
194
                feature, featuretypelist, parse_remote_metadata, auth=self.auth
195
            )
196
            self.contents[cm.id] = cm
1✔
197

198
        # exceptions
199
        self.exceptions = [
1✔
200
            f.text for f in self._capabilities.findall("Capability/Exception/Format")
201
        ]
202

203
    def getcapabilities(self):
1✔
204
        """Request and return capabilities document from the WFS as a
205
        file-like object.
206
        NOTE: this is effectively redundant now"""
207
        reader = WFSCapabilitiesReader(self.version, auth=self.auth)
×
208
        return openURL(
×
209
            reader.capabilities_url(self.url), timeout=self.timeout,
210
            headers=self.headers, auth=self.auth
211
        )
212

213
    def items(self):
1✔
214
        """supports dict-like items() access"""
215
        items = []
×
216
        for item in self.contents:
×
217
            items.append((item, self.contents[item]))
×
218
        return items
×
219

220
    def getfeature(
1✔
221
        self,
222
        typename=None,
223
        filter=None,
224
        bbox=None,
225
        featureid=None,
226
        featureversion=None,
227
        propertyname=None,
228
        maxfeatures=None,
229
        storedQueryID=None,
230
        storedQueryParams=None,
231
        method="Get",
232
        outputFormat=None,
233
        startindex=None,
234
        sortby=None,
235
    ):
236
        """Request and return feature data as a file-like object.
237

238
        #TODO: NOTE: have changed property name from ['*'] to None - check the use of this in WFS 2.0
239

240
        Parameters
241
        ----------
242
        typename : list
243
            List of typenames (string)
244
        filter : string
245
            XML-encoded OGC filter expression.
246
        bbox : tuple
247
            (left, bottom, right, top) in the feature type's coordinates == (minx, miny, maxx, maxy)
248
        featureid : list
249
            List of unique feature ids (string)
250
        featureversion : string
251
            Default is most recent feature version.
252
        propertyname : list
253
            List of feature property names. For Get request, '*' matches all.
254
            For Post request, leave blank (None) to get all properties.
255
        maxfeatures : int
256
            Maximum number of features to be returned.
257
        storedQueryID : string
258
            A name identifying a prepared set available in WFS-service
259
        storedQueryParams : dict
260
            Variable amount of extra information sent to server related to
261
            storedQueryID to further define the requested data
262
            {'parameter_name': parameter_value}
263
        method : string
264
            Qualified name of the HTTP DCP method to use.
265
        outputFormat: string (optional)
266
            Requested response format of the request.
267
        startindex: int (optional)
268
            Start position to return feature set (paging in combination with maxfeatures)
269
        sortby: list (optional)
270
            List of property names whose values should be used to order
271
            (upon presentation) the set of feature instances that
272
            satify the query.
273

274
        There are 5 different modes of use
275

276
        1) typename and bbox (simple spatial query)
277
        2) typename and filter (==query) (more expressive)
278
        3) featureid (direct access to known features)
279
        4) storedQueryID and optional storedQueryParams
280
        5) filter only via Post method
281

282
        Raises:
283
            ServiceException: If there is an error during the request
284

285
        Returns:
286
            BytesIO -- Data returned from the service as a file-like object
287
        """
288
        storedQueryParams = storedQueryParams or {}
1✔
289
        url = data = None
1✔
290
        if typename and type(typename) == type(""):  # noqa: E721
1✔
291
            typename = [typename]
1✔
292
        if method.upper() == "GET":
1✔
293
            (url) = self.getGETGetFeatureRequest(
1✔
294
                typename,
295
                filter,
296
                bbox,
297
                featureid,
298
                featureversion,
299
                propertyname,
300
                maxfeatures,
301
                storedQueryID,
302
                storedQueryParams,
303
                outputFormat,
304
                "Get",
305
                startindex,
306
                sortby,
307
            )
308
            log.debug("GetFeature WFS GET url %s" % url)
1✔
309
        else:
310
            url, data = self.getPOSTGetFeatureRequest(
×
311
                typename,
312
                filter,
313
                bbox,
314
                featureid,
315
                featureversion,
316
                propertyname,
317
                maxfeatures,
318
                storedQueryID,
319
                storedQueryParams,
320
                outputFormat,
321
                "Post",
322
                startindex,
323
                sortby)
324

325
        u = openURL(url, data, method, timeout=self.timeout, headers=self.headers, auth=self.auth)
1✔
326

327
        # check for service exceptions, rewrap, and return
328
        # We're going to assume that anything with a content-length > 32k
329
        # is data. We'll check anything smaller.
330
        if "Content-Length" in u.info():
1✔
331
            length = int(u.info()["Content-Length"])
×
332
            have_read = False
×
333
        else:
334
            data = u.read()
1✔
335
            have_read = True
1✔
336
            length = len(data)
1✔
337

338
        if length < 32000:
1✔
339
            if not have_read:
×
340
                data = u.read()
×
341

342
            try:
×
343
                tree = etree.fromstring(data)
×
344
            except BaseException:
×
345
                # Not XML
346
                return BytesIO(data)
×
347
            else:
348
                if tree.tag == "{%s}ServiceExceptionReport" % OGC_NAMESPACE:
×
349
                    se = tree.find(nspath("ServiceException", OGC_NAMESPACE))
×
350
                    raise ServiceException(str(se.text).strip())
×
351
                else:
352
                    return BytesIO(data)
×
353
        else:
354
            if have_read:
1✔
355
                return BytesIO(data)
1✔
356
            return u
×
357

358
    def getpropertyvalue(
1✔
359
        self,
360
        query=None,
361
        storedquery_id=None,
362
        valuereference=None,
363
        typename=None,
364
        method=nspath("Get"),
365
        **kwargs
366
    ):
367
        """ the WFS GetPropertyValue method"""
368
        try:
×
369
            base_url = next(
×
370
                (
371
                    m.get("url")
372
                    for m in self.getOperationByName("GetPropertyValue").methods
373
                    if m.get("type").lower() == method.lower()
374
                )
375
            )
376
        except StopIteration:
×
377
            base_url = self.url
×
378
        request = {
×
379
            "service": "WFS",
380
            "version": self.version,
381
            "request": "GetPropertyValue",
382
        }
383
        if query:
×
384
            request["query"] = str(query)
×
385
        if valuereference:
×
386
            request["valueReference"] = str(valuereference)
×
387
        if storedquery_id:
×
388
            request["storedQuery_id"] = str(storedquery_id)
×
389
        if typename:
×
390
            request["typename"] = str(typename)
×
391
        if kwargs:
×
392
            for kw in list(kwargs.keys()):
×
393
                request[kw] = str(kwargs[kw])
×
394
        encoded_request = urlencode(request)
×
395
        u = openURL(base_url + encoded_request, timeout=self.timeout, headers=self.headers, auth=self.auth)
×
396
        return u.read()
×
397

398
    def _getStoredQueries(self):
1✔
399
        """ gets descriptions of the stored queries available on the server """
400
        sqs = []
1✔
401
        # This method makes two calls to the WFS - one ListStoredQueries, and one DescribeStoredQueries.
402
        # The information is then aggregated in 'StoredQuery' objects
403
        method = nspath("Get")
1✔
404

405
        # first make the ListStoredQueries response and save the results in a dictionary
406
        # if form {storedqueryid:(title, returnfeaturetype)}
407
        try:
1✔
408
            base_url = next(
1✔
409
                (
410
                    m.get("url")
411
                    for m in self.getOperationByName("ListStoredQueries").methods
412
                    if m.get("type").lower() == method.lower()
413
                )
414
            )
415
        except StopIteration:
1✔
416
            base_url = self.url
1✔
417

418
        request = {
1✔
419
            "service": "WFS",
420
            "version": self.version,
421
            "request": "ListStoredQueries",
422
        }
423
        encoded_request = urlencode(request)
1✔
424
        u = openURL(
1✔
425
            base_url, data=encoded_request, timeout=self.timeout, headers=self.headers, auth=self.auth
426
        )
427
        tree = etree.fromstring(u.read())
1✔
428
        tempdict = {}
1✔
429
        for sqelem in tree[:]:
1✔
430
            title = rft = id = None
1✔
431
            id = sqelem.get("id")
1✔
432
            for elem in sqelem[:]:
1✔
433
                if elem.tag == nspath("Title", WFS_NAMESPACE):
1✔
434
                    title = elem.text
1✔
435
                elif elem.tag == nspath("ReturnFeatureType", WFS_NAMESPACE):
1✔
436
                    rft = elem.text
1✔
437
            tempdict[id] = (title, rft)  # store in temporary dictionary
1✔
438

439
        # then make the DescribeStoredQueries request and get the rest of the information about the stored queries
440
        try:
1✔
441
            base_url = next(
1✔
442
                (
443
                    m.get("url")
444
                    for m in self.getOperationByName("DescribeStoredQueries").methods
445
                    if m.get("type").lower() == method.lower()
446
                )
447
            )
448
        except StopIteration:
1✔
449
            base_url = self.url
1✔
450
        request = {
1✔
451
            "service": "WFS",
452
            "version": self.version,
453
            "request": "DescribeStoredQueries",
454
        }
455
        encoded_request = urlencode(request)
1✔
456
        u = openURL(
1✔
457
            base_url, data=encoded_request, timeout=self.timeout, headers=self.headers, auth=self.auth
458
        )
459
        tree = etree.fromstring(u.read())
1✔
460
        tempdict2 = {}
1✔
461
        for sqelem in tree[:]:
1✔
462
            params = []  # list to store parameters for the stored query description
1✔
463
            id = sqelem.get("id")
1✔
464
            for elem in sqelem[:]:
1✔
465
                abstract = ""
1✔
466
                if elem.tag == nspath("Abstract", WFS_NAMESPACE):
1✔
467
                    abstract = elem.text
1✔
468
                elif elem.tag == nspath("Parameter", WFS_NAMESPACE):
1✔
469
                    newparam = Parameter(elem.get("name"), elem.get("type"))
1✔
470
                    params.append(newparam)
1✔
471
            tempdict2[id] = (abstract, params)  # store in another temporary dictionary
1✔
472

473
        # now group the results into StoredQuery objects:
474
        for key in list(tempdict.keys()):
1✔
475
            sqs.append(
1✔
476
                StoredQuery(
477
                    key,
478
                    tempdict[key][0],
479
                    tempdict[key][1],
480
                    tempdict2[key][0],
481
                    tempdict2[key][1],
482
                )
483
            )
484
        return sqs
1✔
485

486
    storedqueries = property(_getStoredQueries, None)
1✔
487

488
    def getOperationByName(self, name):
1✔
489
        """Return a named content item."""
490
        for item in self.operations:
1✔
491
            if item.name == name:
1✔
492
                return item
1✔
493
        raise KeyError("No operation named %s" % name)
×
494

495

496
class StoredQuery(object):
1✔
497
    """' Class to describe a storedquery """
498

499
    def __init__(self, id, title, returntype, abstract, parameters):
1✔
500
        self.id = id
1✔
501
        self.title = title
1✔
502
        self.returnfeaturetype = returntype
1✔
503
        self.abstract = abstract
1✔
504
        self.parameters = parameters
1✔
505

506

507
class Parameter(object):
1✔
508
    def __init__(self, name, type):
1✔
509
        self.name = name
1✔
510
        self.type = type
1✔
511

512

513
class ContentMetadata(AbstractContentMetadata):
1✔
514
    """Abstraction for WFS metadata.
515

516
    Implements IMetadata.
517
    """
518

519
    def __init__(
1✔
520
        self, elem, parent, parse_remote_metadata=False, timeout=30, headers=None, auth=None
521
    ):
522
        """."""
523
        super(ContentMetadata, self).__init__(headers=headers, auth=auth)
1✔
524
        self.id = elem.find(nspath("Name", ns=WFS_NAMESPACE)).text
1✔
525
        self.title = elem.find(nspath("Title", ns=WFS_NAMESPACE)).text
1✔
526
        abstract = elem.find(nspath("Abstract", ns=WFS_NAMESPACE))
1✔
527
        if abstract is not None:
1✔
528
            self.abstract = abstract.text
1✔
529
        else:
530
            self.abstract = None
×
531
        self.keywords = [
1✔
532
            f.text for f in elem.findall(nspath("Keywords", ns=WFS_NAMESPACE))
533
        ]
534

535
        # bboxes
536
        self.boundingBoxWGS84 = None
1✔
537
        b = elem.find(nspath("WGS84BoundingBox", ns=OWS_NAMESPACE))
1✔
538
        if b is not None:
1✔
539
            try:
1✔
540
                lc = b.find(nspath("LowerCorner", ns=OWS_NAMESPACE))
1✔
541
                uc = b.find(nspath("UpperCorner", ns=OWS_NAMESPACE))
1✔
542
                ll = [float(s) for s in lc.text.split()]
1✔
543
                ur = [float(s) for s in uc.text.split()]
1✔
544
                self.boundingBoxWGS84 = (ll[0], ll[1], ur[0], ur[1])
1✔
545

546
                # there is no such think as bounding box
547
                # make copy of the WGS84BoundingBox
548
                self.boundingBox = (
1✔
549
                    self.boundingBoxWGS84[0],
550
                    self.boundingBoxWGS84[1],
551
                    self.boundingBoxWGS84[2],
552
                    self.boundingBoxWGS84[3],
553
                    Crs("epsg:4326"),
554
                )
555
            except AttributeError:
1✔
556
                self.boundingBoxWGS84 = None
1✔
557
        # crs options
558
        self.crsOptions = [
1✔
559
            Crs(srs.text) for srs in elem.findall(nspath("OtherCRS", ns=WFS_NAMESPACE))
560
        ]
561
        defaultCrs = elem.findall(nspath("DefaultCRS", ns=WFS_NAMESPACE))
1✔
562
        if len(defaultCrs) > 0:
1✔
563
            self.crsOptions.insert(0, Crs(defaultCrs[0].text))
1✔
564

565
        # verbs
566
        self.verbOptions = [
1✔
567
            op.tag for op in parent.findall(nspath("Operations/*", ns=WFS_NAMESPACE))
568
        ]
569
        self.verbOptions + [
1✔
570
            op.tag
571
            for op in elem.findall(nspath("Operations/*", ns=WFS_NAMESPACE))
572
            if op.tag not in self.verbOptions
573
        ]
574

575
        # others not used but needed for iContentMetadata harmonisation
576
        self.styles = None
1✔
577
        self.timepositions = None
1✔
578
        self.defaulttimeposition = None
1✔
579

580
        # MetadataURLs
581
        self.metadataUrls = []
1✔
582
        for m in elem.findall(nspath("MetadataURL", ns=WFS_NAMESPACE)):
1✔
583
            metadataUrl = {
1✔
584
                "url": testXMLValue(
585
                    m.attrib["{http://www.w3.org/1999/xlink}href"], attrib=True
586
                )
587
            }
588
            self.metadataUrls.append(metadataUrl)
1✔
589

590
        if parse_remote_metadata:
1✔
591
            self.parse_remote_metadata(timeout)
1✔
592

593
    def parse_remote_metadata(self, timeout=30):
1✔
594
        """Parse remote metadata for MetadataURL and add it as metadataUrl['metadata']"""
595
        for metadataUrl in self.metadataUrls:
1✔
596
            if metadataUrl["url"] is not None:
1✔
597
                try:
1✔
598
                    content = openURL(metadataUrl["url"], timeout=timeout, headers=self.headers, auth=self.auth)
1✔
599
                    doc = etree.fromstring(content.read())
1✔
600

601
                    mdelem = doc.find(".//metadata")
1✔
602
                    if mdelem is not None:
1✔
603
                        metadataUrl["metadata"] = Metadata(mdelem)
×
604
                        continue
×
605

606
                    mdelem = doc.find(
1✔
607
                        ".//" + util.nspath_eval("gmd:MD_Metadata", n.get_namespaces(["gmd"]))
608
                    ) or doc.find(
609
                        ".//" + util.nspath_eval("gmi:MI_Metadata", n.get_namespaces(["gmi"]))
610
                    )
611
                    if mdelem is not None:
1✔
612
                        metadataUrl["metadata"] = MD_Metadata(mdelem)
1✔
613
                        continue
1✔
614
                except Exception:
1✔
615
                    metadataUrl["metadata"] = None
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc