• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nens / ThreeDiToolbox / #2521

22 May 2025 08:14AM UTC coverage: 34.891% (-0.1%) from 35.003%
#2521

push

coveralls-python

web-flow
Merge 9dc01b18d into 67c8900c8

5 of 70 new or added lines in 1 file covered. (7.14%)

4 existing lines in 1 file now uncovered.

4742 of 13591 relevant lines covered (34.89%)

0.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

25.19
/processing/schematisation_algorithms.py
1
# -*- coding: utf-8 -*-
2

3
"""
1✔
4
***************************************************************************
5
*                                                                         *
6
*   This program is free software; you can redistribute it and/or modify  *
7
*   it under the terms of the GNU General Public License as published by  *
8
*   the Free Software Foundation; either version 2 of the License, or     *
9
*   (at your option) any later version.                                   *
10
*                                                                         *
11
***************************************************************************
12
"""
13

14
import os
1✔
15
import shutil
1✔
16
import warnings
1✔
17

18
from pathlib import Path
1✔
19

20
from hydxlib.scripts import run_import_export
1✔
21
from hydxlib.scripts import write_logging_to_file
1✔
22

23
from osgeo import ogr, osr
1✔
24
from osgeo import gdal
1✔
25
from qgis.core import QgsProcessingAlgorithm
1✔
26
from qgis.core import QgsProcessingException
1✔
27
from qgis.core import QgsProcessingParameterBoolean
1✔
28
from qgis.core import QgsProcessingParameterFile
1✔
29
from qgis.core import QgsProcessingParameterFileDestination
1✔
30
from qgis.core import QgsProcessingParameterFolderDestination
1✔
31
from qgis.core import QgsProcessingParameterString
1✔
32
from qgis.core import QgsProject
1✔
33
from qgis.core import QgsVectorLayer
1✔
34
from qgis.PyQt.QtCore import QCoreApplication
1✔
35
from shapely import wkb  # Import Shapely wkb for geometry handling
1✔
36
from sqlalchemy.exc import DatabaseError
1✔
37
from sqlalchemy.exc import OperationalError
1✔
38

39
from threedi_modelchecker import ThreediModelChecker
1✔
40
from threedi_modelchecker.exporters import export_with_geom
1✔
41
from threedi_results_analysis.processing.download_hydx import download_hydx
1✔
42
from threedi_results_analysis.utils.utils import backup_sqlite
1✔
43
from threedi_schema import errors
1✔
44
from threedi_schema import ThreediDatabase
1✔
45

46

47
def get_threedi_database(filename, feedback):
1✔
48
    try:
×
49
        threedi_db = ThreediDatabase(filename)
×
50
        threedi_db.check_connection()
×
51
        return threedi_db
×
52
    except (OperationalError, DatabaseError):
×
53
        feedback.pushWarning("Invalid schematisation file")
×
54
        return None
×
55

56

57
def feedback_callback_factory(feedback):
1✔
58
    """Callback function to track schematisation migration progress."""
59

60
    def feedback_callback(progres_value, message):
×
61
        feedback.setProgress(progres_value)
×
62
        feedback.setProgressText(message)
×
63

64
    return feedback_callback
×
65

66

67
class MigrateAlgorithm(QgsProcessingAlgorithm):
1✔
68
    """
69
    Migrate 3Di model schema to the current version
70
    """
71

72
    INPUT = "INPUT"
1✔
73
    OUTPUT = "OUTPUT"
1✔
74

75
    def initAlgorithm(self, config):
1✔
76
        self.addParameter(
×
77
            QgsProcessingParameterFile(
78
                self.INPUT,
79
                "3Di schematisation database",
80
                fileFilter="3Di schematisation database (*.gpkg *.sqlite)"
81
            )
82
        )
83

84
    def processAlgorithm(self, parameters, context, feedback):
1✔
85
        filename = self.parameterAsFile(parameters, self.INPUT, context)
×
86
        threedi_db = get_threedi_database(filename=filename, feedback=feedback)
×
87
        if not threedi_db:
×
88
            return {self.OUTPUT: None}
×
89
        schema = threedi_db.schema
×
90

91
        # Check whether is it not an intermediate legacy geopackage created by
92
        # the schematisation editor
93
        if filename.endswith(".gpkg"):
×
94
            if schema.get_version() < 300:
×
95
                warn_msg = "The selected file is not a valid 3Di schematisation database.\n\nYou may have selected a geopackage that was created by an older version of the 3Di Schematisation Editor (before version 2.0). In that case, there will probably be a Spatialite (*.sqlite) in the same folder. Please use that file instead."
×
96
                feedback.pushWarning(warn_msg)
×
97
                return {self.OUTPUT: None}
×
98

99
        try:
×
100
            schema.validate_schema()
×
101
            schema.set_spatial_indexes()
×
102
        except errors.MigrationMissingError:
×
103
            backup_filepath = backup_sqlite(filename)
×
104

105
            srid, _ = schema._get_epsg_data()
×
106
            if srid is None:
×
107
                try:
×
108
                    srid = schema._get_dem_epsg()
×
109
                except errors.InvalidSRIDException:
×
110
                    srid = None
×
111
            if srid is None:
×
112
                feedback.pushWarning(
×
113
                    "Could not fetch valid EPSG code from database or DEM; aborting database migration."
114
                )
115
                return {self.OUTPUT: None}
×
116

117
            try:
×
118
                feedback_callback = feedback_callback_factory(feedback)
×
119
                with warnings.catch_warnings(record=True) as w:
×
120
                    warnings.simplefilter("always", UserWarning)
×
121
                    schema.upgrade(backup=False, epsg_code_override=srid, progress_func=feedback_callback)
×
122
                if w:
×
123
                    for warning in w:
×
124
                        feedback.pushWarning(f'{warning._category_name}: {warning.message}')
×
125
                schema.set_spatial_indexes()
×
126
                shutil.rmtree(os.path.dirname(backup_filepath))
×
127
            except errors.UpgradeFailedError:
×
128
                feedback.pushWarning(
×
129
                    "The schematisation database cannot be migrated to the current version. Please contact the service desk for assistance."
130
                )
131
                return {self.OUTPUT: None}
×
132
        success = True
×
133
        return {self.OUTPUT: success}
×
134

135
    def name(self):
1✔
136
        """
137
        Returns the algorithm name, used for identifying the algorithm. This
138
        string should be fixed for the algorithm, and must not be localised.
139
        The name should be unique within each provider. Names should contain
140
        lowercase alphanumeric characters only and no spaces or other
141
        formatting characters.
142
        """
143
        return "migrate"
×
144

145
    def displayName(self):
1✔
146
        """
147
        Returns the translated algorithm name, which should be used for any
148
        user-visible display of the algorithm name.
149
        """
150
        return self.tr("Migrate schematisation database")
×
151

152
    def group(self):
1✔
153
        """
154
        Returns the name of the group this algorithm belongs to. This string
155
        should be localised.
156
        """
157
        return self.tr(self.groupId())
×
158

159
    def groupId(self):
1✔
160
        """
161
        Returns the unique ID of the group this algorithm belongs to. This
162
        string should be fixed for the algorithm, and must not be localised.
163
        The group id should be unique within each provider. Group id should
164
        contain lowercase alphanumeric characters only and no spaces or other
165
        formatting characters.
166
        """
167
        return "Schematisation"
×
168

169
    def tr(self, string):
1✔
170
        return QCoreApplication.translate("Processing", string)
×
171

172
    def createInstance(self):
1✔
173
        return MigrateAlgorithm()
×
174

175

176
class CheckSchematisationAlgorithm(QgsProcessingAlgorithm):
1✔
177
    """
178
    Run the schematisation checker
179
    """
180

181
    INPUT = "INPUT"
1✔
182
    OUTPUT = "OUTPUT"
1✔
183
    ADD_TO_PROJECT = "ADD_TO_PROJECT"
1✔
184

185
    def initAlgorithm(self, config):
1✔
186
        self.addParameter(
×
187
            QgsProcessingParameterFile(
188
                self.INPUT, self.tr("3Di Schematisation"), fileFilter="GeoPackage (*.gpkg)"
189
            )
190
        )
191

192
        self.addParameter(
×
193
            QgsProcessingParameterFileDestination(
194
                self.OUTPUT, self.tr("Output"), fileFilter="csv"
195
            )
196
        )
197

198
        self.addParameter(
×
199
            QgsProcessingParameterBoolean(
200
                self.ADD_TO_PROJECT, self.tr("Add result to project"), defaultValue=True
201
            )
202
        )
203

204
    def processAlgorithm(self, parameters, context, feedback):
1✔
205
        self.add_to_project = self.parameterAsBoolean(
×
206
            parameters, self.ADD_TO_PROJECT, context
207
        )
208
        self.output_file_path = None
×
209
        input_filename = self.parameterAsFile(parameters, self.INPUT, context)
×
NEW
210
        self.schema_name = Path(input_filename).stem
×
211
        threedi_db = get_threedi_database(filename=input_filename, feedback=feedback)
×
212
        if not threedi_db:
×
213
            return {self.OUTPUT: None}
×
214
        try:
×
215
            model_checker = ThreediModelChecker(threedi_db)
×
216
        except errors.MigrationMissingError:
×
217
            feedback.pushWarning(
×
218
                "The selected 3Di model does not have the latest migration. Please "
219
                "migrate your model to the latest version."
220
            )
221
            return {self.OUTPUT: None}
×
222
        schema = threedi_db.schema
×
223
        schema.set_spatial_indexes()
×
NEW
224
        srid, _ = schema._get_epsg_data()
×
UNCOV
225
        generated_output_file_path = self.parameterAsFileOutput(
×
226
            parameters, self.OUTPUT, context
227
        )
NEW
228
        self.output_file_path = f"{os.path.splitext(generated_output_file_path)[0]}.gpkg"
×
229
        session = model_checker.db.get_session()
×
230
        session.model_checker_context = model_checker.context
×
231
        total_checks = len(model_checker.config.checks)
×
232
        progress_per_check = 100.0 / total_checks
×
233
        checks_passed = 0
×
NEW
234
        error_list = []
×
NEW
235
        for i, check in enumerate(model_checker.checks(level="info")):
×
NEW
236
            model_errors = check.get_invalid(session)
×
NEW
237
            error_list += [[check, error_row] for error_row in model_errors]
×
NEW
238
            checks_passed += 1
×
NEW
239
            feedback.setProgress(int(checks_passed * progress_per_check))
×
NEW
240
        error_details = export_with_geom(error_list)
×
241

242
        # Create an output GeoPackage
NEW
243
        gdal.UseExceptions()
×
NEW
244
        driver = ogr.GetDriverByName("GPKG")
×
NEW
245
        data_source = driver.CreateDataSource(self.output_file_path)
×
NEW
246
        if data_source is None:
×
UNCOV
247
            feedback.pushWarning(
×
248
                f"Unable to create the GeoPackage '{self.output_file_path}', check the directory permissions."
249
            )
250
            return {self.OUTPUT: None}
×
251

252
        # Loop through the error_details and group by geometry type
NEW
253
        grouped_errors = {'Point': [], 'LineString': [], 'Polygon': [], 'Table': []}
×
NEW
254
        for error in error_details:
×
NEW
255
            geom = wkb.loads(error.geom.data) if error.geom is not None else None
×
NEW
256
            if geom is None or geom.geom_type not in grouped_errors.keys():
×
NEW
257
                feature_type = 'Table'
×
258
            else:
NEW
259
                feature_type = geom.geom_type
×
NEW
260
            grouped_errors[feature_type].append(error)
×
261

NEW
262
        group_name_map = {'LineString': 'Line'}
×
NEW
263
        for feature_type, errors_group in grouped_errors.items():
×
NEW
264
            group_name = f'{group_name_map.get(feature_type, feature_type)} features'
×
NEW
265
            if feature_type == 'Table':
×
266
                # Create a table for non-geometry errors
NEW
267
                layer = data_source.CreateLayer(
×
268
                    group_name, None, ogr.wkbNone, options=["OVERWRITE=YES"]
269
                )
270
            else:
271
                # Create a layer for each type of geometry
NEW
272
                spatial_ref = osr.SpatialReference()
×
NEW
273
                spatial_ref.ImportFromEPSG(srid)
×
NEW
274
                layer = data_source.CreateLayer(
×
275
                    group_name,
276
                    srs=spatial_ref,
277
                    geom_type=getattr(ogr, f"wkb{feature_type}"),
278
                    options=["OVERWRITE=YES"],
279
                    )
280

281
            # Add fields
NEW
282
            layer.CreateField(ogr.FieldDefn("level", ogr.OFTString))
×
NEW
283
            layer.CreateField(ogr.FieldDefn("error_code", ogr.OFTString))
×
NEW
284
            layer.CreateField(ogr.FieldDefn("id", ogr.OFTString))
×
NEW
285
            layer.CreateField(ogr.FieldDefn("table", ogr.OFTString))
×
NEW
286
            layer.CreateField(ogr.FieldDefn("column", ogr.OFTString))
×
NEW
287
            layer.CreateField(ogr.FieldDefn("value", ogr.OFTString))
×
NEW
288
            layer.CreateField(ogr.FieldDefn("description", ogr.OFTString))
×
289

NEW
290
            defn = layer.GetLayerDefn()
×
NEW
291
            for error in errors_group:
×
NEW
292
                feat = ogr.Feature(defn)
×
NEW
293
                feat.SetField("level", error.name)
×
NEW
294
                feat.SetField("error_code", error.code)
×
NEW
295
                feat.SetField("id", error.id)
×
NEW
296
                feat.SetField("table", error.table)
×
NEW
297
                feat.SetField("column", error.column)
×
NEW
298
                feat.SetField("value", error.value)
×
NEW
299
                feat.SetField("description", error.description)
×
NEW
300
                if feature_type != 'Table':
×
NEW
301
                    geom = wkb.loads(error.geom.data)  # Convert WKB to a Shapely geometry object
×
NEW
302
                    feat.SetGeometry(ogr.CreateGeometryFromWkb(geom.wkb))  # Convert back to OGR-compatible WKB
×
NEW
303
                layer.CreateFeature(feat)
×
304

NEW
305
        feedback.pushInfo(f"GeoPackage successfully written to {self.output_file_path}")
×
UNCOV
306
        return {self.OUTPUT: self.output_file_path}
×
307

308
    def postProcessAlgorithm(self, context, feedback):
1✔
NEW
309
        if self.add_to_project and self.output_file_path:
×
310
            # Create a group for the GeoPackage layers
NEW
311
            group = QgsProject.instance().layerTreeRoot().insertGroup(0, f'Check results: {self.schema_name}')
×
312
            # Add all layers in the geopackage to the group
NEW
313
            conn = ogr.Open(self.output_file_path)
×
NEW
314
            if conn:
×
NEW
315
                for i in range(conn.GetLayerCount()):
×
NEW
316
                    layer_name = conn.GetLayerByIndex(i).GetName()
×
NEW
317
                    layer_uri = f"{self.output_file_path}|layername={layer_name}"
×
NEW
318
                    layer = QgsVectorLayer(layer_uri, layer_name.replace('errors_', '', 1), "ogr")
×
NEW
319
                    if layer.isValid():
×
NEW
320
                        added_layer = QgsProject.instance().addMapLayer(layer, False)
×
NEW
321
                        group.addLayer(added_layer)
×
322
                    else:
NEW
323
                        feedback.reportError(f"Layer {layer_name} is not valid")
×
NEW
324
                conn = None  # Close the connection
×
325
            else:
NEW
326
                feedback.reportError(f"Could not open GeoPackage file: {self.output_file_path}")
×
UNCOV
327
        return {self.OUTPUT: self.output_file_path}
×
328

329
    def name(self):
1✔
330
        """
331
        Returns the algorithm name, used for identifying the algorithm. This
332
        string should be fixed for the algorithm, and must not be localised.
333
        The name should be unique within each provider. Names should contain
334
        lowercase alphanumeric characters only and no spaces or other
335
        formatting characters.
336
        """
337
        return "check_schematisation"
×
338

339
    def displayName(self):
1✔
340
        """
341
        Returns the translated algorithm name, which should be used for any
342
        user-visible display of the algorithm name.
343
        """
344
        return self.tr("Check Schematisation")
×
345

346
    def group(self):
1✔
347
        """
348
        Returns the name of the group this algorithm belongs to. This string
349
        should be localised.
350
        """
351
        return self.tr(self.groupId())
×
352

353
    def groupId(self):
1✔
354
        """
355
        Returns the unique ID of the group this algorithm belongs to. This
356
        string should be fixed for the algorithm, and must not be localised.
357
        The group id should be unique within each provider. Group id should
358
        contain lowercase alphanumeric characters only and no spaces or other
359
        formatting characters.
360
        """
361
        return "Schematisation"
×
362

363
    def tr(self, string):
1✔
364
        return QCoreApplication.translate("Processing", string)
×
365

366
    def createInstance(self):
1✔
367
        return CheckSchematisationAlgorithm()
×
368

369

370
class ImportHydXAlgorithm(QgsProcessingAlgorithm):
1✔
371
    """
372
    Import data from GWSW HydX to a 3Di Schematisation
373
    """
374

375
    INPUT_DATASET_NAME = "INPUT_DATASET_NAME"
1✔
376
    HYDX_DOWNLOAD_DIRECTORY = "HYDX_DOWNLOAD_DIRECTORY"
1✔
377
    INPUT_HYDX_DIRECTORY = "INPUT_HYDX_DIRECTORY"
1✔
378
    TARGET_SCHEMATISATION = "TARGET_SCHEMATISATION"
1✔
379

380
    def initAlgorithm(self, config):
1✔
381
        self.addParameter(
×
382
            QgsProcessingParameterFile(
383
                self.TARGET_SCHEMATISATION, "Target 3Di Schematisation", fileFilter="GeoPackage (*.gpkg)"
384
            )
385
        )
386

387
        self.addParameter(
×
388
            QgsProcessingParameterFile(
389
                self.INPUT_HYDX_DIRECTORY,
390
                "GWSW HydX directory (local)",
391
                behavior=QgsProcessingParameterFile.Folder,
392
                optional=True,
393
            )
394
        )
395

396
        self.addParameter(
×
397
            QgsProcessingParameterString(
398
                self.INPUT_DATASET_NAME, "GWSW dataset name (online)", optional=True
399
            )
400
        )
401

402
        self.addParameter(
×
403
            QgsProcessingParameterFolderDestination(
404
                self.HYDX_DOWNLOAD_DIRECTORY,
405
                "Destination directory for GWSW HydX dataset download",
406
                optional=True,
407
            )
408
        )
409

410
    def processAlgorithm(self, parameters, context, feedback):
1✔
411
        hydx_dataset_name = self.parameterAsString(
×
412
            parameters, self.INPUT_DATASET_NAME, context
413
        )
414
        hydx_download_dir = self.parameterAsString(
×
415
            parameters, self.HYDX_DOWNLOAD_DIRECTORY, context
416
        )
417
        hydx_path = self.parameterAsString(
×
418
            parameters, self.INPUT_HYDX_DIRECTORY, context
419
        )
420
        out_path = self.parameterAsFile(parameters, self.TARGET_SCHEMATISATION, context)
×
421
        threedi_db = get_threedi_database(filename=out_path, feedback=feedback)
×
422
        if not threedi_db:
×
423
            raise QgsProcessingException(
×
424
                f"Unable to connect to 3Di schematisation '{out_path}'"
425
            )
426
        try:
×
427
            schema = threedi_db.schema
×
428
            schema.validate_schema()
×
429

430
        except errors.MigrationMissingError:
×
431
            raise QgsProcessingException(
×
432
                "The selected 3Di schematisation does not have the latest database schema version. Please migrate this "
433
                "schematisation and try again: Processing > Toolbox > 3Di > Schematisation > Migrate schematisation database"
434
            )
435
        if not (hydx_dataset_name or hydx_path):
×
436
            raise QgsProcessingException(
×
437
                "Either 'GWSW HydX directory (local)' or 'GWSW dataset name (online)' must be filled in!"
438
            )
439
        if hydx_dataset_name and hydx_path:
×
440
            feedback.pushWarning(
×
441
                "Both 'GWSW dataset name (online)' and 'GWSW HydX directory (local)' are filled in. "
442
                "'GWSW dataset name (online)' will be ignored. This dataset will not be downloaded."
443
            )
444
        elif hydx_dataset_name:
×
445
            try:
×
446
                hydx_download_path = Path(hydx_download_dir)
×
447
                hydx_download_dir_is_valid = hydx_download_path.is_dir()
×
448
            except TypeError:
×
449
                hydx_download_dir_is_valid = False
×
450
            if parameters[self.HYDX_DOWNLOAD_DIRECTORY] == "TEMPORARY_OUTPUT":
×
451
                hydx_download_dir_is_valid = True
×
452
            if not hydx_download_dir_is_valid:
×
453
                raise QgsProcessingException(
×
454
                    f"'Destination directory for HydX dataset download' ({hydx_download_path}) is not a valid directory"
455
                )
456
            hydx_path = download_hydx(
×
457
                dataset_name=hydx_dataset_name,
458
                target_directory=hydx_download_path,
459
                wait_times=[0.1, 1, 2, 3, 4, 5, 10],
460
                feedback=feedback,
461
            )
462
            # hydx_path will be None if user has canceled the process during download
463
            if feedback.isCanceled():
×
464
                raise QgsProcessingException("Process canceled")
×
465
            if hydx_path is None:
×
466
                raise QgsProcessingException("Error in retrieving dataset (note case-sensitivity)")
×
467
        feedback.pushInfo(f"Starting import of {hydx_path} to {out_path}")
×
468
        log_path = Path(out_path).parent / "import_hydx.log"
×
469
        write_logging_to_file(log_path)
×
470
        feedback.pushInfo(f"Logging will be written to {log_path}")
×
471
        run_import_export(hydx_path=hydx_path, out_path=out_path)
×
472
        return {}
×
473

474
    def name(self):
1✔
475
        """
476
        Returns the algorithm name, used for identifying the algorithm. This
477
        string should be fixed for the algorithm, and must not be localised.
478
        The name should be unique within each provider. Names should contain
479
        lowercase alphanumeric characters only and no spaces or other
480
        formatting characters.
481
        """
482
        return "import_hydx"
×
483

484
    def displayName(self):
1✔
485
        """
486
        Returns the translated algorithm name, which should be used for any
487
        user-visible display of the algorithm name.
488
        """
489
        return self.tr("Import GWSW HydX")
×
490

491
    def shortHelpString(self):
1✔
492
        return """
×
493
        <h3>Introduction</h3>
494
        <p>Use this processing algorithm to import data in the format of the Dutch "Gegevenswoordenboek Stedelijk Water (GWSW)". Either select a previously downloaded local dataset, or download a dataset directly from the server.</p>
495
        <p>A log file will be created in the same directory as the Target 3Di schematisation. Please check this log file after the import has completed.&nbsp;&nbsp;</p>
496
        <h3>Parameters</h3>
497
        <h4>Target 3Di Schematisation</h4>
498
        <p>GeoPackage (.gpkg) file that contains the layers required by 3Di. Imported data will be added to any data already contained in the 3Di schematisation.</p>
499
        <h4>GWSW HydX directory (local)</h4>
500
        <p>Use this option if you have already downloaded a GWSW HydX dataset to a local directory.</p>
501
        <h4>GWSW dataset name (online)</h4>
502
        <p>Use this option if you want to download a GWSW HydX dataset.</p>
503
        <h4>Destination directory for GWSW HydX dataset download</h4>
504
        <p>If you have chosen to download a GWSW HydX dataset, this is the directory it will be downloaded to.</p>
505
        """
506

507
    def group(self):
1✔
508
        """
509
        Returns the name of the group this algorithm belongs to. This string
510
        should be localised.
511
        """
512
        return self.tr(self.groupId())
×
513

514
    def groupId(self):
1✔
515
        """
516
        Returns the unique ID of the group this algorithm belongs to. This
517
        string should be fixed for the algorithm, and must not be localised.
518
        The group id should be unique within each provider. Group id should
519
        contain lowercase alphanumeric characters only and no spaces or other
520
        formatting characters.
521
        """
522
        return "Schematisation"
×
523

524
    def tr(self, string):
1✔
525
        return QCoreApplication.translate("Processing", string)
×
526

527
    def createInstance(self):
1✔
528
        return ImportHydXAlgorithm()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc