• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 12767614094

14 Jan 2025 12:26PM UTC coverage: 90.64% (+0.06%) from 90.576%
12767614094

push

github

web-flow
Merge pull request #1006 from geo-engine/migrate-pro-api

Migrate-pro-api

1106 of 1152 new or added lines in 24 files covered. (96.01%)

248 existing lines in 13 files now uncovered.

133501 of 147287 relevant lines covered (90.64%)

54652.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.6
/services/src/tasks/in_memory.rs
1
use super::{
2
    RunningTaskStatusInfo, Task, TaskCleanUpStatus, TaskContext, TaskError, TaskFilter, TaskId,
3
    TaskListOptions, TaskManager, TaskStatus, TaskStatusInfo, TaskStatusWithId,
4
};
5
use crate::{contexts::Db, error::Result};
6
use futures::channel::oneshot;
7
use futures::StreamExt;
8
use geoengine_datatypes::{
9
    error::ErrorSource,
10
    util::{helpers::ge_report, Identifier},
11
};
12
use log::warn;
13
use std::{
14
    collections::{HashMap, HashSet, VecDeque},
15
    sync::Arc,
16
};
17
use tokio::{
18
    sync::{RwLock, RwLockWriteGuard},
19
    task::JoinHandle,
20
};
21

22
type SharedTask = Arc<Box<dyn Task<SimpleTaskManagerContext>>>;
23

24
/// An in-memory implementation of the [`TaskManager`] trait.
25
#[derive(Default, Clone)]
26
pub struct SimpleTaskManagerBackend {
27
    tasks_by_id: Db<HashMap<TaskId, TaskHandle>>,
28
    unique_tasks: Db<HashSet<(&'static str, String)>>,
29
    // these two lists won't be cleaned-up
30
    status_by_id: Db<HashMap<TaskId, Db<TaskStatus>>>,
31
    status_list: Db<VecDeque<TaskUpdateStatusWithTaskId>>,
32
}
33

34
struct TaskHandle {
35
    task: SharedTask,
36
    handle: Option<JoinHandle<()>>,
37
    status: Db<TaskStatus>,
38
    unique_key: Option<(&'static str, String)>,
39
}
40

41
impl SimpleTaskManagerBackend {
42
    async fn write_lock_all(&self) -> WriteLockAll {
24✔
43
        let (tasks_by_id, status_by_id, task_list, unique_tasks) = tokio::join!(
24✔
44
            self.tasks_by_id.write(),
24✔
45
            self.status_by_id.write(),
24✔
46
            self.status_list.write(),
24✔
47
            self.unique_tasks.write(),
24✔
48
        );
24✔
49
        WriteLockAll {
24✔
50
            tasks_by_id,
24✔
51
            status_by_id,
24✔
52
            status_list: task_list,
24✔
53
            unique_tasks,
24✔
54
        }
24✔
55
    }
24✔
56

57
    async fn write_lock_for_update(&self) -> WriteLockForUpdate {
30✔
58
        let (tasks_by_id, unique_tasks) =
30✔
59
            tokio::join!(self.tasks_by_id.write(), self.unique_tasks.write());
30✔
60
        WriteLockForUpdate {
30✔
61
            tasks_by_id,
30✔
62
            unique_tasks,
30✔
63
        }
30✔
64
    }
30✔
65
}
66

67
#[derive(Debug)]
68
struct TaskUpdateStatusWithTaskId {
69
    pub task_id: TaskId,
70
    pub status: Db<TaskStatus>,
71
}
72

73
struct WriteLockAll<'a> {
74
    pub tasks_by_id: RwLockWriteGuard<'a, HashMap<TaskId, TaskHandle>>,
75
    pub status_by_id: RwLockWriteGuard<'a, HashMap<TaskId, Db<TaskStatus>>>,
76
    pub status_list: RwLockWriteGuard<'a, VecDeque<TaskUpdateStatusWithTaskId>>,
77
    pub unique_tasks: RwLockWriteGuard<'a, HashSet<(&'static str, String)>>,
78
}
79

80
struct WriteLockForUpdate<'a> {
81
    pub tasks_by_id: RwLockWriteGuard<'a, HashMap<TaskId, TaskHandle>>,
82
    pub unique_tasks: RwLockWriteGuard<'a, HashSet<(&'static str, String)>>,
83
}
84

85
#[async_trait::async_trait]
86
impl TaskManager<SimpleTaskManagerContext> for SimpleTaskManagerBackend {
87
    async fn schedule_task(
88
        &self,
89
        task: Box<dyn Task<SimpleTaskManagerContext>>,
90
        notify: Option<oneshot::Sender<TaskStatus>>,
91
    ) -> Result<TaskId, TaskError> {
24✔
92
        let task_id = TaskId::new();
24✔
93

94
        // get lock before starting the task to prevent a race condition of initial status setting
95
        let mut lock = self.write_lock_all().await;
24✔
96

97
        // check if task is duplicate
98
        let task_unique_key = task
24✔
99
            .task_unique_id()
24✔
100
            .map(|task_unique_id| (task.task_type(), task_unique_id));
24✔
101

102
        if let Some(task_unique_id) = &task_unique_key {
24✔
103
            if !lock.unique_tasks.insert(task_unique_id.clone()) {
13✔
104
                return Err(TaskError::DuplicateTask {
1✔
105
                    task_type: task_unique_id.0,
1✔
106
                    task_unique_id: task_unique_id.1.clone(),
1✔
107
                });
1✔
108
            }
12✔
109
        }
11✔
110

111
        let task_type = task.task_type();
23✔
112
        let description = Some(task.task_description());
23✔
113

23✔
114
        let mut task_handle = TaskHandle {
23✔
115
            task: Arc::new(task),
23✔
116
            handle: None,
23✔
117
            status: Arc::new(RwLock::new(TaskStatus::Running(
23✔
118
                RunningTaskStatusInfo::new(task_type, description, 0., ().boxed()),
23✔
119
            ))),
23✔
120
            unique_key: task_unique_key,
23✔
121
        };
23✔
122

23✔
123
        let task = task_handle.task.clone();
23✔
124
        let task_ctx = SimpleTaskManagerContext {
23✔
125
            status: task_handle.status.clone(),
23✔
126
        };
23✔
127

23✔
128
        let handle = run_task(
23✔
129
            self.clone(), // we can clone here, since all interior stuff is wrapped into `Arc`s
23✔
130
            task_id,
23✔
131
            task,
23✔
132
            task_ctx,
23✔
133
            notify,
23✔
134
        );
23✔
135

23✔
136
        task_handle.handle = Some(handle);
23✔
137

23✔
138
        lock.status_by_id
23✔
139
            .insert(task_id, task_handle.status.clone());
23✔
140
        lock.status_list.push_front(TaskUpdateStatusWithTaskId {
23✔
141
            task_id,
23✔
142
            status: task_handle.status.clone(),
23✔
143
        });
23✔
144

145
        if let Some(task_unique_id) = &task_handle.unique_key {
23✔
146
            lock.unique_tasks.insert(task_unique_id.clone());
12✔
147
        }
12✔
148

149
        lock.tasks_by_id.insert(task_id, task_handle);
23✔
150

23✔
151
        Ok(task_id)
23✔
152
    }
48✔
153

154
    async fn get_task_status(&self, task_id: TaskId) -> Result<TaskStatus, TaskError> {
50✔
155
        let task_status_map = self.status_by_id.read().await;
50✔
156
        let task_status = task_status_map
50✔
157
            .get(&task_id)
50✔
158
            .ok_or(TaskError::TaskNotFound { task_id })?
50✔
159
            .read()
50✔
160
            .await
50✔
161
            .clone();
50✔
162

50✔
163
        Ok(task_status)
50✔
164
    }
100✔
165

166
    async fn list_tasks(
167
        &self,
168
        options: TaskListOptions,
169
    ) -> Result<Vec<TaskStatusWithId>, TaskError> {
4✔
170
        let lock = self.status_list.read().await;
4✔
171

172
        let stream = futures::stream::iter(lock.iter());
4✔
173

174
        let result: Vec<TaskStatusWithId> = stream
4✔
175
            .filter_map(|task_status_with_id| async {
7✔
176
                let task_status = task_status_with_id.status.read().await;
7✔
177

178
                match (options.filter, &*task_status) {
7✔
179
                    (None, _)
180
                    | (Some(TaskFilter::Running), &TaskStatus::Running(_))
181
                    | (Some(TaskFilter::Completed), &TaskStatus::Completed { .. })
182
                    | (Some(TaskFilter::Aborted), &TaskStatus::Aborted { .. })
183
                    | (Some(TaskFilter::Failed), &TaskStatus::Failed { .. }) => {
184
                        Some(TaskStatusWithId {
7✔
185
                            task_id: task_status_with_id.task_id,
7✔
186
                            status: task_status.clone(),
7✔
187
                        })
7✔
188
                    }
189
                    _ => None,
×
190
                }
191
            })
14✔
192
            .skip(options.offset as usize)
4✔
193
            .take(options.limit as usize)
4✔
194
            .collect()
4✔
195
            .await;
4✔
196

197
        Ok(result)
4✔
198
    }
8✔
199

200
    async fn abort_tasks(&self, task_id: TaskId, force: bool) -> Result<(), TaskError> {
8✔
201
        let mut write_lock = self.write_lock_for_update().await;
8✔
202

203
        let mut task_handle = write_lock
8✔
204
            .tasks_by_id
8✔
205
            .remove(&task_id)
8✔
206
            .ok_or(TaskError::TaskNotFound { task_id })?;
8✔
207

208
        let task_status_lock = task_handle.status.read().await;
8✔
209

210
        if task_status_lock.is_finished() {
8✔
211
            return Err(TaskError::TaskAlreadyFinished { task_id });
×
212
        } else if !force && task_status_lock.has_aborted() {
8✔
213
            drop(task_status_lock);
1✔
214

1✔
215
            // put clean-up handle back
1✔
216
            write_lock.tasks_by_id.insert(task_id, task_handle);
1✔
217

1✔
218
            return Err(TaskError::TaskAlreadyAborted { task_id });
1✔
219
        }
7✔
220

7✔
221
        drop(task_status_lock); // prevent deadlocks on the status lock
7✔
222

223
        let task_finished_before_being_aborted = if let Some(handle) = task_handle.handle.take() {
7✔
224
            handle.abort();
7✔
225
            handle.await.is_ok()
7✔
226
        } else {
227
            // this case should not happen, so we just assume that the task finished before being aborted
228
            true
×
229
        };
230

231
        let subtask_ids = task_handle.task.subtasks().await;
7✔
232

233
        if force || task_finished_before_being_aborted {
7✔
234
            set_status_to_no_clean_up(&task_handle.status).await;
2✔
235

236
            remove_unique_key(&task_handle, &mut write_lock.unique_tasks);
2✔
237

2✔
238
            // propagate abort to subtasks
2✔
239
            drop(write_lock); // prevent deadlocks because the subtask abort tries to fetch the lock
2✔
240
            abort_subtasks(self.clone(), subtask_ids, force, task_id).await;
2✔
241

242
            // no clean-up in this case
243
            return Ok(());
2✔
244
        }
5✔
245

5✔
246
        set_status_to_aborting(&task_handle.status).await;
5✔
247
        clean_up_phase(self.clone(), task_handle, &mut write_lock, task_id);
5✔
248

5✔
249
        // propagate abort to subtasks
5✔
250
        drop(write_lock); // prevent deadlocks because the subtask abort tries to fetch the lock
5✔
251
        abort_subtasks(self.clone(), subtask_ids, force, task_id).await;
5✔
252

253
        Ok(())
5✔
254
    }
16✔
255
}
256

257
async fn abort_subtasks(
7✔
258
    task_manager: SimpleTaskManagerBackend,
7✔
259
    subtask_ids: Vec<TaskId>,
7✔
260
    force: bool,
7✔
261
    supertask_id: TaskId,
7✔
262
) {
7✔
263
    for subtask_id in subtask_ids {
9✔
264
        // don't fail if subtask failed to abort
265
        let subtask_abort_result = task_manager.abort_tasks(subtask_id, force).await;
2✔
266

267
        if let Err(subtask_abort_result) = subtask_abort_result {
2✔
268
            warn!(
×
269
                "failed to abort subtask {subtask_id} of task {supertask_id}: {subtask_abort_result:?}"
×
270
            );
271
        }
2✔
272
    }
273
}
7✔
274

275
fn run_task(
23✔
276
    task_manager: SimpleTaskManagerBackend,
23✔
277
    task_id: TaskId,
23✔
278
    task: SharedTask,
23✔
279
    task_ctx: SimpleTaskManagerContext,
23✔
280
    notify: Option<oneshot::Sender<TaskStatus>>,
23✔
281
) -> JoinHandle<()> {
23✔
282
    crate::util::spawn(async move {
23✔
283
        let result = task.run(task_ctx.clone()).await;
23✔
284

285
        let mut update_lock = task_manager.write_lock_for_update().await;
16✔
286

287
        let Some(task_handle) = update_lock.tasks_by_id.remove(&task_id) else {
16✔
288
            return; /* never happens */
×
289
        };
290

291
        let task_status = task_handle.status.clone();
16✔
292

16✔
293
        match result {
16✔
294
            Ok(status) => {
14✔
295
                let mut task_status_lock = task_handle.status.write().await;
14✔
296
                let completed_task_status = task_status_lock.completed(Arc::from(status));
14✔
297
                *task_status_lock = completed_task_status;
14✔
298

14✔
299
                remove_unique_key(&task_handle, &mut update_lock.unique_tasks);
14✔
300
            }
301
            Err(err) => {
2✔
302
                let err = Arc::from(err);
2✔
303

2✔
304
                log::error!(
2✔
305
                    "Task {} failed with: {}",
×
306
                    task_id,
×
307
                    ge_report(Arc::clone(&err))
×
308
                );
309
                *task_handle.status.write().await = TaskStatus::failed(
2✔
310
                    Arc::clone(&err),
2✔
311
                    TaskCleanUpStatus::Running(RunningTaskStatusInfo::new(
2✔
312
                        "",
2✔
313
                        None,
2✔
314
                        0.,
2✔
315
                        ().boxed(),
2✔
316
                    )),
2✔
317
                );
2✔
318

319
                clean_up_phase(task_manager.clone(), task_handle, &mut update_lock, task_id);
2✔
320
            }
321
        };
322

323
        // TODO: move this into clean-up?
324
        if let Some(notify) = notify {
16✔
325
            // we can ignore the returned error because this means
326
            // that the receiver has already been dropped
327
            notify
1✔
328
                .send(task_status.read().await.clone())
1✔
329
                .unwrap_or_default();
1✔
330
        }
15✔
331
    })
23✔
332
}
23✔
333

334
fn remove_unique_key(
22✔
335
    task_handle: &TaskHandle,
22✔
336
    unique_lock: &mut RwLockWriteGuard<'_, HashSet<(&'static str, String)>>,
22✔
337
) {
22✔
338
    if let Some(task_unique_id) = &task_handle.unique_key {
22✔
339
        unique_lock.remove(task_unique_id);
12✔
340
    }
12✔
341
}
22✔
342

343
async fn set_status_to_aborting(task_status: &Db<TaskStatus>) {
5✔
344
    let mut task_status_lock = task_status.write().await;
5✔
345
    *task_status_lock = TaskStatus::aborted(TaskCleanUpStatus::Running(
5✔
346
        RunningTaskStatusInfo::new("", None, 0., ().boxed()),
5✔
347
    ));
5✔
348
}
5✔
349

350
async fn set_status_to_clean_up_completed(task_status: &Db<TaskStatus>) {
5✔
351
    let mut task_status_lock = task_status.write().await;
5✔
352

353
    let task_clean_up_status = TaskCleanUpStatus::Completed {
5✔
354
        info: Arc::new(().boxed()),
5✔
355
    };
5✔
356

357
    *task_status_lock = match &*task_status_lock {
5✔
358
        TaskStatus::Running(_) | TaskStatus::Completed { .. } => return, // must not happen, ignore
×
359
        TaskStatus::Aborted { .. } => TaskStatus::aborted(task_clean_up_status),
4✔
360
        TaskStatus::Failed { error, .. } => TaskStatus::failed(error.clone(), task_clean_up_status),
1✔
361
    };
362
}
5✔
363

364
async fn set_status_to_no_clean_up(task_status: &Db<TaskStatus>) {
2✔
365
    let mut task_status_lock = task_status.write().await;
2✔
366

367
    let task_clean_up_status = TaskCleanUpStatus::NoCleanUp;
2✔
368

369
    *task_status_lock = match &*task_status_lock {
2✔
370
        TaskStatus::Completed { .. } => return, // must not happen, ignore
×
371
        TaskStatus::Running(_) | TaskStatus::Aborted { .. } => {
372
            TaskStatus::aborted(task_clean_up_status)
2✔
373
        }
374
        TaskStatus::Failed { error, .. } => TaskStatus::failed(error.clone(), task_clean_up_status),
×
375
    }
376
}
2✔
377

378
async fn set_status_to_clean_up_failed(task_status: &Db<TaskStatus>, error: Box<dyn ErrorSource>) {
1✔
379
    let mut task_status_lock = task_status.write().await;
1✔
380

381
    let task_clean_up_status = TaskCleanUpStatus::Failed {
1✔
382
        error: Arc::from(error),
1✔
383
    };
1✔
384

385
    *task_status_lock = match &*task_status_lock {
1✔
386
        TaskStatus::Running(_) | TaskStatus::Completed { .. } => return, // must not happen, ignore
×
387
        TaskStatus::Aborted { .. } => TaskStatus::aborted(task_clean_up_status),
×
388
        TaskStatus::Failed { error, .. } => TaskStatus::failed(error.clone(), task_clean_up_status),
1✔
389
    }
390
}
1✔
391

392
fn clean_up_phase(
7✔
393
    task_manager: SimpleTaskManagerBackend,
7✔
394
    mut task_handle: TaskHandle,
7✔
395
    write_lock: &mut WriteLockForUpdate<'_>,
7✔
396
    task_id: TaskId,
7✔
397
) {
7✔
398
    let task = task_handle.task.clone();
7✔
399
    let task_ctx = SimpleTaskManagerContext {
7✔
400
        status: task_handle.status.clone(),
7✔
401
    };
7✔
402

7✔
403
    let handle = crate::util::spawn(async move {
7✔
404
        let result = task.cleanup_on_error(task_ctx.clone()).await;
7✔
405

406
        let mut update_lock = task_manager.write_lock_for_update().await;
6✔
407

408
        let Some(task_handle) = update_lock.tasks_by_id.remove(&task_id) else {
6✔
409
            return; /* never happens */
×
410
        };
411

412
        match result {
6✔
413
            Ok(()) => set_status_to_clean_up_completed(&task_handle.status).await,
5✔
414
            Err(err) => set_status_to_clean_up_failed(&task_handle.status, err).await,
1✔
415
        };
416

417
        remove_unique_key(&task_handle, &mut update_lock.unique_tasks);
6✔
418
    });
7✔
419

7✔
420
    task_handle.handle = Some(handle);
7✔
421

7✔
422
    write_lock.tasks_by_id.insert(task_id, task_handle);
7✔
423
}
7✔
424

425
#[derive(Clone)]
426
pub struct SimpleTaskManagerContext {
427
    status: Db<TaskStatus>,
428
}
429

430
#[async_trait::async_trait]
431
impl TaskContext for SimpleTaskManagerContext {
432
    async fn set_completion(&self, pct_complete: f64, status: Box<dyn TaskStatusInfo>) {
21✔
433
        let mut task_status = self.status.write().await;
21✔
434

435
        *task_status = match &*task_status {
21✔
436
            TaskStatus::Running(current_info) => {
21✔
437
                TaskStatus::Running(current_info.update(pct_complete, status))
21✔
438
            }
439
            TaskStatus::Aborted {
440
                clean_up: TaskCleanUpStatus::Running(current_info),
×
441
            } => TaskStatus::aborted(TaskCleanUpStatus::Running(
×
442
                current_info.update(pct_complete, status),
×
443
            )),
×
444
            TaskStatus::Failed {
445
                error,
×
446
                clean_up: TaskCleanUpStatus::Running(current_info),
×
447
            } => TaskStatus::failed(
×
448
                error.clone(),
×
449
                TaskCleanUpStatus::Running(current_info.update(pct_complete, status)),
×
450
            ),
×
451
            _ => return, // already completed, aborted or failed, so we ignore the status update
×
452
        };
453
    }
42✔
454
}
455

456
pub struct SimpleTaskManager {
457
    backend: Arc<SimpleTaskManagerBackend>,
458
}
459

460
impl SimpleTaskManager {
UNCOV
461
    pub fn new(backend: Arc<SimpleTaskManagerBackend>) -> Self {
×
UNCOV
462
        Self { backend }
×
UNCOV
463
    }
×
464
}
465

466
#[async_trait::async_trait]
467
impl TaskManager<SimpleTaskManagerContext> for SimpleTaskManager {
468
    async fn schedule_task(
469
        &self,
470
        task: Box<dyn Task<SimpleTaskManagerContext>>,
471
        notify: Option<oneshot::Sender<TaskStatus>>,
UNCOV
472
    ) -> Result<TaskId, TaskError> {
×
UNCOV
473
        self.backend.schedule_task(task, notify).await
×
UNCOV
474
    }
×
475

UNCOV
476
    async fn get_task_status(&self, task_id: TaskId) -> Result<TaskStatus, TaskError> {
×
UNCOV
477
        self.backend.get_task_status(task_id).await
×
UNCOV
478
    }
×
479

480
    async fn list_tasks(
481
        &self,
482
        options: TaskListOptions,
UNCOV
483
    ) -> Result<Vec<TaskStatusWithId>, TaskError> {
×
UNCOV
484
        self.backend.list_tasks(options).await
×
UNCOV
485
    }
×
486

UNCOV
487
    async fn abort_tasks(&self, task_id: TaskId, force: bool) -> Result<(), TaskError> {
×
UNCOV
488
        self.backend.abort_tasks(task_id, force).await
×
UNCOV
489
    }
×
490
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc