• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dcdpr / jp / 18651220506

20 Oct 2025 11:57AM UTC coverage: 45.722% (-0.4%) from 46.105%
18651220506

Pull #276

github

web-flow
Merge 56c69b08c into 6de3b5333
Pull Request #276: feat(cli): Add graceful signal handling and configurable threading

0 of 189 new or added lines in 6 files covered. (0.0%)

6 existing lines in 4 files now uncovered.

6717 of 14691 relevant lines covered (45.72%)

15.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/crates/jp_task/src/handler.rs
1
use std::{error::Error, time::Duration};
2

3
use jp_workspace::Workspace;
4
use tokio::task::{JoinError, JoinSet};
5
use tokio_util::sync::CancellationToken;
6
use tracing::{debug, trace, warn};
7

8
use crate::Task;
9

10
#[derive(Debug, Default)]
11
pub struct TaskHandler {
12
    tasks: JoinSet<Result<Box<dyn Task>, Box<dyn Error + Send + Sync>>>,
13
    cancel_token: CancellationToken,
14
}
15

16
impl TaskHandler {
17
    pub fn spawn(&mut self, task: impl Task) {
×
18
        let name = task.name();
×
19
        debug!(name, "Spawning task.");
×
20
        let mut task = Box::new(task).start(self.cancel_token.child_token());
×
21
        self.tasks.spawn(async move {
×
22
            let now = tokio::time::Instant::now();
×
23
            loop {
NEW
24
                jp_macro::select!(
×
25
                    biased,
NEW
26
                    tokio::time::sleep(Duration::from_millis(500)),
×
27
                    |_wake| {
UNCOV
28
                        trace!(name, elapsed_ms = %now.elapsed().as_millis(), "Task running...");
×
29
                    },
NEW
30
                    &mut task,
×
31
                    |v| {
32
                        debug!(name, elapsed_ms = %now.elapsed().as_millis(), "Task completed.");
×
NEW
33
                        return v;
×
34
                    }
35
                );
36
            }
37
        });
×
38
    }
×
39

40
    pub async fn sync(
×
41
        &mut self,
×
42
        workspace: &mut Workspace,
×
43
        timeout: Duration,
×
44
    ) -> Result<(), Box<dyn Error + Send + Sync>> {
×
45
        if self.tasks.is_empty() {
×
46
            return Ok(());
×
47
        }
×
48

49
        let mut tasks: Vec<Box<dyn Task>> = Vec::new();
×
50
        self.wait_for_tasks(timeout, &mut tasks, false).await;
×
51

52
        // Force long-running tasks to stop if they aren't responding to the
53
        // cancellation signal.
54
        self.wait_for_tasks(Duration::from_secs(2), &mut tasks, true)
×
55
            .await;
×
56

57
        for task in tasks {
×
58
            if let Err(error) = task.sync(workspace).await {
×
59
                tracing::error!(%error, "Error syncing background task.");
×
60
            }
×
61
        }
62

63
        Ok(())
×
64
    }
×
65

66
    async fn wait_for_tasks(
×
67
        &mut self,
×
68
        timeout: Duration,
×
69
        tasks: &mut Vec<Box<dyn Task>>,
×
70
        shutdown: bool,
×
71
    ) {
×
72
        let timeout = tokio::time::sleep(timeout);
×
73
        tokio::pin!(timeout);
×
74

75
        loop {
NEW
76
            jp_macro::select!(
×
77
                biased,
78
                // Ask long-running tasks to stop.
NEW
79
                &mut timeout,
×
80
                |_wake| {
81
                    if shutdown {
×
82
                        warn!("Tasks did not respond to cancellation signal. Forcing shutdown.");
×
83
                        self.tasks.shutdown().await;
×
84
                    } else {
NEW
85
                        warn!(
×
NEW
86
                            "Task finalization timed out. Signalling cancellation to remaining \
×
NEW
87
                             tasks."
×
88
                        );
UNCOV
89
                        self.cancel_token.cancel();
×
90
                    }
91
                    break;
×
92
                },
NEW
93
                self.tasks.join_next(),
×
94
                |task| {
NEW
95
                    match task {
×
NEW
96
                        Some(task) => handle_task_completion(task, tasks),
×
NEW
97
                        None => break,
×
98
                    }
99
                },
100
            );
101
        }
102
    }
×
103
}
104

105
#[expect(clippy::type_complexity)]
106
fn handle_task_completion(
×
107
    result: Result<Result<Box<dyn Task>, Box<dyn Error + Send + Sync>>, JoinError>,
×
108
    tasks: &mut Vec<Box<dyn Task>>,
×
109
) {
×
110
    match result {
×
111
        Ok(Ok(task)) => tasks.push(task),
×
112
        Ok(Err(error)) => tracing::error!(%error, "Background task failed."),
×
113
        Err(error) => tracing::error!(%error, "Error waiting for background task to complete."),
×
114
    }
115
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc