• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geopython / pywps / 16372983163

18 Jul 2025 02:24PM UTC coverage: 84.089% (+0.03%) from 84.061%
16372983163

push

github

web-flow
Merge pull request #694 from geopython/fix-lint

fixed linting issues and test failures

5 of 5 new or added lines in 2 files covered. (100.0%)

2 existing lines in 2 files now uncovered.

5380 of 6398 relevant lines covered (84.09%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.87
/pywps/validator/complexvalidator.py
1
##################################################################
2
# Copyright 2018 Open Source Geospatial Foundation and others    #
3
# licensed under MIT, Please consult LICENSE.txt for details     #
4
##################################################################
5

6
"""Validator classes are used for ComplexInputs, to validate the content"""
1✔
7

8

9
import logging
1✔
10
import mimetypes
1✔
11
import os
1✔
12
from urllib.request import urlopen
1✔
13

14
from lxml.etree import XMLSchema
1✔
15

16
from pywps import xml_util as etree
1✔
17
from pywps.inout.formats import FORMATS
1✔
18
from pywps.validator.mode import MODE
1✔
19

20
LOGGER = logging.getLogger('PYWPS')
1✔
21

22

23
def validategml(data_input, mode):
1✔
24
    """GML validation function.
25

26
    :param data_input: :class:`ComplexInput`
27
    :param pywps.validator.mode.MODE mode:
28

29
    This function validates GML input based on given validation mode. Following
30
    happens, if `mode` parameter is given:
31

32
    `MODE.NONE`
33
        it will return always `True`
34
    `MODE.SIMPLE`
35
        the mimetype will be checked
36
    `MODE.STRICT`
37
        `Fiona` is used for getting the proper format.
38
    `MODE.VERYSTRICT`
39
        the :class:`lxml.etree` is used along with given input `schema` and the
40
        GML file is properly validated against given schema.
41
    """
42

43
    LOGGER.info('validating GML; Mode: {}'.format(mode))
1✔
44
    passed = False
1✔
45

46
    if mode >= MODE.NONE:
1✔
47
        passed = True
1✔
48

49
    if mode >= MODE.SIMPLE:
1✔
50

51
        name = data_input.file
1✔
52
        (mtype, encoding) = mimetypes.guess_type(name, strict=False)
1✔
53
        passed = data_input.data_format.mime_type in {mtype, FORMATS.GML.mime_type}
1✔
54

55
    if mode >= MODE.STRICT:
1✔
56

57
        try:
1✔
58
            import fiona
1✔
59
            data_source = fiona.open(data_input.file)
1✔
60
            passed = (data_source.driver == "GML")
1✔
61
        except (ModuleNotFoundError, ImportError):
×
62
            passed = False
×
63

64
    if mode >= MODE.VERYSTRICT:
1✔
65
        try:
1✔
66
            schema_url = data_input.data_format.schema
1✔
67
            gmlschema_doc = etree.parse(urlopen(schema_url))
1✔
68
            gmlschema = XMLSchema(gmlschema_doc)
1✔
UNCOV
69
            passed = gmlschema.validate(etree.parse(data_input.stream))
×
70
        except Exception as e:
1✔
71
            LOGGER.warning(e)
1✔
72
            passed = False
1✔
73

74
    return passed
1✔
75

76

77
def validategpx(data_input, mode):
1✔
78
    """GPX validation function.
79

80
    :param data_input: :class:`ComplexInput`
81
    :param pywps.validator.mode.MODE mode:
82

83
    This function validates GPX input based on given validation mode. Following
84
    happens, if `mode` parameter is given:
85

86
    `MODE.NONE`
87
        it will return always `True`
88
    `MODE.SIMPLE`
89
        the mimetype will be checked
90
    `MODE.STRICT`
91
        `Fiona` is used for getting the proper format.
92
    `MODE.VERYSTRICT`
93
        the :class:`lxml.etree` is used along with given input `schema` and the
94
        GPX file is properly validated against given schema.
95
    """
96

97
    LOGGER.info('validating GPX; Mode: {}'.format(mode))
×
98
    passed = False
×
99

100
    if mode >= MODE.NONE:
×
101
        passed = True
×
102

103
    if mode >= MODE.SIMPLE:
×
104

105
        name = data_input.file
×
106
        (mtype, encoding) = mimetypes.guess_type(name, strict=False)
×
107
        passed = data_input.data_format.mime_type in {mtype, FORMATS.GPX.mime_type}
×
108

109
    if mode >= MODE.STRICT:
×
110

111
        try:
×
112
            import fiona
×
113
            data_source = fiona.open(data_input.file)
×
114
            passed = (data_source.driver == "GPX")
×
115
        except (ModuleNotFoundError, ImportError):
×
116
            passed = False
×
117

118
    if mode >= MODE.VERYSTRICT:
×
119
        try:
×
120
            schema_url = data_input.data_format.schema
×
121
            gpxschema_doc = etree.parse(urlopen(schema_url))
×
122
            gpxschema = XMLSchema(gpxschema_doc)
×
123
            passed = gpxschema.validate(etree.parse(data_input.stream))
×
124
        except Exception as e:
×
125
            LOGGER.warning(e)
×
126
            passed = False
×
127

128
    return passed
×
129

130

131
def validatexml(data_input, mode):
1✔
132
    """XML validation function.
133

134
    :param data_input: :class:`ComplexInput`
135
    :param pywps.validator.mode.MODE mode:
136

137
    This function validates XML input based on given validation mode. Following
138
    happens, if `mode` parameter is given:
139

140
    `MODE.NONE`
141
        it will return always `True`
142
    `MODE.SIMPLE`
143
        the mimetype will be checked
144
    `MODE.STRICT` and `MODE.VERYSTRICT`
145
        the :class:`lxml.etree` is used along with given input `schema` and the
146
        XML file is properly validated against given schema.
147
    """
148

149
    LOGGER.info('validating XML; Mode: {}'.format(mode))
1✔
150
    passed = False
1✔
151

152
    if mode >= MODE.NONE:
1✔
153
        passed = True
1✔
154

155
    if mode >= MODE.SIMPLE:
1✔
156

157
        name = data_input.file
1✔
158
        (mtype, encoding) = mimetypes.guess_type(name, strict=False)
1✔
159
        passed = data_input.data_format.mime_type in {mtype, FORMATS.GML.mime_type}
1✔
160

161
    if mode >= MODE.STRICT:
1✔
162
        # TODO: Raise the actual validation exception to make it easier to spot the error.
163
        #  xml = etree.parse(data_input.file)
164
        #  schema.assertValid(xml)
165
        try:
1✔
166
            fn = os.path.join(_get_schemas_home(), data_input.data_format.schema)
1✔
167
            schema_doc = etree.parse(fn)
1✔
168
            schema = XMLSchema(schema_doc)
1✔
169
            passed = schema.validate(etree.parse(data_input.file))
1✔
170
        except Exception as e:
×
171
            LOGGER.warning(e)
×
172
            passed = False
×
173

174
    return passed
1✔
175

176

177
def validatejson(data_input, mode):
1✔
178
    """JSON validation function.
179

180
    :param data_input: :class:`ComplexInput`
181
    :param pywps.validator.mode.MODE mode:
182

183
    This function validates JSON input based on given validation mode. Following
184
    happens, if `mode` parameter is given:
185

186
    `MODE.NONE`
187
        No validation, returns `True`.
188
    `MODE.SIMPLE`
189
        Returns `True` if the mime type is correct.
190
    `MODE.STRICT`
191
        Returns `True` if the content can be interpreted as a json object.
192
    """
193

194
    LOGGER.info('validating JSON; Mode: {}'.format(mode))
1✔
195
    passed = False
1✔
196

197
    if mode >= MODE.NONE:
1✔
198
        passed = True
1✔
199

200
    if mode >= MODE.SIMPLE:
1✔
201

202
        name = data_input.file
1✔
203
        (mtype, encoding) = mimetypes.guess_type(name, strict=False)
1✔
204
        passed = data_input.data_format.mime_type in {mtype, FORMATS.JSON.mime_type}
1✔
205

206
    if mode >= MODE.STRICT:
1✔
207

208
        import json
1✔
209
        try:
1✔
210
            with open(data_input.file) as f:
1✔
211
                json.load(f)
1✔
212
            passed = True
1✔
213

214
        except ValueError:
×
215
            passed = False
×
216

217
    return passed
1✔
218

219

220
def validategeojson(data_input, mode):
1✔
221
    """GeoJSON validation example
222

223
    >>> from io import StringIO
224
    >>> class FakeInput(object):
225
    ...     json = open('point.geojson','w')
226
    ...     json.write('''{"type":"Feature", "properties":{}, "geometry":{"type":"Point", "coordinates":[8.5781228542328, 22.87500500679]}, "crs":{"type":"name", "properties":{"name":"urn:ogc:def:crs:OGC:1.3:CRS84"}}}''')  # noqa
227
    ...     json.close()
228
    ...     file = 'point.geojson'
229
    >>> class FakeDataFormat(object):
230
    ...     mimetype = 'application/geojson'
231
    >>> fake_input = FakeInput()
232
    >>> fake_input.data_format = FakeDataFormat()
233
    >>> validategeojson(fake_input, MODE.SIMPLE)
234
    True
235
    """
236

237
    LOGGER.info('validating GeoJSON; Mode: {}'.format(mode))
1✔
238
    passed = False
1✔
239

240
    if mode >= MODE.NONE:
1✔
241
        passed = True
1✔
242

243
    if mode >= MODE.SIMPLE:
1✔
244

245
        name = data_input.file
1✔
246
        (mtype, encoding) = mimetypes.guess_type(name, strict=False)
1✔
247
        passed = data_input.data_format.mime_type in {mtype, FORMATS.GEOJSON.mime_type}
1✔
248

249
    if mode >= MODE.STRICT:
1✔
250

251
        try:
1✔
252
            import fiona
1✔
253
            data_source = fiona.open(data_input.file)
1✔
254
            passed = (data_source.driver == "GeoJSON")
1✔
255
        except (ModuleNotFoundError, ImportError):
×
256
            passed = False
×
257

258
    if mode >= MODE.VERYSTRICT:
1✔
259

260
        import json
1✔
261

262
        import jsonschema
1✔
263
        import referencing
1✔
264

265
        # this code comes from
266
        # https://github.com/om-henners/GeoJSON_Validation/blob/master/geojsonvalidation/geojson_validation.py
267
        schema_uri = "http://json-schema.org/geojson"
1✔
268
        schema_home = os.path.join(_get_schemas_home(), "geojson")
1✔
269

270
        registry = referencing.Registry()
1✔
271

272
        for fn in ["geojson.json", "crs.json", "bbox.json", "geometry.json"]:
1✔
273
            with open(os.path.join(schema_home, fn)) as fh:
1✔
274
                schema_data = json.load(fh)
1✔
275
                registry = registry.with_resource(f"{schema_uri}/{fn}",
1✔
276
                                                  referencing.Resource(schema_data, referencing.jsonschema.DRAFT4))
277

278
        validator = jsonschema.Draft4Validator({"$ref": f"{schema_uri}/geojson.json"}, registry=registry)
1✔
279
        try:
1✔
280
            validator.validate(json.loads(data_input.stream.read()))
1✔
281
            passed = True
1✔
282
        except jsonschema.ValidationError:
×
283
            passed = False
×
284

285
    return passed
1✔
286

287

288
def validateshapefile(data_input, mode):
1✔
289
    """ESRI Shapefile validation example."""
290

291
    LOGGER.info('validating Shapefile; Mode: {}'.format(mode))
1✔
292
    passed = False
1✔
293

294
    if mode >= MODE.NONE:
1✔
295
        passed = True
1✔
296

297
    if mode >= MODE.SIMPLE:
1✔
298

299
        name = data_input.file
1✔
300
        (mtype, encoding) = mimetypes.guess_type(name, strict=False)
1✔
301
        passed = data_input.data_format.mime_type in {mtype, FORMATS.SHP.mime_type}
1✔
302

303
    if mode >= MODE.STRICT:
1✔
304

305
        try:
1✔
306
            import fiona
1✔
307
            sf = fiona.open(data_input.file)
1✔
308
            passed = (sf.driver == "ESRI Shapefile")
1✔
309
        except (ModuleNotFoundError, ImportError):
×
310
            passed = False
×
311

312
    return passed
1✔
313

314

315
def validategeotiff(data_input, mode):
1✔
316
    """GeoTIFF validation example."""
317

318
    LOGGER.info('Validating Shapefile; Mode: {}'.format(mode))
1✔
319
    passed = False
1✔
320

321
    if mode >= MODE.NONE:
1✔
322
        passed = True
1✔
323

324
    if mode >= MODE.SIMPLE:
1✔
325

326
        name = data_input.file
1✔
327
        (mtype, encoding) = mimetypes.guess_type(name, strict=False)
1✔
328
        passed = data_input.data_format.mime_type in {mtype, FORMATS.GEOTIFF.mime_type}
1✔
329

330
    if mode >= MODE.STRICT:
1✔
331

332
        try:
1✔
333
            from geotiff import GeoTiff
1✔
334

335
            data_source = GeoTiff(data_input.file)
1✔
336
            passed = (data_source.crs_code > 0)
1✔
337
        except (ModuleNotFoundError, ImportError):
×
338
            passed = False
×
339

340
    return passed
1✔
341

342

343
def validatenetcdf(data_input, mode):
1✔
344
    """NetCDF validation."""
345

346
    LOGGER.info('Validating netCDF; Mode: {}'.format(mode))
1✔
347
    passed = False
1✔
348

349
    if mode >= MODE.NONE:
1✔
350
        passed = True
1✔
351

352
    if mode >= MODE.SIMPLE:
1✔
353

354
        name = data_input.file
1✔
355
        (mtype, encoding) = mimetypes.guess_type(name, strict=False)
1✔
356
        passed = data_input.data_format.mime_type in {mtype, FORMATS.NETCDF.mime_type}
1✔
357

358
    if mode >= MODE.STRICT:
1✔
359

360
        try:
1✔
361
            from pywps.dependencies import netCDF4 as nc
1✔
362
            nc.Dataset(data_input.file)
1✔
363
            passed = True
1✔
364
        except ImportError as e:
1✔
365
            passed = False
×
366
            LOGGER.exception("ImportError while validating netCDF4 file {}:\n {}".format(data_input.file, e))
×
367
        except IOError as e:
1✔
368
            passed = False
1✔
369
            LOGGER.exception("IOError while validating netCDF4 file {}:\n {}".format(data_input.file, e))
1✔
370

371
    return passed
1✔
372

373

374
def validatedods(data_input, mode):
1✔
375
    """OPeNDAP validation."""
376

377
    LOGGER.info('Validating OPeNDAP; Mode: {}'.format(mode))
1✔
378
    passed = False
1✔
379

380
    if mode >= MODE.NONE:
1✔
381
        passed = True
1✔
382

383
    if mode >= MODE.SIMPLE:
1✔
384
        name = data_input.url
1✔
385
        (mtype, encoding) = mimetypes.guess_type(name, strict=False)
1✔
386
        passed = data_input.data_format.mime_type in {mtype, FORMATS.DODS.mime_type}
1✔
387

388
    if mode >= MODE.STRICT:
1✔
389

390
        try:
1✔
391
            from pywps.dependencies import netCDF4 as nc
1✔
392
            nc.Dataset(data_input.url)
1✔
393
            passed = True
1✔
394
        except ImportError as e:
1✔
395
            passed = False
×
396
            LOGGER.exception("ImportError while validating OPeNDAP link {}:\n {}".format(data_input.url, e))
×
397
        except IOError as e:
1✔
398
            passed = False
1✔
399
            LOGGER.exception("IOError while validating OPeNDAP link {}:\n {}".format(data_input.url, e))
1✔
400

401
    return passed
1✔
402

403

404
def _get_schemas_home():
1✔
405
    """Get path to schemas directory."""
406
    schema_dir = os.path.join(
1✔
407
        os.path.abspath(
408
            os.path.dirname(__file__)
409
        ),
410
        os.path.pardir,
411
        "schemas")
412
    LOGGER.debug('Schemas directory: {}'.format(schema_dir))
1✔
413
    return schema_dir
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc