• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Neptune-Crypto / neptune-core / 16325438891

16 Jul 2025 04:39PM UTC coverage: 72.479% (-0.002%) from 72.481%
16325438891

push

github

aszepieniec
fix(`WalletState`): Do attempt to resync MUTXOs marked spent

After all, the resync may be triggered by a reorganization that
reverts the expenditure. In this case we want to have a valid
membership proof for the now-spendable-again UTXO.

Attempting to resync MUTXOs that are marked as spent should be
expected to fail most of the time, though, for the simple reason
that the spending transaction was *not* reverted by the reorg,
or because there was no reorg to begin with. For this reason, the
warning log message is suppressed when resyncing fails and the
UTXO was marked as spent.

Co-authored-by: Thorkil Schmidiger <thor@neptune.cash>

2 of 3 new or added lines in 1 file covered. (66.67%)

2 existing lines in 1 file now uncovered.

20676 of 28527 relevant lines covered (72.48%)

497066.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.14
/src/job_queue/queue.rs
1
use std::collections::VecDeque;
2
use std::fmt;
3
use std::sync::Arc;
4
use std::sync::Mutex;
5

6
use tokio::sync::mpsc;
7
use tokio::sync::oneshot;
8
use tokio::sync::watch;
9
use tokio::task::JoinHandle;
10

11
use super::channels::JobCancelReceiver;
12
use super::channels::JobCancelSender;
13
use super::channels::JobResultSender;
14
use super::errors::AddJobError;
15
use super::errors::StopQueueError;
16
use super::job_completion::JobCompletion;
17
use super::job_handle::JobHandle;
18
use super::job_id::JobId;
19
use super::traits::Job;
20

21
/// implements a job queue that sends result of each job to a listener.
22
#[derive(Debug)]
23
pub struct JobQueue<P: Ord + Send + Sync + 'static> {
24
    /// holds job-queue which is shared between tokio tasks
25
    shared_queue: Arc<Mutex<SharedQueue<P>>>,
26

27
    /// channel to inform process_jobs task that a job has been added
28
    tx_job_added: mpsc::UnboundedSender<()>,
29

30
    /// channel to inform process_jobs task to stop processing.
31
    tx_stop: tokio::sync::watch::Sender<()>,
32

33
    /// JoinHandle of process_jobs task
34
    process_jobs_task_handle: Option<JoinHandle<()>>,
35
}
36

37
// we implement Drop so we can send stop message to process_jobs task
38
impl<P: Ord + Send + Sync + 'static> Drop for JobQueue<P> {
39
    fn drop(&mut self) {
17✔
40
        tracing::debug!("in JobQueue::drop()");
17✔
41

42
        if !self.tx_stop.is_closed() {
17✔
43
            if let Err(e) = self.tx_stop.send(()) {
16✔
44
                tracing::error!("{}", e);
×
45
            }
16✔
46
        }
1✔
47
    }
17✔
48
}
49

50
impl<P: Ord + Send + Sync + 'static> JobQueue<P> {
51
    /// creates job queue and starts it processing.
52
    ///
53
    /// returns immediately.
54
    pub fn start() -> Self {
161✔
55
        // create a SharedQueue that is shared between tokio tasks.
56
        let shared_queue = SharedQueue {
161✔
57
            jobs: VecDeque::new(),
161✔
58
            current_job: None,
161✔
59
        };
161✔
60
        let shared_queue: Arc<Mutex<SharedQueue<P>>> = Arc::new(Mutex::new(shared_queue));
161✔
61

62
        // create 'job_added' and 'stop' channels for signalling to process_jobs task
63
        let (tx_job_added, rx_job_added) = mpsc::unbounded_channel();
161✔
64
        let (tx_stop, rx_stop) = watch::channel(());
161✔
65

66
        // spawn the process_jobs task
67
        let process_jobs_task_handle =
161✔
68
            tokio::spawn(process_jobs(shared_queue.clone(), rx_stop, rx_job_added));
161✔
69

70
        tracing::info!("JobQueue: started new queue.");
161✔
71

72
        // construct and return JobQueue
73
        Self {
161✔
74
            tx_job_added,
161✔
75
            tx_stop,
161✔
76
            shared_queue,
161✔
77
            process_jobs_task_handle: Some(process_jobs_task_handle),
161✔
78
        }
161✔
79
    }
161✔
80

81
    /// stop the job-queue, and drop it.
82
    ///
83
    /// this method sends a message to the spawned job-queue task
84
    /// to stop and then waits for it to complete.
85
    ///
86
    /// Comparison with drop():
87
    ///
88
    /// if JobQueue is dropped:
89
    ///  1. the stop message will be sent, but any error is ignored.
90
    ///  2. the spawned task is not awaited.
91
    pub async fn stop(mut self) -> Result<(), StopQueueError> {
1✔
92
        tracing::info!("JobQueue: stopping.");
1✔
93

94
        // send stop message to process_jobs task
95
        self.tx_stop.send(())?;
1✔
96

97
        // wait for process_jobs task to finish
98
        if let Some(jh) = self.process_jobs_task_handle.take() {
1✔
99
            jh.await?;
1✔
100
        }
×
101

102
        Ok(())
1✔
103
    }
1✔
104

105
    /// adds job to job-queue (with interior mutability)
106
    ///
107
    /// returns a [`JobHandle`] that can be used to await or cancel the job.
108
    ///
109
    /// note that this method utilizes interior mutability. Consider calling
110
    /// [`Self::add_job_mut()`] instead to make the mutation explicit.
111
    pub fn add_job(
1,527✔
112
        &self,
1,527✔
113
        job: impl Into<Box<dyn Job>>,
1,527✔
114
        priority: P,
1,527✔
115
    ) -> Result<JobHandle, AddJobError> {
1,527✔
116
        let (result_tx, result_rx) = oneshot::channel();
1,527✔
117
        let (cancel_tx, cancel_rx) = watch::channel::<()>(());
1,527✔
118

119
        // each job gets a random JobId
120
        let job_id = JobId::random();
1,527✔
121

122
        // represent a job in the queue
123
        let m = QueuedJob {
1,527✔
124
            job: job.into(),
1,527✔
125
            job_id,
1,527✔
126
            result_tx,
1,527✔
127
            cancel_tx: cancel_tx.clone(),
1,527✔
128
            cancel_rx,
1,527✔
129
            priority,
1,527✔
130
        };
1,527✔
131

132
        // add job to queue and obtain number of jobs in queue and current-job (if any)
133
        let (num_jobs, job_running) = {
1,527✔
134
            // acquire mutex lock
135
            let mut guard = self.shared_queue.lock().unwrap();
1,527✔
136

137
            // add job to job-queue
138
            guard.jobs.push_back(m);
1,527✔
139

140
            let job_running = match &guard.current_job {
1,527✔
141
                Some(j) => format!("#{} - {}", j.job_num, j.job_id),
47✔
142
                None => "none".to_string(),
1,480✔
143
            };
144
            (guard.jobs.len(), job_running)
1,527✔
145
        }; // mutex lock released on drop
146

147
        // notify process_jobs task that a job was added.
148
        self.tx_job_added.send(())?;
1,527✔
149

150
        // log that job is added to the queue
151
        tracing::info!(
1,527✔
152
            "JobQueue: job added - {}  {} queued job(s).  job running: {}",
×
153
            job_id,
154
            num_jobs,
155
            job_running
156
        );
157

158
        // create and return JobHandle
159
        Ok(JobHandle::new(job_id, result_rx, cancel_tx))
1,527✔
160
    }
1,527✔
161

162
    /// Adds a job to the queue (with explicit mutability).
163
    ///
164
    /// returns a [`JobHandle`] that can be used to await or cancel the job.
165
    ///
166
    /// job-results can be obtained by via JobHandle::results().await
167
    /// The job can be cancelled by JobHandle::cancel()
168
    ///
169
    /// Unlike [`Self::add_job()`], this method takes `&mut self`, explicitly
170
    /// signaling to the compiler that the `JobQueue` internal state is being
171
    /// modified.
172
    ///
173
    /// This explicit mutability encourages callers to use correct function
174
    /// signatures and avoids hidden interior mutability, which can be a source
175
    /// of confusion and potentially subtle borrow checker issues when reasoning
176
    /// about a given codebase/architecture.
177
    ///
178
    /// Explicit mutability generally leads to improved compiler optimizations
179
    /// and stronger borrow checker guarantees by enforcing exclusive access.
180
    pub fn add_job_mut(
89✔
181
        &mut self,
89✔
182
        job: impl Into<Box<dyn Job>>,
89✔
183
        priority: P,
89✔
184
    ) -> Result<JobHandle, AddJobError> {
89✔
185
        self.add_job(job, priority)
89✔
186
    }
89✔
187

188
    /// returns total number of jobs, queued plus running.
189
    pub fn num_jobs(&self) -> usize {
4✔
190
        let guard = self.shared_queue.lock().unwrap();
4✔
191
        guard.jobs.len() + guard.current_job.as_ref().map(|_| 1).unwrap_or(0)
4✔
192
    }
4✔
193

194
    /// returns number of queued jobs
195
    pub fn num_queued_jobs(&self) -> usize {
4✔
196
        self.shared_queue.lock().unwrap().jobs.len()
4✔
197
    }
4✔
198
}
199

200
/// implements the process_jobs task, spawned by JobQueue::start().
201
///
202
/// this fn calls tokio::select!{} in a loop.  The select has two branches:
203
/// 1. receive 'job_added' message over mpsc channel (unbounded)
204
/// 2. receive 'stop' message over watch channel
205
///
206
/// job_added:
207
///
208
/// When a 'job_added' msg is received, the highest priority queued job is picked
209
/// to run next.  We await the job, and then send results to the JobHandle.
210
///
211
/// Note that jobs can take a long time to run and thus msgs can pile up in the
212
/// job_added channel, which is unbounded. These messages are of type "()" so
213
/// are as small as possible.
214
///
215
/// stop:
216
///
217
/// When a 'stop' msg is received we send a cancel msg to the current job (if any) and
218
/// wait for it to complete. Then we exit the loop and return.
219
async fn process_jobs<P: Ord + Send + Sync + 'static>(
161✔
220
    shared_queue: Arc<Mutex<SharedQueue<P>>>,
161✔
221
    mut rx_stop: watch::Receiver<()>,
161✔
222
    mut rx_job_added: mpsc::UnboundedReceiver<()>,
161✔
223
) {
161✔
224
    // job number starts at 1 and increments with each job that is processed.
225
    // note that processing order may be different than order in which jobs
226
    // are added due to job priorities.
227
    let mut job_num: usize = 1;
161✔
228

229
    // loop until 'stop' msg is received or job_added channel is closed.
230
    //
231
    // note:  this unbounded channel will grow in size as new job(s) are
232
    // added while an existing job is running.  ie, we read from the
233
    // channel after each job completes.
234
    while rx_job_added.recv().await.is_some() {
1,684✔
235
        // Find the next job to run, and the number of jobs left in queue
236
        tracing::debug!("task process_jobs received JobAdded message.");
1,526✔
237
        let (next_job, num_pending) = {
1,526✔
238
            // acquire mutex lock
239
            let mut guard = shared_queue.lock().unwrap();
1,526✔
240

241
            // pick the highest priority job
242
            guard
1,526✔
243
                .jobs
1,526✔
244
                .make_contiguous()
1,526✔
245
                .sort_by(|a, b| b.priority.cmp(&a.priority));
1,526✔
246
            let job = guard.jobs.pop_front().unwrap();
1,526✔
247

248
            // set highest priority job as the current job
249
            guard.current_job = Some(CurrentJob {
1,526✔
250
                job_num,
1,526✔
251
                job_id: job.job_id,
1,526✔
252
                cancel_tx: job.cancel_tx.clone(),
1,526✔
253
            });
1,526✔
254

255
            (job, guard.jobs.len())
1,526✔
256
        }; // mutex lock is released when guard drops.
257

258
        // log that we are starting a job
259
        tracing::info!(
1,526✔
260
            "  *** JobQueue: begin job #{} - {} - {} queued job(s) ***",
×
261
            job_num,
262
            next_job.job_id,
263
            num_pending
264
        );
265

266
        // record time that job starts
267
        let timer = tokio::time::Instant::now();
1,526✔
268

269
        // spawn task that performs the job, either async or blocking.
270
        let job_task_handle = if next_job.job.is_async() {
1,526✔
271
            tokio::spawn(
1,482✔
272
                async move { next_job.job.run_async_cancellable(next_job.cancel_rx).await },
1,482✔
273
            )
274
        } else {
275
            tokio::task::spawn_blocking(move || next_job.job.run(next_job.cancel_rx))
44✔
276
        };
277

278
        // execute job task and simultaneously listen for a 'stop' message.
279
        let job_task_result = tokio::select! {
1,526✔
280
            // execute the job task
281
            job_task_result = job_task_handle => job_task_result,
1,526✔
282

283
            // handle msg over 'stop' channel which indicates we must exit the loop.
284
            _ = rx_stop.changed() => {
1,526✔
285

286
                handle_stop_signal(&shared_queue).await;
3✔
287

288
                // exit loop, processing ends.
289
                break;
1✔
290
            },
291
        };
292

293
        // create JobCompletion from task results
294
        let job_completion = match job_task_result {
1,523✔
295
            Ok(jc) => jc,
1,521✔
296
            Err(e) => {
2✔
297
                if e.is_panic() {
2✔
298
                    JobCompletion::Panicked(e.into_panic())
2✔
299
                } else if e.is_cancelled() {
×
300
                    JobCompletion::Cancelled
×
301
                } else {
302
                    unreachable!()
×
303
                }
304
            }
305
        };
306

307
        // log that job has ended.
308
        tracing::info!(
1,523✔
309
            "  *** JobQueue: ended job #{} - {} - Completion: {} - {} secs ***",
×
310
            job_num,
311
            next_job.job_id,
312
            job_completion,
313
            timer.elapsed().as_secs_f32()
×
314
        );
315
        job_num += 1;
1,523✔
316

317
        // obtain mutex lock and set current-job to None
318
        shared_queue.lock().unwrap().current_job = None;
1,523✔
319

320
        // send job results to the JobHandle receiver
321
        if let Err(e) = next_job.result_tx.send(job_completion) {
1,523✔
322
            tracing::warn!("job-handle dropped? {}", e);
3✔
323
        }
1,520✔
324
    }
325
    tracing::debug!("task process_jobs exiting");
10✔
326
}
10✔
327

328
/// handles the 'stop' branch of tokio::select!{} in process_job() task
329
async fn handle_stop_signal<P: Ord + Send + Sync + 'static>(
3✔
330
    shared_queue: &Arc<Mutex<SharedQueue<P>>>,
3✔
331
) {
3✔
332
    tracing::debug!("task process_jobs received Stop message.");
3✔
333

334
    // acquire mutex lock and obtain current_job info, if any.
335
    let maybe_info = shared_queue
3✔
336
        .lock()
3✔
337
        .unwrap()
3✔
338
        .current_job
3✔
339
        .as_ref()
3✔
340
        .map(|cj| (cj.job_id, cj.cancel_tx.clone()));
3✔
341

342
    // if there is a presently executing job we need to cancel it
343
    // and wait for it to complete.
344
    if let Some((job_id, cancel_tx)) = maybe_info {
3✔
345
        match cancel_tx.send(()) {
3✔
346
            Ok(()) => {
347
                // wait for channel to close, indicating job has cancelled (or otherwise completed)
348
                tracing::debug!(
3✔
349
                    "JobQueue: notified current job {} to cancel.  waiting...",
×
350
                    job_id
351
                );
352
                cancel_tx.closed().await;
3✔
353
                tracing::debug!("JobQueue: current job {} has cancelled.", job_id);
1✔
354
            }
UNCOV
355
            Err(e) => {
×
UNCOV
356
                tracing::warn!(
×
357
                    "could not send cancellation msg to current job {}. {}",
×
358
                    job_id,
359
                    e
360
                )
361
            }
362
        }
363
    }
×
364
}
1✔
365

366
/// represents a job in the queue.
367
pub(super) struct QueuedJob<P> {
368
    job: Box<dyn Job>,
369
    job_id: JobId,
370
    result_tx: JobResultSender,
371
    cancel_tx: JobCancelSender,
372
    cancel_rx: JobCancelReceiver,
373
    priority: P,
374
}
375

376
impl<P: fmt::Debug> fmt::Debug for QueuedJob<P> {
377
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2✔
378
        f.debug_struct("QueuedJob")
2✔
379
            .field("job", &"Box<dyn Job>")
2✔
380
            .field("job_id", &self.job_id)
2✔
381
            .field("result_tx", &"JobResultSender")
2✔
382
            .field("cancel_tx", &"JobCancelSender")
2✔
383
            .field("cancel_rx", &"JobCancelReceiver")
2✔
384
            .field("priority", &self.priority)
2✔
385
            .finish()
2✔
386
    }
2✔
387
}
388

389
/// represents the currently executing job
390
#[derive(Debug)]
391
pub(super) struct CurrentJob {
392
    job_num: usize,
393
    job_id: JobId,
394
    cancel_tx: JobCancelSender,
395
}
396

397
/// represents data shared between tasks/threads
398
#[derive(Debug)]
399
pub(super) struct SharedQueue<P: Ord> {
400
    jobs: VecDeque<QueuedJob<P>>,
401
    current_job: Option<CurrentJob>,
402
}
403

404
#[cfg(test)]
405
#[cfg_attr(coverage_nightly, coverage(off))]
406
mod tests {
407
    use std::time::Instant;
408

409
    use tracing_test::traced_test;
410

411
    use super::*;
412

413
    #[tokio::test(flavor = "multi_thread")]
414
    #[traced_test]
415
    async fn run_sync_jobs_by_priority() -> anyhow::Result<()> {
416
        workers::run_jobs_by_priority(false).await
417
    }
418

419
    #[tokio::test(flavor = "multi_thread")]
420
    #[traced_test]
421
    async fn run_async_jobs_by_priority() -> anyhow::Result<()> {
422
        workers::run_jobs_by_priority(true).await
423
    }
424

425
    #[tokio::test(flavor = "multi_thread")]
426
    #[traced_test]
427
    async fn get_sync_job_result() -> anyhow::Result<()> {
428
        workers::get_job_result(false).await
429
    }
430

431
    #[tokio::test(flavor = "multi_thread")]
432
    #[traced_test]
433
    async fn get_async_job_result() -> anyhow::Result<()> {
434
        workers::get_job_result(true).await
435
    }
436

437
    #[tokio::test(flavor = "multi_thread")]
438
    #[traced_test]
439
    async fn cancel_sync_job() -> anyhow::Result<()> {
440
        workers::cancel_job(false).await
441
    }
442

443
    #[tokio::test(flavor = "multi_thread")]
444
    #[traced_test]
445
    async fn cancel_async_job() -> anyhow::Result<()> {
446
        workers::cancel_job(true).await
447
    }
448

449
    #[tokio::test(flavor = "multi_thread")]
450
    #[traced_test]
451
    async fn cancel_sync_job_in_select() -> anyhow::Result<()> {
452
        workers::cancel_job_in_select(false).await
453
    }
454

455
    #[tokio::test(flavor = "multi_thread")]
456
    #[traced_test]
457
    async fn cancel_async_job_in_select() -> anyhow::Result<()> {
458
        workers::cancel_job_in_select(true).await
459
    }
460

461
    #[test]
462
    #[traced_test]
463
    fn runtime_shutdown_timeout_force_cancels_sync_job() -> anyhow::Result<()> {
464
        workers::runtime_shutdown_timeout_force_cancels_job(false)
465
    }
466

467
    #[test]
468
    #[traced_test]
469
    fn runtime_shutdown_timeout_force_cancels_async_job() -> anyhow::Result<()> {
470
        workers::runtime_shutdown_timeout_force_cancels_job(true)
471
    }
472

473
    #[test]
474
    #[traced_test]
475
    fn runtime_shutdown_cancels_sync_job() {
476
        let _ = workers::runtime_shutdown_cancels_job(false);
477
    }
478

479
    #[test]
480
    #[traced_test]
481
    fn runtime_shutdown_cancels_async_job() -> anyhow::Result<()> {
482
        workers::runtime_shutdown_cancels_job(true)
483
    }
484

485
    #[test]
486
    #[traced_test]
487
    fn spawned_tasks_live_as_long_as_jobqueue() -> anyhow::Result<()> {
488
        workers::spawned_tasks_live_as_long_as_jobqueue(true)
489
    }
490

491
    #[tokio::test(flavor = "multi_thread")]
492
    #[traced_test]
493
    async fn panic_in_async_job_ends_job_cleanly() -> anyhow::Result<()> {
494
        workers::panics::panic_in_job_ends_job_cleanly(true).await
495
    }
496

497
    #[tokio::test(flavor = "multi_thread")]
498
    #[traced_test]
499
    async fn panic_in_blocking_job_ends_job_cleanly() -> anyhow::Result<()> {
500
        workers::panics::panic_in_job_ends_job_cleanly(false).await
501
    }
502

503
    #[tokio::test(flavor = "multi_thread")]
504
    #[traced_test]
505
    async fn stop_queue() -> anyhow::Result<()> {
506
        workers::stop_queue().await
507
    }
508

509
    #[tokio::test(flavor = "multi_thread")]
510
    #[traced_test]
511
    async fn job_result_wrapper() -> anyhow::Result<()> {
512
        workers::job_result_wrapper().await
513
    }
514

515
    mod workers {
516
        use super::*;
517
        use crate::job_queue::errors::JobHandleError;
518
        use crate::job_queue::traits::JobResult;
519
        use crate::job_queue::JobResultWrapper;
520

521
        #[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
522
        pub enum DoubleJobPriority {
523
            Low = 1,
524
            Medium = 2,
525
            High = 3,
526
        }
527

528
        type DoubleJobResult = JobResultWrapper<(u64, u64, Instant)>;
529

530
        // represents a prover job.  implements Job.
531
        #[derive(Debug)]
532
        struct DoubleJob {
533
            data: u64,
534
            duration: std::time::Duration,
535
            is_async: bool,
536
        }
537

538
        #[async_trait::async_trait]
539
        impl Job for DoubleJob {
540
            fn is_async(&self) -> bool {
541
                self.is_async
542
            }
543

544
            fn run(&self, cancel_rx: JobCancelReceiver) -> JobCompletion {
545
                let start = Instant::now();
546
                let sleep_time =
547
                    std::cmp::min(std::time::Duration::from_micros(100), self.duration);
548

549
                let r = loop {
550
                    if start.elapsed() < self.duration {
551
                        match cancel_rx.has_changed() {
552
                            Ok(changed) if changed => break JobCompletion::Cancelled,
553
                            Err(_) => break JobCompletion::Cancelled,
554
                            _ => {}
555
                        }
556

557
                        std::thread::sleep(sleep_time);
558
                    } else {
559
                        break JobCompletion::Finished(
560
                            DoubleJobResult::new((self.data, self.data * 2, Instant::now())).into(),
561
                        );
562
                    }
563
                };
564

565
                tracing::info!("results: {:?}", r);
566
                r
567
            }
568

569
            async fn run_async(&self) -> Box<dyn JobResult> {
570
                tokio::time::sleep(self.duration).await;
571
                let r = DoubleJobResult::new((self.data, self.data * 2, Instant::now()));
572
                tracing::info!("results: {:?}", r);
573
                r.into()
574
            }
575
        }
576

577
        // this test demonstrates/verifies that:
578
        //  1. jobs are run in priority order, highest priority first.
579
        //  2. when multiple jobs have the same priority, they run in FIFO order.
580
        pub(super) async fn run_jobs_by_priority(is_async: bool) -> anyhow::Result<()> {
581
            let start_of_test = Instant::now();
582

583
            // create a job queue
584
            let mut job_queue = JobQueue::start();
585

586
            let mut handles = vec![];
587
            let duration = std::time::Duration::from_millis(20);
588

589
            // create 30 jobs, 10 at each priority level.
590
            for i in (1..10).rev() {
591
                let job1 = DoubleJob {
592
                    data: i,
593
                    duration,
594
                    is_async,
595
                };
596
                let job2 = DoubleJob {
597
                    data: i * 100,
598
                    duration,
599
                    is_async,
600
                };
601
                let job3 = DoubleJob {
602
                    data: i * 1000,
603
                    duration,
604
                    is_async,
605
                };
606

607
                // process job and print results.
608
                handles.push(job_queue.add_job_mut(job1, DoubleJobPriority::Low)?);
609
                handles.push(job_queue.add_job_mut(job2, DoubleJobPriority::Medium)?);
610
                handles.push(job_queue.add_job_mut(job3, DoubleJobPriority::High)?);
611
            }
612

613
            // we can't know exact number of jobs in queue because it is already processing.
614
            assert!(job_queue.num_jobs() > 0);
615
            assert!(job_queue.num_queued_jobs() > 0);
616

617
            // wait for all jobs to complete.
618
            let mut results = futures::future::join_all(handles).await;
619

620
            assert_eq!(0, job_queue.num_jobs());
621
            assert_eq!(0, job_queue.num_queued_jobs());
622

623
            // the results are in the same order as handles passed to join_all.
624
            // we sort them by the timestamp in job result, ascending.
625
            results.sort_by(|a_completion, b_completion| {
626
                let a = <&DoubleJobResult>::try_from(a_completion.as_ref().unwrap())
627
                    .unwrap()
628
                    .2;
629
                let b = <&DoubleJobResult>::try_from(b_completion.as_ref().unwrap())
630
                    .unwrap()
631
                    .2;
632

633
                a.cmp(&b)
634
            });
635

636
            // iterate job results and verify that:
637
            //   timestamp of each is greater than prev.
638
            //   input value of each is greater than prev, except every 9th item which should be < prev
639
            //     because there are nine jobs per level.
640
            let mut prev = DoubleJobResult::new((9999, 0, start_of_test));
641
            for (i, c) in results.into_iter().enumerate() {
642
                let job_result = DoubleJobResult::try_from(c?)?;
643

644
                assert!(job_result.2 > prev.2);
645

646
                // we don't do the assertion for the 2nd job because the job-queue starts
647
                // processing immediately and so a race condition is setup where it is possible
648
                // for either the Low priority or High job to start processing first.
649
                if i != 1 {
650
                    assert!(job_result.0 < prev.0);
651
                }
652

653
                prev = job_result;
654
            }
655

656
            Ok(())
657
        }
658

659
        // this test demonstrates/verifies that a job can return a result back to
660
        // the job initiator.
661
        pub(super) async fn get_job_result(is_async: bool) -> anyhow::Result<()> {
662
            // create a job queue
663
            let mut job_queue = JobQueue::start();
664
            let duration = std::time::Duration::from_millis(20);
665

666
            // create 10 jobs
667
            for i in 0..10 {
668
                let job = DoubleJob {
669
                    data: i,
670
                    duration,
671
                    is_async,
672
                };
673

674
                let completion = job_queue.add_job_mut(job, DoubleJobPriority::Low)?.await?;
675

676
                let job_result = DoubleJobResult::try_from(completion)?;
677

678
                assert_eq!(i, job_result.0);
679
                assert_eq!(i * 2, job_result.1);
680
            }
681

682
            Ok(())
683
        }
684

685
        // tests that stopping job_queue also cancels presently running job
686
        // and queued job(s)
687
        pub(super) async fn stop_queue() -> anyhow::Result<()> {
688
            // create a job queue
689
            let mut job_queue = JobQueue::start();
690
            // start a 1 hour job.
691
            let duration = std::time::Duration::from_secs(3600); // 1 hour job.
692

693
            let job = DoubleJob {
694
                data: 10,
695
                duration,
696
                is_async: true,
697
            };
698
            let job2 = DoubleJob {
699
                data: 10,
700
                duration,
701
                is_async: true,
702
            };
703
            let job_handle = job_queue.add_job_mut(job, DoubleJobPriority::Low)?;
704
            let job2_handle = job_queue.add_job_mut(job2, DoubleJobPriority::Low)?;
705

706
            // so we have some test coverage for debug impls.
707
            println!("job-queue: {:?}", job_queue);
708

709
            tokio::time::sleep(std::time::Duration::from_millis(20)).await;
710

711
            job_queue.stop().await?;
712

713
            assert!(job_handle.is_finished());
714
            assert!(job2_handle.is_finished());
715

716
            assert!(matches!(
717
                job_handle.await,
718
                Err(JobHandleError::JobResultError(_))
719
            ));
720
            assert!(matches!(
721
                job2_handle.await,
722
                Err(JobHandleError::JobResultError(_))
723
            ));
724

725
            Ok(())
726
        }
727

728
        // tests/demonstrates that a long running job can be cancelled early.
729
        pub(super) async fn cancel_job(is_async: bool) -> anyhow::Result<()> {
730
            // create a job queue
731
            let mut job_queue = JobQueue::start();
732
            // start a 1 hour job.
733
            let duration = std::time::Duration::from_secs(3600); // 1 hour job.
734

735
            let job = DoubleJob {
736
                data: 10,
737
                duration,
738
                is_async,
739
            };
740
            let job_handle = job_queue.add_job_mut(job, DoubleJobPriority::Low)?;
741

742
            tokio::time::sleep(std::time::Duration::from_millis(20)).await;
743

744
            job_handle.cancel().unwrap();
745
            let completion = job_handle.await.unwrap();
746
            assert!(matches!(completion, JobCompletion::Cancelled));
747

748
            Ok(())
749
        }
750

751
        // this test demonstrates how to listen for a cancellation message
752
        // and cancel a job when it is received.
753
        //
754
        // The key concepts demonstrated are:
755
        //  1. using tokio::select!{} to execute the job and listen for a
756
        //     cancellation message simultaneously.
757
        //  2. using tokio::pin!() to avoid borrow-checker complaints in the select.
758
        //  3. obtaining the job result.
759
        pub async fn cancel_job_in_select(is_async: bool) -> anyhow::Result<()> {
760
            async fn do_some_work(
761
                is_async: bool,
762
                cancel_work_rx: tokio::sync::oneshot::Receiver<()>,
763
            ) -> Result<DoubleJobResult, JobHandleError> {
764
                // create a job queue.  (this could be done elsewhere)
765
                let mut job_queue = JobQueue::start();
766

767
                // start a 1 hour job.
768
                let duration = std::time::Duration::from_secs(3600); // 1 hour job.
769

770
                let job = DoubleJob {
771
                    data: 10,
772
                    duration,
773
                    is_async,
774
                };
775

776
                // add the job to queue
777
                let job_handle = job_queue.add_job_mut(job, DoubleJobPriority::Low).unwrap();
778

779
                // pin job_handle, so borrow checker knows the address can't change
780
                // and it is safe to use in both select branches
781
                tokio::pin!(job_handle);
782

783
                // execute job and simultaneously listen for cancel msg from elsewhere
784
                let completion = tokio::select! {
785
                    // case: job completion.
786
                    completion = &mut job_handle => completion,
787

788
                    // case: sender cancelled, or sender dropped.
789
                    _ = cancel_work_rx => {
790
                        job_handle.cancel()?;
791
                        job_handle.await
792
                    }
793
                };
794

795
                println!("job_completion: {:#?}", completion);
796

797
                // obtain job result (via downcast)
798
                let result = DoubleJobResult::try_from(completion?)?;
799

800
                println!("job_result: {:#?}", result);
801

802
                Ok(result)
803
            }
804

805
            // create cancellation channel for the worker task
806
            let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
807

808
            // create the worker task, that will create and run the job
809
            let worker_task = async move { do_some_work(is_async, cancel_rx).await };
810

811
            // spawn the worker task
812
            let jh = tokio::task::spawn(worker_task);
813

814
            // send cancel message to the worker task
815
            cancel_tx.send(()).unwrap();
816

817
            // wait for worker task to finish (with an error)
818
            let job_handle_error = jh.await?.unwrap_err();
819

820
            // ensure the error indicates JobCancelled
821
            assert!(matches!(job_handle_error, JobHandleError::JobCancelled));
822

823
            Ok(())
824
        }
825

826
        // note: creates own tokio runtime.  caller must not use [tokio::test]
827
        //
828
        // this test starts a job that runs for 1 hour and then attempts to
829
        // shutdown tokio runtime via shutdown_timeout() with a 1 sec timeout.
830
        //
831
        // any async tasks should be aborted quickly.
832
        // any sync tasks will continue to run to completion.
833
        //
834
        // shutdown_timeout() will wait for tasks to abort for 1 sec and then
835
        // returns.  Any un-aborted tasks/threads become ignored/detached.
836
        // The OS can cleanup such threads when the process exits.
837
        //
838
        // the test checks that the shutdown completes in under 2 secs.
839
        //
840
        // the test demonstrates that shutdown_timeout() can be used to shutdown
841
        // tokio runtime even if sync (spawn_blocking) tasks/threads are still running
842
        // in the blocking threadpool.
843
        //
844
        // when called with is_async=true, it demonstrates that shutdown_timeout() also
845
        // aborts async jobs, as one would expect.
846
        pub(super) fn runtime_shutdown_timeout_force_cancels_job(
847
            is_async: bool,
848
        ) -> anyhow::Result<()> {
849
            let rt = tokio::runtime::Runtime::new()?;
850
            let result = rt.block_on(async {
851
                // create a job queue
852
                let mut job_queue = JobQueue::start();
853
                // start a 1 hour job.
854
                let duration = std::time::Duration::from_secs(3600); // 1 hour job.
855

856
                let job = DoubleJob {
857
                    data: 10,
858
                    duration,
859
                    is_async,
860
                };
861
                let _rx = job_queue.add_job_mut(job, DoubleJobPriority::Low)?;
862

863
                tokio::time::sleep(std::time::Duration::from_millis(20)).await;
864
                println!("finished scope");
865

866
                Ok(())
867
            });
868

869
            let start = std::time::Instant::now();
870

871
            println!("waiting 1 second for job before shutdown runtime");
872
            rt.shutdown_timeout(tokio::time::Duration::from_secs(1));
873

874
            assert!(start.elapsed() < std::time::Duration::from_secs(2));
875

876
            result
877
        }
878

879
        // note: creates own tokio runtime.  caller must not use [tokio::test]
880
        //
881
        // this test starts a job that runs for 5 secs and then attempts to
882
        // shutdown tokio runtime normally by dropping it.
883
        //
884
        // any async tasks should be aborted quickly.
885
        // any sync tasks will continue to run to completion.
886
        //
887
        // the tokio runtime does not complete the drop() until all tasks
888
        // have completed/aborted.
889
        //
890
        // the test checks that the job finishes in less than the 5 secs
891
        // required for full completion.  In other words, that it aborts.
892
        //
893
        // the test is expected to succeed for async jobs but fail for sync jobs.
894
        pub(super) fn runtime_shutdown_cancels_job(is_async: bool) -> anyhow::Result<()> {
895
            let rt = tokio::runtime::Runtime::new()?;
896
            let start = tokio::time::Instant::now();
897

898
            let result = rt.block_on(async {
899
                // create a job queue
900
                let mut job_queue = JobQueue::start();
901

902
                // this job takes at least 5 secs to complete.
903
                let duration = std::time::Duration::from_secs(5);
904

905
                let job = DoubleJob {
906
                    data: 10,
907
                    duration,
908
                    is_async,
909
                };
910

911
                let rx_handle = job_queue.add_job_mut(job, DoubleJobPriority::Low)?;
912
                drop(rx_handle);
913

914
                // sleep 50 ms to let job get started.
915
                tokio::time::sleep(std::time::Duration::from_millis(50)).await;
916

917
                Ok(())
918
            });
919

920
            // drop the tokio runtime. It will attempt to abort tasks.
921
            //   - async tasks can normally be aborted
922
            //   - spawn_blocking (sync) tasks cannot normally be aborted.
923
            drop(rt);
924

925
            // if test is successful, elapsed time should be less than the 5 secs
926
            // it takes for the job to complete.  (should be around 0.5 ms)
927

928
            // however it is expected/normal that sync tasks will not be aborted
929
            // and will run for full 5 secs.  thus this assert will fail for them.
930

931
            assert!(start.elapsed() < std::time::Duration::from_secs(5));
932

933
            result
934
        }
935

936
        // this test attempts to verify that the task spawned by the JobQueue
937
        // continues running until the JobQueue is dropped after the tokio
938
        // runtime is dropped.
939
        //
940
        // If the tasks are cencelled before JobQueue is dropped then a subsequent
941
        // api call that sends a msg will result in a "channel closed" error, which
942
        // is what the test checks for.
943
        //
944
        // note that the test has to do some tricky stuff to setup conditions
945
        // where the "channel closed" error can occur. It's a subtle issue.
946
        //
947
        // see description at:
948
        // https://github.com/tokio-rs/tokio/discussions/6961
949
        pub(super) fn spawned_tasks_live_as_long_as_jobqueue(is_async: bool) -> anyhow::Result<()> {
950
            let rt = tokio::runtime::Runtime::new()?;
951

952
            let result_ok: Arc<Mutex<bool>> = Arc::new(Mutex::new(true));
953

954
            let result_ok_clone = result_ok.clone();
955
            rt.block_on(async {
956
                // create a job queue (not mutable)
957
                let job_queue = Arc::new(JobQueue::start());
958

959
                // spawns background task that adds job
960
                let job_queue_cloned = job_queue.clone();
961
                let jh = tokio::spawn(async move {
962
                    // sleep 200 ms to let runtime finish.
963
                    // ie ensure drop(rt) will be reached and wait for us.
964
                    // note that we use std sleep.  if tokio sleep is used
965
                    // the test will always succeed due to the await point.
966
                    std::thread::sleep(std::time::Duration::from_millis(200));
967

968
                    let job = DoubleJob {
969
                        data: 10,
970
                        duration: std::time::Duration::from_secs(1),
971
                        is_async,
972
                    };
973

974
                    // add job (with JobQueue interior mutability).
975
                    let result = job_queue_cloned.add_job(job, DoubleJobPriority::Low);
976

977
                    // an assert on result.is_ok() would panic, but that panic would be
978
                    // printed and swallowed by tokio runtime, so the test would succeed
979
                    // despite the panic. instead we pass the result in a mutex so it
980
                    // can be asserted where it will be caught by the test runner.
981
                    *result_ok_clone.lock().unwrap() = result.is_ok();
982
                });
983

984
                // sleep 50 ms to let job get started.
985
                tokio::time::sleep(std::time::Duration::from_millis(50)).await;
986

987
                // note; awaiting the joinhandle makes the test succeed.
988

989
                jh.abort();
990
                let _ = jh.await;
991
            });
992

993
            // drop the tokio runtime. It will abort tasks.
994
            drop(rt);
995

996
            assert!(*result_ok.lock().unwrap());
997

998
            Ok(())
999
        }
1000

1001
        pub mod panics {
1002
            use super::*;
1003

1004
            const PANIC_STR: &str = "job panics unexpectedly";
1005

1006
            struct PanicJob {
1007
                is_async: bool,
1008
            }
1009

1010
            #[async_trait::async_trait]
1011
            impl Job for PanicJob {
1012
                fn is_async(&self) -> bool {
1013
                    self.is_async
1014
                }
1015

1016
                fn run(&self, _cancel_rx: JobCancelReceiver) -> JobCompletion {
1017
                    panic!("{}", PANIC_STR);
1018
                }
1019

1020
                async fn run_async_cancellable(
1021
                    &self,
1022
                    _cancel_rx: JobCancelReceiver,
1023
                ) -> JobCompletion {
1024
                    panic!("{}", PANIC_STR);
1025
                }
1026
            }
1027

1028
            /// verifies that a job that panics will be ended properly.
1029
            ///
1030
            /// Properly means that:
1031
            /// 1. an error is returned from JobCompletion::result() indicating job panicked.
1032
            /// 2. caller is able to obtain panic info, which matches job's panic msg.
1033
            /// 3. the job-queue continues accepting new jobs.
1034
            /// 4. the job-queue continues processing jobs.
1035
            ///
1036
            /// async_job == true --> test an async job
1037
            /// async_job == false --> test a blocking job
1038
            pub async fn panic_in_job_ends_job_cleanly(async_job: bool) -> anyhow::Result<()> {
1039
                // create a job queue
1040
                let mut job_queue = JobQueue::start();
1041

1042
                let job = PanicJob {
1043
                    is_async: async_job,
1044
                };
1045
                let job_handle = job_queue.add_job_mut(job, DoubleJobPriority::Low)?;
1046

1047
                let job_result = job_handle.await?.result();
1048

1049
                // verify that job_queue channels are still open
1050
                assert!(!job_queue.tx_job_added.is_closed());
1051
                assert!(!job_queue.tx_stop.is_closed());
1052

1053
                // verify that we get an error with the job's panic msg.
1054
                assert!(matches!(
1055
                    job_result,
1056
                    Err(e) if e.panic_message() == Some((*PANIC_STR).to_string())
1057
                ));
1058

1059
                // ensure we can still run another job afterwards.
1060
                let newjob = DoubleJob {
1061
                    data: 10,
1062
                    duration: std::time::Duration::from_millis(50),
1063
                    is_async: false,
1064
                };
1065

1066
                // ensure we can add another job.
1067
                let new_job_handle = job_queue.add_job_mut(newjob, DoubleJobPriority::Low)?;
1068

1069
                // ensure job processes and returns a result without error.
1070
                assert!(new_job_handle.await?.result().is_ok());
1071

1072
                Ok(())
1073
            }
1074
        }
1075

1076
        // demonstrates/tests usage of JobResultWrapper
1077
        pub(super) async fn job_result_wrapper() -> anyhow::Result<()> {
1078
            type MyJobResult = JobResultWrapper<(u64, u64, Instant)>;
1079

1080
            // represents a custom job.  implements Job.
1081
            #[derive(Debug)]
1082
            struct MyJob {
1083
                data: u64,
1084
                duration: std::time::Duration,
1085
            }
1086

1087
            #[async_trait::async_trait]
1088
            impl Job for MyJob {
1089
                fn is_async(&self) -> bool {
1090
                    true
1091
                }
1092

1093
                async fn run_async(&self) -> Box<dyn JobResult> {
1094
                    tokio::time::sleep(self.duration).await;
1095
                    MyJobResult::new((self.data, self.data * 2, Instant::now())).into()
1096
                }
1097
            }
1098

1099
            let mut job_queue = JobQueue::start();
1100
            let job = MyJob {
1101
                data: 15,
1102
                duration: std::time::Duration::from_secs(5),
1103
            };
1104
            let job_handle = job_queue.add_job_mut(job, 10usize)?;
1105
            let completion = job_handle.await?;
1106
            let job_result = MyJobResult::try_from(completion)?;
1107
            let answer = job_result.into_inner();
1108

1109
            assert_eq!(answer.0 * 2, answer.1);
1110

1111
            Ok(())
1112
        }
1113
    }
1114
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc