• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Neptune-Crypto / neptune-core / 14919393916

09 May 2025 01:23AM UTC coverage: 71.653% (-3.3%) from 74.963%
14919393916

push

github

dan-da
style: address review comments

Addresses review comments for #583:

+ use #[cfg(not(test))] instead of bool param to disable initial sleep
  in mining loop for unit tests
+ improve comment in mining loop when checking composer_task error
+ rename RpcError::Error to RpcError::WalletError
+ remove redundant copy_dir_recursive() in src/tests/shared.rs

0 of 4 new or added lines in 2 files covered. (0.0%)

172 existing lines in 18 files now uncovered.

19870 of 27731 relevant lines covered (71.65%)

367043.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.48
/src/job_queue/queue.rs
1
use std::collections::VecDeque;
2
use std::fmt;
3
use std::sync::Arc;
4
use std::sync::Mutex;
5

6
use tokio::sync::mpsc;
7
use tokio::sync::oneshot;
8
use tokio::sync::watch;
9
use tokio::task::JoinHandle;
10

11
use super::channels::JobCancelReceiver;
12
use super::channels::JobCancelSender;
13
use super::channels::JobResultSender;
14
use super::errors::AddJobError;
15
use super::errors::StopQueueError;
16
use super::job_completion::JobCompletion;
17
use super::job_handle::JobHandle;
18
use super::job_id::JobId;
19
use super::traits::Job;
20

21
/// implements a job queue that sends result of each job to a listener.
22
#[derive(Debug)]
23
pub struct JobQueue<P: Ord + Send + Sync + 'static> {
24
    /// holds job-queue which is shared between tokio tasks
25
    shared_queue: Arc<Mutex<SharedQueue<P>>>,
26

27
    /// channel to inform process_jobs task that a job has been added
28
    tx_job_added: mpsc::UnboundedSender<()>,
29

30
    /// channel to inform process_jobs task to stop processing.
31
    tx_stop: tokio::sync::watch::Sender<()>,
32

33
    /// JoinHandle of process_jobs task
34
    process_jobs_task_handle: Option<JoinHandle<()>>,
35
}
36

37
// we implement Drop so we can send stop message to process_jobs task
38
impl<P: Ord + Send + Sync + 'static> Drop for JobQueue<P> {
39
    fn drop(&mut self) {
16✔
40
        tracing::debug!("in JobQueue::drop()");
16✔
41

42
        if !self.tx_stop.is_closed() {
16✔
43
            if let Err(e) = self.tx_stop.send(()) {
15✔
UNCOV
44
                tracing::error!("{}", e);
×
45
            }
15✔
46
        }
1✔
47
    }
16✔
48
}
49

50
impl<P: Ord + Send + Sync + 'static> JobQueue<P> {
51
    /// creates job queue and starts it processing.
52
    ///
53
    /// returns immediately.
54
    pub fn start() -> Self {
150✔
55
        // create a SharedQueue that is shared between tokio tasks.
56
        let shared_queue = SharedQueue {
150✔
57
            jobs: VecDeque::new(),
150✔
58
            current_job: None,
150✔
59
        };
150✔
60
        let shared_queue: Arc<Mutex<SharedQueue<P>>> = Arc::new(Mutex::new(shared_queue));
150✔
61

62
        // create 'job_added' and 'stop' channels for signalling to process_jobs task
63
        let (tx_job_added, rx_job_added) = mpsc::unbounded_channel();
150✔
64
        let (tx_stop, rx_stop) = watch::channel(());
150✔
65

66
        // spawn the process_jobs task
67
        let process_jobs_task_handle =
150✔
68
            tokio::spawn(process_jobs(shared_queue.clone(), rx_stop, rx_job_added));
150✔
69

70
        tracing::info!("JobQueue: started new queue.");
150✔
71

72
        // construct and return JobQueue
73
        Self {
150✔
74
            tx_job_added,
150✔
75
            tx_stop,
150✔
76
            shared_queue,
150✔
77
            process_jobs_task_handle: Some(process_jobs_task_handle),
150✔
78
        }
150✔
79
    }
150✔
80

81
    /// stop the job-queue, and drop it.
82
    ///
83
    /// this method sends a message to the spawned job-queue task
84
    /// to stop and then waits for it to complete.
85
    ///
86
    /// Comparison with drop():
87
    ///
88
    /// if JobQueue is dropped:
89
    ///  1. the stop message will be sent, but any error is ignored.
90
    ///  2. the spawned task is not awaited.
91
    pub async fn stop(mut self) -> Result<(), StopQueueError> {
1✔
92
        tracing::info!("JobQueue: stopping.");
1✔
93

94
        // send stop message to process_jobs task
95
        self.tx_stop.send(())?;
1✔
96

97
        // wait for process_jobs task to finish
98
        if let Some(jh) = self.process_jobs_task_handle.take() {
1✔
99
            jh.await?;
1✔
UNCOV
100
        }
×
101

102
        Ok(())
1✔
103
    }
1✔
104

105
    /// adds job to job-queue and returns immediately.
106
    ///
107
    /// job-results can be obtained by via JobHandle::results().await
108
    /// The job can be cancelled by JobHandle::cancel()
109
    pub fn add_job(&self, job: Box<dyn Job>, priority: P) -> Result<JobHandle, AddJobError> {
1,389✔
110
        let (result_tx, result_rx) = oneshot::channel();
1,389✔
111
        let (cancel_tx, cancel_rx) = watch::channel::<()>(());
1,389✔
112

113
        // each job gets a random JobId
114
        let job_id = JobId::random();
1,389✔
115

116
        // represent a job in the queue
117
        let m = QueuedJob {
1,389✔
118
            job,
1,389✔
119
            job_id,
1,389✔
120
            result_tx,
1,389✔
121
            cancel_tx: cancel_tx.clone(),
1,389✔
122
            cancel_rx,
1,389✔
123
            priority,
1,389✔
124
        };
1,389✔
125

126
        // add job to queue and obtain number of jobs in queue and current-job (if any)
127
        let (num_jobs, job_running) = {
1,389✔
128
            // acquire mutex lock
129
            let mut guard = self.shared_queue.lock().unwrap();
1,389✔
130

131
            // add job to job-queue
132
            guard.jobs.push_back(m);
1,389✔
133

134
            let job_running = match &guard.current_job {
1,389✔
135
                Some(j) => format!("#{} - {}", j.job_num, j.job_id),
24✔
136
                None => "none".to_string(),
1,365✔
137
            };
138
            (guard.jobs.len(), job_running)
1,389✔
139
        }; // mutex lock released on drop
140

141
        // notify process_jobs task that a job was added.
142
        self.tx_job_added.send(())?;
1,389✔
143

144
        // log that job is added to the queue
145
        tracing::info!(
1,389✔
UNCOV
146
            "JobQueue: job added - {}  {} queued job(s).  job running: {}",
×
147
            job_id,
148
            num_jobs,
149
            job_running
150
        );
151

152
        // create and return JobHandle
153
        Ok(JobHandle::new(job_id, result_rx, cancel_tx))
1,389✔
154
    }
1,389✔
155

156
    /// returns total number of jobs, queued plus running.
157
    pub fn num_jobs(&self) -> usize {
4✔
158
        let guard = self.shared_queue.lock().unwrap();
4✔
159
        guard.jobs.len() + guard.current_job.as_ref().map(|_| 1).unwrap_or(0)
4✔
160
    }
4✔
161

162
    /// returns number of queued jobs
163
    pub fn num_queued_jobs(&self) -> usize {
4✔
164
        self.shared_queue.lock().unwrap().jobs.len()
4✔
165
    }
4✔
166
}
167

168
/// implements the process_jobs task, spawned by JobQueue::start().
169
///
170
/// this fn calls tokio::select!{} in a loop.  The select has two branches:
171
/// 1. receive 'job_added' message over mpsc channel (unbounded)
172
/// 2. receive 'stop' message over watch channel
173
///
174
/// job_added:
175
///
176
/// When a 'job_added' msg is received, the highest priority queued job is picked
177
/// to run next.  We await the job, and then send results to the JobHandle.
178
///
179
/// Note that jobs can take a long time to run and thus msgs can pile up in the
180
/// job_added channel, which is unbounded. These messages are of type "()" so
181
/// are as small as possible.
182
///
183
/// stop:
184
///
185
/// When a 'stop' msg is received we send a cancel msg to the current job (if any) and
186
/// wait for it to complete. Then we exit the loop and return.
187
async fn process_jobs<P: Ord + Send + Sync + 'static>(
150✔
188
    shared_queue: Arc<Mutex<SharedQueue<P>>>,
150✔
189
    mut rx_stop: watch::Receiver<()>,
150✔
190
    mut rx_job_added: mpsc::UnboundedReceiver<()>,
150✔
191
) {
150✔
192
    // job number starts at 1 and increments with each job that is processed.
193
    // note that processing order may be different than order in which jobs
194
    // are added due to job priorities.
195
    let mut job_num: usize = 1;
150✔
196

197
    // loop until 'stop' msg is received or job_added channel is closed.
198
    //
199
    // note:  this unbounded channel will grow in size as new job(s) are
200
    // added while an existing job is running.  ie, we read from the
201
    // channel after each job completes.
202
    while rx_job_added.recv().await.is_some() {
1,536✔
203
        // Find the next job to run, and the number of jobs left in queue
204
        tracing::debug!("task process_jobs received JobAdded message.");
1,388✔
205
        let (next_job, num_pending) = {
1,388✔
206
            // acquire mutex lock
207
            let mut guard = shared_queue.lock().unwrap();
1,388✔
208

209
            // pick the highest priority job
210
            guard
1,388✔
211
                .jobs
1,388✔
212
                .make_contiguous()
1,388✔
213
                .sort_by(|a, b| b.priority.cmp(&a.priority));
1,388✔
214
            let job = guard.jobs.pop_front().unwrap();
1,388✔
215

216
            // set highest priority job as the current job
217
            guard.current_job = Some(CurrentJob {
1,388✔
218
                job_num,
1,388✔
219
                job_id: job.job_id,
1,388✔
220
                cancel_tx: job.cancel_tx.clone(),
1,388✔
221
            });
1,388✔
222

223
            (job, guard.jobs.len())
1,388✔
224
        }; // mutex lock is released when guard drops.
225

226
        // log that we are starting a job
227
        tracing::info!(
1,388✔
UNCOV
228
            "  *** JobQueue: begin job #{} - {} - {} queued job(s) ***",
×
229
            job_num,
230
            next_job.job_id,
231
            num_pending
232
        );
233

234
        // record time that job starts
235
        let timer = tokio::time::Instant::now();
1,388✔
236

237
        // spawn task that performs the job, either async or blocking.
238
        let job_task_handle = if next_job.job.is_async() {
1,388✔
239
            tokio::spawn(
1,344✔
240
                async move { next_job.job.run_async_cancellable(next_job.cancel_rx).await },
1,344✔
241
            )
242
        } else {
243
            tokio::task::spawn_blocking(move || next_job.job.run(next_job.cancel_rx))
44✔
244
        };
245

246
        // execute job task and simultaneously listen for a 'stop' message.
247
        let job_task_result = tokio::select! {
1,388✔
248
            // execute the job task
249
            job_task_result = job_task_handle => job_task_result,
1,388✔
250

251
            // handle msg over 'stop' channel which indicates we must exit the loop.
252
            _ = rx_stop.changed() => {
1,388✔
253

254
                handle_stop_signal(&shared_queue).await;
2✔
255

256
                // exit loop, processing ends.
257
                break;
1✔
258
            },
259
        };
260

261
        // create JobCompletion from task results
262
        let job_completion = match job_task_result {
1,386✔
263
            Ok(jc) => jc,
1,384✔
264
            Err(e) => {
2✔
265
                if e.is_panic() {
2✔
266
                    JobCompletion::Panicked(e.into_panic())
2✔
UNCOV
267
                } else if e.is_cancelled() {
×
UNCOV
268
                    JobCompletion::Cancelled
×
269
                } else {
UNCOV
270
                    unreachable!()
×
271
                }
272
            }
273
        };
274

275
        // log that job has ended.
276
        tracing::info!(
1,386✔
UNCOV
277
            "  *** JobQueue: ended job #{} - {} - Completion: {} - {} secs ***",
×
278
            job_num,
279
            next_job.job_id,
280
            job_completion,
UNCOV
281
            timer.elapsed().as_secs_f32()
×
282
        );
283
        job_num += 1;
1,386✔
284

285
        // obtain mutex lock and set current-job to None
286
        shared_queue.lock().unwrap().current_job = None;
1,386✔
287

288
        // send job results to the JobHandle receiver
289
        if let Err(e) = next_job.result_tx.send(job_completion) {
1,386✔
290
            tracing::warn!("job-handle dropped? {}", e);
4✔
291
        }
1,382✔
292
    }
293
    tracing::debug!("task process_jobs exiting");
15✔
294
}
15✔
295

296
/// handles the 'stop' branch of tokio::select!{} in process_job() task
297
async fn handle_stop_signal<P: Ord + Send + Sync + 'static>(
2✔
298
    shared_queue: &Arc<Mutex<SharedQueue<P>>>,
2✔
299
) {
2✔
300
    tracing::debug!("task process_jobs received Stop message.");
2✔
301

302
    // acquire mutex lock and obtain current_job info, if any.
303
    let maybe_info = shared_queue
2✔
304
        .lock()
2✔
305
        .unwrap()
2✔
306
        .current_job
2✔
307
        .as_ref()
2✔
308
        .map(|cj| (cj.job_id, cj.cancel_tx.clone()));
2✔
309

310
    // if there is a presently executing job we need to cancel it
311
    // and wait for it to complete.
312
    if let Some((job_id, cancel_tx)) = maybe_info {
2✔
313
        match cancel_tx.send(()) {
2✔
314
            Ok(()) => {
315
                // wait for channel to close, indicating job has cancelled (or otherwise completed)
316
                tracing::debug!(
2✔
UNCOV
317
                    "JobQueue: notified current job {} to cancel.  waiting...",
×
318
                    job_id
319
                );
320
                cancel_tx.closed().await;
2✔
321
                tracing::debug!("JobQueue: current job {} has cancelled.", job_id);
1✔
322
            }
UNCOV
323
            Err(e) => {
×
UNCOV
324
                tracing::warn!(
×
UNCOV
325
                    "could not send cancellation msg to current job {}. {}",
×
326
                    job_id,
327
                    e
328
                )
329
            }
330
        }
UNCOV
331
    }
×
332
}
1✔
333

334
/// represents a job in the queue.
335
pub(super) struct QueuedJob<P> {
336
    job: Box<dyn Job>,
337
    job_id: JobId,
338
    result_tx: JobResultSender,
339
    cancel_tx: JobCancelSender,
340
    cancel_rx: JobCancelReceiver,
341
    priority: P,
342
}
343

344
impl<P: fmt::Debug> fmt::Debug for QueuedJob<P> {
345
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2✔
346
        f.debug_struct("QueuedJob")
2✔
347
            .field("job", &"Box<dyn Job>")
2✔
348
            .field("job_id", &self.job_id)
2✔
349
            .field("result_tx", &"JobResultSender")
2✔
350
            .field("cancel_tx", &"JobCancelSender")
2✔
351
            .field("cancel_rx", &"JobCancelReceiver")
2✔
352
            .field("priority", &self.priority)
2✔
353
            .finish()
2✔
354
    }
2✔
355
}
356

357
/// represents the currently executing job
358
#[derive(Debug)]
359
pub(super) struct CurrentJob {
360
    job_num: usize,
361
    job_id: JobId,
362
    cancel_tx: JobCancelSender,
363
}
364

365
/// represents data shared between tasks/threads
366
#[derive(Debug)]
367
pub(super) struct SharedQueue<P: Ord> {
368
    jobs: VecDeque<QueuedJob<P>>,
369
    current_job: Option<CurrentJob>,
370
}
371

372
#[cfg(test)]
373
#[cfg_attr(coverage_nightly, coverage(off))]
374
mod tests {
375
    use std::time::Instant;
376

377
    use tracing_test::traced_test;
378

379
    use super::*;
380

381
    #[tokio::test(flavor = "multi_thread")]
382
    #[traced_test]
383
    async fn run_sync_jobs_by_priority() -> anyhow::Result<()> {
384
        workers::run_jobs_by_priority(false).await
385
    }
386

387
    #[tokio::test(flavor = "multi_thread")]
388
    #[traced_test]
389
    async fn run_async_jobs_by_priority() -> anyhow::Result<()> {
390
        workers::run_jobs_by_priority(true).await
391
    }
392

393
    #[tokio::test(flavor = "multi_thread")]
394
    #[traced_test]
395
    async fn get_sync_job_result() -> anyhow::Result<()> {
396
        workers::get_job_result(false).await
397
    }
398

399
    #[tokio::test(flavor = "multi_thread")]
400
    #[traced_test]
401
    async fn get_async_job_result() -> anyhow::Result<()> {
402
        workers::get_job_result(true).await
403
    }
404

405
    #[tokio::test(flavor = "multi_thread")]
406
    #[traced_test]
407
    async fn cancel_sync_job() -> anyhow::Result<()> {
408
        workers::cancel_job(false).await
409
    }
410

411
    #[tokio::test(flavor = "multi_thread")]
412
    #[traced_test]
413
    async fn cancel_async_job() -> anyhow::Result<()> {
414
        workers::cancel_job(true).await
415
    }
416

417
    #[tokio::test(flavor = "multi_thread")]
418
    #[traced_test]
419
    async fn cancel_sync_job_in_select() -> anyhow::Result<()> {
420
        workers::cancel_job_in_select(false).await
421
    }
422

423
    #[tokio::test(flavor = "multi_thread")]
424
    #[traced_test]
425
    async fn cancel_async_job_in_select() -> anyhow::Result<()> {
426
        workers::cancel_job_in_select(true).await
427
    }
428

429
    #[test]
430
    #[traced_test]
431
    fn runtime_shutdown_timeout_force_cancels_sync_job() -> anyhow::Result<()> {
432
        workers::runtime_shutdown_timeout_force_cancels_job(false)
433
    }
434

435
    #[test]
436
    #[traced_test]
437
    fn runtime_shutdown_timeout_force_cancels_async_job() -> anyhow::Result<()> {
438
        workers::runtime_shutdown_timeout_force_cancels_job(true)
439
    }
440

441
    #[test]
442
    #[traced_test]
443
    fn runtime_shutdown_cancels_sync_job() {
444
        let _ = workers::runtime_shutdown_cancels_job(false);
445
    }
446

447
    #[test]
448
    #[traced_test]
449
    fn runtime_shutdown_cancels_async_job() -> anyhow::Result<()> {
450
        workers::runtime_shutdown_cancels_job(true)
451
    }
452

453
    #[test]
454
    #[traced_test]
455
    fn spawned_tasks_live_as_long_as_jobqueue() -> anyhow::Result<()> {
456
        workers::spawned_tasks_live_as_long_as_jobqueue(true)
457
    }
458

459
    #[tokio::test(flavor = "multi_thread")]
460
    #[traced_test]
461
    async fn panic_in_async_job_ends_job_cleanly() -> anyhow::Result<()> {
462
        workers::panics::panic_in_job_ends_job_cleanly(true).await
463
    }
464

465
    #[tokio::test(flavor = "multi_thread")]
466
    #[traced_test]
467
    async fn panic_in_blocking_job_ends_job_cleanly() -> anyhow::Result<()> {
468
        workers::panics::panic_in_job_ends_job_cleanly(false).await
469
    }
470

471
    #[tokio::test(flavor = "multi_thread")]
472
    #[traced_test]
473
    async fn stop_queue() -> anyhow::Result<()> {
474
        workers::stop_queue().await
475
    }
476

477
    mod workers {
478
        use std::any::Any;
479

480
        use super::*;
481
        use crate::job_queue::errors::JobHandleError;
482
        use crate::job_queue::errors::JobHandleErrorSync;
483
        use crate::job_queue::traits::JobResult;
484

485
        #[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
486
        pub enum DoubleJobPriority {
487
            Low = 1,
488
            Medium = 2,
489
            High = 3,
490
        }
491

492
        #[derive(Debug, PartialEq, Clone)]
493
        struct DoubleJobResult(u64, u64, Instant);
494
        impl JobResult for DoubleJobResult {
495
            fn as_any(&self) -> &dyn Any {
496
                self
497
            }
498
            fn into_any(self: Box<Self>) -> Box<dyn Any> {
499
                self
500
            }
501
        }
502

503
        // represents a prover job.  implements Job.
504
        #[derive(Debug)]
505
        struct DoubleJob {
506
            data: u64,
507
            duration: std::time::Duration,
508
            is_async: bool,
509
        }
510

511
        #[async_trait::async_trait]
512
        impl Job for DoubleJob {
513
            fn is_async(&self) -> bool {
514
                self.is_async
515
            }
516

517
            fn run(&self, cancel_rx: JobCancelReceiver) -> JobCompletion {
518
                let start = Instant::now();
519
                let sleep_time =
520
                    std::cmp::min(std::time::Duration::from_micros(100), self.duration);
521

522
                let r = loop {
523
                    if start.elapsed() < self.duration {
524
                        match cancel_rx.has_changed() {
525
                            Ok(changed) if changed => break JobCompletion::Cancelled,
526
                            Err(_) => break JobCompletion::Cancelled,
527
                            _ => {}
528
                        }
529

530
                        std::thread::sleep(sleep_time);
531
                    } else {
532
                        break JobCompletion::Finished(Box::new(DoubleJobResult(
533
                            self.data,
534
                            self.data * 2,
535
                            Instant::now(),
536
                        )));
537
                    }
538
                };
539

540
                tracing::info!("results: {:?}", r);
541
                r
542
            }
543

544
            async fn run_async(&self) -> Box<dyn JobResult> {
545
                tokio::time::sleep(self.duration).await;
546
                let r = DoubleJobResult(self.data, self.data * 2, Instant::now());
547

548
                tracing::info!("results: {:?}", r);
549
                Box::new(r)
550
            }
551
        }
552

553
        // this test demonstrates/verifies that:
554
        //  1. jobs are run in priority order, highest priority first.
555
        //  2. when multiple jobs have the same priority, they run in FIFO order.
556
        pub(super) async fn run_jobs_by_priority(is_async: bool) -> anyhow::Result<()> {
557
            let start_of_test = Instant::now();
558

559
            // create a job queue
560
            let job_queue = JobQueue::start();
561

562
            let mut handles = vec![];
563
            let duration = std::time::Duration::from_millis(20);
564

565
            // create 30 jobs, 10 at each priority level.
566
            for i in (1..10).rev() {
567
                let job1 = Box::new(DoubleJob {
568
                    data: i,
569
                    duration,
570
                    is_async,
571
                });
572
                let job2 = Box::new(DoubleJob {
573
                    data: i * 100,
574
                    duration,
575
                    is_async,
576
                });
577
                let job3 = Box::new(DoubleJob {
578
                    data: i * 1000,
579
                    duration,
580
                    is_async,
581
                });
582

583
                // process job and print results.
584
                handles.push(job_queue.add_job(job1, DoubleJobPriority::Low)?);
585
                handles.push(job_queue.add_job(job2, DoubleJobPriority::Medium)?);
586
                handles.push(job_queue.add_job(job3, DoubleJobPriority::High)?);
587
            }
588

589
            // we can't know exact number of jobs in queue because it is already processing.
590
            assert!(job_queue.num_jobs() > 0);
591
            assert!(job_queue.num_queued_jobs() > 0);
592

593
            // wait for all jobs to complete.
594
            let mut results = futures::future::join_all(handles).await;
595

596
            assert_eq!(0, job_queue.num_jobs());
597
            assert_eq!(0, job_queue.num_queued_jobs());
598

599
            // the results are in the same order as handles passed to join_all.
600
            // we sort them by the timestamp in job result, ascending.
601
            results.sort_by(
602
                |a_completion, b_completion| match (a_completion, b_completion) {
603
                    (Ok(JobCompletion::Finished(a_dyn)), Ok(JobCompletion::Finished(b_dyn))) => {
604
                        let a = a_dyn.as_any().downcast_ref::<DoubleJobResult>().unwrap().2;
605

606
                        let b = b_dyn.as_any().downcast_ref::<DoubleJobResult>().unwrap().2;
607

608
                        a.cmp(&b)
609
                    }
610
                    _ => panic!("at least one job did not finish"),
611
                },
612
            );
613

614
            // iterate job results and verify that:
615
            //   timestamp of each is greater than prev.
616
            //   input value of each is greater than prev, except every 9th item which should be < prev
617
            //     because there are nine jobs per level.
618
            let mut prev = Box::new(DoubleJobResult(9999, 0, start_of_test));
619
            for (i, c) in results.into_iter().enumerate() {
620
                let Ok(JobCompletion::Finished(dyn_result)) = c else {
621
                    panic!("A job did not finish");
622
                };
623

624
                let job_result = dyn_result.into_any().downcast::<DoubleJobResult>().unwrap();
625

626
                assert!(job_result.2 > prev.2);
627

628
                // we don't do the assertion for the 2nd job because the job-queue starts
629
                // processing immediately and so a race condition is setup where it is possible
630
                // for either the Low priority or High job to start processing first.
631
                if i != 1 {
632
                    assert!(job_result.0 < prev.0);
633
                }
634

635
                prev = job_result;
636
            }
637

638
            Ok(())
639
        }
640

641
        // this test demonstrates/verifies that a job can return a result back to
642
        // the job initiator.
643
        pub(super) async fn get_job_result(is_async: bool) -> anyhow::Result<()> {
644
            // create a job queue
645
            let job_queue = JobQueue::start();
646
            let duration = std::time::Duration::from_millis(20);
647

648
            // create 10 jobs
649
            for i in 0..10 {
650
                let job = Box::new(DoubleJob {
651
                    data: i,
652
                    duration,
653
                    is_async,
654
                });
655

656
                let result = job_queue
657
                    .add_job(job, DoubleJobPriority::Low)?
658
                    .await
659
                    .map_err(|e| e.into_sync())?
660
                    .result()
661
                    .map_err(|e| e.into_sync())?;
662

663
                let job_result = result.into_any().downcast::<DoubleJobResult>().unwrap();
664

665
                assert_eq!(i, job_result.0);
666
                assert_eq!(i * 2, job_result.1);
667
            }
668

669
            Ok(())
670
        }
671

672
        // tests that stopping job_queue also cancels presently running job
673
        // and queued job(s)
674
        pub(super) async fn stop_queue() -> anyhow::Result<()> {
675
            // create a job queue
676
            let job_queue = JobQueue::start();
677
            // start a 1 hour job.
678
            let duration = std::time::Duration::from_secs(3600); // 1 hour job.
679

680
            let job = Box::new(DoubleJob {
681
                data: 10,
682
                duration,
683
                is_async: true,
684
            });
685
            let job2 = Box::new(DoubleJob {
686
                data: 10,
687
                duration,
688
                is_async: true,
689
            });
690
            let job_handle = job_queue.add_job(job, DoubleJobPriority::Low)?;
691
            let job2_handle = job_queue.add_job(job2, DoubleJobPriority::Low)?;
692

693
            // so we have some test coverage for debug impls.
694
            println!("job-queue: {:?}", job_queue);
695

696
            tokio::time::sleep(std::time::Duration::from_millis(20)).await;
697

698
            job_queue.stop().await?;
699

700
            assert!(job_handle.is_finished());
701
            assert!(job2_handle.is_finished());
702

703
            assert!(matches!(
704
                job_handle.await,
705
                Err(JobHandleError::JobResultError(_))
706
            ));
707
            assert!(matches!(
708
                job2_handle.await,
709
                Err(JobHandleError::JobResultError(_))
710
            ));
711

712
            Ok(())
713
        }
714

715
        // tests/demonstrates that a long running job can be cancelled early.
716
        pub(super) async fn cancel_job(is_async: bool) -> anyhow::Result<()> {
717
            // create a job queue
718
            let job_queue = JobQueue::start();
719
            // start a 1 hour job.
720
            let duration = std::time::Duration::from_secs(3600); // 1 hour job.
721

722
            let job = Box::new(DoubleJob {
723
                data: 10,
724
                duration,
725
                is_async,
726
            });
727
            let job_handle = job_queue.add_job(job, DoubleJobPriority::Low)?;
728

729
            tokio::time::sleep(std::time::Duration::from_millis(20)).await;
730

731
            job_handle.cancel().unwrap();
732
            let completion = job_handle.await.unwrap();
733
            assert!(matches!(completion, JobCompletion::Cancelled));
734

735
            Ok(())
736
        }
737

738
        // this test demonstrates how to listen for a cancellation message
739
        // and cancel a job when it is received.
740
        //
741
        // The key concepts demonstrated are:
742
        //  1. using tokio::select!{} to execute the job and listen for a
743
        //     cancellation message simultaneously.
744
        //  2. using tokio::pin!() to avoid borrow-checker complaints in the select.
745
        //  3. using into_sync() to convert JobHandleError into JobHandleErrorSync for
746
        //     inter-thread usage.
747
        //  4. using downcast to obtain the job result.
748
        pub async fn cancel_job_in_select(is_async: bool) -> anyhow::Result<()> {
749
            async fn do_some_work(
750
                is_async: bool,
751
                cancel_work_rx: tokio::sync::oneshot::Receiver<()>,
752
            ) -> Result<DoubleJobResult, JobHandleErrorSync> {
753
                // create a job queue.  (this could be done elsewhere)
754
                let job_queue = JobQueue::start();
755

756
                // start a 1 hour job.
757
                let duration = std::time::Duration::from_secs(3600); // 1 hour job.
758

759
                let job = Box::new(DoubleJob {
760
                    data: 10,
761
                    duration,
762
                    is_async,
763
                });
764

765
                // add the job to queue
766
                let job_handle = job_queue.add_job(job, DoubleJobPriority::Low).unwrap();
767

768
                // pin job_handle, so borrow checker knows the address can't change
769
                // and it is safe to use in both select branches
770
                tokio::pin!(job_handle);
771

772
                // execute job and simultaneously listen for cancel msg from elsewhere
773
                let completion = tokio::select! {
774
                    // case: job completion.
775
                    completion = &mut job_handle => completion,
776

777
                    // case: sender cancelled, or sender dropped.
778
                    _ = cancel_work_rx => {
779
                        job_handle.cancel().map_err(|e| e.into_sync())?;
780
                        job_handle.await
781
                    }
782
                };
783

784
                // obtain job result (via downcast)
785
                let result: DoubleJobResult = *completion
786
                    .map_err(|e| e.into_sync())?
787
                    .result()
788
                    .map_err(|e| e.into_sync())?
789
                    .into_any()
790
                    .downcast::<DoubleJobResult>()
791
                    .expect("downcast should succeed, else bug");
792

793
                Ok(result)
794
            }
795

796
            // create cancellation channel for the worker task
797
            let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
798

799
            // create the worker task, that will create and run the job
800
            let worker_task = async move { do_some_work(is_async, cancel_rx).await };
801

802
            // spawn the worker task
803
            let jh = tokio::task::spawn(worker_task);
804

805
            // send cancel message to the worker task
806
            cancel_tx.send(()).unwrap();
807

808
            // wait for worker task to finish (with an error)
809
            let job_handle_error = jh.await?.unwrap_err();
810

811
            // ensure the error indicates JobCancelled
812
            assert!(matches!(job_handle_error, JobHandleErrorSync::JobCancelled));
813

814
            Ok(())
815
        }
816

817
        // note: creates own tokio runtime.  caller must not use [tokio::test]
818
        //
819
        // this test starts a job that runs for 1 hour and then attempts to
820
        // shutdown tokio runtime via shutdown_timeout() with a 1 sec timeout.
821
        //
822
        // any async tasks should be aborted quickly.
823
        // any sync tasks will continue to run to completion.
824
        //
825
        // shutdown_timeout() will wait for tasks to abort for 1 sec and then
826
        // returns.  Any un-aborted tasks/threads become ignored/detached.
827
        // The OS can cleanup such threads when the process exits.
828
        //
829
        // the test checks that the shutdown completes in under 2 secs.
830
        //
831
        // the test demonstrates that shutdown_timeout() can be used to shutdown
832
        // tokio runtime even if sync (spawn_blocking) tasks/threads are still running
833
        // in the blocking threadpool.
834
        //
835
        // when called with is_async=true, it demonstrates that shutdown_timeout() also
836
        // aborts async jobs, as one would expect.
837
        pub(super) fn runtime_shutdown_timeout_force_cancels_job(
838
            is_async: bool,
839
        ) -> anyhow::Result<()> {
840
            let rt = tokio::runtime::Runtime::new()?;
841
            let result = rt.block_on(async {
842
                // create a job queue
843
                let job_queue = JobQueue::start();
844
                // start a 1 hour job.
845
                let duration = std::time::Duration::from_secs(3600); // 1 hour job.
846

847
                let job = Box::new(DoubleJob {
848
                    data: 10,
849
                    duration,
850
                    is_async,
851
                });
852
                let _rx = job_queue.add_job(job, DoubleJobPriority::Low)?;
853

854
                tokio::time::sleep(std::time::Duration::from_millis(20)).await;
855
                println!("finished scope");
856

857
                Ok(())
858
            });
859

860
            let start = std::time::Instant::now();
861

862
            println!("waiting 1 second for job before shutdown runtime");
863
            rt.shutdown_timeout(tokio::time::Duration::from_secs(1));
864

865
            assert!(start.elapsed() < std::time::Duration::from_secs(2));
866

867
            result
868
        }
869

870
        // note: creates own tokio runtime.  caller must not use [tokio::test]
871
        //
872
        // this test starts a job that runs for 5 secs and then attempts to
873
        // shutdown tokio runtime normally by dropping it.
874
        //
875
        // any async tasks should be aborted quickly.
876
        // any sync tasks will continue to run to completion.
877
        //
878
        // the tokio runtime does not complete the drop() until all tasks
879
        // have completed/aborted.
880
        //
881
        // the test checks that the job finishes in less than the 5 secs
882
        // required for full completion.  In other words, that it aborts.
883
        //
884
        // the test is expected to succeed for async jobs but fail for sync jobs.
885
        pub(super) fn runtime_shutdown_cancels_job(is_async: bool) -> anyhow::Result<()> {
886
            let rt = tokio::runtime::Runtime::new()?;
887
            let start = tokio::time::Instant::now();
888

889
            let result = rt.block_on(async {
890
                // create a job queue
891
                let job_queue = JobQueue::start();
892

893
                // this job takes at least 5 secs to complete.
894
                let duration = std::time::Duration::from_secs(5);
895

896
                let job = Box::new(DoubleJob {
897
                    data: 10,
898
                    duration,
899
                    is_async,
900
                });
901

902
                let rx_handle = job_queue.add_job(job, DoubleJobPriority::Low)?;
903
                drop(rx_handle);
904

905
                // sleep 50 ms to let job get started.
906
                tokio::time::sleep(std::time::Duration::from_millis(50)).await;
907

908
                Ok(())
909
            });
910

911
            // drop the tokio runtime. It will attempt to abort tasks.
912
            //   - async tasks can normally be aborted
913
            //   - spawn_blocking (sync) tasks cannot normally be aborted.
914
            drop(rt);
915

916
            // if test is successful, elapsed time should be less than the 5 secs
917
            // it takes for the job to complete.  (should be around 0.5 ms)
918

919
            // however it is expected/normal that sync tasks will not be aborted
920
            // and will run for full 5 secs.  thus this assert will fail for them.
921

922
            assert!(start.elapsed() < std::time::Duration::from_secs(5));
923

924
            result
925
        }
926

927
        // this test attempts to verify that the task spawned by the JobQueue
928
        // continues running until the JobQueue is dropped after the tokio
929
        // runtime is dropped.
930
        //
931
        // If the tasks are cencelled before JobQueue is dropped then a subsequent
932
        // api call that sends a msg will result in a "channel closed" error, which
933
        // is what the test checks for.
934
        //
935
        // note that the test has to do some tricky stuff to setup conditions
936
        // where the "channel closed" error can occur. It's a subtle issue.
937
        //
938
        // see description at:
939
        // https://github.com/tokio-rs/tokio/discussions/6961
940
        pub(super) fn spawned_tasks_live_as_long_as_jobqueue(is_async: bool) -> anyhow::Result<()> {
941
            let rt = tokio::runtime::Runtime::new()?;
942

943
            let result_ok: Arc<Mutex<bool>> = Arc::new(Mutex::new(true));
944

945
            let result_ok_clone = result_ok.clone();
946
            rt.block_on(async {
947
                // create a job queue
948
                let job_queue = Arc::new(JobQueue::start());
949

950
                // spawns background task that adds job
951
                let job_queue_cloned = job_queue.clone();
952
                let jh = tokio::spawn(async move {
953
                    // sleep 200 ms to let runtime finish.
954
                    // ie ensure drop(rt) will be reached and wait for us.
955
                    // note that we use std sleep.  if tokio sleep is used
956
                    // the test will always succeed due to the await point.
957
                    std::thread::sleep(std::time::Duration::from_millis(200));
958

959
                    let job = Box::new(DoubleJob {
960
                        data: 10,
961
                        duration: std::time::Duration::from_secs(1),
962
                        is_async,
963
                    });
964

965
                    let result = job_queue_cloned.add_job(job, DoubleJobPriority::Low);
966

967
                    // an assert on result.is_ok() would panic, but that panic would be
968
                    // printed and swallowed by tokio runtime, so the test would succeed
969
                    // despite the panic. instead we pass the result in a mutex so it
970
                    // can be asserted where it will be caught by the test runner.
971
                    *result_ok_clone.lock().unwrap() = result.is_ok();
972
                });
973

974
                // sleep 50 ms to let job get started.
975
                tokio::time::sleep(std::time::Duration::from_millis(50)).await;
976

977
                // note; awaiting the joinhandle makes the test succeed.
978

979
                jh.abort();
980
                let _ = jh.await;
981
            });
982

983
            // drop the tokio runtime. It will abort tasks.
984
            drop(rt);
985

986
            assert!(*result_ok.lock().unwrap());
987

988
            Ok(())
989
        }
990

991
        pub mod panics {
992
            use super::*;
993

994
            const PANIC_STR: &str = "job panics unexpectedly";
995

996
            struct PanicJob {
997
                is_async: bool,
998
            }
999

1000
            #[async_trait::async_trait]
1001
            impl Job for PanicJob {
1002
                fn is_async(&self) -> bool {
1003
                    self.is_async
1004
                }
1005

1006
                fn run(&self, _cancel_rx: JobCancelReceiver) -> JobCompletion {
1007
                    panic!("{}", PANIC_STR);
1008
                }
1009

1010
                async fn run_async_cancellable(
1011
                    &self,
1012
                    _cancel_rx: JobCancelReceiver,
1013
                ) -> JobCompletion {
1014
                    panic!("{}", PANIC_STR);
1015
                }
1016
            }
1017

1018
            /// verifies that a job that panics will be ended properly.
1019
            ///
1020
            /// Properly means that:
1021
            /// 1. an error is returned from JobCompletion::result() indicating job panicked.
1022
            /// 2. caller is able to obtain panic info, which matches job's panic msg.
1023
            /// 3. the job-queue continues accepting new jobs.
1024
            /// 4. the job-queue continues processing jobs.
1025
            ///
1026
            /// async_job == true --> test an async job
1027
            /// async_job == false --> test a blocking job
1028
            pub async fn panic_in_job_ends_job_cleanly(async_job: bool) -> anyhow::Result<()> {
1029
                // create a job queue
1030
                let job_queue = JobQueue::start();
1031

1032
                let job = PanicJob {
1033
                    is_async: async_job,
1034
                };
1035
                let job_handle = job_queue.add_job(Box::new(job), DoubleJobPriority::Low)?;
1036

1037
                let job_result = job_handle.await.map_err(|e| e.into_sync())?.result();
1038

1039
                println!("job_result: {:#?}", job_result);
1040

1041
                // verify that job_queue channels are still open
1042
                assert!(!job_queue.tx_job_added.is_closed());
1043
                assert!(!job_queue.tx_stop.is_closed());
1044

1045
                // verify that we get an error with the job's panic msg.
1046
                assert!(matches!(
1047
                    job_result.map_err(|e| e.into_sync()),
1048
                    Err(JobHandleErrorSync::JobPanicked(e)) if e == *PANIC_STR
1049
                ));
1050

1051
                // ensure we can still run another job afterwards.
1052
                let newjob = Box::new(DoubleJob {
1053
                    data: 10,
1054
                    duration: std::time::Duration::from_millis(50),
1055
                    is_async: false,
1056
                });
1057

1058
                // ensure we can add another job.
1059
                let new_job_handle = job_queue.add_job(newjob, DoubleJobPriority::Low)?;
1060

1061
                // ensure job processes and returns a result without error.
1062
                assert!(new_job_handle
1063
                    .await
1064
                    .map_err(|e| e.into_sync())?
1065
                    .result()
1066
                    .is_ok());
1067

1068
                Ok(())
1069
            }
1070
        }
1071
    }
1072
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc