• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Neptune-Crypto / neptune-core / 14686931002

27 Apr 2025 01:18AM UTC coverage: 79.739% (-5.2%) from 84.945%
14686931002

push

github

dan-da
refactor: simplify job-queue and log unique job-id

These changes were motivated by a need to follow/debug job-queue
behavior in the logs when shared by all async unit tests.

Primary changes:
1. Each job is now assigned a randomly generated 12-byte job-id that is
   logged when adding a job and when it begins and completes
   processing.
2. Simplifies queue implementation by removing the "add_job" spawned
   task.  There is now just a single "process_job" task, and jobs are
   added to the queue, via mutex, in the caller's task.
3. The process_job task listens for a Stop message and will stop
   right away, after signaling running job to cancel, if any.
4. Adds an async stop() method that also drops the queue. It performs
   cleanup better than a simple drop().
5. improve error handling.

Squashed:

* refactor job-queue so there is only one spawned task
* separate add-job and stop into separate channels
* add job_id to jobs, log
* job-queue cleanups
* wait for current job to complete when stopping queue

84 of 124 new or added lines in 1 file covered. (67.74%)

331 existing lines in 31 files now uncovered.

37191 of 46641 relevant lines covered (79.74%)

231785.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.57
/src/job_queue/queue.rs
1
use std::collections::VecDeque;
2
use std::fmt;
3
use std::sync::Arc;
4
use std::sync::Mutex;
5

6
use tokio::sync::oneshot;
7
use tokio::sync::watch;
8
use tokio::task::JoinHandle;
9

10
use super::errors::AddJobError;
11
use super::errors::JobHandleError;
12
use super::errors::StopQueueError;
13
use super::traits::Job;
14
use super::traits::JobCancelReceiver;
15
use super::traits::JobCancelSender;
16
use super::traits::JobCompletion;
17
use super::traits::JobResult;
18
use super::traits::JobResultReceiver;
19
use super::traits::JobResultSender;
20

21
/// a randomly generated Job identifier
22
#[derive(Debug, Clone, Copy)]
23
pub struct JobId([u8; 12]);
24

25
impl std::fmt::Display for JobId {
26
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2,497✔
27
        for byte in &self.0 {
32,461✔
28
            write!(f, "{:02x}", byte)?;
29,964✔
29
        }
30
        Ok(())
2,497✔
31
    }
2,497✔
32
}
33

34
impl JobId {
35
    fn random() -> Self {
1,382✔
36
        Self(rand::random())
1,382✔
37
    }
1,382✔
38
}
39

40
/// A job-handle enables cancelling a job and awaiting results
41
#[derive(Debug)]
42
pub struct JobHandle {
43
    job_id: JobId,
44
    result_rx: JobResultReceiver,
45
    cancel_tx: JobCancelSender,
46
}
47
impl JobHandle {
48
    /// wait for job to complete
49
    ///
50
    /// a completed job may either be finished, cancelled, or panicked.
51
    pub async fn complete(self) -> Result<JobCompletion, JobHandleError> {
1,323✔
52
        Ok(self.result_rx.await?)
1,323✔
53
    }
1,323✔
54

55
    /// wait for job result, or err if cancelled or a panic occurred within job.
56
    pub async fn result(self) -> Result<Box<dyn JobResult>, JobHandleError> {
1,321✔
57
        match self.complete().await? {
1,321✔
58
            JobCompletion::Finished(r) => Ok(r),
1,319✔
59
            JobCompletion::Cancelled => Err(JobHandleError::JobCancelled),
×
60
            JobCompletion::Panicked(e) => Err(JobHandleError::JobPanicked(e)),
2✔
61
        }
62
    }
1,321✔
63

64
    /// cancel job and return immediately.
65
    pub fn cancel(&self) -> Result<(), JobHandleError> {
×
66
        Ok(self.cancel_tx.send(())?)
×
67
    }
×
68

69
    /// cancel job and wait for it to complete.
70
    pub async fn cancel_and_await(self) -> Result<JobCompletion, JobHandleError> {
2✔
71
        self.cancel_tx.send(())?;
2✔
72
        self.complete().await
2✔
73
    }
2✔
74

75
    /// channel receiver for job results
76
    pub fn result_rx(self) -> JobResultReceiver {
54✔
77
        self.result_rx
54✔
78
    }
54✔
79

80
    /// channel sender for cancelling job.
81
    pub fn cancel_tx(&self) -> &JobCancelSender {
1,297✔
82
        &self.cancel_tx
1,297✔
83
    }
1,297✔
84

NEW
85
    pub fn job_id(&self) -> JobId {
×
NEW
86
        self.job_id
×
NEW
87
    }
×
88
}
89

90
/// represents a job in the queue.
91
struct QueuedJob<P> {
92
    job: Box<dyn Job>,
93
    job_id: JobId,
94
    result_tx: JobResultSender,
95
    cancel_tx: JobCancelSender,
96
    cancel_rx: JobCancelReceiver,
97
    priority: P,
98
}
99

100
impl<P: fmt::Debug> fmt::Debug for QueuedJob<P> {
NEW
101
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
×
NEW
102
        f.debug_struct("QueuedJob")
×
NEW
103
            .field("job", &"Box<dyn Job>")
×
NEW
104
            .field("job_id", &self.job_id)
×
NEW
105
            .field("result_tx", &"JobResultSender")
×
NEW
106
            .field("cancel_tx", &"JobCancelSender")
×
NEW
107
            .field("cancel_rx", &"JobCancelReceiver")
×
NEW
108
            .field("priority", &self.priority)
×
NEW
109
            .finish()
×
NEW
110
    }
×
111
}
112

113
/// represents the currently executing job
114
#[derive(Debug)]
115
struct CurrentJob {
116
    job_num: usize,
117
    job_id: JobId,
118
    cancel_tx: JobCancelSender,
119
}
120

121
/// represents data shared between tasks/threads
122
#[derive(Debug)]
123
struct Shared<P: Ord> {
124
    jobs: VecDeque<QueuedJob<P>>,
125
    current_job: Option<CurrentJob>,
126
}
127

128
/// implements a job queue that sends result of each job to a listener.
129
#[derive(Debug)]
130
pub struct JobQueue<P: Ord + Send + Sync + 'static> {
131
    shared: Arc<Mutex<Shared<P>>>,
132

133
    tx_job_added: tokio::sync::mpsc::UnboundedSender<()>,
134
    tx_stop: tokio::sync::watch::Sender<()>,
135

136
    process_jobs_task_handle: Option<JoinHandle<()>>, // Store the job processing task handle
137
}
138

139
impl<P: Ord + Send + Sync + 'static> Drop for JobQueue<P> {
140
    fn drop(&mut self) {
13✔
141
        tracing::debug!("in JobQueue::drop()");
13✔
142

143
        if !self.tx_stop.is_closed() {
13✔
144
            if let Err(e) = self.tx_stop.send(()) {
13✔
NEW
145
                tracing::error!("{}", e);
×
146
            }
13✔
NEW
147
        }
×
148
    }
13✔
149
}
150

151
impl<P: Ord + Send + Sync + 'static> JobQueue<P> {
152
    /// creates job queue and starts it processing.  returns immediately.
153
    pub fn start() -> Self {
145✔
154
        let shared = Shared {
145✔
155
            jobs: VecDeque::new(),
145✔
156
            current_job: None,
145✔
157
        };
145✔
158
        let shared: Arc<Mutex<Shared<P>>> = Arc::new(Mutex::new(shared));
145✔
159

145✔
160
        let (tx_job_added, mut rx_job_added) = tokio::sync::mpsc::unbounded_channel();
145✔
161
        let (tx_stop, mut rx_stop) = tokio::sync::watch::channel(());
145✔
162

145✔
163
        // spawns background task that processes job queue and runs jobs.
145✔
164
        let shared2 = shared.clone();
145✔
165
        let process_jobs_task_handle = tokio::spawn(async move {
145✔
166
            let mut job_num: usize = 1;
145✔
167

168
            loop {
169
                tokio::select!(
1,522✔
170
                _ = rx_stop.changed() => {
1,522✔
171
                    tracing::debug!("task process_jobs received Stop message.");
4✔
172

173
                    // if there is a presently executing job we need to cancel it
174
                    // and wait for it to complete.
175
                    let maybe_info = shared2.lock().unwrap().current_job.as_ref().map(|cj| (cj.job_id, cj.cancel_tx.clone()) );
4✔
176
                    if let Some((job_id, cancel_tx)) = maybe_info {
4✔
NEW
177
                        match cancel_tx.send(()) {
×
178
                            Ok(()) => {
179
                                // wait for channel to close, indicating job has cancelled (or otherwise completed)
NEW
180
                                tracing::debug!("JobQueue: notified current job {} to cancel.  waiting...", job_id);
×
NEW
181
                                cancel_tx.closed().await;
×
NEW
182
                                tracing::debug!("JobQueue: current job {} has cancelled.", job_id);
×
183
                            }
NEW
184
                            Err(e) => {
×
NEW
185
                                tracing::warn!("could not send cancellation msg to current job {}. {}", job_id, e)
×
186
                            }
187
                        }
188
                    }
4✔
189

190
                    break;
4✔
191
                }
192
                _ = rx_job_added.recv() => {
1,522✔
193
                    tracing::debug!("task process_jobs received JobAdded message.");
1,383✔
194
                    let (next_job, num_pending) = {
1,383✔
195
                        let mut guard = shared2.lock().unwrap();
1,383✔
196

1,383✔
197
                        // This is where we pick the highest priority job
1,383✔
198
                        guard
1,383✔
199
                            .jobs
1,383✔
200
                            .make_contiguous()
1,383✔
201
                            .sort_by(|a, b| b.priority.cmp(&a.priority));
1,383✔
202
                        let job = guard.jobs.pop_front().unwrap();
1,383✔
203

1,383✔
204
                        guard.current_job = Some(CurrentJob {
1,383✔
205
                            job_num,
1,383✔
206
                            job_id: job.job_id,
1,383✔
207
                            cancel_tx: job.cancel_tx.clone(),
1,383✔
208
                        });
1,383✔
209
                        (job, guard.jobs.len())
1,383✔
210
                    };
1,383✔
211

1,383✔
212
                    tracing::info!(
1,383✔
NEW
213
                        "  *** JobQueue: begin job #{} - {} - {} queued job(s) ***",
×
214
                        job_num,
215
                        next_job.job_id,
216
                        num_pending
217
                    );
218
                    let timer = tokio::time::Instant::now();
1,383✔
219
                    let task_handle = if next_job.job.is_async() {
1,383✔
220
                        tokio::spawn(async move {
1,339✔
221
                            next_job.job.run_async_cancellable(next_job.cancel_rx).await
1,339✔
222
                        })
1,339✔
223
                    } else {
224
                        tokio::task::spawn_blocking(move || next_job.job.run(next_job.cancel_rx))
44✔
225
                    };
226

227
                    let job_completion = match task_handle.await {
1,383✔
228
                        Ok(jc) => jc,
1,375✔
229
                        Err(e) => {
2✔
230
                            if e.is_panic() {
2✔
231
                                JobCompletion::Panicked(e.into_panic())
2✔
NEW
232
                            } else if e.is_cancelled() {
×
NEW
233
                                JobCompletion::Cancelled
×
234
                            } else {
NEW
235
                                unreachable!()
×
236
                            }
237
                        }
238
                    };
239

240
                    tracing::info!(
1,377✔
NEW
241
                        "  *** JobQueue: ended job #{} - {} - Completion: {} - {} secs ***",
×
NEW
242
                        job_num,
×
NEW
243
                        next_job.job_id,
×
NEW
244
                        job_completion,
×
NEW
245
                        timer.elapsed().as_secs_f32()
×
246
                    );
247
                    job_num += 1;
1,377✔
248

1,377✔
249
                    shared2.lock().unwrap().current_job = None;
1,377✔
250

251
                    if let Err(e) = next_job.result_tx.send(job_completion) {
1,377✔
NEW
252
                        tracing::warn!("job-handle dropped? {}", e);
×
253
                    }
1,377✔
254
                });
255
            }
256
            tracing::debug!("task process_jobs exiting");
4✔
257
        });
145✔
258

145✔
259
        tracing::info!("JobQueue: started new queue.");
145✔
260

261
        Self {
145✔
262
            tx_job_added,
145✔
263
            tx_stop,
145✔
264
            shared,
145✔
265
            process_jobs_task_handle: Some(process_jobs_task_handle),
145✔
266
        }
145✔
267
    }
145✔
268

269
    /// stop the job-queue, and drop it.
270
    ///
271
    /// this method sends a message to the spawned job-queue task
272
    /// to stop and then waits for it to complete.
273
    ///
274
    /// Comparison with drop():
275
    ///
276
    /// if JobQueue is dropped:
277
    ///  1. the stop message will be sent, but any error is ignored.
278
    ///  2. the spawned task is not awaited.
NEW
279
    pub async fn stop(mut self) -> Result<(), StopQueueError> {
×
NEW
280
        tracing::info!("JobQueue: stopping.");
×
281

NEW
282
        self.tx_stop.send(())?;
×
283

NEW
284
        if let Some(jh) = self.process_jobs_task_handle.take() {
×
NEW
285
            jh.await?;
×
NEW
286
        }
×
287

NEW
288
        Ok(())
×
NEW
289
    }
×
290

291
    /// adds job to job-queue and returns immediately.
292
    ///
293
    /// job-results can be obtained by via JobHandle::results().await
294
    /// The job can be cancelled by JobHandle::cancel()
295
    pub fn add_job(&self, job: Box<dyn Job>, priority: P) -> Result<JobHandle, AddJobError> {
1,382✔
296
        let (result_tx, result_rx) = oneshot::channel();
1,382✔
297
        let (cancel_tx, cancel_rx) = watch::channel::<()>(());
1,382✔
298

1,382✔
299
        let job_id = JobId::random();
1,382✔
300

1,382✔
301
        let m = QueuedJob {
1,382✔
302
            job,
1,382✔
303
            job_id,
1,382✔
304
            result_tx,
1,382✔
305
            cancel_tx: cancel_tx.clone(),
1,382✔
306
            cancel_rx,
1,382✔
307
            priority,
1,382✔
308
        };
1,382✔
309

310
        let (num_jobs, job_running) = {
1,382✔
311
            let mut guard = self.shared.lock().unwrap();
1,382✔
312
            guard.jobs.push_back(m);
1,382✔
313
            let job_running = match &guard.current_job {
1,382✔
314
                Some(j) => format!("#{} - {}", j.job_num, j.job_id),
45✔
315
                None => "none".to_string(),
1,337✔
316
            };
317
            (guard.jobs.len(), job_running)
1,382✔
318
        };
1,382✔
319
        tracing::info!(
1,382✔
NEW
320
            "JobQueue: job added - {}  {} queued job(s).  job running: {}",
×
321
            job_id,
322
            num_jobs,
323
            job_running
324
        );
325

326
        self.tx_job_added.send(())?;
1,382✔
327

328
        Ok(JobHandle {
1,382✔
329
            job_id,
1,382✔
330
            result_rx,
1,382✔
331
            cancel_tx,
1,382✔
332
        })
1,382✔
333
    }
1,382✔
334
}
335

336
#[cfg(test)]
337
mod tests {
338
    use std::time::Instant;
339

340
    use tracing_test::traced_test;
341

342
    use super::*;
343

344
    #[tokio::test(flavor = "multi_thread")]
345
    #[traced_test]
×
346
    async fn run_sync_jobs_by_priority() -> anyhow::Result<()> {
1✔
347
        workers::run_jobs_by_priority(false).await
1✔
348
    }
1✔
349

350
    #[tokio::test(flavor = "multi_thread")]
351
    #[traced_test]
×
352
    async fn run_async_jobs_by_priority() -> anyhow::Result<()> {
1✔
353
        workers::run_jobs_by_priority(true).await
1✔
354
    }
1✔
355

356
    #[tokio::test(flavor = "multi_thread")]
357
    #[traced_test]
×
358
    async fn get_sync_job_result() -> anyhow::Result<()> {
1✔
359
        workers::get_job_result(false).await
1✔
360
    }
1✔
361

362
    #[tokio::test(flavor = "multi_thread")]
363
    #[traced_test]
×
364
    async fn get_async_job_result() -> anyhow::Result<()> {
1✔
365
        workers::get_job_result(true).await
1✔
366
    }
1✔
367

368
    #[tokio::test(flavor = "multi_thread")]
369
    #[traced_test]
×
370
    async fn cancel_sync_job() -> anyhow::Result<()> {
1✔
371
        workers::cancel_job(false).await
1✔
372
    }
1✔
373

374
    #[tokio::test(flavor = "multi_thread")]
375
    #[traced_test]
×
376
    async fn cancel_async_job() -> anyhow::Result<()> {
1✔
377
        workers::cancel_job(true).await
1✔
378
    }
1✔
379

380
    #[test]
381
    #[traced_test]
×
382
    fn runtime_shutdown_timeout_force_cancels_sync_job() -> anyhow::Result<()> {
1✔
383
        workers::runtime_shutdown_timeout_force_cancels_job(false)
1✔
384
    }
1✔
385

386
    #[test]
387
    #[traced_test]
×
388
    fn runtime_shutdown_timeout_force_cancels_async_job() -> anyhow::Result<()> {
1✔
389
        workers::runtime_shutdown_timeout_force_cancels_job(true)
1✔
390
    }
1✔
391

392
    #[test]
393
    #[traced_test]
×
394
    fn runtime_shutdown_cancels_sync_job() {
1✔
395
        let _ = workers::runtime_shutdown_cancels_job(false);
1✔
396
    }
1✔
397

398
    #[test]
399
    #[traced_test]
×
400
    fn runtime_shutdown_cancels_async_job() -> anyhow::Result<()> {
1✔
401
        workers::runtime_shutdown_cancels_job(true)
1✔
402
    }
1✔
403

404
    #[test]
405
    #[traced_test]
×
406
    fn spawned_tasks_live_as_long_as_jobqueue() -> anyhow::Result<()> {
1✔
407
        workers::spawned_tasks_live_as_long_as_jobqueue(true)
1✔
408
    }
1✔
409

410
    #[tokio::test(flavor = "multi_thread")]
411
    #[traced_test]
×
412
    async fn panic_in_async_job_ends_job_cleanly() -> anyhow::Result<()> {
1✔
413
        workers::panics::panic_in_job_ends_job_cleanly(true).await
1✔
414
    }
1✔
415

416
    #[tokio::test(flavor = "multi_thread")]
417
    #[traced_test]
×
418
    async fn panic_in_blocking_job_ends_job_cleanly() -> anyhow::Result<()> {
1✔
419
        workers::panics::panic_in_job_ends_job_cleanly(false).await
1✔
420
    }
1✔
421

422
    mod workers {
423
        use std::any::Any;
424

425
        use super::*;
426
        use crate::job_queue::errors::JobHandleErrorSync;
427

428
        #[derive(PartialEq, Eq, PartialOrd, Ord)]
429
        pub enum DoubleJobPriority {
430
            Low = 1,
431
            Medium = 2,
432
            High = 3,
433
        }
434

435
        #[derive(PartialEq, Debug, Clone)]
436
        struct DoubleJobResult(u64, u64, Instant);
437
        impl JobResult for DoubleJobResult {
438
            fn as_any(&self) -> &dyn Any {
472✔
439
                self
472✔
440
            }
472✔
441
            fn into_any(self: Box<Self>) -> Box<dyn Any> {
74✔
442
                self
74✔
443
            }
74✔
444
        }
445

446
        // represents a prover job.  implements Job.
447
        struct DoubleJob {
448
            data: u64,
449
            duration: std::time::Duration,
450
            is_async: bool,
451
        }
452

453
        #[async_trait::async_trait]
454
        impl Job for DoubleJob {
455
            fn is_async(&self) -> bool {
83✔
456
                self.is_async
83✔
457
            }
83✔
458

459
            fn run(&self, cancel_rx: JobCancelReceiver) -> JobCompletion {
42✔
460
                let start = Instant::now();
42✔
461
                let sleep_time =
42✔
462
                    std::cmp::min(std::time::Duration::from_micros(100), self.duration);
42✔
463

464
                let r = loop {
42✔
465
                    if start.elapsed() < self.duration {
5,886✔
466
                        match cancel_rx.has_changed() {
5,847✔
467
                            Ok(changed) if changed => break JobCompletion::Cancelled,
1✔
468
                            Err(_) => break JobCompletion::Cancelled,
2✔
469
                            _ => {}
5,844✔
470
                        }
5,844✔
471

5,844✔
472
                        std::thread::sleep(sleep_time);
5,844✔
473
                    } else {
474
                        break JobCompletion::Finished(Box::new(DoubleJobResult(
39✔
475
                            self.data,
39✔
476
                            self.data * 2,
39✔
477
                            Instant::now(),
39✔
478
                        )));
39✔
479
                    }
480
                };
481

482
                tracing::info!("results: {:?}", r);
42✔
483
                r
42✔
484
            }
42✔
485

486
            async fn run_async(&self) -> Box<dyn JobResult> {
41✔
487
                tokio::time::sleep(self.duration).await;
41✔
488
                let r = DoubleJobResult(self.data, self.data * 2, Instant::now());
37✔
489

37✔
490
                tracing::info!("results: {:?}", r);
37✔
491
                Box::new(r)
37✔
492
            }
78✔
493
        }
494

495
        // this test demonstrates/verifies that:
496
        //  1. jobs are run in priority order, highest priority first.
497
        //  2. when multiple jobs have the same priority, they run in FIFO order.
498
        pub(super) async fn run_jobs_by_priority(is_async: bool) -> anyhow::Result<()> {
2✔
499
            let start_of_test = Instant::now();
2✔
500

2✔
501
            // create a job queue
2✔
502
            let job_queue = JobQueue::start();
2✔
503

2✔
504
            let mut handles = vec![];
2✔
505
            let duration = std::time::Duration::from_millis(20);
2✔
506

507
            // create 30 jobs, 10 at each priority level.
508
            for i in (1..10).rev() {
18✔
509
                let job1 = Box::new(DoubleJob {
18✔
510
                    data: i,
18✔
511
                    duration,
18✔
512
                    is_async,
18✔
513
                });
18✔
514
                let job2 = Box::new(DoubleJob {
18✔
515
                    data: i * 100,
18✔
516
                    duration,
18✔
517
                    is_async,
18✔
518
                });
18✔
519
                let job3 = Box::new(DoubleJob {
18✔
520
                    data: i * 1000,
18✔
521
                    duration,
18✔
522
                    is_async,
18✔
523
                });
18✔
524

18✔
525
                // process job and print results.
18✔
526
                handles.push(job_queue.add_job(job1, DoubleJobPriority::Low)?.result_rx());
18✔
527
                handles.push(
18✔
528
                    job_queue
18✔
529
                        .add_job(job2, DoubleJobPriority::Medium)?
18✔
530
                        .result_rx(),
18✔
531
                );
18✔
532
                handles.push(
18✔
533
                    job_queue
18✔
534
                        .add_job(job3, DoubleJobPriority::High)?
18✔
535
                        .result_rx(),
18✔
536
                );
537
            }
538

539
            // wait for all jobs to complete.
540
            let mut results = futures::future::join_all(handles).await;
2✔
541

542
            // the results are in the same order as handles passed to join_all.
543
            // we sort them by the timestamp in job result, ascending.
544
            results.sort_by(
2✔
545
                |a_completion, b_completion| match (a_completion, b_completion) {
236✔
546
                    (Ok(JobCompletion::Finished(a_dyn)), Ok(JobCompletion::Finished(b_dyn))) => {
236✔
547
                        let a = a_dyn.as_any().downcast_ref::<DoubleJobResult>().unwrap().2;
236✔
548

236✔
549
                        let b = b_dyn.as_any().downcast_ref::<DoubleJobResult>().unwrap().2;
236✔
550

236✔
551
                        a.cmp(&b)
236✔
552
                    }
553
                    _ => panic!("at least one job did not finish"),
×
554
                },
236✔
555
            );
2✔
556

2✔
557
            // iterate job results and verify that:
2✔
558
            //   timestamp of each is greater than prev.
2✔
559
            //   input value of each is greater than prev, except every 9th item which should be < prev
2✔
560
            //     because there are nine jobs per level.
2✔
561
            let mut prev = Box::new(DoubleJobResult(9999, 0, start_of_test));
2✔
562
            for (i, c) in results.into_iter().enumerate() {
54✔
563
                let Ok(JobCompletion::Finished(dyn_result)) = c else {
54✔
564
                    panic!("A job did not finish");
×
565
                };
566

567
                let job_result = dyn_result.into_any().downcast::<DoubleJobResult>().unwrap();
54✔
568

54✔
569
                assert!(job_result.2 > prev.2);
54✔
570

571
                // we don't do the assertion for the 2nd job because the job-queue starts
572
                // processing immediately and so a race condition is setup where it is possible
573
                // for either the Low priority or High job to start processing first.
574
                if i != 1 {
54✔
575
                    assert!(job_result.0 < prev.0);
52✔
576
                }
2✔
577

578
                prev = job_result;
54✔
579
            }
580

581
            Ok(())
2✔
582
        }
2✔
583

584
        // this test demonstrates/verifies that a job can return a result back to
585
        // the job initiator.
586
        pub(super) async fn get_job_result(is_async: bool) -> anyhow::Result<()> {
2✔
587
            // create a job queue
2✔
588
            let job_queue = JobQueue::start();
2✔
589
            let duration = std::time::Duration::from_millis(20);
2✔
590

591
            // create 10 jobs
592
            for i in 0..10 {
22✔
593
                let job = Box::new(DoubleJob {
20✔
594
                    data: i,
20✔
595
                    duration,
20✔
596
                    is_async,
20✔
597
                });
20✔
598

599
                let result = job_queue
20✔
600
                    .add_job(job, DoubleJobPriority::Low)?
20✔
601
                    .result()
20✔
602
                    .await
20✔
603
                    .map_err(|e| e.into_sync())?;
20✔
604

605
                let job_result = result.into_any().downcast::<DoubleJobResult>().unwrap();
20✔
606

20✔
607
                assert_eq!(i, job_result.0);
20✔
608
                assert_eq!(i * 2, job_result.1);
20✔
609
            }
610

611
            Ok(())
2✔
612
        }
2✔
613

614
        // tests/demonstrates that a long running job can be cancelled early.
615
        pub(super) async fn cancel_job(is_async: bool) -> anyhow::Result<()> {
2✔
616
            // create a job queue
2✔
617
            let job_queue = JobQueue::start();
2✔
618
            // start a 1 hour job.
2✔
619
            let duration = std::time::Duration::from_secs(3600); // 1 hour job.
2✔
620

2✔
621
            let job = Box::new(DoubleJob {
2✔
622
                data: 10,
2✔
623
                duration,
2✔
624
                is_async,
2✔
625
            });
2✔
626
            let job_handle = job_queue.add_job(job, DoubleJobPriority::Low)?;
2✔
627

628
            tokio::time::sleep(std::time::Duration::from_millis(20)).await;
2✔
629

630
            let completion = job_handle.cancel_and_await().await.unwrap();
2✔
631
            assert!(matches!(completion, JobCompletion::Cancelled));
2✔
632

633
            Ok(())
2✔
634
        }
2✔
635

636
        // note: creates own tokio runtime.  caller must not use [tokio::test]
637
        //
638
        // this test starts a job that runs for 1 hour and then attempts to
639
        // shutdown tokio runtime via shutdown_timeout() with a 1 sec timeout.
640
        //
641
        // any async tasks should be aborted quickly.
642
        // any sync tasks will continue to run to completion.
643
        //
644
        // shutdown_timeout() will wait for tasks to abort for 1 sec and then
645
        // returns.  Any un-aborted tasks/threads become ignored/detached.
646
        // The OS can cleanup such threads when the process exits.
647
        //
648
        // the test checks that the shutdown completes in under 2 secs.
649
        //
650
        // the test demonstrates that shutdown_timeout() can be used to shutdown
651
        // tokio runtime even if sync (spawn_blocking) tasks/threads are still running
652
        // in the blocking threadpool.
653
        //
654
        // when called with is_async=true, it demonstrates that shutdown_timeout() also
655
        // aborts async jobs, as one would expect.
656
        pub(super) fn runtime_shutdown_timeout_force_cancels_job(
2✔
657
            is_async: bool,
2✔
658
        ) -> anyhow::Result<()> {
2✔
659
            let rt = tokio::runtime::Runtime::new()?;
2✔
660
            let result = rt.block_on(async {
2✔
661
                // create a job queue
2✔
662
                let job_queue = JobQueue::start();
2✔
663
                // start a 1 hour job.
2✔
664
                let duration = std::time::Duration::from_secs(3600); // 1 hour job.
2✔
665

2✔
666
                let job = Box::new(DoubleJob {
2✔
667
                    data: 10,
2✔
668
                    duration,
2✔
669
                    is_async,
2✔
670
                });
2✔
671
                let _rx = job_queue.add_job(job, DoubleJobPriority::Low)?;
2✔
672

673
                tokio::time::sleep(std::time::Duration::from_millis(20)).await;
2✔
674
                println!("finished scope");
2✔
675

2✔
676
                Ok(())
2✔
677
            });
2✔
678

2✔
679
            let start = std::time::Instant::now();
2✔
680

2✔
681
            println!("waiting 1 second for job before shutdown runtime");
2✔
682
            rt.shutdown_timeout(tokio::time::Duration::from_secs(1));
2✔
683

2✔
684
            assert!(start.elapsed() < std::time::Duration::from_secs(2));
2✔
685

686
            result
2✔
687
        }
2✔
688

689
        // note: creates own tokio runtime.  caller must not use [tokio::test]
690
        //
691
        // this test starts a job that runs for 5 secs and then attempts to
692
        // shutdown tokio runtime normally by dropping it.
693
        //
694
        // any async tasks should be aborted quickly.
695
        // any sync tasks will continue to run to completion.
696
        //
697
        // the tokio runtime does not complete the drop() until all tasks
698
        // have completed/aborted.
699
        //
700
        // the test checks that the job finishes in less than the 5 secs
701
        // required for full completion.  In other words, that it aborts.
702
        //
703
        // the test is expected to succeed for async jobs but fail for sync jobs.
704
        pub(super) fn runtime_shutdown_cancels_job(is_async: bool) -> anyhow::Result<()> {
2✔
705
            let rt = tokio::runtime::Runtime::new()?;
2✔
706
            let start = tokio::time::Instant::now();
2✔
707

2✔
708
            let result = rt.block_on(async {
2✔
709
                // create a job queue
2✔
710
                let job_queue = JobQueue::start();
2✔
711

2✔
712
                // this job takes at least 5 secs to complete.
2✔
713
                let duration = std::time::Duration::from_secs(5);
2✔
714

2✔
715
                let job = Box::new(DoubleJob {
2✔
716
                    data: 10,
2✔
717
                    duration,
2✔
718
                    is_async,
2✔
719
                });
2✔
720

721
                let rx_handle = job_queue.add_job(job, DoubleJobPriority::Low)?;
2✔
722
                drop(rx_handle);
2✔
723

2✔
724
                // sleep 50 ms to let job get started.
2✔
725
                tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2✔
726

727
                Ok(())
2✔
728
            });
2✔
729

2✔
730
            // drop the tokio runtime. It will attempt to abort tasks.
2✔
731
            //   - async tasks can normally be aborted
2✔
732
            //   - spawn_blocking (sync) tasks cannot normally be aborted.
2✔
733
            drop(rt);
2✔
734

2✔
735
            // if test is successful, elapsed time should be less than the 5 secs
2✔
736
            // it takes for the job to complete.  (should be around 0.5 ms)
2✔
737

2✔
738
            // however it is expected/normal that sync tasks will not be aborted
2✔
739
            // and will run for full 5 secs.  thus this assert will fail for them.
2✔
740

2✔
741
            assert!(start.elapsed() < std::time::Duration::from_secs(5));
2✔
742

743
            result
2✔
744
        }
2✔
745

746
        // this test attempts to verify that the task spawned by the JobQueue
747
        // continues running until the JobQueue is dropped after the tokio
748
        // runtime is dropped.
749
        //
750
        // If the tasks are cencelled before JobQueue is dropped then a subsequent
751
        // api call that sends a msg will result in a "channel closed" error, which
752
        // is what the test checks for.
753
        //
754
        // note that the test has to do some tricky stuff to setup conditions
755
        // where the "channel closed" error can occur. It's a subtle issue.
756
        //
757
        // see description at:
758
        // https://github.com/tokio-rs/tokio/discussions/6961
759
        pub(super) fn spawned_tasks_live_as_long_as_jobqueue(is_async: bool) -> anyhow::Result<()> {
1✔
760
            let rt = tokio::runtime::Runtime::new()?;
1✔
761

762
            let result_ok: Arc<Mutex<bool>> = Arc::new(Mutex::new(true));
1✔
763

1✔
764
            let result_ok_clone = result_ok.clone();
1✔
765
            rt.block_on(async {
1✔
766
                // create a job queue
1✔
767
                let job_queue = Arc::new(JobQueue::start());
1✔
768

1✔
769
                // spawns background task that adds job
1✔
770
                let job_queue_cloned = job_queue.clone();
1✔
771
                let jh = tokio::spawn(async move {
1✔
772
                    // sleep 200 ms to let runtime finish.
1✔
773
                    // ie ensure drop(rt) will be reached and wait for us.
1✔
774
                    // note that we use std sleep.  if tokio sleep is used
1✔
775
                    // the test will always succeed due to the await point.
1✔
776
                    std::thread::sleep(std::time::Duration::from_millis(200));
1✔
777

1✔
778
                    let job = Box::new(DoubleJob {
1✔
779
                        data: 10,
1✔
780
                        duration: std::time::Duration::from_secs(1),
1✔
781
                        is_async,
1✔
782
                    });
1✔
783

1✔
784
                    let result = job_queue_cloned.add_job(job, DoubleJobPriority::Low);
1✔
785

1✔
786
                    // an assert on result.is_ok() would panic, but that panic would be
1✔
787
                    // printed and swallowed by tokio runtime, so the test would succeed
1✔
788
                    // despite the panic. instead we pass the result in a mutex so it
1✔
789
                    // can be asserted where it will be caught by the test runner.
1✔
790
                    *result_ok_clone.lock().unwrap() = result.is_ok();
1✔
791
                });
1✔
792

1✔
793
                // sleep 50 ms to let job get started.
1✔
794
                tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1✔
795

796
                // note; awaiting the joinhandle makes the test succeed.
797

798
                jh.abort();
1✔
799
                let _ = jh.await;
1✔
800
            });
1✔
801

1✔
802
            // drop the tokio runtime. It will abort tasks.
1✔
803
            drop(rt);
1✔
804

1✔
805
            assert!(*result_ok.lock().unwrap());
1✔
806

807
            Ok(())
1✔
808
        }
1✔
809

810
        pub mod panics {
811
            use super::*;
812

813
            const PANIC_STR: &str = "job panics unexpectedly";
814

815
            struct PanicJob {
816
                is_async: bool,
817
            }
818

819
            #[async_trait::async_trait]
820
            impl Job for PanicJob {
821
                fn is_async(&self) -> bool {
2✔
822
                    self.is_async
2✔
823
                }
2✔
824

825
                fn run(&self, _cancel_rx: JobCancelReceiver) -> JobCompletion {
1✔
826
                    panic!("{}", PANIC_STR);
1✔
827
                }
828

829
                async fn run_async_cancellable(
830
                    &self,
831
                    _cancel_rx: JobCancelReceiver,
832
                ) -> JobCompletion {
1✔
833
                    panic!("{}", PANIC_STR);
1✔
834
                }
1✔
835
            }
836

837
            /// verifies that a job that panics will be ended properly.
838
            ///
839
            /// Properly means that:
840
            /// 1. an error is returned from job_handle.result() indicating job panicked.
841
            /// 2. caller is able to obtain panic info, which matches job's panic msg.
842
            /// 3. the job-queue continues accepting new jobs.
843
            /// 4. the job-queue continues processing jobs.
844
            ///
845
            /// async_job == true --> test an async job
846
            /// async_job == false --> test a blocking job
847
            pub async fn panic_in_job_ends_job_cleanly(async_job: bool) -> anyhow::Result<()> {
2✔
848
                // create a job queue
2✔
849
                let job_queue = JobQueue::start();
2✔
850

2✔
851
                let job = PanicJob {
2✔
852
                    is_async: async_job,
2✔
853
                };
2✔
854
                let job_handle = job_queue.add_job(Box::new(job), DoubleJobPriority::Low)?;
2✔
855

856
                let job_result = job_handle.result().await;
2✔
857

858
                println!("job_result: {:#?}", job_result);
2✔
859

2✔
860
                // verify that job_queue channels are still open
2✔
861
                assert!(!job_queue.tx_job_added.is_closed());
2✔
862
                assert!(!job_queue.tx_stop.is_closed());
2✔
863

864
                // verify that we get an error with the job's panic msg.
865
                assert!(matches!(
2✔
866
                    job_result.map_err(|e| e.into_sync()),
2✔
867
                    Err(JobHandleErrorSync::JobPanicked(e)) if e == *PANIC_STR
2✔
868
                ));
869

870
                // ensure we can still run another job afterwards.
871
                let newjob = Box::new(DoubleJob {
2✔
872
                    data: 10,
2✔
873
                    duration: std::time::Duration::from_millis(50),
2✔
874
                    is_async: false,
2✔
875
                });
2✔
876

877
                // ensure we can add another job.
878
                let new_job_handle = job_queue.add_job(newjob, DoubleJobPriority::Low)?;
2✔
879

880
                // ensure job processes and returns a result without error.
881
                assert!(new_job_handle.result().await.is_ok());
2✔
882

883
                Ok(())
2✔
884
            }
2✔
885
        }
886
    }
887
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc