• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 12767614094

14 Jan 2025 12:26PM UTC coverage: 90.64% (+0.06%) from 90.576%
12767614094

push

github

web-flow
Merge pull request #1006 from geo-engine/migrate-pro-api

Migrate-pro-api

1106 of 1152 new or added lines in 24 files covered. (96.01%)

248 existing lines in 13 files now uncovered.

133501 of 147287 relevant lines covered (90.64%)

54652.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.53
/services/src/api/handlers/tasks.rs
1
use crate::contexts::ApplicationContext;
2
use crate::error::Result;
3
use crate::tasks::{TaskListOptions, TaskManager, TaskStatusWithId};
4
use crate::util::extractors::ValidatedQuery;
5
use crate::{contexts::SessionContext, tasks::TaskId};
6
use actix_web::{web, FromRequest, HttpResponse, Responder};
7
use serde::{Deserialize, Serialize};
8
use utoipa::{IntoParams, ToSchema};
9

10
pub(crate) fn init_task_routes<C>(cfg: &mut web::ServiceConfig)
339✔
11
where
339✔
12
    C: ApplicationContext,
339✔
13
    C::Session: FromRequest,
339✔
14
{
339✔
15
    cfg.service(
339✔
16
        web::scope("/tasks")
339✔
17
            .service(web::resource("/list").route(web::get().to(list_handler::<C>)))
339✔
18
            .service(
339✔
19
                web::scope("/{task_id}")
339✔
20
                    .service(web::resource("/status").route(web::get().to(status_handler::<C>)))
339✔
21
                    .service(web::resource("").route(web::delete().to(abort_handler::<C>))),
339✔
22
            ),
339✔
23
    );
339✔
24
}
339✔
25

26
/// Create a task somewhere and respond with a task id to query the task status.
27
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
2✔
28
#[serde(rename_all = "camelCase")]
29
pub struct TaskResponse {
30
    pub task_id: TaskId,
31
}
32

33
impl TaskResponse {
34
    pub fn new(task_id: TaskId) -> Self {
9✔
35
        Self { task_id }
9✔
36
    }
9✔
37
}
38

39
/// Retrieve the status of a task.
40
#[utoipa::path(
4✔
41
    tag = "Tasks",
4✔
42
    get,
4✔
43
    path = "/tasks/{id}/status",
4✔
44
    responses(
4✔
45
        (status = 200, description = "Status of the task (running)", body = TaskStatus,
4✔
46
            example = json!({
4✔
47
                "status": "running",
4✔
48
                "taskType": "dummy-task",
4✔
49
                "description": "Demo",
4✔
50
                "pctComplete": "0.00%",
4✔
51
                "timeStarted": "2023-02-16T15:25:45.390Z",
4✔
52
                "estimatedTimeRemaining": "? (± ?)",
4✔
53
                "info": null,
4✔
54
            })
4✔
55
        )
4✔
56
    ),
4✔
57
    params(
4✔
58
        ("id" = TaskId, description = "Task id")
4✔
59
    ),
4✔
60
    security(
4✔
61
        ("session_token" = [])
4✔
62
    )
4✔
63
)]
4✔
64
async fn status_handler<C: ApplicationContext>(
6✔
65
    session: C::Session, // TODO: check for session auth
6✔
66
    app_ctx: web::Data<C>,
6✔
67
    task_id: web::Path<TaskId>,
6✔
68
) -> Result<impl Responder> {
6✔
69
    let task_id = task_id.into_inner();
6✔
70

71
    let task = app_ctx
6✔
72
        .session_context(session)
6✔
73
        .tasks()
6✔
74
        .get_task_status(task_id)
6✔
75
        .await?;
6✔
76

77
    Ok(web::Json(task))
6✔
78
}
6✔
79

80
/// Retrieve the status of all tasks.
81
#[utoipa::path(
10✔
82
    tag = "Tasks",
10✔
83
    get,
10✔
84
    path = "/tasks/list",
10✔
85
    responses(
10✔
86
        (status = 200, description = "Status of all tasks", body = Vec<TaskStatusWithId>,
10✔
87
            example = json!([
10✔
88
                {
10✔
89
                    "taskId": "420b06de-0a7e-45cb-9c1c-ea901b46ab69",
10✔
90
                    "status": "completed",
10✔
91
                    "taskType": "dummy-task",
10✔
92
                    "description": "Demo",
10✔
93
                    "info": null,
10✔
94
                    "timeTotal": "00:00:30",
10✔
95
                    "timeStarted": "2023-02-16T15:25:45.390Z"
10✔
96
                }
10✔
97
            ])
10✔
98
        )
10✔
99
    ),
10✔
100
    params(
10✔
101
        TaskListOptions
10✔
102
    ),
10✔
103
    security(
10✔
104
        ("session_token" = [])
10✔
105
    )
10✔
106
)]
10✔
107
async fn list_handler<C: ApplicationContext>(
1✔
108
    session: C::Session,
1✔
109
    app_ctx: web::Data<C>,
1✔
110
    task_list_options: ValidatedQuery<TaskListOptions>,
1✔
111
) -> Result<web::Json<Vec<TaskStatusWithId>>> {
1✔
112
    let task_list_options = task_list_options.into_inner();
1✔
113

114
    let task = app_ctx
1✔
115
        .session_context(session)
1✔
116
        .tasks()
1✔
117
        .list_tasks(task_list_options)
1✔
118
        .await?;
1✔
119

120
    Ok(web::Json(task))
1✔
121
}
1✔
122

123
#[derive(Debug, Serialize, Deserialize, Clone, ToSchema, IntoParams)]
2✔
124
pub struct TaskAbortOptions {
125
    #[serde(default)]
126
    pub force: bool,
127
}
128

129
/// Abort a running task.
130
///
131
/// # Parameters
132
///
133
/// * `force` - If true, the task will be aborted without clean-up.
134
///             You can abort a task that is already in the process of aborting.
135
#[utoipa::path(
6✔
136
    tag = "Tasks",
6✔
137
    delete,
6✔
138
    path = "/tasks/{id}",
6✔
139
    responses(
6✔
140
        (status = 202, description = "Task will be aborted.")
6✔
141
    ),
6✔
142
    params(
6✔
143
        TaskAbortOptions,
6✔
144
        ("id" = TaskId, description = "Task id")
6✔
145
    ),
6✔
146
    security(
6✔
147
        ("session_token" = [])
6✔
148
    )
6✔
149
)]
6✔
150
async fn abort_handler<C: ApplicationContext>(
5✔
151
    session: C::Session, // TODO: check for session auth
5✔
152
    app_ctx: web::Data<C>,
5✔
153
    task_id: web::Path<TaskId>,
5✔
154
    options: web::Query<TaskAbortOptions>,
5✔
155
) -> Result<HttpResponse> {
5✔
156
    let task_id = task_id.into_inner();
5✔
157

5✔
158
    app_ctx
5✔
159
        .session_context(session)
5✔
160
        .tasks()
5✔
161
        .abort_tasks(task_id, options.force)
5✔
162
        .await?;
5✔
163

164
    Ok(HttpResponse::Accepted().finish())
4✔
165
}
5✔
166

167
#[cfg(test)]
168
mod tests {
169

170
    use super::*;
171
    use crate::contexts::Session;
172
    use crate::pro::contexts::ProPostgresContext;
173
    use crate::pro::ge_context;
174
    use crate::pro::users::UserAuth;
175
    use crate::tasks::{
176
        util::test::wait_for_task_to_finish, Task, TaskContext, TaskStatus, TaskStatusInfo,
177
    };
178
    use crate::util::tests::{read_body_json, send_test_request};
179
    use actix_http::header;
180
    use actix_web_httpauth::headers::authorization::Bearer;
181
    use futures::{channel::oneshot, lock::Mutex};
182
    use geoengine_datatypes::error::ErrorSource;
183
    use serde_json::json;
184
    use std::{pin::Pin, sync::Arc};
185
    use tokio_postgres::NoTls;
186

187
    struct NopTask {
188
        complete_rx: Arc<Mutex<oneshot::Receiver<()>>>,
189
        unique_id: Option<String>,
190
    }
191

192
    impl NopTask {
193
        pub fn new_with_sender() -> (Self, oneshot::Sender<()>) {
9✔
194
            let (complete_tx, complete_rx) = oneshot::channel();
9✔
195

9✔
196
            let this = Self {
9✔
197
                complete_rx: Arc::new(Mutex::new(complete_rx)),
9✔
198
                unique_id: None,
9✔
199
            };
9✔
200

9✔
201
            (this, complete_tx)
9✔
202
        }
9✔
203

204
        pub fn new_with_sender_and_unique_id(unique_id: String) -> (Self, oneshot::Sender<()>) {
4✔
205
            let (complete_tx, complete_rx) = oneshot::channel();
4✔
206

4✔
207
            let this = Self {
4✔
208
                complete_rx: Arc::new(Mutex::new(complete_rx)),
4✔
209
                unique_id: Some(unique_id),
4✔
210
            };
4✔
211

4✔
212
            (this, complete_tx)
4✔
213
        }
4✔
214
    }
215

216
    #[async_trait::async_trait]
217
    impl<C: TaskContext + 'static> Task<C> for NopTask {
218
        async fn run(&self, _ctx: C) -> Result<Box<dyn TaskStatusInfo>, Box<dyn ErrorSource>> {
12✔
219
            let mut complete_rx_lock = self.complete_rx.lock().await;
12✔
220
            let pinned_receiver: Pin<&mut oneshot::Receiver<()>> = Pin::new(&mut complete_rx_lock);
12✔
221

12✔
222
            pinned_receiver.await.unwrap();
12✔
223

7✔
224
            Ok("completed".to_string().boxed())
7✔
225
        }
19✔
226

227
        async fn cleanup_on_error(&self, _ctx: C) -> Result<(), Box<dyn ErrorSource>> {
4✔
228
            let mut complete_rx_lock = self.complete_rx.lock().await;
4✔
229
            let pinned_receiver: Pin<&mut oneshot::Receiver<()>> = Pin::new(&mut complete_rx_lock);
4✔
230

4✔
231
            pinned_receiver.await.unwrap();
4✔
232

3✔
233
            Ok(())
3✔
234
        }
7✔
235

236
        fn task_type(&self) -> &'static str {
29✔
237
            "nopTask"
29✔
238
        }
29✔
239

240
        fn task_unique_id(&self) -> Option<String> {
13✔
241
            self.unique_id.clone()
13✔
242
        }
13✔
243

244
        fn task_description(&self) -> String {
14✔
245
            "No operation".to_string()
14✔
246
        }
14✔
247
    }
248

249
    struct TaskTree<T: TaskManager<C>, C: TaskContext + 'static> {
250
        subtasks: Arc<Mutex<Vec<Box<dyn Task<C>>>>>,
251
        subtask_ids: Arc<Mutex<Vec<TaskId>>>,
252
        task_manager: Arc<T>,
253
        complete_rx: Arc<Mutex<oneshot::Receiver<()>>>,
254
        task_description: String,
255
    }
256

257
    impl<T: TaskManager<C>, C: TaskContext + 'static> TaskTree<T, C> {
258
        pub fn new_with_sender(
1✔
259
            subtasks: Vec<Box<dyn Task<C>>>,
1✔
260
            task_manager: Arc<T>,
1✔
261
        ) -> (Self, oneshot::Sender<()>) {
1✔
262
            let task_description = subtasks
1✔
263
                .iter()
1✔
264
                .map(|subtask| subtask.task_description())
2✔
265
                .collect::<Vec<_>>()
1✔
266
                .join(", ");
1✔
267
            let (complete_tx, complete_rx) = oneshot::channel();
1✔
268

1✔
269
            let this = Self {
1✔
270
                subtasks: Arc::new(Mutex::new(subtasks)),
1✔
271
                subtask_ids: Arc::new(Mutex::new(vec![])),
1✔
272
                task_manager,
1✔
273
                complete_rx: Arc::new(Mutex::new(complete_rx)),
1✔
274
                task_description,
1✔
275
            };
1✔
276

1✔
277
            (this, complete_tx)
1✔
278
        }
1✔
279
    }
280

281
    #[async_trait::async_trait]
282
    impl<T: TaskManager<C>, C: TaskContext + 'static> Task<C> for TaskTree<T, C> {
283
        async fn run(&self, _ctx: C) -> Result<Box<dyn TaskStatusInfo>, Box<dyn ErrorSource>> {
1✔
284
            for subtask in self.subtasks.lock().await.drain(..) {
2✔
285
                let subtask_id = self
2✔
286
                    .task_manager
2✔
287
                    .schedule_task(subtask, None)
2✔
288
                    .await
2✔
289
                    .map_err(ErrorSource::boxed)?;
2✔
290
                self.subtask_ids.lock().await.push(subtask_id);
2✔
291
            }
292

293
            let mut complete_rx_lock = self.complete_rx.lock().await;
1✔
294
            let pinned_receiver: Pin<&mut oneshot::Receiver<()>> = Pin::new(&mut complete_rx_lock);
1✔
295

1✔
296
            pinned_receiver.await.unwrap();
1✔
297

×
298
            Ok("completed".to_string().boxed())
×
299
        }
1✔
300

301
        async fn cleanup_on_error(&self, _ctx: C) -> Result<(), Box<dyn ErrorSource>> {
1✔
302
            Ok(())
1✔
303
        }
2✔
304

305
        fn task_type(&self) -> &'static str {
2✔
306
            stringify!(TaskTree)
2✔
307
        }
2✔
308

309
        fn task_unique_id(&self) -> Option<String> {
1✔
310
            None
1✔
311
        }
1✔
312

313
        fn task_description(&self) -> String {
1✔
314
            self.task_description.clone()
1✔
315
        }
1✔
316

317
        async fn subtasks(&self) -> Vec<TaskId> {
1✔
318
            self.subtask_ids.lock().await.clone()
1✔
319
        }
2✔
320
    }
321

322
    struct FailingTaskWithFailingCleanup;
323

324
    #[derive(Debug)]
325
    struct FailingTaskWithFailingCleanupError;
326

327
    impl std::fmt::Display for FailingTaskWithFailingCleanupError {
328
        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2✔
329
            write!(f, "FailingTaskWithFailingCleanupError")
2✔
330
        }
2✔
331
    }
332

333
    impl std::error::Error for FailingTaskWithFailingCleanupError {}
334

335
    #[async_trait::async_trait]
336
    impl<C: TaskContext + 'static> Task<C> for FailingTaskWithFailingCleanup {
337
        async fn run(&self, _ctx: C) -> Result<Box<dyn TaskStatusInfo>, Box<dyn ErrorSource>> {
1✔
338
            Err(Box::new(FailingTaskWithFailingCleanupError))
1✔
339
        }
2✔
340

341
        async fn cleanup_on_error(&self, _ctx: C) -> Result<(), Box<dyn ErrorSource>> {
1✔
342
            Err(Box::new(FailingTaskWithFailingCleanupError))
1✔
343
        }
2✔
344

345
        fn task_type(&self) -> &'static str {
2✔
346
            "FailingTaskWithFailingCleanup"
2✔
347
        }
2✔
348

349
        fn task_unique_id(&self) -> Option<String> {
1✔
350
            None
1✔
351
        }
1✔
352

353
        fn task_description(&self) -> String {
1✔
354
            "Failing task with failing cleanup".to_string()
1✔
355
        }
1✔
356
    }
357

358
    #[ge_context::test]
2✔
359
    async fn test_get_status(app_ctx: ProPostgresContext<NoTls>) {
1✔
360
        let session = app_ctx.create_anonymous_session().await.unwrap();
1✔
361
        let ctx = app_ctx.session_context(session.clone());
1✔
362

1✔
363
        let session_id = session.id();
1✔
364

1✔
365
        let (task, complete_tx) = NopTask::new_with_sender();
1✔
366

1✔
367
        let tasks = Arc::new(ctx.tasks());
1✔
368
        let task_id = tasks.schedule_task(task.boxed(), None).await.unwrap();
1✔
369

1✔
370
        // 1. initially, we should get a running status
1✔
371

1✔
372
        let req = actix_web::test::TestRequest::get()
1✔
373
            .uri(&format!("/tasks/{task_id}/status"))
1✔
374
            .append_header((header::CONTENT_LENGTH, 0))
1✔
375
            .append_header((header::AUTHORIZATION, Bearer::new(session_id.to_string())));
1✔
376

377
        let res = send_test_request(req, app_ctx.clone()).await;
1✔
378

379
        let res_status = res.status();
1✔
380
        let res_body = read_body_json(res).await;
1✔
381
        assert_eq!(res_status, 200, "{res_body:?}");
1✔
382

383
        assert_eq!(res_body["status"], json!("running"));
1✔
384
        assert_eq!(res_body["pctComplete"], json!("0.00%"));
1✔
385
        assert!(res_body["info"].is_null());
1✔
386
        assert_eq!(res_body["estimatedTimeRemaining"], json!("? (± ?)"));
1✔
387
        assert!(res_body["timeStarted"].is_string());
1✔
388

389
        // 2. wait for task to finish
390

391
        complete_tx.send(()).unwrap();
1✔
392

1✔
393
        wait_for_task_to_finish(tasks, task_id).await;
1✔
394

395
        // 3. finally, it should complete
396

397
        let req = actix_web::test::TestRequest::get()
1✔
398
            .uri(&format!("/tasks/{task_id}/status"))
1✔
399
            .append_header((header::CONTENT_LENGTH, 0))
1✔
400
            .append_header((header::AUTHORIZATION, Bearer::new(session_id.to_string())));
1✔
401

1✔
402
        let app_ctx_clone = app_ctx.clone();
1✔
403

404
        let res = send_test_request(req, app_ctx_clone).await;
1✔
405

406
        let res_status = res.status();
1✔
407
        let res_body = read_body_json(res).await;
1✔
408
        assert_eq!(res_status, 200, "{res_body:?}");
1✔
409

410
        assert_eq!(res_body["status"], json!("completed"));
1✔
411
        assert_eq!(res_body["info"], json!("completed"));
1✔
412
        assert_eq!(res_body["timeTotal"], json!("00:00:00"));
1✔
413
        assert!(res_body["timeStarted"].is_string());
1✔
414
    }
1✔
415

416
    #[ge_context::test]
2✔
417
    async fn test_get_status_with_admin_session(app_ctx: ProPostgresContext<NoTls>) {
1✔
418
        let session = app_ctx.create_anonymous_session().await.unwrap();
1✔
419
        let ctx = app_ctx.session_context(session.clone());
1✔
420

1✔
421
        let session_id = session.id();
1✔
422

1✔
423
        let (task, _complete_tx) = NopTask::new_with_sender();
1✔
424
        let task_id = ctx.tasks().schedule_task(task.boxed(), None).await.unwrap();
1✔
425

1✔
426
        // 1. initially, we should get a running status
1✔
427

1✔
428
        let req = actix_web::test::TestRequest::get()
1✔
429
            .uri(&format!("/tasks/{task_id}/status"))
1✔
430
            .append_header((header::CONTENT_LENGTH, 0))
1✔
431
            .append_header((header::AUTHORIZATION, Bearer::new(session_id.to_string())));
1✔
432

433
        let res = send_test_request(req, app_ctx.clone()).await;
1✔
434

435
        let res_status = res.status();
1✔
436
        let res_body = read_body_json(res).await;
1✔
437
        assert_eq!(res_status, 200, "{res_body:?}");
1✔
438

439
        assert_eq!(res_body["status"], json!("running"));
1✔
440
        assert_eq!(res_body["pctComplete"], json!("0.00%"));
1✔
441
        assert!(res_body["info"].is_null());
1✔
442
        assert_eq!(res_body["estimatedTimeRemaining"], json!("? (± ?)"));
1✔
443
        assert!(res_body["timeStarted"].is_string());
1✔
444
    }
1✔
445

446
    #[ge_context::test]
2✔
447
    async fn test_get_list(app_ctx: ProPostgresContext<NoTls>) {
1✔
448
        let session = app_ctx.create_anonymous_session().await.unwrap();
1✔
449
        let ctx = app_ctx.session_context(session.clone());
1✔
450

1✔
451
        let session_id = session.id();
1✔
452

1✔
453
        let tasks = Arc::new(ctx.tasks());
1✔
454

1✔
455
        let (task, complete_tx) = NopTask::new_with_sender();
1✔
456
        let task_id = tasks.schedule_task(task.boxed(), None).await.unwrap();
1✔
457

1✔
458
        complete_tx.send(()).unwrap();
1✔
459

1✔
460
        wait_for_task_to_finish(tasks, task_id).await;
1✔
461

462
        let req = actix_web::test::TestRequest::get()
1✔
463
            .uri("/tasks/list")
1✔
464
            .append_header((header::CONTENT_LENGTH, 0))
1✔
465
            .append_header((header::AUTHORIZATION, Bearer::new(session_id.to_string())));
1✔
466

467
        let res = send_test_request(req, app_ctx.clone()).await;
1✔
468

469
        let res_status = res.status();
1✔
470
        let res_body = read_body_json(res).await;
1✔
471
        assert_eq!(res_status, 200, "{res_body:?}");
1✔
472

473
        let res_body = res_body.get(0).unwrap();
1✔
474
        assert_eq!(res_body["taskId"], json!(task_id));
1✔
475
        assert_eq!(res_body["status"], json!("completed"));
1✔
476
        assert_eq!(res_body["info"], json!("completed"));
1✔
477
        assert_eq!(res_body["timeTotal"], json!("00:00:00"));
1✔
478
        assert!(res_body["timeStarted"].is_string());
1✔
479
    }
1✔
480

481
    #[ge_context::test]
2✔
482
    async fn test_abort_task(app_ctx: ProPostgresContext<NoTls>) {
1✔
483
        let session = app_ctx.create_anonymous_session().await.unwrap();
1✔
484
        let ctx = app_ctx.session_context(session.clone());
1✔
485

1✔
486
        let session_id = session.id();
1✔
487

1✔
488
        let tasks = Arc::new(ctx.tasks());
1✔
489

1✔
490
        // 1. Create task
1✔
491

1✔
492
        let (task, complete_tx) = NopTask::new_with_sender();
1✔
493
        let task_id = tasks.schedule_task(task.boxed(), None).await.unwrap();
1✔
494

1✔
495
        // 2. Abort task
1✔
496

1✔
497
        let req = actix_web::test::TestRequest::delete()
1✔
498
            .uri(&format!("/tasks/{task_id}"))
1✔
499
            .append_header((header::CONTENT_LENGTH, 0))
1✔
500
            .append_header((header::AUTHORIZATION, Bearer::new(session_id.to_string())));
1✔
501

502
        let res = send_test_request(req, app_ctx.clone()).await;
1✔
503

504
        assert_eq!(res.status(), 202, "{:?}", res.response().error());
1✔
505

506
        // 3. Wait for abortion to complete
507

508
        complete_tx.send(()).unwrap();
1✔
509

1✔
510
        wait_for_task_to_finish(tasks, task_id).await;
1✔
511

512
        // 4. check status
513

514
        let req = actix_web::test::TestRequest::get()
1✔
515
            .uri(&format!("/tasks/{task_id}/status"))
1✔
516
            .append_header((header::CONTENT_LENGTH, 0))
1✔
517
            .append_header((header::AUTHORIZATION, Bearer::new(session_id.to_string())));
1✔
518

519
        let res = send_test_request(req, app_ctx.clone()).await;
1✔
520

521
        assert_eq!(res.status(), 200, "{:?}", res.response().error());
1✔
522

523
        let status = read_body_json(res).await;
1✔
524

525
        assert_eq!(
1✔
526
            status,
1✔
527
            json!({
1✔
528
                "status": "aborted",
1✔
529
                "cleanUp": {
1✔
530
                    "status": "completed",
1✔
531
                    "info": null
1✔
532
                },
1✔
533
            })
1✔
534
        );
1✔
535
    }
1✔
536

537
    #[ge_context::test]
2✔
538
    async fn test_force_abort_task(app_ctx: ProPostgresContext<NoTls>) {
1✔
539
        let session = app_ctx.create_anonymous_session().await.unwrap();
1✔
540
        let ctx = app_ctx.session_context(session.clone());
1✔
541

1✔
542
        let session_id = session.id();
1✔
543

1✔
544
        let tasks = Arc::new(ctx.tasks());
1✔
545

1✔
546
        // 1. Create task
1✔
547

1✔
548
        let (task, _complete_tx) = NopTask::new_with_sender();
1✔
549
        let task_id = tasks.schedule_task(task.boxed(), None).await.unwrap();
1✔
550

1✔
551
        // 2. Abort task
1✔
552

1✔
553
        let req = actix_web::test::TestRequest::delete()
1✔
554
            .uri(&format!("/tasks/{task_id}?force=true"))
1✔
555
            .append_header((header::CONTENT_LENGTH, 0))
1✔
556
            .append_header((header::AUTHORIZATION, Bearer::new(session_id.to_string())));
1✔
557

558
        let res = send_test_request(req, app_ctx.clone()).await;
1✔
559

560
        assert_eq!(res.status(), 202, "{:?}", res.response().error());
1✔
561

562
        // 3. Wait for abortion to complete
563

564
        wait_for_task_to_finish(tasks, task_id).await;
1✔
565

566
        // 4. check status
567

568
        let req = actix_web::test::TestRequest::get()
1✔
569
            .uri(&format!("/tasks/{task_id}/status"))
1✔
570
            .append_header((header::CONTENT_LENGTH, 0))
1✔
571
            .append_header((header::AUTHORIZATION, Bearer::new(session_id.to_string())));
1✔
572

573
        let res = send_test_request(req, app_ctx.clone()).await;
1✔
574

575
        assert_eq!(res.status(), 200, "{:?}", res.response().error());
1✔
576

577
        let status = read_body_json(res).await;
1✔
578

579
        assert_eq!(
1✔
580
            status,
1✔
581
            json!({
1✔
582
                "status": "aborted",
1✔
583
                "cleanUp": {
1✔
584
                    "status": "noCleanUp"
1✔
585
                },
1✔
586
            })
1✔
587
        );
1✔
588
    }
1✔
589

590
    #[ge_context::test]
2✔
591
    async fn test_abort_after_abort(app_ctx: ProPostgresContext<NoTls>) {
1✔
592
        let session = app_ctx.create_anonymous_session().await.unwrap();
1✔
593
        let ctx = app_ctx.session_context(session.clone());
1✔
594

1✔
595
        let session_id = session.id();
1✔
596

1✔
597
        let tasks = Arc::new(ctx.tasks());
1✔
598

1✔
599
        // 1. Create task
1✔
600

1✔
601
        let (task, _complete_tx) = NopTask::new_with_sender();
1✔
602
        let task_id = tasks.schedule_task(task.boxed(), None).await.unwrap();
1✔
603

1✔
604
        // 2. Abort task
1✔
605

1✔
606
        let req = actix_web::test::TestRequest::delete()
1✔
607
            .uri(&format!("/tasks/{task_id}"))
1✔
608
            .append_header((header::CONTENT_LENGTH, 0))
1✔
609
            .append_header((header::AUTHORIZATION, Bearer::new(session_id.to_string())));
1✔
610

611
        let res = send_test_request(req, app_ctx.clone()).await;
1✔
612

613
        assert_eq!(res.status(), 202, "{:?}", res.response().error());
1✔
614

615
        // do not call `_complete_tx`
616

617
        // 3. Abort again without force
618

619
        let req = actix_web::test::TestRequest::delete()
1✔
620
            .uri(&format!("/tasks/{task_id}"))
1✔
621
            .append_header((header::CONTENT_LENGTH, 0))
1✔
622
            .append_header((header::AUTHORIZATION, Bearer::new(session_id.to_string())));
1✔
623

624
        let res = send_test_request(req, app_ctx.clone()).await;
1✔
625

626
        assert_eq!(res.status(), 400, "{:?}", res.response().error());
1✔
627

628
        let status = read_body_json(res).await;
1✔
629

630
        assert_eq!(
1✔
631
            status,
1✔
632
            json!({
1✔
633
                "error": "Task",
1✔
634
                "message": format!("TaskError: Task was already aborted by the user: {task_id}"),
1✔
635
            })
1✔
636
        );
1✔
637

638
        // 5. Abort again with force
639

640
        let req = actix_web::test::TestRequest::delete()
1✔
641
            .uri(&format!("/tasks/{task_id}?force=true"))
1✔
642
            .append_header((header::CONTENT_LENGTH, 0))
1✔
643
            .append_header((header::AUTHORIZATION, Bearer::new(session_id.to_string())));
1✔
644

645
        let res = send_test_request(req, app_ctx.clone()).await;
1✔
646

647
        assert_eq!(res.status(), 202, "{:?}", res.response().error());
1✔
648

649
        // 6. Check abortion status
650

651
        let req = actix_web::test::TestRequest::get()
1✔
652
            .uri(&format!("/tasks/{task_id}/status"))
1✔
653
            .append_header((header::CONTENT_LENGTH, 0))
1✔
654
            .append_header((header::AUTHORIZATION, Bearer::new(session_id.to_string())));
1✔
655

656
        let res = send_test_request(req, app_ctx.clone()).await;
1✔
657

658
        assert_eq!(res.status(), 200, "{:?}", res.response().error());
1✔
659

660
        let status = read_body_json(res).await;
1✔
661

662
        assert_eq!(
1✔
663
            status,
1✔
664
            json!({
1✔
665
                "status": "aborted",
1✔
666
                "cleanUp": {
1✔
667
                    "status": "noCleanUp"
1✔
668
                },
1✔
669
            })
1✔
670
        );
1✔
671
    }
1✔
672

673
    #[ge_context::test]
2✔
674
    async fn test_duplicate(app_ctx: ProPostgresContext<NoTls>) {
1✔
675
        let unique_id = "highlander".to_string();
1✔
676

677
        let session = app_ctx.create_anonymous_session().await.unwrap();
1✔
678

1✔
679
        let tasks = app_ctx.session_context(session).tasks();
1✔
680

1✔
681
        let (task, complete_tx) = NopTask::new_with_sender_and_unique_id(unique_id.clone());
1✔
682
        tasks.schedule_task(task.boxed(), None).await.unwrap();
1✔
683

1✔
684
        let (task, _) = NopTask::new_with_sender_and_unique_id(unique_id.clone());
1✔
685
        assert!(tasks.schedule_task(task.boxed(), None).await.is_err());
1✔
686

687
        complete_tx.send(()).unwrap();
1✔
688
    }
1✔
689

690
    #[ge_context::test]
2✔
691
    async fn test_duplicate_after_finish(app_ctx: ProPostgresContext<NoTls>) {
1✔
692
        let unique_id = "highlander".to_string();
1✔
693

694
        let session = app_ctx.create_anonymous_session().await.unwrap();
1✔
695
        let ctx = app_ctx.session_context(session.clone());
1✔
696

1✔
697
        let tasks = Arc::new(ctx.tasks());
1✔
698

1✔
699
        // 1. start first task
1✔
700

1✔
701
        let (task, complete_tx) = NopTask::new_with_sender_and_unique_id(unique_id.clone());
1✔
702
        let task_id = tasks.schedule_task(task.boxed(), None).await.unwrap();
1✔
703

1✔
704
        // 2. wait for task to finish
1✔
705

1✔
706
        complete_tx.send(()).unwrap();
1✔
707

1✔
708
        wait_for_task_to_finish(tasks.clone(), task_id).await;
1✔
709

710
        // 3. start second task
711

712
        let (task, complete_tx) = NopTask::new_with_sender_and_unique_id(unique_id.clone());
1✔
713
        assert!(tasks.schedule_task(task.boxed(), None).await.is_ok());
1✔
714

715
        complete_tx.send(()).unwrap();
1✔
716
    }
1✔
717

718
    #[ge_context::test]
2✔
719
    async fn test_notify(app_ctx: ProPostgresContext<NoTls>) {
1✔
720
        let session = app_ctx.create_anonymous_session().await.unwrap();
1✔
721

1✔
722
        let tasks = app_ctx.session_context(session).tasks();
1✔
723

1✔
724
        // 1. start first task
1✔
725

1✔
726
        let (schedule_complete_tx, schedule_complete_rx) = oneshot::channel();
1✔
727
        let (task, complete_tx) = NopTask::new_with_sender();
1✔
728

1✔
729
        tasks
1✔
730
            .schedule_task(task.boxed(), Some(schedule_complete_tx))
1✔
731
            .await
1✔
732
            .unwrap();
1✔
733

1✔
734
        // finish task
1✔
735
        complete_tx.send(()).unwrap();
1✔
736

1✔
737
        // wait for completion notification
1✔
738

1✔
739
        assert!(matches!(
1✔
740
            schedule_complete_rx.await.unwrap(),
1✔
741
            TaskStatus::Completed { .. }
742
        ));
743
    }
1✔
744

745
    #[ge_context::test]
2✔
746
    async fn abort_subtasks(app_ctx: ProPostgresContext<NoTls>) {
1✔
747
        let session = app_ctx.create_anonymous_session().await.unwrap();
1✔
748
        let ctx = app_ctx.session_context(session.clone());
1✔
749

1✔
750
        let tasks = Arc::new(ctx.tasks());
1✔
751

1✔
752
        // 1. start super task
1✔
753

1✔
754
        let (subtask_a, complete_tx_a) = NopTask::new_with_sender();
1✔
755
        let (subtask_b, complete_tx_b) = NopTask::new_with_sender();
1✔
756

1✔
757
        let (task, _complete_tx) =
1✔
758
            TaskTree::new_with_sender(vec![subtask_a.boxed(), subtask_b.boxed()], tasks.clone());
1✔
759

760
        let task_id = tasks.schedule_task(task.boxed(), None).await.unwrap();
1✔
761

762
        // 2. wait for all subtasks to schedule
763

764
        let all_task_ids: Vec<TaskId> =
1✔
765
            geoengine_operators::util::retry::retry(5, 100, 2., None, || {
1✔
766
                let task_manager = tasks.clone();
1✔
767
                async move {
1✔
768
                    let task_list = task_manager
1✔
769
                        .list_tasks(TaskListOptions {
1✔
770
                            filter: None,
1✔
771
                            offset: 0,
1✔
772
                            limit: 10,
1✔
773
                        })
1✔
774
                        .await
1✔
775
                        .unwrap();
1✔
776

1✔
777
                    if task_list.len() == 3 {
1✔
778
                        Ok(task_list.into_iter().map(|t| t.task_id).collect())
3✔
779
                    } else {
UNCOV
780
                        Err(())
×
781
                    }
782
                }
1✔
783
            })
1✔
784
            .await
1✔
785
            .unwrap();
1✔
786

1✔
787
        // 3. abort task
1✔
788

1✔
789
        tasks.abort_tasks(task_id, false).await.unwrap();
1✔
790

1✔
791
        // allow clean-up to complete
1✔
792
        complete_tx_a.send(()).unwrap();
1✔
793
        complete_tx_b.send(()).unwrap();
1✔
794

795
        // 4. wait for completion
796

797
        for task_id in all_task_ids {
4✔
798
            wait_for_task_to_finish(tasks.clone(), task_id).await;
3✔
799
        }
800

801
        // 5. check results
802

803
        let list = tasks
1✔
804
            .list_tasks(TaskListOptions {
1✔
805
                filter: None,
1✔
806
                offset: 0,
1✔
807
                limit: 10,
1✔
808
            })
1✔
809
            .await
1✔
810
            .unwrap();
1✔
811

1✔
812
        assert_eq!(list.len(), 3);
1✔
813
        assert_eq!(
1✔
814
            serde_json::to_value(&list[0].status).unwrap(),
1✔
815
            json!({
1✔
816
                "status": "aborted",
1✔
817
                "cleanUp": {"status": "completed", "info": null}
1✔
818
            })
1✔
819
        );
1✔
820
        assert_eq!(
1✔
821
            serde_json::to_value(&list[1].status).unwrap(),
1✔
822
            json!({
1✔
823
                "status": "aborted",
1✔
824
                "cleanUp": {"status": "completed", "info": null}
1✔
825
            })
1✔
826
        );
1✔
827
        assert_eq!(
1✔
828
            serde_json::to_value(&list[2].status).unwrap(),
1✔
829
            json!({
1✔
830
                "status": "aborted",
1✔
831
                "cleanUp": {"status": "completed", "info": null}
1✔
832
            })
1✔
833
        );
1✔
834
    }
1✔
835

836
    #[ge_context::test]
2✔
837
    async fn test_failing_task_with_failing_cleanup(app_ctx: ProPostgresContext<NoTls>) {
1✔
838
        let session = app_ctx.create_anonymous_session().await.unwrap();
1✔
839
        let ctx = app_ctx.session_context(session.clone());
1✔
840

1✔
841
        let tasks = Arc::new(ctx.tasks());
1✔
842

843
        // 1. start task
844

845
        let task_id = tasks
1✔
846
            .schedule_task(FailingTaskWithFailingCleanup.boxed(), None)
1✔
847
            .await
1✔
848
            .unwrap();
1✔
849

1✔
850
        // 2. wait for completion
1✔
851

1✔
852
        wait_for_task_to_finish(tasks.clone(), task_id).await;
1✔
853

854
        // 3. check results
855

856
        assert_eq!(
1✔
857
            serde_json::to_value(tasks.get_task_status(task_id).await.unwrap()).unwrap(),
1✔
858
            json!({
1✔
859
                "status": "failed",
1✔
860
                "error": "FailingTaskWithFailingCleanupError",
1✔
861
                "cleanUp": {"status": "failed", "error": "FailingTaskWithFailingCleanupError"}
1✔
862
            })
1✔
863
        );
864
    }
1✔
865
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc