• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geopython / OWSLib / 3663701074

pending completion
3663701074

Pull #851

github

GitHub
Merge 6cd54d613 into 13b1443f7
Pull Request #851: Adding Python 3.10 in CI

7461 of 12701 relevant lines covered (58.74%)

0.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.44
/owslib/catalogue/csw3.py
1
# -*- coding: ISO-8859-15 -*-
2
# =============================================================================
3
# Copyright (c) 2021 Tom Kralidis
4
#
5
# Authors : Tom Kralidis <tomkralidis@gmail.com>
6
#
7
# Contact email: tomkralidis@gmail.com
8
# =============================================================================
9

10
""" CSW 3.0.0 request and response processor """
1✔
11

12
import inspect
1✔
13
import warnings
1✔
14
from io import BytesIO
1✔
15
import random
1✔
16
from urllib.parse import urlencode
1✔
17

18
from owslib.etree import etree
1✔
19
from owslib import fes2
1✔
20
from owslib import util
1✔
21
from owslib import ows
1✔
22
from owslib.iso import MD_Metadata, FC_FeatureCatalogue
1✔
23
from owslib.fgdc import Metadata
1✔
24
from owslib.dif import DIF
1✔
25
from owslib.gm03 import GM03
1✔
26
from owslib.namespaces import Namespaces
1✔
27
from owslib.util import cleanup_namespaces, bind_url, add_namespaces, OrderedDict, Authentication, openURL, http_post
1✔
28

29
# default variables
30
outputformat = 'application/xml'
1✔
31

32

33
def get_namespaces():
1✔
34
    n = Namespaces()
1✔
35
    return n.get_namespaces()
1✔
36

37

38
namespaces = get_namespaces()
1✔
39
schema = 'http://schemas.opengis.net/cat/csw/3.0/cswAll.xsd'
1✔
40
schema_location = '%s %s' % (namespaces['csw30'], schema)
1✔
41

42

43
class CatalogueServiceWeb(object):
1✔
44
    """ csw request class """
45
    def __init__(self, url, lang='en-US', version='3.0.0', timeout=10, skip_caps=False,
1✔
46
                 username=None, password=None, auth=None, headers=None):
47
        """
48

49
        Construct and process a GetCapabilities request
50

51
        Parameters
52
        ----------
53

54
        - url: the URL of the CSW
55
        - lang: the language (default is 'en-US')
56
        - version: version (default is '3.0.0')
57
        - timeout: timeout in seconds
58
        - skip_caps: whether to skip GetCapabilities processing on init (default is False)
59
        - username: username for HTTP basic authentication
60
        - password: password for HTTP basic authentication
61
        - auth: instance of owslib.util.Authentication
62
        - headers: HTTP headers to send with requests
63

64
        """
65
        if auth:
1✔
66
            if username:
1✔
67
                auth.username = username
×
68
            if password:
1✔
69
                auth.password = password
×
70
        self.url = util.clean_ows_url(url)
1✔
71
        self.lang = lang
1✔
72
        self.version = version
1✔
73
        self.timeout = timeout
1✔
74
        self.auth = auth or Authentication(username, password)
1✔
75
        self.headers = headers
1✔
76
        self.service = 'CSW'
1✔
77
        self.exceptionreport = None
1✔
78
        self.owscommon = ows.OwsCommon('2.0.0')
1✔
79

80
        if not skip_caps:  # process GetCapabilities
1✔
81
            # construct request
82

83
            data = {'service': self.service, 'version': self.version, 'request': 'GetCapabilities'}
1✔
84

85
            self.request = urlencode(data)
1✔
86

87
            self._invoke()
1✔
88

89
            if self.exceptionreport is None:
1✔
90
                self.updateSequence = self._exml.getroot().attrib.get('updateSequence')
1✔
91

92
                # ServiceIdentification
93
                val = self._exml.find(util.nspath_eval('ows200:ServiceIdentification', namespaces))
1✔
94
                if val is not None:
1✔
95
                    self.identification = ows.ServiceIdentification(val, self.owscommon.namespace)
1✔
96
                else:
97
                    self.identification = None
×
98
                # ServiceProvider
99
                val = self._exml.find(util.nspath_eval('ows200:ServiceProvider', namespaces))
1✔
100
                if val is not None:
1✔
101
                    self.provider = ows.ServiceProvider(val, self.owscommon.namespace)
1✔
102
                else:
103
                    self.provider = None
×
104
                # ServiceOperations metadata
105
                self.operations = []
1✔
106
                for elem in self._exml.findall(util.nspath_eval('ows200:OperationsMetadata/ows200:Operation', namespaces)):  # noqa
1✔
107
                    self.operations.append(ows.OperationsMetadata(elem, self.owscommon.namespace))
1✔
108
                self.constraints = {}
1✔
109
                for elem in self._exml.findall(util.nspath_eval('ows200:OperationsMetadata/ows200:Constraint', namespaces)):  # noqa
1✔
110
                    self.constraints[elem.attrib['name']] = ows.Constraint(elem, self.owscommon.namespace)
1✔
111
                self.parameters = {}
1✔
112
                for elem in self._exml.findall(util.nspath_eval('ows200:OperationsMetadata/ows200:Parameter', namespaces)):  # noqa
1✔
113
                    self.parameters[elem.attrib['name']] = ows.Parameter(elem, self.owscommon.namespace)
1✔
114

115
                # FilterCapabilities
116
                val = self._exml.find(util.nspath_eval('fes:Filter_Capabilities', namespaces))
1✔
117
                self.filters = fes2.FilterCapabilities(val)
1✔
118

119
    def getdomain(self, dname, dtype='parameter'):
1✔
120
        """
121

122
        Construct and process a GetDomain request
123

124
        Parameters
125
        ----------
126

127
        - dname: the value of the Parameter or Property to query
128
        - dtype: whether to query a parameter (parameter) or property (property)
129

130
        """
131

132
        # construct request
133
        dtypename = 'ParameterName'
×
134
        node0 = self._setrootelement('csw30:GetDomain')
×
135
        node0.set('service', self.service)
×
136
        node0.set('version', self.version)
×
137
        node0.set(util.nspath_eval('xsi:schemaLocation', namespaces), schema_location)
×
138
        if dtype == 'property':
×
139
            dtypename = 'ValueReference'
×
140
        else:
141
            dtypename = 'ParameterName'
×
142

143
        etree.SubElement(node0, util.nspath_eval('csw30:%s' % dtypename, namespaces)).text = dname
×
144

145
        self.request = node0
×
146

147
        self._invoke()
×
148

149
        if self.exceptionreport is None:
×
150
            self.results = {}
×
151

152
            val = self._exml.find(util.nspath_eval('csw30:DomainValues', namespaces)).attrib.get('type')
×
153
            self.results['type'] = util.testXMLValue(val, True)
×
154

155
            val = self._exml.find(util.nspath_eval('csw30:DomainValues/csw30:%s' % dtypename, namespaces))
×
156
            self.results[dtype] = util.testXMLValue(val)
×
157

158
            # get the list of values associated with the Domain
159
            self.results['values'] = []
×
160

161
            for f in self._exml.findall(util.nspath_eval('csw30:DomainValues/csw30:ListOfValues/csw30:Value', namespaces)):  # noqa
×
162
                self.results['values'].append(util.testXMLValue(f))
×
163

164
    def getrecordbyid(self, id=[], esn='full', outputschema=namespaces['csw30'], format=outputformat):
1✔
165
        """
166

167
        Construct and process a GetRecordById request
168

169
        Parameters
170
        ----------
171

172
        - id: the list of Ids
173
        - esn: the ElementSetName 'full', 'brief' or 'summary' (default is 'full')
174
        - outputschema: the outputSchema (default is 'http://www.opengis.net/cat/csw/3.0.0')
175
        - format: the outputFormat (default is 'application/xml')
176

177
        """
178

179
        # construct request
180
        data = {
×
181
            'service': self.service,
182
            'version': self.version,
183
            'request': 'GetRecordById',
184
            'outputFormat': format,
185
            'outputSchema': outputschema,
186
            'elementsetname': esn,
187
            'id': ','.join(id),
188
        }
189

190
        self.request = urlencode(data)
×
191

192
        self._invoke()
×
193

194
        if self.exceptionreport is None:
×
195
            self.results = {}
×
196
            self.records = OrderedDict()
×
197
            self._parserecords(outputschema, esn)
×
198

199
    def getrecords(self, constraints=[], sortby=None, typenames='csw30:Record', esn='summary',
1✔
200
                   outputschema=namespaces['csw30'], format=outputformat, startposition=0,
201
                   maxrecords=10, cql=None, xml=None, distributedsearch=False, hopcount=2,
202
                   federatedcatalogues=[]):
203
        """
204

205
        Construct and process a  GetRecords request
206

207
        Parameters
208
        ----------
209

210
        - constraints: the list of constraints (OgcExpression from owslib.fes2 module)
211
        - sortby: an OGC SortBy object (SortBy from owslib.fes2 module)
212
        - typenames: the typeNames to query against (default is csw30:Record)
213
        - esn: the ElementSetName 'full', 'brief' or 'summary' (default is 'summary')
214
        - outputschema: the outputSchema (default is 'http://www.opengis.net/cat/csw/3.0.0')
215
        - format: the outputFormat (default is 'application/xml')
216
        - startposition: requests a slice of the result set, starting at this position (default is 0)
217
        - maxrecords: the maximum number of records to return. No records are returned if 0 (default is 10)
218
        - cql: common query language text.  Note this overrides bbox, qtype, keywords
219
        - xml: raw XML request.  Note this overrides all other options
220
        - distributedsearch: `bool` of whether to trigger distributed search
221
        - hopcount: number of message hops before search is terminated (default is 1)
222
        - federatedcatalogues: list of CSW 3 URLs
223

224
        """
225

226
        if xml is not None:
1✔
227
            if isinstance(xml, bytes):
×
228
                startswith_xml = xml.startswith(b'<')
×
229
            else:  # str
230
                startswith_xml = xml.startswith('<')
×
231

232
            if startswith_xml:
×
233
                self.request = etree.fromstring(xml)
×
234
                val = self.request.find(util.nspath_eval('csw30:Query/csw30:ElementSetName', namespaces))
×
235
                if val is not None:
×
236
                    esn = util.testXMLValue(val)
×
237
                val = self.request.attrib.get('outputSchema')
×
238
                if val is not None:
×
239
                    outputschema = util.testXMLValue(val, True)
×
240
            else:
241
                self.request = xml
×
242
        else:
243
            # construct request
244
            node0 = self._setrootelement('csw30:GetRecords')
1✔
245
            if etree.__name__ != 'lxml.etree':  # apply nsmap manually
1✔
246
                node0.set('xmlns:ows110', namespaces['ows110'])
1✔
247
                node0.set('xmlns:gmd', namespaces['gmd'])
1✔
248
                node0.set('xmlns:dif', namespaces['dif'])
1✔
249
                node0.set('xmlns:fgdc', namespaces['fgdc'])
1✔
250
            node0.set('outputSchema', outputschema)
1✔
251
            node0.set('outputFormat', format)
1✔
252
            node0.set('version', self.version)
1✔
253
            node0.set('service', self.service)
1✔
254
            if startposition > 0:
1✔
255
                node0.set('startPosition', str(startposition))
×
256
            node0.set('maxRecords', str(maxrecords))
1✔
257
            node0.set(util.nspath_eval('xsi:schemaLocation', namespaces), schema_location)
1✔
258

259
            if distributedsearch:
1✔
260
                node00 = etree.SubElement(node0, util.nspath_eval('csw30:DistributedSearch', namespaces),
×
261
                                          hopCount=str(hopcount), clientId='owslib',
262
                                          distributedSearchId='owslib-request')
263

264
                if federatedcatalogues:
×
265
                    for fc in federatedcatalogues:
×
266
                        etree.SubElement(node00, util.nspath_eval('csw30:federatedCatalogues', namespaces),
×
267
                                         catalogueURL=fc)
268

269
            node1 = etree.SubElement(node0, util.nspath_eval('csw30:Query', namespaces))
1✔
270
            node1.set('typeNames', typenames)
1✔
271

272
            etree.SubElement(node1, util.nspath_eval('csw30:ElementSetName', namespaces)).text = esn
1✔
273

274
            if any([len(constraints) > 0, cql is not None]):
1✔
275
                node2 = etree.SubElement(node1, util.nspath_eval('csw30:Constraint', namespaces))
×
276
                node2.set('version', '1.1.0')
×
277
                flt = fes2.FilterRequest()
×
278
                if len(constraints) > 0:
×
279
                    node2.append(flt.setConstraintList(constraints))
×
280
                # Now add a CQL filter if passed in
281
                elif cql is not None:
×
282
                    etree.SubElement(node2, util.nspath_eval('csw30:CqlText', namespaces)).text = cql
×
283

284
            if sortby is not None and isinstance(sortby, fes2.SortBy):
1✔
285
                node1.append(sortby.toXML())
×
286

287
            self.request = node0
1✔
288

289
        self._invoke()
1✔
290

291
        if self.exceptionreport is None:
1✔
292
            self.results = {}
1✔
293

294
            # process search results attributes
295
            val = self._exml.find(
1✔
296
                util.nspath_eval('csw30:SearchResults', namespaces)).attrib.get('numberOfRecordsMatched')
297
            self.results['matches'] = int(util.testXMLValue(val, True))
1✔
298
            val = self._exml.find(
1✔
299
                util.nspath_eval('csw30:SearchResults', namespaces)).attrib.get('numberOfRecordsReturned')
300
            self.results['returned'] = int(util.testXMLValue(val, True))
1✔
301
            val = self._exml.find(util.nspath_eval('csw30:SearchResults', namespaces)).attrib.get('nextRecord')
1✔
302
            if val is not None:
1✔
303
                self.results['nextrecord'] = int(util.testXMLValue(val, True))
1✔
304
            else:
305
                warnings.warn("""CSW Server did not supply a nextRecord value (it is optional), so the client
×
306
                should page through the results in another way.""")
307
                # For more info, see:
308
                # https://github.com/geopython/OWSLib/issues/100
309
                self.results['nextrecord'] = None
×
310

311
            # process list of matching records
312
            self.records = OrderedDict()
1✔
313

314
            self._parserecords(outputschema, esn)
1✔
315

316
    def transaction(self, ttype=None, typename='csw30:Record', record=None, propertyname=None, propertyvalue=None,
1✔
317
                    bbox=None, keywords=[], cql=None, identifier=None):
318
        """
319

320
        Construct and process a Transaction request
321

322
        Parameters
323
        ----------
324

325
        - ttype: the type of transaction 'insert, 'update', 'delete'
326
        - typename: the typename to describe (default is 'csw30:Record')
327
        - record: the XML record to insert
328
        - propertyname: the RecordProperty/PropertyName to Filter against
329
        - propertyvalue: the RecordProperty Value to Filter against (for updates)
330
        - bbox: the bounding box of the spatial query in the form [minx,miny,maxx,maxy]
331
        - keywords: list of keywords
332
        - cql: common query language text.  Note this overrides bbox, qtype, keywords
333
        - identifier: record identifier.  Note this overrides bbox, qtype, keywords, cql
334

335
        """
336

337
        # construct request
338
        node0 = self._setrootelement('csw30:Transaction')
×
339
        node0.set('version', self.version)
×
340
        node0.set('service', self.service)
×
341
        node0.set(util.nspath_eval('xsi:schemaLocation', namespaces), schema_location)
×
342

343
        validtransactions = ['insert', 'update', 'delete']
×
344

345
        if ttype not in validtransactions:  # invalid transaction
×
346
            raise RuntimeError('Invalid transaction \'%s\'.' % ttype)
×
347

348
        node1 = etree.SubElement(node0, util.nspath_eval('csw30:%s' % ttype.capitalize(), namespaces))
×
349

350
        if ttype != 'update':
×
351
            node1.set('typeName', typename)
×
352

353
        if ttype == 'insert':
×
354
            if record is None:
×
355
                raise RuntimeError('Nothing to insert.')
×
356
            node1.append(etree.fromstring(record))
×
357

358
        if ttype == 'update':
×
359
            if record is not None:
×
360
                node1.append(etree.fromstring(record))
×
361
            else:
362
                if propertyname is not None and propertyvalue is not None:
×
363
                    node2 = etree.SubElement(node1, util.nspath_eval('csw30:RecordProperty', namespaces))
×
364
                    etree.SubElement(node2, util.nspath_eval('csw30:Name', namespaces)).text = propertyname
×
365
                    etree.SubElement(node2, util.nspath_eval('csw30:Value', namespaces)).text = propertyvalue
×
366
                    self._setconstraint(node1, None, propertyname, keywords, bbox, cql, identifier)
×
367

368
        if ttype == 'delete':
×
369
            self._setconstraint(node1, None, propertyname, keywords, bbox, cql, identifier)
×
370

371
        self.request = node0
×
372

373
        self._invoke()
×
374
        self.results = {}
×
375

376
        if self.exceptionreport is None:
×
377
            self._parsetransactionsummary()
×
378
            self._parseinsertresult()
×
379

380
    def harvest(self, source, resourcetype, resourceformat=None, harvestinterval=None, responsehandler=None):
1✔
381
        """
382

383
        Construct and process a Harvest request
384

385
        Parameters
386
        ----------
387

388
        - source: a URI to harvest
389
        - resourcetype: namespace identifying the type of resource
390
        - resourceformat: MIME type of the resource
391
        - harvestinterval: frequency of harvesting, in ISO8601
392
        - responsehandler: endpoint that CSW should responsd to with response
393

394
        """
395

396
        # construct request
397
        node0 = self._setrootelement('csw30:Harvest')
×
398
        node0.set('version', self.version)
×
399
        node0.set('service', self.service)
×
400
        node0.set(util.nspath_eval('xsi:schemaLocation', namespaces), schema_location)
×
401
        etree.SubElement(node0, util.nspath_eval('csw30:Source', namespaces)).text = source
×
402
        etree.SubElement(node0, util.nspath_eval('csw30:ResourceType', namespaces)).text = resourcetype
×
403
        if resourceformat is not None:
×
404
            etree.SubElement(node0, util.nspath_eval('csw30:ResourceFormat', namespaces)).text = resourceformat
×
405
        if harvestinterval is not None:
×
406
            etree.SubElement(node0, util.nspath_eval('csw30:HarvestInterval', namespaces)).text = harvestinterval
×
407
        if responsehandler is not None:
×
408
            etree.SubElement(node0, util.nspath_eval('csw30:ResponseHandler', namespaces)).text = responsehandler
×
409

410
        self.request = node0
×
411

412
        self._invoke()
×
413
        self.results = {}
×
414

415
        if self.exceptionreport is None:
×
416
            val = self._exml.find(util.nspath_eval('csw30:Acknowledgement', namespaces))
×
417
            if util.testXMLValue(val) is not None:
×
418
                ts = val.attrib.get('timeStamp')
×
419
                self.timestamp = util.testXMLValue(ts, True)
×
420
                id = val.find(util.nspath_eval('csw30:RequestId', namespaces))
×
421
                self.id = util.testXMLValue(id)
×
422
            else:
423
                self._parsetransactionsummary()
×
424
                self._parseinsertresult()
×
425

426
    def get_operation_by_name(self, name):
1✔
427
        """Return a named operation"""
428
        for item in self.operations:
1✔
429
            if item.name.lower() == name.lower():
1✔
430
                return item
1✔
431
        raise KeyError("No operation named %s" % name)
×
432

433
    def getService_urls(self, service_string=None):
1✔
434
        """
435

436
        Return easily identifiable URLs for all service types
437

438
        Parameters
439
        ----------
440

441
        - service_string: a URI to lookup
442

443
        """
444

445
        urls = []
×
446
        for key, rec in list(self.records.items()):
×
447
            # create a generator object, and iterate through it until the match is found
448
            # if not found, gets the default value (here "none")
449
            url = next((d['url'] for d in rec.references if d['scheme'] == service_string), None)
×
450
            if url is not None:
×
451
                urls.append(url)
×
452
        return urls
×
453

454
    def _parseinsertresult(self):
1✔
455
        self.results['insertresults'] = []
×
456
        for i in self._exml.findall('.//' + util.nspath_eval('csw30:InsertResult', namespaces)):
×
457
            for j in i.findall(util.nspath_eval('csw30:BriefRecord/dc:identifier', namespaces)):
×
458
                self.results['insertresults'].append(util.testXMLValue(j))
×
459

460
    def _parserecords(self, outputschema, esn):
1✔
461
        if outputschema == namespaces['gmd']:  # iso 19139
1✔
462
            for i in self._exml.findall('.//' + util.nspath_eval('gmd:MD_Metadata', namespaces)) or \
×
463
                    self._exml.findall('.//' + util.nspath_eval('gmi:MI_Metadata', namespaces)):
464
                val = i.find(util.nspath_eval('gmd:fileIdentifier/gco:CharacterString', namespaces))
×
465
                identifier = self._setidentifierkey(util.testXMLValue(val))
×
466
                self.records[identifier] = MD_Metadata(i)
×
467
            for i in self._exml.findall('.//' + util.nspath_eval('gfc:FC_FeatureCatalogue', namespaces)):
×
468
                identifier = self._setidentifierkey(util.testXMLValue(i.attrib['uuid'], attrib=True))
×
469
                self.records[identifier] = FC_FeatureCatalogue(i)
×
470
        elif outputschema == namespaces['fgdc']:  # fgdc csdgm
1✔
471
            for i in self._exml.findall('.//metadata'):
×
472
                val = i.find('idinfo/datasetid')
×
473
                identifier = self._setidentifierkey(util.testXMLValue(val))
×
474
                self.records[identifier] = Metadata(i)
×
475
        elif outputschema == namespaces['dif']:  # nasa dif
1✔
476
            for i in self._exml.findall('.//' + util.nspath_eval('dif:DIF', namespaces)):
×
477
                val = i.find(util.nspath_eval('dif:Entry_ID', namespaces))
×
478
                identifier = self._setidentifierkey(util.testXMLValue(val))
×
479
                self.records[identifier] = DIF(i)
×
480
        elif outputschema == namespaces['gm03']:  # GM03
1✔
481
            for i in self._exml.findall('.//' + util.nspath_eval('gm03:TRANSFER', namespaces)):
×
482
                val = i.find(util.nspath_eval('gm03:fileIdentifier', namespaces))
×
483
                identifier = self._setidentifierkey(util.testXMLValue(val))
×
484
                self.records[identifier] = GM03(i)
×
485
        else:  # process default
486
            for i in self._exml.findall('.//' + util.nspath_eval('csw30:%s' % self._setesnel(esn), namespaces)):
1✔
487
                val = i.find(util.nspath_eval('dc:identifier', namespaces))
1✔
488
                identifier = self._setidentifierkey(util.testXMLValue(val))
1✔
489
                self.records[identifier] = Csw30Record(i)
1✔
490

491
    def _parsetransactionsummary(self):
1✔
492
        val = self._exml.find(util.nspath_eval('csw30:TransactionResponse/csw30:TransactionSummary', namespaces))
×
493
        if val is not None:
×
494
            rid = val.attrib.get('requestId')
×
495
            self.results['requestid'] = util.testXMLValue(rid, True)
×
496
            ts = val.find(util.nspath_eval('csw30:totalInserted', namespaces))
×
497
            self.results['inserted'] = int(util.testXMLValue(ts))
×
498
            ts = val.find(util.nspath_eval('csw30:totalUpdated', namespaces))
×
499
            self.results['updated'] = int(util.testXMLValue(ts))
×
500
            ts = val.find(util.nspath_eval('csw30:totalDeleted', namespaces))
×
501
            self.results['deleted'] = int(util.testXMLValue(ts))
×
502

503
    def _setesnel(self, esn):
1✔
504
        """ Set the element name to parse depending on the ElementSetName requested """
505
        el = 'Record'
1✔
506
        if esn == 'brief':
1✔
507
            el = 'BriefRecord'
×
508
        if esn == 'summary':
1✔
509
            el = 'SummaryRecord'
1✔
510
        return el
1✔
511

512
    def _setidentifierkey(self, el):
1✔
513
        if el is None:
1✔
514
            return 'owslib_random_%i' % random.randint(1, 65536)
×
515
        else:
516
            return el
1✔
517

518
    def _setrootelement(self, el):
1✔
519
        if etree.__name__ == 'lxml.etree':  # apply nsmap
1✔
520
            return etree.Element(util.nspath_eval(el, namespaces), nsmap=namespaces)
×
521
        else:
522
            return etree.Element(util.nspath_eval(el, namespaces))
1✔
523

524
    def _setconstraint(self, parent, qtype=None, propertyname='csw30:AnyText', keywords=[], bbox=None, cql=None,
1✔
525
                       identifier=None):
526
        if keywords or bbox is not None or qtype is not None or cql is not None or identifier is not None:
×
527
            node0 = etree.SubElement(parent, util.nspath_eval('csw30:Constraint', namespaces))
×
528
            node0.set('version', '1.1.0')
×
529

530
            if identifier is not None:  # set identifier filter, overrides all other parameters
×
531
                flt = fes2.FilterRequest()
×
532
                node0.append(flt.set(identifier=identifier))
×
533
            elif cql is not None:  # send raw CQL query
×
534
                # CQL passed, overrides all other parameters
535
                node1 = etree.SubElement(node0, util.nspath_eval('csw30:CqlText', namespaces))
×
536
                node1.text = cql
×
537
            else:  # construct a Filter request
538
                flt = fes2.FilterRequest()
×
539
                node0.append(flt.set(qtype=qtype, keywords=keywords, propertyname=propertyname, bbox=bbox))
×
540

541
    def _invoke(self):
1✔
542
        # do HTTP request
543

544
        request_url = self.url
1✔
545

546
        # Get correct URL based on Operation list.
547

548
        # If skip_caps=True, then self.operations has not been set, so use
549
        # default URL.
550
        if hasattr(self, 'operations'):
1✔
551
            try:
1✔
552
                op = self.get_operation_by_name('getrecords')
1✔
553
                if isinstance(self.request, str):  # GET KVP
1✔
554
                    get_verbs = [x for x in op.methods if x.get('type').lower() == 'get']
×
555
                    request_url = get_verbs[0].get('url')
×
556
                else:
557
                    post_verbs = [x for x in op.methods if x.get('type').lower() == 'post']
1✔
558
                    if len(post_verbs) > 1:
1✔
559
                        # Filter by constraints.  We must match a PostEncoding of "XML"
560
                        for pv in post_verbs:
×
561
                            for const in pv.get('constraints'):
×
562
                                if const.name.lower() == 'postencoding':
×
563
                                    values = [v.lower() for v in const.values]
×
564
                                    if 'xml' in values:
×
565
                                        request_url = pv.get('url')
×
566
                                        break
×
567
                        else:
568
                            # Well, just use the first one.
569
                            request_url = post_verbs[0].get('url')
×
570
                    elif len(post_verbs) == 1:
1✔
571
                        request_url = post_verbs[0].get('url')
1✔
572
            except Exception:  # no such luck, just go with request_url
×
573
                pass
×
574

575
        if isinstance(self.request, str):  # GET KVP
1✔
576
            self.request = '%s%s' % (bind_url(request_url), self.request)
1✔
577
            headers_ = {'Accept': outputformat}
1✔
578
            if self.headers:
1✔
579
                headers_.update(self.headers)
1✔
580
            self.response = openURL(
1✔
581
                self.request, None, 'Get', timeout=self.timeout, auth=self.auth, headers=headers_
582
            ).read()
583
        else:
584
            self.request = cleanup_namespaces(self.request)
1✔
585
            # Add any namespaces used in the "typeNames" attribute of the
586
            # csw30:Query element to the query's xml namespaces.
587
            for query in self.request.findall(util.nspath_eval('csw30:Query', namespaces)):
1✔
588
                ns = query.get("typeNames", None)
1✔
589
                if ns is not None:
1✔
590
                    # Pull out "gmd" from something like "gmd:MD_Metadata" from the list
591
                    # of typenames
592
                    ns_keys = [x.split(':')[0] for x in ns.split(' ')]
1✔
593
                    self.request = add_namespaces(self.request, ns_keys)
1✔
594
            self.request = add_namespaces(self.request, 'fes')
1✔
595

596
            self.request = util.element_to_string(self.request, encoding='utf-8')
1✔
597

598
            self.response = http_post(request_url, self.request, self.lang, self.timeout,
1✔
599
                                      auth=self.auth, headers=self.headers).content
600

601
        # parse result see if it's XML
602
        self._exml = etree.parse(BytesIO(self.response))
1✔
603

604
        # it's XML.  Attempt to decipher whether the XML response is CSW-ish """
605
        valid_xpaths = [
1✔
606
            util.nspath_eval('ows200:ExceptionReport', namespaces),
607
            util.nspath_eval('csw30:Capabilities', namespaces),
608
            util.nspath_eval('csw30:DescribeRecordResponse', namespaces),
609
            util.nspath_eval('csw30:GetDomainResponse', namespaces),
610
            util.nspath_eval('csw30:GetRecordsResponse', namespaces),
611
            util.nspath_eval('csw30:GetRecordByIdResponse', namespaces),
612
            util.nspath_eval('csw30:HarvestResponse', namespaces),
613
            util.nspath_eval('csw30:TransactionResponse', namespaces),
614
            util.nspath_eval('csw30:Record', namespaces)
615
        ]
616

617
        if self._exml.getroot().tag not in valid_xpaths:
1✔
618
            raise RuntimeError('Document is XML, but not CSW-ish')
×
619

620
        # check if it's an OGC Exception
621
        val = self._exml.find(util.nspath_eval('ows200:Exception', namespaces))
1✔
622
        if val is not None:
1✔
623
            raise ows.ExceptionReport(self._exml, self.owscommon.namespace)
×
624
        else:
625
            self.exceptionreport = None
1✔
626

627

628
class Csw30Record(object):
1✔
629
    """ Process csw30:Record, csw30:BriefRecord, csw30:SummaryRecord """
630
    def __init__(self, record):
1✔
631

632
        if hasattr(record, 'getroot'):  # standalone document
1✔
633
            self.xml = etree.tostring(record.getroot())
×
634
        else:  # part of a larger document
635
            self.xml = etree.tostring(record)
1✔
636

637
        # check to see if Dublin Core record comes from
638
        # rdf:RDF/rdf:Description container
639
        # (child content model is identical)
640
        self.rdf = False
1✔
641
        rdf = record.find(util.nspath_eval('rdf:Description', namespaces))
1✔
642
        if rdf is not None:
1✔
643
            self.rdf = True
×
644
            record = rdf
×
645

646
        # some CSWs return records with multiple identifiers based on
647
        # different schemes.  Use the first dc:identifier value to set
648
        # self.identifier, and set self.identifiers as a list of dicts
649
        val = record.find(util.nspath_eval('dc:identifier', namespaces))
1✔
650
        self.identifier = util.testXMLValue(val)
1✔
651

652
        self.identifiers = []
1✔
653
        for i in record.findall(util.nspath_eval('dc:identifier', namespaces)):
1✔
654
            d = {}
1✔
655
            d['scheme'] = i.attrib.get('scheme')
1✔
656
            d['identifier'] = i.text
1✔
657
            self.identifiers.append(d)
1✔
658

659
        val = record.find(util.nspath_eval('dc:type', namespaces))
1✔
660
        self.type = util.testXMLValue(val)
1✔
661

662
        val = record.find(util.nspath_eval('dc:title', namespaces))
1✔
663
        self.title = util.testXMLValue(val)
1✔
664

665
        val = record.find(util.nspath_eval('dct:alternative', namespaces))
1✔
666
        self.alternative = util.testXMLValue(val)
1✔
667

668
        val = record.find(util.nspath_eval('dct:isPartOf', namespaces))
1✔
669
        self.ispartof = util.testXMLValue(val)
1✔
670

671
        val = record.find(util.nspath_eval('dct:abstract', namespaces))
1✔
672
        self.abstract = util.testXMLValue(val)
1✔
673

674
        val = record.find(util.nspath_eval('dc:date', namespaces))
1✔
675
        self.date = util.testXMLValue(val)
1✔
676

677
        val = record.find(util.nspath_eval('dct:created', namespaces))
1✔
678
        self.created = util.testXMLValue(val)
1✔
679

680
        val = record.find(util.nspath_eval('dct:issued', namespaces))
1✔
681
        self.issued = util.testXMLValue(val)
1✔
682

683
        val = record.find(util.nspath_eval('dc:relation', namespaces))
1✔
684
        self.relation = util.testXMLValue(val)
1✔
685

686
        val = record.find(util.nspath_eval('dct:temporal', namespaces))
1✔
687
        self.temporal = util.testXMLValue(val)
1✔
688

689
        self.uris = []  # list of dicts
1✔
690
        for i in record.findall(util.nspath_eval('dc:URI', namespaces)):
1✔
691
            uri = {}
×
692
            uri['protocol'] = util.testXMLValue(i.attrib.get('protocol'), True)
×
693
            uri['name'] = util.testXMLValue(i.attrib.get('name'), True)
×
694
            uri['description'] = util.testXMLValue(i.attrib.get('description'), True)
×
695
            uri['url'] = util.testXMLValue(i)
×
696

697
            self.uris.append(uri)
×
698

699
        self.references = []  # list of dicts
1✔
700
        for i in record.findall(util.nspath_eval('dct:references', namespaces)):
1✔
701
            ref = {}
×
702
            ref['scheme'] = util.testXMLValue(i.attrib.get('scheme'), True)
×
703
            ref['url'] = util.testXMLValue(i)
×
704

705
            self.references.append(ref)
×
706

707
        val = record.find(util.nspath_eval('dct:modified', namespaces))
1✔
708
        self.modified = util.testXMLValue(val)
1✔
709

710
        val = record.find(util.nspath_eval('dc:creator', namespaces))
1✔
711
        self.creator = util.testXMLValue(val)
1✔
712

713
        val = record.find(util.nspath_eval('dc:publisher', namespaces))
1✔
714
        self.publisher = util.testXMLValue(val)
1✔
715

716
        val = record.find(util.nspath_eval('dc:coverage', namespaces))
1✔
717
        self.coverage = util.testXMLValue(val)
1✔
718

719
        val = record.find(util.nspath_eval('dc:contributor', namespaces))
1✔
720
        self.contributor = util.testXMLValue(val)
1✔
721

722
        val = record.find(util.nspath_eval('dc:language', namespaces))
1✔
723
        self.language = util.testXMLValue(val)
1✔
724

725
        val = record.find(util.nspath_eval('dc:source', namespaces))
1✔
726
        self.source = util.testXMLValue(val)
1✔
727

728
        val = record.find(util.nspath_eval('dct:rightsHolder', namespaces))
1✔
729
        self.rightsholder = util.testXMLValue(val)
1✔
730

731
        val = record.find(util.nspath_eval('dct:accessRights', namespaces))
1✔
732
        self.accessrights = util.testXMLValue(val)
1✔
733

734
        val = record.find(util.nspath_eval('dct:license', namespaces))
1✔
735
        self.license = util.testXMLValue(val)
1✔
736

737
        val = record.find(util.nspath_eval('dc:format', namespaces))
1✔
738
        self.format = util.testXMLValue(val)
1✔
739

740
        self.subjects = []
1✔
741
        for i in record.findall(util.nspath_eval('dc:subject', namespaces)):
1✔
742
            self.subjects.append(util.testXMLValue(i))
1✔
743

744
        self.rights = []
1✔
745
        for i in record.findall(util.nspath_eval('dc:rights', namespaces)):
1✔
746
            self.rights.append(util.testXMLValue(i))
×
747

748
        val = record.find(util.nspath_eval('dct:spatial', namespaces))
1✔
749
        self.spatial = util.testXMLValue(val)
1✔
750

751
        val = record.find(util.nspath_eval('ows200:BoundingBox', namespaces))
1✔
752
        if val is not None:
1✔
753
            self.bbox = ows.BoundingBox(val, namespaces['ows'])
1✔
754
        else:
755
            self.bbox = None
1✔
756

757
        val = record.find(util.nspath_eval('ows200:WGS84BoundingBox', namespaces))
1✔
758
        if val is not None:
1✔
759
            self.bbox_wgs84 = ows.WGS84BoundingBox(val, namespaces['ows'])
×
760
        else:
761
            self.bbox_wgs84 = None
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc