• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dcdpr / jp / 20416384001

21 Dec 2025 09:57PM UTC coverage: 51.395% (-0.003%) from 51.398%
20416384001

push

github

web-flow
fix(task): move task execution inside spawn to prevent blocking (#328)

Moves the invocation of the task logic inside the spawn block. This
ensures that any synchronous setup done by the task does not block the
caller of `TaskHandler::spawn`.

Also renames `Task::start` to `Task::run` to better reflect its
behavior.

Signed-off-by: Jean Mertz <git@jeanmertz.com>

0 of 2 new or added lines in 1 file covered. (0.0%)

1 existing line in 1 file now uncovered.

8806 of 17134 relevant lines covered (51.39%)

132.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/crates/jp_task/src/handler.rs
1
use std::{error::Error, time::Duration};
2

3
use jp_workspace::Workspace;
4
use tokio::task::{JoinError, JoinSet};
5
use tokio_util::sync::CancellationToken;
6
use tracing::{debug, trace, warn};
7

8
use crate::Task;
9

10
#[derive(Debug, Default)]
11
pub struct TaskHandler {
12
    tasks: JoinSet<Result<Box<dyn Task>, Box<dyn Error + Send + Sync>>>,
13
    cancel_token: CancellationToken,
14
}
15

16
impl TaskHandler {
17
    pub fn spawn(&mut self, task: impl Task) {
×
18
        let name = task.name();
×
19
        debug!(name, "Spawning task.");
×
NEW
20
        let token = self.cancel_token.child_token();
×
21
        self.tasks.spawn(async move {
×
NEW
22
            let mut task = Box::new(task).run(token);
×
UNCOV
23
            let now = tokio::time::Instant::now();
×
24
            loop {
25
                jp_macro::select!(
×
26
                    biased,
27
                    tokio::time::sleep(Duration::from_millis(500)),
×
28
                    |_wake| {
29
                        trace!(name, elapsed_ms = %now.elapsed().as_millis(), "Task running...");
×
30
                    },
31
                    &mut task,
×
32
                    |v| {
33
                        debug!(name, elapsed_ms = %now.elapsed().as_millis(), "Task completed.");
×
34
                        return v;
×
35
                    }
36
                );
37
            }
38
        });
×
39
    }
×
40

41
    pub async fn sync(
×
42
        &mut self,
×
43
        workspace: &mut Workspace,
×
44
        timeout: Duration,
×
45
    ) -> Result<(), Box<dyn Error + Send + Sync>> {
×
46
        if self.tasks.is_empty() {
×
47
            return Ok(());
×
48
        }
×
49

50
        let mut tasks: Vec<Box<dyn Task>> = Vec::new();
×
51
        self.wait_for_tasks(timeout, &mut tasks, false).await;
×
52

53
        // Force long-running tasks to stop if they aren't responding to the
54
        // cancellation signal.
55
        self.wait_for_tasks(Duration::from_secs(2), &mut tasks, true)
×
56
            .await;
×
57

58
        for task in tasks {
×
59
            if let Err(error) = task.sync(workspace).await {
×
60
                tracing::error!(%error, "Error syncing background task.");
×
61
            }
×
62
        }
63

64
        Ok(())
×
65
    }
×
66

67
    async fn wait_for_tasks(
×
68
        &mut self,
×
69
        timeout: Duration,
×
70
        tasks: &mut Vec<Box<dyn Task>>,
×
71
        shutdown: bool,
×
72
    ) {
×
73
        let timeout = tokio::time::sleep(timeout);
×
74
        tokio::pin!(timeout);
×
75

76
        loop {
77
            jp_macro::select!(
×
78
                biased,
79
                // Ask long-running tasks to stop.
80
                &mut timeout,
×
81
                |_wake| {
82
                    if shutdown {
×
83
                        warn!("Tasks did not respond to cancellation signal. Forcing shutdown.");
×
84
                        self.tasks.shutdown().await;
×
85
                    } else {
86
                        warn!(
×
87
                            "Task finalization timed out. Signalling cancellation to remaining \
×
88
                             tasks."
×
89
                        );
90
                        self.cancel_token.cancel();
×
91
                    }
92
                    break;
×
93
                },
94
                self.tasks.join_next(),
×
95
                |task| {
96
                    match task {
×
97
                        Some(task) => handle_task_completion(task, tasks),
×
98
                        None => break,
×
99
                    }
100
                },
101
            );
102
        }
103
    }
×
104
}
105

106
#[expect(clippy::type_complexity)]
107
fn handle_task_completion(
×
108
    result: Result<Result<Box<dyn Task>, Box<dyn Error + Send + Sync>>, JoinError>,
×
109
    tasks: &mut Vec<Box<dyn Task>>,
×
110
) {
×
111
    match result {
×
112
        Ok(Ok(task)) => tasks.push(task),
×
113
        Ok(Err(error)) => tracing::error!(%error, "Background task failed."),
×
114
        Err(error) => tracing::error!(%error, "Error waiting for background task to complete."),
×
115
    }
116
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc