• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine-python / 16367912334

18 Jul 2025 10:06AM UTC coverage: 76.934% (+0.1%) from 76.806%
16367912334

push

github

web-flow
ci: use Ruff as new formatter and linter (#233)

* wip

* pycodestyle

* update dependencies

* skl2onnx

* use ruff

* apply formatter

* apply lint auto fixes

* manually apply lints

* change check

* ruff ci from branch

2805 of 3646 relevant lines covered (76.93%)

0.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

20.9
geoengine/raster_workflow_rio_writer.py
1
"""A module that contains classes to write raster data from a Geo Engine raster workflow."""
2

3
from datetime import datetime
1✔
4
from typing import cast
1✔
5

6
import numpy as np
1✔
7
import rasterio as rio
1✔
8

9
from geoengine.raster import ge_type_to_np
1✔
10
from geoengine.types import RasterResultDescriptor, TimeInterval
1✔
11
from geoengine.workflow import QueryRectangle, Workflow
1✔
12

13

14
# pylint: disable=too-many-instance-attributes
15
class RasterWorkflowRioWriter:
1✔
16
    """
17
    A class to write raster data from a Geo Engine raster workflow to a GDAL dataset.
18
    It creates a new dataset for each time interval and writes the tiles to the dataset.
19
    Multiple bands are supported and the bands are written to the dataset in the order of the result descriptor.
20
    """
21

22
    current_dataset: rio.io.DatasetWriter | None = None
1✔
23
    current_time: TimeInterval | None = None
1✔
24
    dataset_geo_transform = None
1✔
25
    dataset_width = None
1✔
26
    dataset_height = None
1✔
27
    dataset_data_type = np.dtype
1✔
28
    print_info = False
1✔
29

30
    dataset_prefix = None
1✔
31
    workflow: Workflow | None = None
1✔
32
    bands = None
1✔
33
    no_data_value = 0
1✔
34
    time_format = "%Y-%m-%d_%H-%M-%S"
1✔
35

36
    gdal_driver = "GTiff"
1✔
37
    rio_kwargs = {"tiled": True, "compress": "DEFLATE", "zlevel": 6}
1✔
38
    tile_size = 512
1✔
39

40
    # pylint: disable=too-many-arguments,too-many-positional-arguments
41
    def __init__(
1✔
42
        self, dataset_prefix, workflow: Workflow, no_data_value=0, data_type=None, print_info=False, rio_kwargs=None
43
    ):
44
        """Create a new RasterWorkflowGdalWriter instance."""
45
        self.dataset_prefix = dataset_prefix
×
46
        self.workflow = workflow
×
47
        self.no_data_value = no_data_value
×
48
        self.print_info = print_info
×
49

50
        ras_res = cast(RasterResultDescriptor, self.workflow.get_result_descriptor())
×
51
        dt = ge_type_to_np(ras_res.data_type)
×
52
        self.dataset_data_type = dt if data_type is None else data_type
×
53
        self.bands = ras_res.bands
×
54
        if rio_kwargs:
×
55
            for key, value in rio_kwargs.items():
×
56
                self.rio_kwargs[key] = value
×
57

58
    def close_current_dataset(self):
1✔
59
        """Close the current dataset"""
60
        if self.current_dataset:
×
61
            del self.current_dataset
×
62
            self.current_dataset = None
×
63

64
    # pylint: disable=too-many-locals, too-many-statements
65
    def create_tiling_geo_transform_width_height(self, query: QueryRectangle):
1✔
66
        """Create the tiling geo transform, width and height for the current query."""
67

68
        ul_x = query.spatial_bounds.xmin
×
69
        ul_y = query.spatial_bounds.ymax
×
70
        lr_x = query.spatial_bounds.xmax
×
71
        lr_y = query.spatial_bounds.ymin
×
72
        res_x = query.spatial_resolution.x_resolution
×
73
        res_y = query.spatial_resolution.y_resolution * -1  # honor the fact that the y axis is flipped
×
74

75
        assert res_y < 0, "The y resolution must be negative"
×
76

77
        assert ul_x < lr_x, "The upper left x coordinate must be smaller than the lower right x coordinate"
×
78
        assert ul_y > lr_y, "The upper left y coordinate must be greater than the lower right y coordinate"
×
79

80
        ul_pixel_x = ul_x / res_x  # we can assume that the global origin is 0,0
×
81
        ul_pixel_y = ul_y / res_y
×
82
        lr_pixel_x = lr_x / res_x
×
83
        lr_pixel_y = lr_y / res_y
×
84

85
        assert ul_pixel_x < lr_pixel_x, "The upper left pixel x must be smaller than the lower right pixel x"
×
86
        assert ul_pixel_y < lr_pixel_y, "The upper left pixel y must be smaller than the lower right pixel y"
×
87

88
        tiling_ul_pixel_x = (ul_pixel_x // self.tile_size) * self.tile_size
×
89
        if ul_pixel_x % self.tile_size != 0:
×
90
            tiling_ul_pixel_x = ((ul_pixel_x // self.tile_size) - 1) * self.tile_size
×
91

92
        tiling_ul_pixel_y = (ul_pixel_y // self.tile_size) * self.tile_size
×
93
        if ul_pixel_y % self.tile_size != 0:
×
94
            tiling_ul_pixel_y = ((ul_pixel_y // self.tile_size) - 1) * self.tile_size
×
95

96
        assert tiling_ul_pixel_x <= ul_pixel_x, "Tiling upper left x pixel must be smaller than upper left x coordinate"
×
97
        assert tiling_ul_pixel_y <= ul_pixel_y, "Tiling upper left y pixel must be smaller than upper left y coordinate"
×
98

99
        width = int(lr_pixel_x - tiling_ul_pixel_x)
×
100
        if width % self.tile_size != 0:
×
101
            width = int((width // self.tile_size + 1) * self.tile_size)
×
102
        assert width > 0, "The width must be greater than 0"
×
103

104
        height = int(lr_pixel_y - tiling_ul_pixel_y)
×
105
        if height % self.tile_size != 0:
×
106
            height = int((height // self.tile_size + 1) * self.tile_size)
×
107
        assert height > 0, "The height must be greater than 0"
×
108

109
        assert width % self.tile_size == 0, "The width must be a multiple of the tile size"
×
110
        assert height % self.tile_size == 0, "The height must be a multiple of the tile size"
×
111

112
        tiling_ul_x_coord = tiling_ul_pixel_x * res_x
×
113
        tiling_ul_y_coord = tiling_ul_pixel_y * res_y
×
114
        assert tiling_ul_x_coord <= ul_x, "Tiling upper left x coordinate must be smaller than upper left x coordinate"
×
115
        assert tiling_ul_y_coord >= ul_y, "Tiling upper left y coordinate must be greater than upper left y coordinate"
×
116

117
        geo_transform = [tiling_ul_x_coord, res_x, 0.0, tiling_ul_y_coord, 0.0, res_y]
×
118

119
        if self.dataset_geo_transform is None:
×
120
            self.dataset_geo_transform = geo_transform
×
121
        else:
122
            assert self.dataset_geo_transform == geo_transform, "Can not change the geo transform of the dataset"
×
123

124
        if self.dataset_width is None:
×
125
            self.dataset_width = width
×
126
        else:
127
            assert self.dataset_width == width, "The width of the current dataset does not match the new one"
×
128

129
        if self.dataset_height is None:
×
130
            self.dataset_height = height
×
131
        else:
132
            assert self.dataset_height == height, "The height of the current dataset does not match the new one"
×
133

134
    def __create_new_dataset(self, query: QueryRectangle):
1✔
135
        """Create a new dataset for the current query."""
136
        assert self.current_time is not None, "The current time must be set"
×
137
        time_formated_start = self.current_time.start.astype(datetime).strftime(self.time_format)
×
138
        width = self.dataset_width
×
139
        height = self.dataset_height
×
140
        geo_transform = self.dataset_geo_transform
×
141
        assert geo_transform is not None
×
142
        affine_transform = rio.Affine.from_gdal(
×
143
            geo_transform[0], geo_transform[1], geo_transform[2], geo_transform[3], geo_transform[4], geo_transform[5]
144
        )
145
        if self.print_info:
×
146
            print(
×
147
                f"Creating dataset {self.dataset_prefix}{time_formated_start}.tif"
148
                f" with width {width}, height {height}, geo_transform {geo_transform}"
149
                f" rio kwargs: {self.rio_kwargs}"
150
            )
151
        assert self.bands is not None, "The bands must be set"
×
152
        number_of_bands = len(self.bands)
×
153
        dataset_data_type = self.dataset_data_type
×
154
        file_path = f"{self.dataset_prefix}{time_formated_start}.tif"
×
155
        rio_dataset = rio.open(
×
156
            file_path,
157
            "w",
158
            driver=self.gdal_driver,
159
            width=width,
160
            height=height,
161
            count=number_of_bands,
162
            crs=query.srs,
163
            transform=affine_transform,
164
            dtype=dataset_data_type,
165
            nodata=self.no_data_value,
166
            **self.rio_kwargs,
167
        )
168

169
        for i, b in enumerate(self.bands, start=1):
×
170
            b_n = b.name
×
171
            b_m = str(b.measurement)
×
172
            rio_dataset.update_tags(i, band_name=b_n, band_measurement=b_m)
×
173

174
        self.current_dataset = rio_dataset
×
175

176
    async def query_and_write(self, query: QueryRectangle, skip_empty_times=True):
1✔
177
        """
178
        Query the raster workflow and write the resulting tiles to a GDAL dataset per timeslice.
179

180
        :param query: The QueryRectangle to write to GDAL dataset(s)
181
        :param skip_empty_times: Skip timeslices where all pixels are empty/nodata
182
        """
183

184
        self.create_tiling_geo_transform_width_height(query)
×
185

186
        assert self.bands is not None, "The bands must be set"
×
187
        bands = list(range(0, len(self.bands)))
×
188

189
        assert self.workflow is not None, "The workflow must be set"
×
190
        try:
×
191
            async for tile in self.workflow.raster_stream(query, bands=bands):
×
192
                if self.current_time != tile.time:
×
193
                    self.close_current_dataset()
×
194
                    self.current_time = tile.time
×
195

196
                if tile.is_empty() and skip_empty_times:
×
197
                    continue
×
198

199
                if self.current_dataset is None:
×
200
                    self.__create_new_dataset(query)
×
201

202
                assert self.current_time == tile.time, "The time of the current dataset does not match the tile"
×
203
                assert self.dataset_geo_transform is not None, "The geo transform must be set"
×
204

205
                tile_ul_x = int(
×
206
                    (tile.geo_transform.x_min - self.dataset_geo_transform[0]) / self.dataset_geo_transform[1]
207
                )
208
                tile_ul_y = int(
×
209
                    (tile.geo_transform.y_max - self.dataset_geo_transform[3]) / self.dataset_geo_transform[5]
210
                )
211

212
                band_index = tile.band + 1
×
213
                data = tile.to_numpy_data_array(self.no_data_value)
×
214

215
                assert self.tile_size == tile.size_x == tile.size_y, "Tile size does not match the expected size"
×
216
                window = rio.windows.Window(tile_ul_x, tile_ul_y, tile.size_x, tile.size_y)
×
217
                assert self.current_dataset is not None, "Dataset must be open."
×
218
                self.current_dataset.write(data, window=window, indexes=band_index)
×
219
        except Exception as inner_e:
×
220
            raise RuntimeError(f"Tile at {tile.spatial_partition().as_bbox_str()} with {tile.time}") from inner_e
×
221

222
        finally:
223
            self.close_current_dataset()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc