• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dcdpr / jp / 26664039893

29 May 2026 09:50PM UTC coverage: 66.375% (+0.003%) from 66.372%
26664039893

push

github

web-flow
chore: reformat all markdown files using `comfort` (#699)

Signed-off-by: Jean Mertz <git@jeanmertz.com>

32028 of 48253 relevant lines covered (66.38%)

269.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

3.57
/crates/jp_task/src/handler.rs
1
use std::{error::Error, time::Duration};
2

3
use jp_workspace::Workspace;
4
use tokio::task::{JoinError, JoinSet};
5
use tokio_util::sync::CancellationToken;
6
use tracing::{debug, trace, warn};
7

8
use crate::Task;
9

10
#[derive(Debug, Default)]
11
pub struct TaskHandler {
12
    tasks: JoinSet<Result<Box<dyn Task>, Box<dyn Error + Send + Sync>>>,
13
    /// Soft-cancellation signal.
14
    /// Firing it asks each task's `run()` to return promptly; well-behaved
15
    /// tasks then proceed to their `sync()` phase under [`TaskHandler::sync`].
16
    /// Tasks that don't observe the token are force-aborted after the grace
17
    /// window.
18
    cancel_token: CancellationToken,
19
    /// Hard-cancellation signal.
20
    /// Firing it short-circuits both the soft wait and the grace window in
21
    /// [`TaskHandler::sync`], force-aborts the `JoinSet`, and skips the
22
    /// workspace-sync iteration entirely.
23
    /// Tasks that had completed their `run()` cleanly lose their pending
24
    /// workspace mutation.
25
    force_token: CancellationToken,
26
}
27

28
impl TaskHandler {
29
    /// Returns `true` if no tasks are currently live.
30
    #[must_use]
31
    pub fn is_empty(&self) -> bool {
2✔
32
        self.tasks.is_empty()
2✔
33
    }
2✔
34

35
    /// Returns a clone of the soft-cancellation token.
36
    ///
37
    /// Cancelling the token signals every task's `run()` to stop.
38
    /// The `sync()` phase still runs for tasks that returned cleanly.
39
    #[must_use]
40
    pub fn cancel_token(&self) -> CancellationToken {
×
41
        self.cancel_token.clone()
×
42
    }
×
43

44
    /// Returns a clone of the hard-cancellation token.
45
    ///
46
    /// Cancelling the token force-aborts the `JoinSet` and skips the
47
    /// workspace-sync iteration.
48
    /// Pending workspace mutations are dropped.
49
    #[must_use]
50
    pub fn force_token(&self) -> CancellationToken {
×
51
        self.force_token.clone()
×
52
    }
×
53

54
    pub fn spawn(&mut self, task: impl Task) {
×
55
        let name = task.name();
×
56
        debug!(name, "Spawning task.");
×
57
        let token = self.cancel_token.child_token();
×
58
        self.tasks.spawn(async move {
×
59
            let mut task = Box::new(task).run(token);
×
60
            let now = tokio::time::Instant::now();
×
61
            loop {
62
                jp_macro::select!(
×
63
                    biased,
64
                    tokio::time::sleep(Duration::from_millis(500)),
×
65
                    |_wake| {
66
                        trace!(name, elapsed_ms = %now.elapsed().as_millis(), "Task running...");
×
67
                    },
68
                    &mut task,
×
69
                    |v| {
70
                        debug!(name, elapsed_ms = %now.elapsed().as_millis(), "Task completed.");
×
71
                        return v;
×
72
                    }
73
                );
74
            }
75
        });
×
76
    }
×
77

78
    pub async fn sync(
×
79
        &mut self,
×
80
        workspace: &mut Workspace,
×
81
        timeout: Duration,
×
82
    ) -> Result<(), Box<dyn Error + Send + Sync>> {
×
83
        if self.tasks.is_empty() {
×
84
            return Ok(());
×
85
        }
×
86

87
        let mut tasks: Vec<Box<dyn Task>> = Vec::new();
×
88
        self.wait_for_tasks(timeout, &mut tasks, false).await;
×
89

90
        // Grace window for stragglers that didn't observe the soft
91
        // cancellation signal.
92
        self.wait_for_tasks(Duration::from_secs(2), &mut tasks, true)
×
93
            .await;
×
94

95
        // Force quit: drop accumulated results without applying them.
96
        if self.force_token.is_cancelled() {
×
97
            warn!(
×
98
                count = tasks.len(),
×
99
                "Force-quit requested; skipping workspace sync for collected tasks."
100
            );
101
            return Ok(());
×
102
        }
×
103

104
        for task in tasks {
×
105
            if let Err(error) = task.sync(workspace).await {
×
106
                tracing::error!(%error, "Error syncing background task.");
×
107
            }
×
108
        }
109

110
        Ok(())
×
111
    }
×
112

113
    async fn wait_for_tasks(
×
114
        &mut self,
×
115
        timeout: Duration,
×
116
        tasks: &mut Vec<Box<dyn Task>>,
×
117
        shutdown: bool,
×
118
    ) {
×
119
        let timeout = tokio::time::sleep(timeout);
×
120
        tokio::pin!(timeout);
×
121

122
        loop {
123
            jp_macro::select!(
×
124
                biased,
125
                self.force_token.cancelled(),
×
126
                |_force| {
127
                    // Force quit: abort everything immediately. The
128
                    // grace pass observes the same signal and exits via
129
                    // the empty-JoinSet branch.
130
                    warn!("Force-quit requested. Aborting background tasks.");
×
131
                    self.tasks.shutdown().await;
×
132
                    break;
×
133
                },
134
                self.cancel_token.cancelled(),
×
135
                |_cancel| if (!shutdown) {
×
136
                    // Soft cancellation fired externally during the soft
137
                    // wait: stop waiting and let the grace pass collect
138
                    // any tasks still in flight.
139
                    break;
×
140
                },
141
                &mut timeout,
×
142
                |_wake| {
143
                    if shutdown {
×
144
                        warn!("Tasks did not respond to cancellation signal. Forcing shutdown.");
×
145
                        self.tasks.shutdown().await;
×
146
                    } else {
147
                        warn!(
×
148
                            "Task finalization timed out. Signalling cancellation to remaining \
149
                             tasks."
150
                        );
151
                        self.cancel_token.cancel();
×
152
                    }
153
                    break;
×
154
                },
155
                self.tasks.join_next(),
×
156
                |task| {
157
                    match task {
×
158
                        Some(task) => handle_task_completion(task, tasks),
×
159
                        None => break,
×
160
                    }
161
                },
162
            );
163
        }
164
    }
×
165
}
166

167
#[expect(clippy::type_complexity)]
168
fn handle_task_completion(
×
169
    result: Result<Result<Box<dyn Task>, Box<dyn Error + Send + Sync>>, JoinError>,
×
170
    tasks: &mut Vec<Box<dyn Task>>,
×
171
) {
×
172
    match result {
×
173
        Ok(Ok(task)) => tasks.push(task),
×
174
        Ok(Err(error)) => tracing::error!(%error, "Background task failed."),
×
175
        Err(error) => tracing::error!(%error, "Error waiting for background task to complete."),
×
176
    }
177
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc