• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Neptune-Crypto / neptune-core / 14685169589

26 Apr 2025 09:14PM UTC coverage: 79.675% (-5.3%) from 84.945%
14685169589

Pull #569

github

web-flow
Merge 1e5236f5a into cf1fc8513
Pull Request #569: use common tokio Runtime and TritonVmJobQueue for unit tests

130 of 170 new or added lines in 17 files covered. (76.47%)

49 existing lines in 10 files now uncovered.

37161 of 46641 relevant lines covered (79.67%)

232680.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.57
/src/job_queue/queue.rs
1
use std::collections::VecDeque;
2
use std::fmt;
3
use std::sync::Arc;
4
use std::sync::Mutex;
5

6
use tokio::sync::oneshot;
7
use tokio::sync::watch;
8
use tokio::task::JoinHandle;
9

10
use super::errors::AddJobError;
11
use super::errors::JobHandleError;
12
use super::errors::StopQueueError;
13
use super::traits::Job;
14
use super::traits::JobCancelReceiver;
15
use super::traits::JobCancelSender;
16
use super::traits::JobCompletion;
17
use super::traits::JobResult;
18
use super::traits::JobResultReceiver;
19
use super::traits::JobResultSender;
20

21
/// a randomly generated Job identifier
22
#[derive(Debug, Clone, Copy)]
23
pub struct JobId([u8; 12]);
24

25
impl std::fmt::Display for JobId {
26
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2,494✔
27
        for byte in &self.0 {
32,422✔
28
            write!(f, "{:02x}", byte)?;
29,928✔
29
        }
30
        Ok(())
2,494✔
31
    }
2,494✔
32
}
33

34
impl JobId {
35
    fn random() -> Self {
1,382✔
36
        Self(rand::random())
1,382✔
37
    }
1,382✔
38
}
39

40
/// A job-handle enables cancelling a job and awaiting results
41
#[derive(Debug)]
42
pub struct JobHandle {
43
    job_id: JobId,
44
    result_rx: JobResultReceiver,
45
    cancel_tx: JobCancelSender,
46
}
47
impl JobHandle {
48
    /// wait for job to complete
49
    ///
50
    /// a completed job may either be finished, cancelled, or panicked.
51
    pub async fn complete(self) -> Result<JobCompletion, JobHandleError> {
1,323✔
52
        Ok(self.result_rx.await?)
1,323✔
53
    }
1,323✔
54

55
    /// wait for job result, or err if cancelled or a panic occurred within job.
56
    pub async fn result(self) -> Result<Box<dyn JobResult>, JobHandleError> {
1,321✔
57
        match self.complete().await? {
1,321✔
58
            JobCompletion::Finished(r) => Ok(r),
1,319✔
59
            JobCompletion::Cancelled => Err(JobHandleError::JobCancelled),
×
60
            JobCompletion::Panicked(e) => Err(JobHandleError::JobPanicked(e)),
2✔
61
        }
62
    }
1,321✔
63

64
    /// cancel job and return immediately.
65
    pub fn cancel(&self) -> Result<(), JobHandleError> {
×
66
        Ok(self.cancel_tx.send(())?)
×
67
    }
×
68

69
    /// cancel job and wait for it to complete.
70
    pub async fn cancel_and_await(self) -> Result<JobCompletion, JobHandleError> {
2✔
71
        self.cancel_tx.send(())?;
2✔
72
        self.complete().await
2✔
73
    }
2✔
74

75
    /// channel receiver for job results
76
    pub fn result_rx(self) -> JobResultReceiver {
54✔
77
        self.result_rx
54✔
78
    }
54✔
79

80
    /// channel sender for cancelling job.
81
    pub fn cancel_tx(&self) -> &JobCancelSender {
1,297✔
82
        &self.cancel_tx
1,297✔
83
    }
1,297✔
84

NEW
85
    pub fn job_id(&self) -> JobId {
×
NEW
86
        self.job_id
×
NEW
87
    }
×
88
}
89

90
/// represents a job in the queue.
91
struct QueuedJob<P> {
92
    job: Box<dyn Job>,
93
    job_id: JobId,
94
    result_tx: JobResultSender,
95
    cancel_tx: JobCancelSender,
96
    cancel_rx: JobCancelReceiver,
97
    priority: P,
98
}
99

100
impl<P: fmt::Debug> fmt::Debug for QueuedJob<P> {
NEW
101
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
×
NEW
102
        f.debug_struct("QueuedJob")
×
NEW
103
            .field("job", &"Box<dyn Job>")
×
NEW
104
            .field("job_id", &self.job_id)
×
NEW
105
            .field("result_tx", &"JobResultSender")
×
NEW
106
            .field("cancel_tx", &"JobCancelSender")
×
NEW
107
            .field("cancel_rx", &"JobCancelReceiver")
×
NEW
108
            .field("priority", &self.priority)
×
NEW
109
            .finish()
×
NEW
110
    }
×
111
}
112

113
/// represents the currently executing job
114
#[derive(Debug)]
115
struct CurrentJob {
116
    job_num: usize,
117
    job_id: JobId,
118
    cancel_tx: JobCancelSender,
119
}
120

121
/// represents data shared between tasks/threads
122
#[derive(Debug)]
123
struct Shared<P: Ord> {
124
    jobs: VecDeque<QueuedJob<P>>,
125
    current_job: Option<CurrentJob>,
126
}
127

128
/// implements a job queue that sends result of each job to a listener.
129
#[derive(Debug)]
130
pub struct JobQueue<P: Ord + Send + Sync + 'static> {
131
    shared: Arc<Mutex<Shared<P>>>,
132

133
    tx_job_added: tokio::sync::mpsc::UnboundedSender<()>,
134
    tx_stop: tokio::sync::watch::Sender<()>,
135

136
    process_jobs_task_handle: Option<JoinHandle<()>>, // Store the job processing task handle
137
}
138

139
impl<P: Ord + Send + Sync + 'static> Drop for JobQueue<P> {
140
    fn drop(&mut self) {
13✔
141
        tracing::debug!("in JobQueue::drop()");
13✔
142

143
        if !self.tx_stop.is_closed() {
13✔
144
            if let Err(e) = self.tx_stop.send(()) {
13✔
NEW
145
                tracing::error!("{}", e);
×
146
            }
13✔
NEW
147
        }
×
148
    }
13✔
149
}
150

151
impl<P: Ord + Send + Sync + 'static> JobQueue<P> {
152
    /// creates job queue and starts it processing.  returns immediately.
153
    pub fn start() -> Self {
145✔
154
        let shared = Shared {
145✔
155
            jobs: VecDeque::new(),
145✔
156
            current_job: None,
145✔
157
        };
145✔
158
        let shared: Arc<Mutex<Shared<P>>> = Arc::new(Mutex::new(shared));
145✔
159

145✔
160
        let (tx_job_added, mut rx_job_added) = tokio::sync::mpsc::unbounded_channel();
145✔
161
        let (tx_stop, mut rx_stop) = tokio::sync::watch::channel(());
145✔
162

145✔
163
        // spawns background task that processes job queue and runs jobs.
145✔
164
        let shared2 = shared.clone();
145✔
165
        let process_jobs_task_handle = tokio::spawn(async move {
145✔
166
            let mut job_num: usize = 1;
145✔
167

168
            loop {
169
                tokio::select!(
1,522✔
170
                _ = rx_stop.changed() => {
1,522✔
171
                    tracing::debug!("task process_jobs received Stop message.");
4✔
172

173
                    // if there is a presently executing job we need to cancel it
174
                    // and wait for it to complete.
175
                    let maybe_info = shared2.lock().unwrap().current_job.as_ref().map(|cj| (cj.job_id, cj.cancel_tx.clone()) );
4✔
176
                    if let Some((job_id, cancel_tx)) = maybe_info {
4✔
NEW
177
                        match cancel_tx.send(()) {
×
178
                            Ok(()) => {
179
                                // wait for channel to close, indicating job has cancelled (or otherwise completed)
NEW
180
                                tracing::debug!("JobQueue: notified current job {} to cancel.  waiting...", job_id);
×
NEW
181
                                cancel_tx.closed().await;
×
NEW
182
                                tracing::debug!("JobQueue: current job {} has cancelled.", job_id);
×
183
                            }
NEW
184
                            Err(e) => {
×
NEW
185
                                tracing::warn!("could not send cancellation msg to current job {}. {}", job_id, e)
×
186
                            }
187
                        }
188
                    }
4✔
189

190
                    break;
4✔
191
                }
192
                _ = rx_job_added.recv() => {
1,522✔
193
                    tracing::debug!("task process_jobs received JobAdded message.");
1,384✔
194
                    let (next_job, num_pending) = {
1,384✔
195
                        let mut guard = shared2.lock().unwrap();
1,384✔
196

1,384✔
197
                        // This is where we pick the highest priority job
1,384✔
198
                        guard
1,384✔
199
                            .jobs
1,384✔
200
                            .make_contiguous()
1,384✔
201
                            .sort_by(|a, b| b.priority.cmp(&a.priority));
1,384✔
202
                        let job = guard.jobs.pop_front().unwrap();
1,384✔
203

1,384✔
204
                        guard.current_job = Some(CurrentJob {
1,384✔
205
                            job_num,
1,384✔
206
                            job_id: job.job_id,
1,384✔
207
                            cancel_tx: job.cancel_tx.clone(),
1,384✔
208
                        });
1,384✔
209
                        (job, guard.jobs.len())
1,384✔
210
                    };
1,384✔
211

1,384✔
212
                    tracing::info!(
1,384✔
NEW
213
                        "  *** JobQueue: begin job #{} - {} - {} queued job(s) ***",
×
214
                        job_num,
215
                        next_job.job_id,
216
                        num_pending
217
                    );
218
                    let timer = tokio::time::Instant::now();
1,384✔
219
                    let task_handle = if next_job.job.is_async() {
1,384✔
220
                        tokio::spawn(async move {
1,339✔
221
                            next_job.job.run_async_cancellable(next_job.cancel_rx).await
1,339✔
222
                        })
1,339✔
223
                    } else {
224
                        tokio::task::spawn_blocking(move || next_job.job.run(next_job.cancel_rx))
45✔
225
                    };
226

227
                    let job_completion = match task_handle.await {
1,384✔
228
                        Ok(jc) => jc,
1,375✔
229
                        Err(e) => {
2✔
230
                            if e.is_panic() {
2✔
231
                                JobCompletion::Panicked(e.into_panic())
2✔
NEW
232
                            } else if e.is_cancelled() {
×
NEW
233
                                JobCompletion::Cancelled
×
234
                            } else {
NEW
235
                                unreachable!()
×
236
                            }
237
                        }
238
                    };
239

240
                    tracing::info!(
1,377✔
NEW
241
                        "  *** JobQueue: ended job #{} - {} - Completion: {} - {} secs ***",
×
NEW
242
                        job_num,
×
NEW
243
                        next_job.job_id,
×
NEW
244
                        job_completion,
×
NEW
245
                        timer.elapsed().as_secs_f32()
×
246
                    );
247
                    job_num += 1;
1,377✔
248

1,377✔
249
                    shared2.lock().unwrap().current_job = None;
1,377✔
250

251
                    if let Err(e) = next_job.result_tx.send(job_completion) {
1,377✔
NEW
252
                        tracing::warn!("job-handle dropped? {}", e);
×
253
                    }
1,377✔
254
                });
255
            }
256
            tracing::debug!("task process_jobs exiting");
4✔
257
        });
145✔
258

145✔
259
        tracing::info!("JobQueue: started new queue.");
145✔
260

261
        Self {
145✔
262
            tx_job_added,
145✔
263
            tx_stop,
145✔
264
            shared,
145✔
265
            process_jobs_task_handle: Some(process_jobs_task_handle),
145✔
266
        }
145✔
267
    }
145✔
268

269
    /// stop the job-queue, and drop it.
270
    ///
271
    /// this method sends a message to the spawned job-queue task
272
    /// to stop and then waits for it to complete.
273
    ///
274
    /// Comparison with drop():
275
    ///
276
    /// if JobQueue is dropped:
277
    ///  1. the stop message will be sent, but any error is ignored.
278
    ///  2. the spawned task is not awaited.
NEW
279
    pub async fn stop(mut self) -> Result<(), StopQueueError> {
×
NEW
280
        tracing::info!("JobQueue: stopping.");
×
281

NEW
282
        self.tx_stop.send(())?;
×
283

NEW
284
        if let Some(jh) = self.process_jobs_task_handle.take() {
×
NEW
285
            jh.await?;
×
NEW
286
        }
×
287

NEW
288
        Ok(())
×
NEW
289
    }
×
290

291
    /// adds job to job-queue and returns immediately.
292
    ///
293
    /// job-results can be obtained by via JobHandle::results().await
294
    /// The job can be cancelled by JobHandle::cancel()
295
    pub fn add_job(&self, job: Box<dyn Job>, priority: P) -> Result<JobHandle, AddJobError> {
1,382✔
296
        let (result_tx, result_rx) = oneshot::channel();
1,382✔
297
        let (cancel_tx, cancel_rx) = watch::channel::<()>(());
1,382✔
298

1,382✔
299
        let job_id = JobId::random();
1,382✔
300

1,382✔
301
        let m = QueuedJob {
1,382✔
302
            job,
1,382✔
303
            job_id,
1,382✔
304
            result_tx,
1,382✔
305
            cancel_tx: cancel_tx.clone(),
1,382✔
306
            cancel_rx,
1,382✔
307
            priority,
1,382✔
308
        };
1,382✔
309

310
        let (num_jobs, job_running) = {
1,382✔
311
            let mut guard = self.shared.lock().unwrap();
1,382✔
312
            guard.jobs.push_back(m);
1,382✔
313
            let job_running = match &guard.current_job {
1,382✔
314
                Some(j) => format!("#{} - {}", j.job_num, j.job_id),
42✔
315
                None => "none".to_string(),
1,340✔
316
            };
317
            (guard.jobs.len(), job_running)
1,382✔
318
        };
1,382✔
319
        tracing::info!(
1,382✔
NEW
320
            "JobQueue: job added - {}  {} queued job(s).  job running: {}",
×
321
            job_id,
322
            num_jobs,
323
            job_running
324
        );
325

326
        self.tx_job_added.send(())?;
1,382✔
327

328
        Ok(JobHandle {
1,382✔
329
            job_id,
1,382✔
330
            result_rx,
1,382✔
331
            cancel_tx,
1,382✔
332
        })
1,382✔
333
    }
1,382✔
334
}
335

336
#[cfg(test)]
337
mod tests {
338
    use std::time::Instant;
339

340
    use tracing_test::traced_test;
341

342
    use super::*;
343

344
    #[tokio::test(flavor = "multi_thread")]
345
    #[traced_test]
×
346
    async fn run_sync_jobs_by_priority() -> anyhow::Result<()> {
1✔
347
        workers::run_jobs_by_priority(false).await
1✔
348
    }
1✔
349

350
    #[tokio::test(flavor = "multi_thread")]
351
    #[traced_test]
×
352
    async fn run_async_jobs_by_priority() -> anyhow::Result<()> {
1✔
353
        workers::run_jobs_by_priority(true).await
1✔
354
    }
1✔
355

356
    #[tokio::test(flavor = "multi_thread")]
357
    #[traced_test]
×
358
    async fn get_sync_job_result() -> anyhow::Result<()> {
1✔
359
        workers::get_job_result(false).await
1✔
360
    }
1✔
361

362
    #[tokio::test(flavor = "multi_thread")]
363
    #[traced_test]
×
364
    async fn get_async_job_result() -> anyhow::Result<()> {
1✔
365
        workers::get_job_result(true).await
1✔
366
    }
1✔
367

368
    #[tokio::test(flavor = "multi_thread")]
369
    #[traced_test]
×
370
    async fn cancel_sync_job() -> anyhow::Result<()> {
1✔
371
        workers::cancel_job(false).await
1✔
372
    }
1✔
373

374
    #[tokio::test(flavor = "multi_thread")]
375
    #[traced_test]
×
376
    async fn cancel_async_job() -> anyhow::Result<()> {
1✔
377
        workers::cancel_job(true).await
1✔
378
    }
1✔
379

380
    #[test]
381
    #[traced_test]
×
382
    fn runtime_shutdown_timeout_force_cancels_sync_job() -> anyhow::Result<()> {
1✔
383
        workers::runtime_shutdown_timeout_force_cancels_job(false)
1✔
384
    }
1✔
385

386
    #[test]
387
    #[traced_test]
×
388
    fn runtime_shutdown_timeout_force_cancels_async_job() -> anyhow::Result<()> {
1✔
389
        workers::runtime_shutdown_timeout_force_cancels_job(true)
1✔
390
    }
1✔
391

392
    #[test]
393
    #[traced_test]
×
394
    fn runtime_shutdown_cancels_sync_job() {
1✔
395
        let _ = workers::runtime_shutdown_cancels_job(false);
1✔
396
    }
1✔
397

398
    #[test]
399
    #[traced_test]
×
400
    fn runtime_shutdown_cancels_async_job() -> anyhow::Result<()> {
1✔
401
        workers::runtime_shutdown_cancels_job(true)
1✔
402
    }
1✔
403

404
    #[test]
405
    #[traced_test]
×
406
    fn spawned_tasks_live_as_long_as_jobqueue() -> anyhow::Result<()> {
1✔
407
        workers::spawned_tasks_live_as_long_as_jobqueue(true)
1✔
408
    }
1✔
409

410
    #[tokio::test(flavor = "multi_thread")]
411
    #[traced_test]
×
412
    async fn panic_in_async_job_ends_job_cleanly() -> anyhow::Result<()> {
1✔
413
        workers::panics::panic_in_job_ends_job_cleanly(true).await
1✔
414
    }
1✔
415

416
    #[tokio::test(flavor = "multi_thread")]
417
    #[traced_test]
×
418
    async fn panic_in_blocking_job_ends_job_cleanly() -> anyhow::Result<()> {
1✔
419
        workers::panics::panic_in_job_ends_job_cleanly(false).await
1✔
420
    }
1✔
421

422
    mod workers {
423
        use std::any::Any;
424

425
        use super::*;
426
        use crate::job_queue::errors::JobHandleErrorSync;
427

428
        #[derive(PartialEq, Eq, PartialOrd, Ord)]
429
        pub enum DoubleJobPriority {
430
            Low = 1,
431
            Medium = 2,
432
            High = 3,
433
        }
434

435
        #[derive(PartialEq, Debug, Clone)]
436
        struct DoubleJobResult(u64, u64, Instant);
437
        impl JobResult for DoubleJobResult {
438
            fn as_any(&self) -> &dyn Any {
472✔
439
                self
472✔
440
            }
472✔
441
            fn into_any(self: Box<Self>) -> Box<dyn Any> {
74✔
442
                self
74✔
443
            }
74✔
444
        }
445

446
        // represents a prover job.  implements Job.
447
        struct DoubleJob {
448
            data: u64,
449
            duration: std::time::Duration,
450
            is_async: bool,
451
        }
452

453
        #[async_trait::async_trait]
454
        impl Job for DoubleJob {
455
            fn is_async(&self) -> bool {
83✔
456
                self.is_async
83✔
457
            }
83✔
458

459
            fn run(&self, cancel_rx: JobCancelReceiver) -> JobCompletion {
42✔
460
                let start = Instant::now();
42✔
461
                let sleep_time =
42✔
462
                    std::cmp::min(std::time::Duration::from_micros(100), self.duration);
42✔
463

464
                let r = loop {
42✔
465
                    if start.elapsed() < self.duration {
5,859✔
466
                        match cancel_rx.has_changed() {
5,820✔
467
                            Ok(changed) if changed => break JobCompletion::Cancelled,
1✔
468
                            Err(_) => break JobCompletion::Cancelled,
2✔
469
                            _ => {}
5,817✔
470
                        }
5,817✔
471

5,817✔
472
                        std::thread::sleep(sleep_time);
5,817✔
473
                    } else {
474
                        break JobCompletion::Finished(Box::new(DoubleJobResult(
39✔
475
                            self.data,
39✔
476
                            self.data * 2,
39✔
477
                            Instant::now(),
39✔
478
                        )));
39✔
479
                    }
480
                };
481

482
                tracing::info!("results: {:?}", r);
42✔
483
                r
42✔
484
            }
42✔
485

486
            async fn run_async(&self) -> Box<dyn JobResult> {
41✔
487
                tokio::time::sleep(self.duration).await;
41✔
488
                let r = DoubleJobResult(self.data, self.data * 2, Instant::now());
37✔
489

37✔
490
                tracing::info!("results: {:?}", r);
37✔
491
                Box::new(r)
37✔
492
            }
78✔
493
        }
494

495
        // this test demonstrates/verifies that:
496
        //  1. jobs are run in priority order, highest priority first.
497
        //  2. when multiple jobs have the same priority, they run in FIFO order.
498
        pub(super) async fn run_jobs_by_priority(is_async: bool) -> anyhow::Result<()> {
2✔
499
            let start_of_test = Instant::now();
2✔
500

2✔
501
            // create a job queue
2✔
502
            let job_queue = JobQueue::start();
2✔
503

2✔
504
            let mut handles = vec![];
2✔
505
            let duration = std::time::Duration::from_millis(20);
2✔
506

507
            // create 30 jobs, 10 at each priority level.
508
            for i in (1..10).rev() {
18✔
509
                let job1 = Box::new(DoubleJob {
18✔
510
                    data: i,
18✔
511
                    duration,
18✔
512
                    is_async,
18✔
513
                });
18✔
514
                let job2 = Box::new(DoubleJob {
18✔
515
                    data: i * 100,
18✔
516
                    duration,
18✔
517
                    is_async,
18✔
518
                });
18✔
519
                let job3 = Box::new(DoubleJob {
18✔
520
                    data: i * 1000,
18✔
521
                    duration,
18✔
522
                    is_async,
18✔
523
                });
18✔
524

18✔
525
                // process job and print results.
18✔
526
                handles.push(job_queue.add_job(job1, DoubleJobPriority::Low)?.result_rx());
18✔
527
                handles.push(
18✔
528
                    job_queue
18✔
529
                        .add_job(job2, DoubleJobPriority::Medium)?
18✔
530
                        .result_rx(),
18✔
531
                );
18✔
532
                handles.push(
18✔
533
                    job_queue
18✔
534
                        .add_job(job3, DoubleJobPriority::High)?
18✔
535
                        .result_rx(),
18✔
536
                );
537
            }
538

539
            // wait for all jobs to complete.
540
            let mut results = futures::future::join_all(handles).await;
2✔
541

542
            // the results are in the same order as handles passed to join_all.
543
            // we sort them by the timestamp in job result, ascending.
544
            results.sort_by(
2✔
545
                |a_completion, b_completion| match (a_completion, b_completion) {
236✔
546
                    (Ok(JobCompletion::Finished(a_dyn)), Ok(JobCompletion::Finished(b_dyn))) => {
236✔
547
                        let a = a_dyn.as_any().downcast_ref::<DoubleJobResult>().unwrap().2;
236✔
548

236✔
549
                        let b = b_dyn.as_any().downcast_ref::<DoubleJobResult>().unwrap().2;
236✔
550

236✔
551
                        a.cmp(&b)
236✔
552
                    }
553
                    _ => panic!("at least one job did not finish"),
×
554
                },
236✔
555
            );
2✔
556

2✔
557
            // iterate job results and verify that:
2✔
558
            //   timestamp of each is greater than prev.
2✔
559
            //   input value of each is greater than prev, except every 9th item which should be < prev
2✔
560
            //     because there are nine jobs per level.
2✔
561
            let mut prev = Box::new(DoubleJobResult(9999, 0, start_of_test));
2✔
562
            for (i, c) in results.into_iter().enumerate() {
54✔
563
                let Ok(JobCompletion::Finished(dyn_result)) = c else {
54✔
564
                    panic!("A job did not finish");
×
565
                };
566

567
                let job_result = dyn_result.into_any().downcast::<DoubleJobResult>().unwrap();
54✔
568

54✔
569
                assert!(job_result.2 > prev.2);
54✔
570

571
                // we don't do the assertion for the 2nd job because the job-queue starts
572
                // processing immediately and so a race condition is setup where it is possible
573
                // for either the Low priority or High job to start processing first.
574
                if i != 1 {
54✔
575
                    assert!(job_result.0 < prev.0);
52✔
576
                }
2✔
577

578
                prev = job_result;
54✔
579
            }
580

581
            Ok(())
2✔
582
        }
2✔
583

584
        // this test demonstrates/verifies that a job can return a result back to
585
        // the job initiator.
586
        pub(super) async fn get_job_result(is_async: bool) -> anyhow::Result<()> {
2✔
587
            // create a job queue
2✔
588
            let job_queue = JobQueue::start();
2✔
589
            let duration = std::time::Duration::from_millis(20);
2✔
590

591
            // create 10 jobs
592
            for i in 0..10 {
22✔
593
                let job = Box::new(DoubleJob {
20✔
594
                    data: i,
20✔
595
                    duration,
20✔
596
                    is_async,
20✔
597
                });
20✔
598

599
                let result = job_queue
20✔
600
                    .add_job(job, DoubleJobPriority::Low)?
20✔
601
                    .result()
20✔
602
                    .await
20✔
603
                    .map_err(|e| e.into_sync())?;
20✔
604

605
                let job_result = result.into_any().downcast::<DoubleJobResult>().unwrap();
20✔
606

20✔
607
                assert_eq!(i, job_result.0);
20✔
608
                assert_eq!(i * 2, job_result.1);
20✔
609
            }
610

611
            Ok(())
2✔
612
        }
2✔
613

614
        // tests/demonstrates that a long running job can be cancelled early.
615
        pub(super) async fn cancel_job(is_async: bool) -> anyhow::Result<()> {
2✔
616
            // create a job queue
2✔
617
            let job_queue = JobQueue::start();
2✔
618
            // start a 1 hour job.
2✔
619
            let duration = std::time::Duration::from_secs(3600); // 1 hour job.
2✔
620

2✔
621
            let job = Box::new(DoubleJob {
2✔
622
                data: 10,
2✔
623
                duration,
2✔
624
                is_async,
2✔
625
            });
2✔
626
            let job_handle = job_queue.add_job(job, DoubleJobPriority::Low)?;
2✔
627

628
            tokio::time::sleep(std::time::Duration::from_millis(20)).await;
2✔
629

630
            let completion = job_handle.cancel_and_await().await.unwrap();
2✔
631
            assert!(matches!(completion, JobCompletion::Cancelled));
2✔
632

633
            Ok(())
2✔
634
        }
2✔
635

636
        // note: creates own tokio runtime.  caller must not use [tokio::test]
637
        //
638
        // this test starts a job that runs for 1 hour and then attempts to
639
        // shutdown tokio runtime via shutdown_timeout() with a 1 sec timeout.
640
        //
641
        // any async tasks should be aborted quickly.
642
        // any sync tasks will continue to run to completion.
643
        //
644
        // shutdown_timeout() will wait for tasks to abort for 1 sec and then
645
        // returns.  Any un-aborted tasks/threads become ignored/detached.
646
        // The OS can cleanup such threads when the process exits.
647
        //
648
        // the test checks that the shutdown completes in under 2 secs.
649
        //
650
        // the test demonstrates that shutdown_timeout() can be used to shutdown
651
        // tokio runtime even if sync (spawn_blocking) tasks/threads are still running
652
        // in the blocking threadpool.
653
        //
654
        // when called with is_async=true, it demonstrates that shutdown_timeout() also
655
        // aborts async jobs, as one would expect.
656
        pub(super) fn runtime_shutdown_timeout_force_cancels_job(
2✔
657
            is_async: bool,
2✔
658
        ) -> anyhow::Result<()> {
2✔
659
            let rt = tokio::runtime::Runtime::new()?;
2✔
660
            let result = rt.block_on(async {
2✔
661
                // create a job queue
2✔
662
                let job_queue = JobQueue::start();
2✔
663
                // start a 1 hour job.
2✔
664
                let duration = std::time::Duration::from_secs(3600); // 1 hour job.
2✔
665

2✔
666
                let job = Box::new(DoubleJob {
2✔
667
                    data: 10,
2✔
668
                    duration,
2✔
669
                    is_async,
2✔
670
                });
2✔
671
                let _rx = job_queue.add_job(job, DoubleJobPriority::Low)?;
2✔
672

673
                tokio::time::sleep(std::time::Duration::from_millis(20)).await;
2✔
674
                println!("finished scope");
2✔
675

2✔
676
                Ok(())
2✔
677
            });
2✔
678

2✔
679
            let start = std::time::Instant::now();
2✔
680

2✔
681
            println!("waiting 1 second for job before shutdown runtime");
2✔
682
            rt.shutdown_timeout(tokio::time::Duration::from_secs(1));
2✔
683

2✔
684
            assert!(start.elapsed() < std::time::Duration::from_secs(2));
2✔
685

686
            result
2✔
687
        }
2✔
688

689
        // note: creates own tokio runtime.  caller must not use [tokio::test]
690
        //
691
        // this test starts a job that runs for 5 secs and then attempts to
692
        // shutdown tokio runtime normally by dropping it.
693
        //
694
        // any async tasks should be aborted quickly.
695
        // any sync tasks will continue to run to completion.
696
        //
697
        // the tokio runtime does not complete the drop() until all tasks
698
        // have completed/aborted.
699
        //
700
        // the test checks that the job finishes in less than the 5 secs
701
        // required for full completion.  In other words, that it aborts.
702
        //
703
        // the test is expected to succeed for async jobs but fail for sync jobs.
704
        pub(super) fn runtime_shutdown_cancels_job(is_async: bool) -> anyhow::Result<()> {
2✔
705
            let rt = tokio::runtime::Runtime::new()?;
2✔
706
            let start = tokio::time::Instant::now();
2✔
707

2✔
708
            let result = rt.block_on(async {
2✔
709
                // create a job queue
2✔
710
                let job_queue = JobQueue::start();
2✔
711

2✔
712
                // this job takes at least 5 secs to complete.
2✔
713
                let duration = std::time::Duration::from_secs(5);
2✔
714

2✔
715
                let job = Box::new(DoubleJob {
2✔
716
                    data: 10,
2✔
717
                    duration,
2✔
718
                    is_async,
2✔
719
                });
2✔
720

721
                let rx_handle = job_queue.add_job(job, DoubleJobPriority::Low)?;
2✔
722
                drop(rx_handle);
2✔
723

2✔
724
                // sleep 50 ms to let job get started.
2✔
725
                tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2✔
726

727
                Ok(())
2✔
728
            });
2✔
729

2✔
730
            // drop the tokio runtime. It will attempt to abort tasks.
2✔
731
            //   - async tasks can normally be aborted
2✔
732
            //   - spawn_blocking (sync) tasks cannot normally be aborted.
2✔
733
            drop(rt);
2✔
734

2✔
735
            // if test is successful, elapsed time should be less than the 5 secs
2✔
736
            // it takes for the job to complete.  (should be around 0.5 ms)
2✔
737

2✔
738
            // however it is expected/normal that sync tasks will not be aborted
2✔
739
            // and will run for full 5 secs.  thus this assert will fail for them.
2✔
740

2✔
741
            assert!(start.elapsed() < std::time::Duration::from_secs(5));
2✔
742

743
            result
2✔
744
        }
2✔
745

746
        // this test attempts to verify that the task spawned by the JobQueue
747
        // continues running until the JobQueue is dropped after the tokio
748
        // runtime is dropped.
749
        //
750
        // If the tasks are cencelled before JobQueue is dropped then a subsequent
751
        // api call that sends a msg will result in a "channel closed" error, which
752
        // is what the test checks for.
753
        //
754
        // note that the test has to do some tricky stuff to setup conditions
755
        // where the "channel closed" error can occur. It's a subtle issue.
756
        //
757
        // see description at:
758
        // https://github.com/tokio-rs/tokio/discussions/6961
759
        pub(super) fn spawned_tasks_live_as_long_as_jobqueue(is_async: bool) -> anyhow::Result<()> {
1✔
760
            let rt = tokio::runtime::Runtime::new()?;
1✔
761

762
            let result_ok: Arc<Mutex<bool>> = Arc::new(Mutex::new(true));
1✔
763

1✔
764
            let result_ok_clone = result_ok.clone();
1✔
765
            rt.block_on(async {
1✔
766
                // create a job queue
1✔
767
                let job_queue = Arc::new(JobQueue::start());
1✔
768

1✔
769
                // spawns background task that adds job
1✔
770
                let job_queue_cloned = job_queue.clone();
1✔
771
                let jh = tokio::spawn(async move {
1✔
772
                    // sleep 200 ms to let runtime finish.
1✔
773
                    // ie ensure drop(rt) will be reached and wait for us.
1✔
774
                    // note that we use std sleep.  if tokio sleep is used
1✔
775
                    // the test will always succeed due to the await point.
1✔
776
                    std::thread::sleep(std::time::Duration::from_millis(200));
1✔
777

1✔
778
                    let job = Box::new(DoubleJob {
1✔
779
                        data: 10,
1✔
780
                        duration: std::time::Duration::from_secs(1),
1✔
781
                        is_async,
1✔
782
                    });
1✔
783

1✔
784
                    let result = job_queue_cloned.add_job(job, DoubleJobPriority::Low);
1✔
785

1✔
786
                    // an assert on result.is_ok() would panic, but that panic would be
1✔
787
                    // printed and swallowed by tokio runtime, so the test would succeed
1✔
788
                    // despite the panic. instead we pass the result in a mutex so it
1✔
789
                    // can be asserted where it will be caught by the test runner.
1✔
790
                    *result_ok_clone.lock().unwrap() = result.is_ok();
1✔
791
                });
1✔
792

1✔
793
                // sleep 50 ms to let job get started.
1✔
794
                tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1✔
795

796
                // note; awaiting the joinhandle makes the test succeed.
797

798
                jh.abort();
1✔
799
                let _ = jh.await;
1✔
800
            });
1✔
801

1✔
802
            // drop the tokio runtime. It will abort tasks.
1✔
803
            drop(rt);
1✔
804

1✔
805
            assert!(*result_ok.lock().unwrap());
1✔
806

807
            Ok(())
1✔
808
        }
1✔
809

810
        pub mod panics {
811
            use super::*;
812

813
            const PANIC_STR: &str = "job panics unexpectedly";
814

815
            struct PanicJob {
816
                is_async: bool,
817
            }
818

819
            #[async_trait::async_trait]
820
            impl Job for PanicJob {
821
                fn is_async(&self) -> bool {
2✔
822
                    self.is_async
2✔
823
                }
2✔
824

825
                fn run(&self, _cancel_rx: JobCancelReceiver) -> JobCompletion {
1✔
826
                    panic!("{}", PANIC_STR);
1✔
827
                }
828

829
                async fn run_async_cancellable(
830
                    &self,
831
                    _cancel_rx: JobCancelReceiver,
832
                ) -> JobCompletion {
1✔
833
                    panic!("{}", PANIC_STR);
1✔
834
                }
1✔
835
            }
836

837
            /// verifies that a job that panics will be ended properly.
838
            ///
839
            /// Properly means that:
840
            /// 1. an error is returned from job_handle.result() indicating job panicked.
841
            /// 2. caller is able to obtain panic info, which matches job's panic msg.
842
            /// 3. the job-queue continues accepting new jobs.
843
            /// 4. the job-queue continues processing jobs.
844
            ///
845
            /// async_job == true --> test an async job
846
            /// async_job == false --> test a blocking job
847
            pub async fn panic_in_job_ends_job_cleanly(async_job: bool) -> anyhow::Result<()> {
2✔
848
                // create a job queue
2✔
849
                let job_queue = JobQueue::start();
2✔
850

2✔
851
                let job = PanicJob {
2✔
852
                    is_async: async_job,
2✔
853
                };
2✔
854
                let job_handle = job_queue.add_job(Box::new(job), DoubleJobPriority::Low)?;
2✔
855

856
                let job_result = job_handle.result().await;
2✔
857

858
                println!("job_result: {:#?}", job_result);
2✔
859

2✔
860
                // verify that job_queue channels are still open
2✔
861
                assert!(!job_queue.tx_job_added.is_closed());
2✔
862
                assert!(!job_queue.tx_stop.is_closed());
2✔
863

864
                // verify that we get an error with the job's panic msg.
865
                assert!(matches!(
2✔
866
                    job_result.map_err(|e| e.into_sync()),
2✔
867
                    Err(JobHandleErrorSync::JobPanicked(e)) if e == *PANIC_STR
2✔
868
                ));
869

870
                // ensure we can still run another job afterwards.
871
                let newjob = Box::new(DoubleJob {
2✔
872
                    data: 10,
2✔
873
                    duration: std::time::Duration::from_millis(50),
2✔
874
                    is_async: false,
2✔
875
                });
2✔
876

877
                // ensure we can add another job.
878
                let new_job_handle = job_queue.add_job(newjob, DoubleJobPriority::Low)?;
2✔
879

880
                // ensure job processes and returns a result without error.
881
                assert!(new_job_handle.result().await.is_ok());
2✔
882

883
                Ok(())
2✔
884
            }
2✔
885
        }
886
    }
887
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc