• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geopython / pywps / 5487252648

pending completion
5487252648

push

github

web-flow
Support Python3.10/3.11, documentation fixes, and general codebase cleanup (#677)

* docstring adjustments, removal of unused imports, renaming of internal variables accoing to PEP8 conventions, update URL targets

* remove references to Travis-CI, formatting adjustments

* standardize requirements for readability

* Mention support for Python3.10 and Python3.11

* Add Zeitsperre to CONTRIBUTORS.md

* update installation instructions to reflect modern python approaches and changes to GitHub repository

* Add CI builds for Python3.10 and Python3.11

* remove unused imports

* undo regression

* silence DeprecationWarning for newer sqlalchemy

* fix bad installation command

* cleanup docstrings, remove unused imports

* update actions versions

* ran "isort --py 37 --profile black"

---------

Co-authored-by: MacPingu <cehbrecht@users.noreply.github.com>

160 of 160 new or added lines in 31 files covered. (100.0%)

5106 of 6278 relevant lines covered (81.33%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.74
/pywps/app/Process.py
1
##################################################################
2
# Copyright 2018 Open Source Geospatial Foundation and others    #
3
# licensed under MIT, Please consult LICENSE.txt for details     #
4
##################################################################
5

6
import importlib
1✔
7
import json
1✔
8
import logging
1✔
9
import os
1✔
10
import shutil
1✔
11
import sys
1✔
12
import traceback
1✔
13

14
import pywps.configuration as config
1✔
15
from pywps import dblog
1✔
16
from pywps.app.exceptions import ProcessError
1✔
17
from pywps.app.WPSRequest import WPSRequest
1✔
18
from pywps.exceptions import (
1✔
19
    InvalidParameterValue,
20
    NoApplicableCode,
21
    OperationNotSupported,
22
    ServerBusy,
23
    StorageNotSupported,
24
)
25
from pywps.inout.outputs import ComplexOutput
1✔
26
from pywps.inout.storage.builder import StorageBuilder
1✔
27
from pywps.response import get_response
1✔
28
from pywps.response.execute import ExecuteResponse
1✔
29
from pywps.response.status import WPS_STATUS
1✔
30
from pywps.translations import lower_case_dict
1✔
31

32
LOGGER = logging.getLogger("PYWPS")
1✔
33

34

35
class Process(object):
1✔
36
    """
37
    :param handler: A callable that gets invoked for each incoming
38
                    request. It should accept a single
39
                    :class:`pywps.app.WPSRequest` argument and return a
40
                    :class:`pywps.app.WPSResponse` object.
41
    :param string identifier: Name of this process.
42
    :param string title: Human-readable title of process.
43
    :param string abstract: Brief narrative description of the process.
44
    :param list keywords: Keywords that characterize a process.
45
    :param inputs: List of inputs accepted by this process. They
46
                   should be :class:`~LiteralInput` and :class:`~ComplexInput`
47
                   and :class:`~BoundingBoxInput`
48
                   objects.
49
    :param outputs: List of outputs returned by this process. They
50
                   should be :class:`~LiteralOutput` and :class:`~ComplexOutput`
51
                   and :class:`~BoundingBoxOutput`
52
                   objects.
53
    :param metadata: List of metadata advertised by this process. They
54
                     should be :class:`pywps.app.Common.Metadata` objects.
55
    :param dict[str,dict[str,str]] translations: The first key is the RFC 4646 language code,
56
        and the nested mapping contains translated strings accessible by a string property.
57
        e.g. {"fr-CA": {"title": "Mon titre", "abstract": "Une description"}}
58
    """
59

60
    def __init__(self, handler, identifier, title, abstract='', keywords=None, profile=None,
1✔
61
                 metadata=None, inputs=None, outputs=None, version='None', store_supported=False,
62
                 status_supported=False, grass_location=None, translations=None):
63
        self.identifier = identifier
1✔
64
        self.handler = handler
1✔
65
        self.title = title
1✔
66
        self.abstract = abstract
1✔
67
        self.keywords = keywords if keywords is not None else []
1✔
68
        self.metadata = metadata if metadata is not None else []
1✔
69
        self.profile = profile if profile is not None else []
1✔
70
        self.version = version
1✔
71
        self.inputs = inputs if inputs is not None else []
1✔
72
        self.outputs = outputs if outputs is not None else []
1✔
73
        self.uuid = None
1✔
74
        self._status_store = None
1✔
75
        # self.status_location = ''
76
        # self.status_url = ''
77
        self.workdir = None
1✔
78
        self._grass_mapset = None
1✔
79
        self.grass_location = grass_location
1✔
80
        self.service = None
1✔
81
        self.translations = lower_case_dict(translations)
1✔
82

83
        if store_supported:
1✔
84
            self.store_supported = 'true'
1✔
85
        else:
86
            self.store_supported = 'false'
1✔
87

88
        if status_supported:
1✔
89
            self.status_supported = 'true'
1✔
90
        else:
91
            self.status_supported = 'false'
1✔
92

93
    @property
1✔
94
    def json(self):
×
95

96
        return {
1✔
97
            'class': '{}:{}'.format(self.__module__, self.__class__.__name__),
98
            'uuid': str(self.uuid),
99
            'workdir': self.workdir,
100
            'version': self.version,
101
            'identifier': self.identifier,
102
            'title': self.title,
103
            'abstract': self.abstract,
104
            'keywords': self.keywords,
105
            'metadata': [m.json for m in self.metadata],
106
            'inputs': [i.json for i in self.inputs],
107
            'outputs': [o.json for o in self.outputs],
108
            'store_supported': self.store_supported,
109
            'status_supported': self.status_supported,
110
            'profile': [p for p in self.profile],
111
            'translations': self.translations,
112
        }
113

114
    @classmethod
1✔
115
    def from_json(cls, value):
×
116
        """init this process from json back again
117

118
        :param value: the json (not string) representation
119
        """
120
        module, classname = value['class'].split(':')
1✔
121
        # instantiate subclass of Process
122
        new_process = getattr(importlib.import_module(module), classname)()
1✔
123
        new_process._set_uuid(value['uuid'])
1✔
124
        new_process.set_workdir(value['workdir'])
1✔
125
        return new_process
1✔
126

127
    def execute(self, wps_request, uuid):
1✔
128
        self._set_uuid(uuid)
1✔
129
        self._setup_status_storage()
1✔
130
        self.async_ = False
1✔
131
        response_cls = get_response("execute")
1✔
132
        wps_response = response_cls(wps_request, process=self, uuid=self.uuid)
1✔
133

134
        LOGGER.debug('Check if status storage and updating are supported by this process')
1✔
135
        if wps_request.store_execute == 'true':
1✔
136
            if self.store_supported != 'true':
1✔
137
                raise StorageNotSupported('Process does not support the storing of the execute response')
×
138

139
            if wps_request.status == 'true':
1✔
140
                if self.status_supported != 'true':
1✔
141
                    raise OperationNotSupported('Process does not support the updating of status')
×
142

143
                wps_response.store_status_file = True
1✔
144
                self.async_ = True
1✔
145
            else:
146
                wps_response.store_status_file = False
×
147

148
        LOGGER.debug('Check if updating of status is not required then no need to spawn a process')
1✔
149

150
        wps_response = self._execute_process(self.async_, wps_request, wps_response)
1✔
151

152
        return wps_response
1✔
153

154
    def _set_uuid(self, uuid):
1✔
155
        """Set uuid and status location path and url
156
        """
157

158
        self.uuid = uuid
1✔
159
        for inpt in self.inputs:
1✔
160
            inpt.uuid = uuid
1✔
161

162
        for outpt in self.outputs:
1✔
163
            outpt.uuid = uuid
1✔
164

165
    def _setup_status_storage(self):
1✔
166
        self._status_store = StorageBuilder.buildStorage()
1✔
167

168
    @property
1✔
169
    def status_store(self):
×
170
        if self._status_store is None:
1✔
171
            self._setup_status_storage()
×
172
        return self._status_store
1✔
173

174
    @property
1✔
175
    def status_location(self):
×
176
        return self.status_store.location(self.status_filename)
1✔
177

178
    @property
1✔
179
    def status_filename(self):
×
180
        return str(self.uuid) + '.xml'
1✔
181

182
    @property
1✔
183
    def status_url(self):
×
184
        return self.status_store.url(self.status_filename)
1✔
185

186
    def _execute_process(self, async_, wps_request, wps_response):
1✔
187
        """
188

189
        Uses :module:`pywps.processing` module for sending process to
190
        background BUT first, check for maxprocesses configuration value
191

192
        :param async_: run in asynchronous mode
193
        :return: wps_response or None
194
        """
195

196
        maxparallel = int(config.get_config_value('server', 'parallelprocesses'))
1✔
197

198
        running, stored = dblog.get_process_counts()
1✔
199

200
        if maxparallel != -1 and running >= maxparallel:
1✔
201
            # Try to check for crashed process
202
            dblog.cleanup_crashed_process()
×
203
            running, stored = dblog.get_process_counts()
×
204

205
        # async
206
        if async_:
1✔
207

208
            # run immedietly
209
            LOGGER.debug("Running processes: {} of {} allowed parallelprocesses".format(running, maxparallel))
1✔
210
            LOGGER.debug("Stored processes: {}".format(stored))
1✔
211

212
            if running < maxparallel or maxparallel == -1:
1✔
213
                wps_response._update_status(WPS_STATUS.ACCEPTED, "PyWPS Request accepted", 0)
1✔
214
                LOGGER.debug("Accepted request {}".format(self.uuid))
1✔
215
                self._run_async(wps_request, wps_response)
1✔
216

217
            # try to store for later usage
218
            else:
219
                maxprocesses = int(config.get_config_value('server', 'maxprocesses'))
×
220
                if stored >= maxprocesses and maxprocesses != -1:
×
221
                    raise ServerBusy('Maximum number of processes in queue reached. Please try later.')
×
222
                LOGGER.debug("Store process in job queue, uuid={}".format(self.uuid))
×
223
                dblog.store_process(self.uuid, wps_request)
×
224
                wps_response._update_status(WPS_STATUS.ACCEPTED, 'PyWPS Process stored in job queue', 0)
×
225

226
        # not async
227
        else:
228
            if running >= maxparallel != -1:
1✔
229
                raise ServerBusy('Maximum number of parallel running processes reached. Please try later.')
×
230
            wps_response._update_status(WPS_STATUS.ACCEPTED, "PyWPS Request accepted", 0)
1✔
231
            wps_response = self._run_process(wps_request, wps_response)
1✔
232

233
        return wps_response
1✔
234

235
    # This function may not raise exception and must return a valid wps_response
236
    # Failure must be reported as wps_response.status = WPS_STATUS.FAILED
237
    def _run_async(self, wps_request, wps_response):
1✔
238
        import pywps.processing
1✔
239
        process = pywps.processing.Process(
1✔
240
            process=self,
241
            wps_request=wps_request,
242
            wps_response=wps_response)
243
        LOGGER.debug("Starting process for request: {}".format(self.uuid))
1✔
244
        process.start()
1✔
245

246
    # This function may not raise exception and must return a valid wps_response
247
    # Failure must be reported as wps_response.status = WPS_STATUS.FAILED
248
    def _run_process(self, wps_request, wps_response):
1✔
249
        LOGGER.debug("Started processing request: {} with pid: {}".format(self.uuid, os.getpid()))
1✔
250
        # Update the actual pid of current process to check if failed latter
251
        dblog.update_pid(self.uuid, os.getpid())
1✔
252
        try:
1✔
253
            self._set_grass(wps_request)
1✔
254
            # if required set HOME to the current working directory.
255
            if config.get_config_value('server', 'sethomedir') is True:
1✔
256
                os.environ['HOME'] = self.workdir
×
257
                LOGGER.info('Setting HOME to current working directory: {}'.format(os.environ['HOME']))
×
258
            LOGGER.debug('ProcessID={}, HOME={}'.format(self.uuid, os.environ.get('HOME')))
1✔
259
            wps_response._update_status(WPS_STATUS.STARTED, 'PyWPS Process started', 0)
1✔
260
            self.handler(wps_request, wps_response)  # the user must update the wps_response.
1✔
261
            # Ensure process termination
262
            if wps_response.status != WPS_STATUS.SUCCEEDED and wps_response.status != WPS_STATUS.FAILED:
1✔
263
                # if (not wps_response.status_percentage) or (wps_response.status_percentage != 100):
264
                LOGGER.debug('Updating process status to 100% if everything went correctly')
1✔
265
                wps_response._update_status(WPS_STATUS.SUCCEEDED, f'PyWPS Process {self.title} finished', 100)
1✔
266
        except Exception as e:
×
267
            traceback.print_exc()
×
268
            LOGGER.debug('Retrieving file and line number where exception occurred')
×
269
            exc_type, exc_obj, exc_tb = sys.exc_info()
×
270
            found = False
×
271
            while not found:
×
272
                # search for the _handler method
273
                m_name = exc_tb.tb_frame.f_code.co_name
×
274
                if m_name == '_handler':
×
275
                    found = True
×
276
                else:
277
                    if exc_tb.tb_next is not None:
×
278
                        exc_tb = exc_tb.tb_next
×
279
                    else:
280
                        # if not found then take the first
281
                        exc_tb = sys.exc_info()[2]
×
282
                        break
×
283
            fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
×
284
            method_name = exc_tb.tb_frame.f_code.co_name
×
285

286
            # update the process status to display process failed
287

288
            msg = 'Process error: method={}.{}, line={}, msg={}'.format(fname, method_name, exc_tb.tb_lineno, e)
×
289
            LOGGER.error(msg)
×
290
            # In case of a ProcessError use the validated exception message.
291
            if isinstance(e, ProcessError):
×
292
                msg = "Process error: {}".format(e)
×
293
            # Only in debug mode we use the log message including the traceback ...
294
            elif config.get_config_value("logging", "level") != "DEBUG":
×
295
                # ... otherwise we use a sparse common error message.
296
                msg = 'Process failed, please check server error log'
×
297
            wps_response._update_status(WPS_STATUS.FAILED, msg, 100)
×
298

299
        finally:
300
            # The run of the next pending request if finished here, weather or not it successful
301
            self.launch_next_process()
1✔
302

303
        return wps_response
1✔
304

305
    def launch_next_process(self):
1✔
306
        """Look at the queue of async process, if the queue is not empty launch the next pending request.
307
        """
308
        try:
1✔
309
            LOGGER.debug("Checking for stored requests")
1✔
310

311
            stored_request = dblog.pop_first_stored()
1✔
312
            if not stored_request:
1✔
313
                LOGGER.debug("No stored request found")
1✔
314
                return
1✔
315

316
            (uuid, request_json) = (stored_request.uuid, stored_request.request)
×
317
            request_json = request_json.decode('utf-8')
×
318
            LOGGER.debug("Launching the stored request {}".format(str(uuid)))
×
319
            new_wps_request = WPSRequest()
×
320
            new_wps_request.json = json.loads(request_json)
×
321
            process_identifier = new_wps_request.identifier
×
322
            process = self.service.prepare_process_for_execution(process_identifier)
×
323
            process._set_uuid(uuid)
×
324
            process._setup_status_storage()
×
325
            process.async_ = True
×
326
            process.setup_outputs_from_wps_request(new_wps_request)
×
327
            new_wps_response = ExecuteResponse(new_wps_request, process=process, uuid=uuid)
×
328
            new_wps_response.store_status_file = True
×
329
            process._run_async(new_wps_request, new_wps_response)
×
330
        except Exception as e:
×
331
            LOGGER.exception("Could not run stored process. {}".format(e))
×
332

333
    def clean(self):
1✔
334
        """Clean the process working dir and other temporary files
335
        """
336
        if config.get_config_value('server', 'cleantempdir'):
1✔
337
            LOGGER.info("Removing temporary working directory: {}".format(self.workdir))
1✔
338
            try:
1✔
339
                if os.path.isdir(self.workdir):
1✔
340
                    shutil.rmtree(self.workdir)
1✔
341
                if self._grass_mapset and os.path.isdir(self._grass_mapset):
1✔
342
                    LOGGER.info("Removing temporary GRASS GIS mapset: {}".format(self._grass_mapset))
×
343
                    shutil.rmtree(self._grass_mapset)
×
344
            except Exception as err:
×
345
                LOGGER.error('Unable to remove directory: {}'.format(err))
×
346
        else:
347
            LOGGER.warning('Temporary working directory is not removed: {}'.format(self.workdir))
×
348

349
    def set_workdir(self, workdir):
1✔
350
        """Set working dir for all inputs and outputs
351

352
        this is the directory, where all the data are being stored to
353
        """
354

355
        self.workdir = workdir
1✔
356
        for inpt in self.inputs:
1✔
357
            inpt.workdir = workdir
1✔
358

359
        for outpt in self.outputs:
1✔
360
            outpt.workdir = workdir
1✔
361

362
    def _set_grass(self, wps_request):
1✔
363
        """Handle given grass_location parameter of the constructor.
364

365
        location is either directory name, 'epsg:1234' form or a geo-referenced file.
366

367
        In the first case, new temporary mapset within the location will be created.
368

369
        In the second case, location will be created in self.workdir.
370

371
        The mapset should be deleted automatically using self.clean() method
372
        """
373
        if self.grass_location:
1✔
374

375
            import random
×
376
            import string
×
377

378
            from grass.script import core as grass
×
379
            from grass.script import setup as gsetup
×
380

381
            # HOME needs to be set - and that is usually not the case for httpd server
382
            os.environ['HOME'] = self.workdir
×
383

384
            # GISRC envvariable needs to be set
385
            gisrc = open(os.path.join(self.workdir, 'GISRC'), 'w')
×
386
            gisrc.write("GISDBASE: {}\n".format(self.workdir))
×
387
            gisrc.write("GUI: txt\n")
×
388
            gisrc.close()
×
389
            os.environ['GISRC'] = gisrc.name
×
390

391
            new_loc_args = dict()
×
392
            mapset_name = 'pywps_ms_{}'.format(
×
393
                ''.join(random.sample(string.ascii_letters, 5)))
394

395
            if self.grass_location.startswith('complexinput:'):
×
396
                # create new location from a georeferenced file
397
                ref_file_parameter = self.grass_location.split(':')[1]
×
398
                ref_file = wps_request.inputs[ref_file_parameter][0].file
×
399
                new_loc_args.update({'filename': ref_file})
×
400
            elif self.grass_location.lower().startswith('epsg:'):
×
401
                # create new location from epsg code
402
                epsg = self.grass_location.lower().replace('epsg:', '')
×
403
                new_loc_args.update({'epsg': epsg})
×
404

405
            if new_loc_args:
×
406
                dbase = self.workdir
×
407
                location = str()
×
408
                while os.path.isdir(os.path.join(dbase, location)):
×
409
                    location = 'pywps_loc_{}'.format(
×
410
                        ''.join(random.sample(string.ascii_letters, 5)))
411

412
                gsetup.init(os.environ['GISBASE'], dbase,
×
413
                            location, 'PERMANENT')
414

415
                grass.create_location(dbase=dbase,
×
416
                                      location=location,
417
                                      **new_loc_args)
418
                LOGGER.debug('GRASS location based on {} created'.format(
×
419
                    list(new_loc_args.keys())[0]))
420
                grass.run_command('g.mapset',
×
421
                                  mapset=mapset_name,
422
                                  flags='c',
423
                                  dbase=dbase,
424
                                  location=location,
425
                                  quiet=True)
426

427
            # create temporary mapset within existing location
428
            elif os.path.isdir(self.grass_location):
×
429
                from grass.pygrass.gis import make_mapset
×
430

431
                LOGGER.debug('Temporary mapset will be created')
×
432

433
                dbase = os.path.dirname(self.grass_location)
×
434
                location = os.path.basename(self.grass_location)
×
435

436
                grass.run_command('g.gisenv', set="GISDBASE={}".format(dbase))
×
437
                grass.run_command('g.gisenv', set="LOCATION_NAME=%s" % location)
×
438

439
                while os.path.isdir(os.path.join(dbase, location, mapset_name)):
×
440
                    mapset_name = 'pywps_ms_{}'.format(
×
441
                        ''.join(random.sample(string.ascii_letters, 5)))
442
                make_mapset(mapset=mapset_name, location=location,
×
443
                            gisdbase=dbase)
444
                grass.run_command('g.gisenv', set="MAPSET=%s" % mapset_name)
×
445

446
            else:
447
                # FIXME: This will fail as location is not set.
448
                raise NoApplicableCode('Location does exists or does not seem '
×
449
                                       'to be in "EPSG:XXXX" form nor is it existing directory: '
450
                                       '{}'.format(location))
451

452
            # set _grass_mapset attribute - will be deleted once handler ends
453
            self._grass_mapset = mapset_name
×
454

455
            # final initialization
456
            LOGGER.debug('GRASS Mapset set to {}'.format(mapset_name))
×
457

458
            LOGGER.debug('GRASS environment initialised')
×
459
            LOGGER.debug('GISRC {}, GISBASE {}, GISDBASE {}, LOCATION {}, MAPSET {}'.format(
×
460
                         os.environ.get('GISRC'), os.environ.get('GISBASE'),
461
                         dbase, location, os.path.basename(mapset_name)))
462

463
    def setup_outputs_from_wps_request(self, wps_request):
1✔
464
        # set as_reference to True for all the outputs specified as reference
465
        # if the output is not required to be raw
466
        if not wps_request.raw:
1✔
467
            for wps_outpt in wps_request.outputs:
1✔
468

469
                is_reference = wps_request.outputs[wps_outpt].get('asReference', 'false')
1✔
470
                mimetype = wps_request.outputs[wps_outpt].get('mimetype', '')
1✔
471
                if not isinstance(mimetype, str):
1✔
472
                    mimetype = ''
1✔
473

474
                if is_reference.lower() == 'true':
1✔
475
                    # check if store is supported
476
                    if self.store_supported == 'false':
×
477
                        raise StorageNotSupported(
×
478
                            'The storage of data is not supported for this process.')
479

480
                    is_reference = True
×
481
                else:
482
                    is_reference = False
1✔
483

484
                for outpt in self.outputs:
1✔
485
                    if outpt.identifier == wps_outpt:
1✔
486
                        outpt.as_reference = is_reference
1✔
487
                        if isinstance(outpt, ComplexOutput) and mimetype:
1✔
488
                            data_format = [f for f in outpt.supported_formats if f.mime_type == mimetype]
1✔
489
                            if len(data_format) == 0:
1✔
490
                                raise InvalidParameterValue(
1✔
491
                                    f"MimeType {mimetype} not valid")
492
                            outpt.data_format = data_format[0]
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc