• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine-python / 16367912334

18 Jul 2025 10:06AM UTC coverage: 76.934% (+0.1%) from 76.806%
16367912334

push

github

web-flow
ci: use Ruff as new formatter and linter (#233)

* wip

* pycodestyle

* update dependencies

* skl2onnx

* use ruff

* apply formatter

* apply lint auto fixes

* manually apply lints

* change check

* ruff ci from branch

2805 of 3646 relevant lines covered (76.93%)

0.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.07
geoengine/tasks.py
1
"""
2
Module for encapsulating Geo Engine tasks API
3
"""
4

5
from __future__ import annotations
1✔
6

7
import asyncio
1✔
8
import datetime
1✔
9
import time
1✔
10
from enum import Enum
1✔
11
from uuid import UUID
1✔
12

13
import geoengine_openapi_client
1✔
14

15
from geoengine import backports
1✔
16
from geoengine.auth import get_session
1✔
17
from geoengine.error import GeoEngineException, TypeException
1✔
18
from geoengine.types import DEFAULT_ISO_TIME_FORMAT
1✔
19

20

21
class TaskId:
1✔
22
    """A wrapper for a task id"""
23

24
    def __init__(self, task_id: UUID) -> None:
1✔
25
        self.__task_id = task_id
1✔
26

27
    @classmethod
1✔
28
    def from_response(cls, response: geoengine_openapi_client.TaskResponse) -> TaskId:
1✔
29
        """Parse a http response to an `TaskId`"""
30

31
        return TaskId(UUID(response.task_id))
1✔
32

33
    def __eq__(self, other) -> bool:
1✔
34
        """Checks if two dataset ids are equal"""
35
        if not isinstance(other, self.__class__):
1✔
36
            return False
×
37

38
        return self.__task_id == other.__task_id  # pylint: disable=protected-access
1✔
39

40
    def __str__(self) -> str:
1✔
41
        return str(self.__task_id)
1✔
42

43
    def __repr__(self) -> str:
1✔
44
        return repr(self.__task_id)
×
45

46

47
class TaskStatus(Enum):
1✔
48
    """An enum of task status types"""
49

50
    RUNNING = "running"
1✔
51
    COMPLETED = "completed"
1✔
52
    ABORTED = "aborted"
1✔
53
    FAILED = "failed"
1✔
54

55

56
class TaskStatusInfo:  # pylint: disable=too-few-public-methods
1✔
57
    """A wrapper for a task status type"""
58

59
    status: TaskStatus
1✔
60
    time_started: datetime.datetime
1✔
61

62
    def __init__(self, status, time_started) -> None:
1✔
63
        self.status = status
1✔
64
        self.time_started = time_started
1✔
65

66
    @classmethod
1✔
67
    def from_response(cls, response: geoengine_openapi_client.TaskStatus) -> TaskStatusInfo:
1✔
68
        """
69
        Parse a http response to a `TaskStatusInfo`
70

71
        The task can be one of:
72
        RunningTaskStatusInfo, CompletedTaskStatusInfo, AbortedTaskStatusInfo or FailedTaskStatusInfo
73
        """
74

75
        inner = response.actual_instance
1✔
76
        if inner is None:
1✔
77
            raise TypeException("Unknown `TaskStatus` type")
×
78

79
        status = TaskStatus(inner.status)
1✔
80
        time_started = None
1✔
81
        if (
1✔
82
            isinstance(inner, geoengine_openapi_client.TaskStatusRunning | geoengine_openapi_client.TaskStatusCompleted)
83
            and inner.time_started is not None
84
        ):
85
            time_started = datetime.datetime.strptime(inner.time_started, DEFAULT_ISO_TIME_FORMAT)
1✔
86

87
        if isinstance(inner, geoengine_openapi_client.TaskStatusRunning):
1✔
88
            return RunningTaskStatusInfo(
1✔
89
                status,
90
                time_started,
91
                inner.pct_complete,
92
                inner.estimated_time_remaining,
93
                inner.info,
94
                inner.task_type,
95
                inner.description,
96
            )
97
        if isinstance(inner, geoengine_openapi_client.TaskStatusCompleted):
1✔
98
            return CompletedTaskStatusInfo(
1✔
99
                status, time_started, inner.info, inner.time_total, inner.task_type, inner.description
100
            )
101
        if isinstance(inner, geoengine_openapi_client.TaskStatusAborted):
1✔
102
            return AbortedTaskStatusInfo(status, time_started, inner.clean_up)
1✔
103
        if isinstance(inner, geoengine_openapi_client.TaskStatusFailed):
1✔
104
            return FailedTaskStatusInfo(status, time_started, inner.error, inner.clean_up)
1✔
105
        raise GeoEngineException(response)
×
106

107

108
class RunningTaskStatusInfo(TaskStatusInfo):
1✔
109
    """A wrapper for a running task status with information about completion progress"""
110

111
    # pylint: disable=too-many-positional-arguments
112
    def __init__(
1✔
113
        self, status, start_time, pct_complete, estimated_time_remaining, info, task_type, description
114
    ) -> None:  # pylint: disable=too-many-arguments,line-too-long
115
        super().__init__(status, start_time)
1✔
116
        self.pct_complete = pct_complete
1✔
117
        self.estimated_time_remaining = estimated_time_remaining
1✔
118
        self.info = info
1✔
119
        self.task_type = task_type
1✔
120
        self.description = description
1✔
121

122
    def __eq__(self, other):
1✔
123
        """Check if two task statuses are equal"""
124
        if not isinstance(other, self.__class__):
1✔
125
            return False
×
126

127
        return (
1✔
128
            self.status == other.status
129
            and self.pct_complete == other.pct_complete
130
            and self.estimated_time_remaining == other.estimated_time_remaining
131
            and self.info == other.info
132
            and self.task_type == other.task_type
133
            and self.description == other.description
134
        )
135

136
    def __str__(self) -> str:
1✔
137
        return (
1✔
138
            f"status={self.status.value}, time_started={self.time_started}, "
139
            f"pct_complete={self.pct_complete}, "
140
            f"estimated_time_remaining={self.estimated_time_remaining}, info={self.info}, "
141
            f"task_type={self.task_type}, description={self.description}"
142
        )
143

144
    def __repr__(self) -> str:
1✔
145
        return (
×
146
            f"TaskStatusInfo(status={self.status.value!r}, pct_complete={self.pct_complete!r}, "
147
            f"estimated_time_remaining={self.estimated_time_remaining!r}, info={self.info!r}, "
148
            f"task_type={self.task_type!r}, description={self.description!r})"
149
        )
150

151

152
class CompletedTaskStatusInfo(TaskStatusInfo):
1✔
153
    """A wrapper for a completed task status with information about the completion"""
154

155
    # pylint: disable=too-many-arguments, too-many-positional-arguments
156
    def __init__(self, status, time_started, info, time_total, task_type, description) -> None:
1✔
157
        super().__init__(status, time_started)
1✔
158
        self.info = info
1✔
159
        self.time_total = time_total
1✔
160
        self.task_type = task_type
1✔
161
        self.description = description
1✔
162

163
    def __eq__(self, other):
1✔
164
        """Check if two task statuses are equal"""
165
        if not isinstance(other, self.__class__):
1✔
166
            return False
×
167

168
        return (
1✔
169
            self.status == other.status
170
            and self.info == other.info
171
            and self.time_total == other.time_total
172
            and self.task_type == other.task_type
173
            and self.description == other.description
174
        )
175

176
    def __str__(self) -> str:
1✔
177
        return (
1✔
178
            f"status={self.status.value}, time_started={self.time_started}, info={self.info}, "
179
            f"time_total={self.time_total}, task_type={self.task_type}, description={self.description}"
180
        )
181

182
    def __repr__(self) -> str:
1✔
183
        return (
×
184
            f"TaskStatusInfo(status={self.status.value!r}, time_started={self.time_started!r}, "
185
            f"info = {self.info!r}, time_total = {self.time_total!r}, task_type={self.task_type!r}, "
186
            f"description={self.description!r})"
187
        )
188

189

190
class AbortedTaskStatusInfo(TaskStatusInfo):
1✔
191
    """A wrapper for an aborted task status with information about the termination"""
192

193
    def __init__(self, status, time_started, clean_up) -> None:
1✔
194
        super().__init__(status, time_started)
1✔
195
        self.clean_up = clean_up
1✔
196

197
    def __eq__(self, other):
1✔
198
        """Check if two task statuses are equal"""
199
        if not isinstance(other, self.__class__):
1✔
200
            return False
×
201

202
        return self.status == other.status and self.clean_up == other.clean_up
1✔
203

204
    def __str__(self) -> str:
1✔
205
        return f"status={self.status.value}, time_started={self.time_started}, clean_up={self.clean_up}"
×
206

207
    def __repr__(self) -> str:
1✔
208
        return (
×
209
            f"TaskStatusInfo(status={self.status.value!r}, time_started={self.time_started!r}, "
210
            f"clean_up={self.clean_up!r})"
211
        )
212

213

214
class FailedTaskStatusInfo(TaskStatusInfo):
1✔
215
    """A wrapper for a failed task status with information about the failure"""
216

217
    def __init__(self, status, time_started, error, clean_up) -> None:
1✔
218
        super().__init__(status, time_started)
1✔
219
        self.error = error
1✔
220
        self.clean_up = clean_up
1✔
221

222
    def __eq__(self, other):
1✔
223
        """Check if two task statuses are equal"""
224
        if not isinstance(other, self.__class__):
1✔
225
            return False
×
226

227
        return self.status == other.status and self.error == other.error and self.clean_up == other.clean_up
1✔
228

229
    def __str__(self) -> str:
1✔
230
        return (
×
231
            f"status={self.status.value}, time_started={self.time_started}, error={self.error}, "
232
            f"clean_up={self.clean_up}"
233
        )
234

235
    def __repr__(self) -> str:
1✔
236
        return (
×
237
            f"TaskStatusInfo(status={self.status.value!r}, time_started={self.time_started!r}, "
238
            f"error={self.error!r}, clean_up={self.clean_up!r})"
239
        )
240

241

242
class Task:
1✔
243
    """
244
    Holds a task id, allows querying and manipulating the task status
245
    """
246

247
    def __init__(self, task_id: TaskId):
1✔
248
        self.__task_id = task_id
1✔
249

250
    def __eq__(self, other):
1✔
251
        """Check if two task representations are equal"""
252
        if not isinstance(other, self.__class__):
1✔
253
            return False
×
254

255
        return self.__task_id == other.__task_id  # pylint: disable=protected-access
1✔
256

257
    def get_status(self, timeout: int = 3600) -> TaskStatusInfo:
1✔
258
        """
259
        Returns the status of a task in a Geo Engine instance
260
        """
261
        session = get_session()
1✔
262

263
        task_id_str = str(self.__task_id)
1✔
264

265
        with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
1✔
266
            tasks_api = geoengine_openapi_client.TasksApi(api_client)
1✔
267
            print(task_id_str)
1✔
268
            response = tasks_api.status_handler(task_id_str, _request_timeout=timeout)
1✔
269

270
        return TaskStatusInfo.from_response(response)
1✔
271

272
    def abort(self, force: bool = False, timeout: int = 3600) -> None:
1✔
273
        """
274
        Abort a running task in a Geo Engine instance
275
        """
276
        session = get_session()
1✔
277

278
        task_id_str = str(self.__task_id)
1✔
279

280
        with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
1✔
281
            tasks_api = geoengine_openapi_client.TasksApi(api_client)
1✔
282
            tasks_api.abort_handler(task_id_str, None if force is False else True, _request_timeout=timeout)
1✔
283

284
    def wait_for_finish(
1✔
285
        self, check_interval_seconds: float = 5, request_timeout: int = 3600, print_status: bool = True
286
    ) -> TaskStatusInfo:
287
        """
288
        Wait for the given task in a Geo Engine instance to finish (status either complete, aborted or failed).
289
        The status is printed after each check-in. Check-ins happen in intervals of check_interval_seconds seconds.
290
        """
291
        current_status = self.get_status(request_timeout)
1✔
292

293
        while current_status.status == TaskStatus.RUNNING:
1✔
294
            current_status = self.get_status(request_timeout)
1✔
295

296
            if print_status:
1✔
297
                print(current_status)
1✔
298
            if current_status.status == TaskStatus.RUNNING:
1✔
299
                time.sleep(check_interval_seconds)
1✔
300

301
        return current_status
1✔
302

303
    def __str__(self) -> str:
1✔
304
        return str(self.__task_id)
×
305

306
    def __repr__(self) -> str:
1✔
307
        return repr(self.__task_id)
×
308

309
    async def as_future(self, request_interval: int = 5, print_status=False) -> TaskStatusInfo:
1✔
310
        """
311
        Returns a future that will be resolved when the task is finished in the backend.
312
        """
313

314
        def get_status_inner(tasks_api: geoengine_openapi_client.TasksApi, task_id_str: str, timeout: int = 3600):
×
315
            return tasks_api.status_handler(task_id_str, _request_timeout=timeout)
×
316

317
        session = get_session()
×
318
        task_id_str = str(self.__task_id)
×
319

320
        last_status = None
×
321
        with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
×
322
            tasks_api = geoengine_openapi_client.TasksApi(api_client)
×
323
            while True:
×
324
                response = await backports.to_thread(get_status_inner, tasks_api, task_id_str)
×
325

326
                last_status = TaskStatusInfo.from_response(response)
×
327

328
                if print_status:
×
329
                    print(last_status)
×
330

331
                if last_status.status != TaskStatus.RUNNING:
×
332
                    return last_status
×
333

334
                await asyncio.sleep(request_interval)
×
335

336

337
def get_task_list(timeout: int = 3600) -> list[tuple[Task, TaskStatusInfo]]:
1✔
338
    """
339
    Returns the status of all tasks in a Geo Engine instance
340
    """
341
    session = get_session()
1✔
342

343
    with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
1✔
344
        tasks_api = geoengine_openapi_client.TasksApi(api_client)
1✔
345
        response = tasks_api.list_handler(None, 0, 10, _request_timeout=timeout)
1✔
346

347
    result = []
1✔
348
    for item in response:
1✔
349
        result.append((Task(TaskId(UUID(item.task_id))), TaskStatusInfo.from_response(item)))
1✔
350

351
    return result
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc