• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Neptune-Crypto / neptune-core / 20062212828

09 Dec 2025 11:40AM UTC coverage: 75.081% (-0.01%) from 75.094%
20062212828

push

github

aszepieniec
refactor: Drop fork reconciliation memory limit

There is already a memory limit being enforced implicitly, namely through:

1. Requiring that every block have a size less than or equal to
   `MAX_BLOCK_SIZE_IN_FORK_RECONCILIATION`.

2. Requiring that the number of blocks being reconciled does not exceed
   `self.global_state_lock.cli().sync_mode_threshold`.

This commit removes the memory limit in `try_ensure_path` as well as the state
variable used to track memory use for fork reconciliation.

26162 of 34845 relevant lines covered (75.08%)

662454.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.12
/neptune-core/src/application/job_queue/queue.rs
1
use std::collections::VecDeque;
2
use std::fmt;
3
use std::sync::Arc;
4
use std::sync::Mutex;
5

6
use tokio::sync::mpsc;
7
use tokio::sync::oneshot;
8
use tokio::sync::watch;
9
use tokio::task::JoinHandle;
10

11
use super::channels::JobCancelReceiver;
12
use super::channels::JobCancelSender;
13
use super::channels::JobResultSender;
14
use super::errors::AddJobError;
15
use super::errors::StopQueueError;
16
use super::job_completion::JobCompletion;
17
use super::job_handle::JobHandle;
18
use super::job_id::JobId;
19
use super::traits::Job;
20

21
/// implements a job queue that sends result of each job to a listener.
22
#[derive(Debug)]
23
pub struct JobQueue<P: Ord + Send + Sync + 'static> {
24
    /// holds job-queue which is shared between tokio tasks
25
    shared_queue: Arc<Mutex<SharedQueue<P>>>,
26

27
    /// channel to inform process_jobs task that a job has been added
28
    tx_job_added: mpsc::UnboundedSender<()>,
29

30
    /// channel to inform process_jobs task to stop processing.
31
    tx_stop: tokio::sync::watch::Sender<()>,
32

33
    /// JoinHandle of process_jobs task
34
    process_jobs_task_handle: Option<JoinHandle<()>>,
35
}
36

37
// we implement Drop so we can send stop message to process_jobs task
38
impl<P: Ord + Send + Sync + 'static> Drop for JobQueue<P> {
39
    fn drop(&mut self) {
17✔
40
        tracing::debug!("in JobQueue::drop()");
17✔
41

42
        if !self.tx_stop.is_closed() {
17✔
43
            if let Err(e) = self.tx_stop.send(()) {
16✔
44
                tracing::error!("{}", e);
×
45
            }
16✔
46
        }
1✔
47
    }
17✔
48
}
49

50
impl<P: Ord + Send + Sync + 'static> JobQueue<P> {
51
    /// creates job queue and starts it processing.
52
    ///
53
    /// returns immediately.
54
    pub fn start() -> Self {
183✔
55
        // create a SharedQueue that is shared between tokio tasks.
56
        let shared_queue = SharedQueue {
183✔
57
            jobs: VecDeque::new(),
183✔
58
            current_job: None,
183✔
59
        };
183✔
60
        let shared_queue: Arc<Mutex<SharedQueue<P>>> = Arc::new(Mutex::new(shared_queue));
183✔
61

62
        // create 'job_added' and 'stop' channels for signalling to process_jobs task
63
        let (tx_job_added, rx_job_added) = mpsc::unbounded_channel();
183✔
64
        let (tx_stop, rx_stop) = watch::channel(());
183✔
65

66
        // spawn the process_jobs task
67
        let process_jobs_task_handle =
183✔
68
            tokio::spawn(process_jobs(shared_queue.clone(), rx_stop, rx_job_added));
183✔
69

70
        tracing::debug!("JobQueue: started new queue.");
183✔
71

72
        // construct and return JobQueue
73
        Self {
183✔
74
            tx_job_added,
183✔
75
            tx_stop,
183✔
76
            shared_queue,
183✔
77
            process_jobs_task_handle: Some(process_jobs_task_handle),
183✔
78
        }
183✔
79
    }
183✔
80

81
    /// stop the job-queue, and drop it.
82
    ///
83
    /// this method sends a message to the spawned job-queue task
84
    /// to stop and then waits for it to complete.
85
    ///
86
    /// Comparison with drop():
87
    ///
88
    /// if JobQueue is dropped:
89
    ///  1. the stop message will be sent, but any error is ignored.
90
    ///  2. the spawned task is not awaited.
91
    pub async fn stop(mut self) -> Result<(), StopQueueError> {
1✔
92
        tracing::info!("JobQueue: stopping.");
1✔
93

94
        // send stop message to process_jobs task
95
        self.tx_stop.send(())?;
1✔
96

97
        // wait for process_jobs task to finish
98
        if let Some(jh) = self.process_jobs_task_handle.take() {
1✔
99
            jh.await?;
1✔
100
        }
×
101

102
        Ok(())
1✔
103
    }
1✔
104

105
    /// adds job to job-queue (with interior mutability)
106
    ///
107
    /// returns a [`JobHandle`] that can be used to await or cancel the job.
108
    ///
109
    /// note that this method utilizes interior mutability. Consider calling
110
    /// [`Self::add_job_mut()`] instead to make the mutation explicit.
111
    pub fn add_job(
2,189✔
112
        &self,
2,189✔
113
        job: impl Into<Box<dyn Job>>,
2,189✔
114
        priority: P,
2,189✔
115
    ) -> Result<JobHandle, AddJobError> {
2,189✔
116
        let (result_tx, result_rx) = oneshot::channel();
2,189✔
117
        let (cancel_tx, cancel_rx) = watch::channel::<()>(());
2,189✔
118

119
        // each job gets a random JobId
120
        let job_id = JobId::random();
2,189✔
121

122
        // represent a job in the queue
123
        let m = QueuedJob {
2,189✔
124
            job: job.into(),
2,189✔
125
            job_id,
2,189✔
126
            result_tx,
2,189✔
127
            cancel_tx: cancel_tx.clone(),
2,189✔
128
            cancel_rx,
2,189✔
129
            priority,
2,189✔
130
        };
2,189✔
131

132
        // add job to queue and obtain number of jobs in queue and current-job (if any)
133
        let (num_jobs, job_running) = {
2,189✔
134
            // acquire mutex lock
135
            let mut guard = self.shared_queue.lock().unwrap();
2,189✔
136

137
            // add job to job-queue
138
            guard.jobs.push_back(m);
2,189✔
139

140
            let job_running = match &guard.current_job {
2,189✔
141
                Some(j) => format!("#{} - {}", j.job_num, j.job_id),
47✔
142
                None => "none".to_string(),
2,142✔
143
            };
144
            (guard.jobs.len(), job_running)
2,189✔
145
        }; // mutex lock released on drop
146

147
        // notify process_jobs task that a job was added.
148
        self.tx_job_added.send(())?;
2,189✔
149

150
        // log that job is added to the queue
151
        tracing::debug!(
2,189✔
152
            "JobQueue: job added - {}  {} queued job(s).  job running: {}",
153
            job_id,
154
            num_jobs,
155
            job_running
156
        );
157

158
        // create and return JobHandle
159
        Ok(JobHandle::new(job_id, result_rx, cancel_tx))
2,189✔
160
    }
2,189✔
161

162
    /// Adds a job to the queue (with explicit mutability).
163
    ///
164
    /// returns a [`JobHandle`] that can be used to await or cancel the job.
165
    ///
166
    /// job-results can be obtained by via JobHandle::results().await
167
    /// The job can be cancelled by JobHandle::cancel()
168
    ///
169
    /// Unlike [`Self::add_job()`], this method takes `&mut self`, explicitly
170
    /// signaling to the compiler that the `JobQueue` internal state is being
171
    /// modified.
172
    ///
173
    /// This explicit mutability encourages callers to use correct function
174
    /// signatures and avoids hidden interior mutability, which can be a source
175
    /// of confusion and potentially subtle borrow checker issues when reasoning
176
    /// about a given codebase/architecture.
177
    ///
178
    /// Explicit mutability generally leads to improved compiler optimizations
179
    /// and stronger borrow checker guarantees by enforcing exclusive access.
180
    pub fn add_job_mut(
89✔
181
        &mut self,
89✔
182
        job: impl Into<Box<dyn Job>>,
89✔
183
        priority: P,
89✔
184
    ) -> Result<JobHandle, AddJobError> {
89✔
185
        self.add_job(job, priority)
89✔
186
    }
89✔
187

188
    /// returns total number of jobs, queued plus running.
189
    pub fn num_jobs(&self) -> usize {
4✔
190
        let guard = self.shared_queue.lock().unwrap();
4✔
191
        guard.jobs.len() + guard.current_job.as_ref().map(|_| 1).unwrap_or(0)
4✔
192
    }
4✔
193

194
    /// returns number of queued jobs
195
    pub fn num_queued_jobs(&self) -> usize {
4✔
196
        self.shared_queue.lock().unwrap().jobs.len()
4✔
197
    }
4✔
198
}
199

200
/// implements the process_jobs task, spawned by JobQueue::start().
201
///
202
/// this fn calls tokio::select!{} in a loop.  The select has two branches:
203
/// 1. receive 'job_added' message over mpsc channel (unbounded)
204
/// 2. receive 'stop' message over watch channel
205
///
206
/// job_added:
207
///
208
/// When a 'job_added' msg is received, the highest priority queued job is picked
209
/// to run next.  We await the job, and then send results to the JobHandle.
210
///
211
/// Note that jobs can take a long time to run and thus msgs can pile up in the
212
/// job_added channel, which is unbounded. These messages are of type "()" so
213
/// are as small as possible.
214
///
215
/// stop:
216
///
217
/// When a 'stop' msg is received we send a cancel msg to the current job (if any) and
218
/// wait for it to complete. Then we exit the loop and return.
219
async fn process_jobs<P: Ord + Send + Sync + 'static>(
183✔
220
    shared_queue: Arc<Mutex<SharedQueue<P>>>,
183✔
221
    mut rx_stop: watch::Receiver<()>,
183✔
222
    mut rx_job_added: mpsc::UnboundedReceiver<()>,
183✔
223
) {
183✔
224
    // job number starts at 1 and increments with each job that is processed.
225
    // note that processing order may be different than order in which jobs
226
    // are added due to job priorities.
227
    let mut job_num: usize = 1;
182✔
228

229
    // loop until 'stop' msg is received or job_added channel is closed.
230
    //
231
    // note:  this unbounded channel will grow in size as new job(s) are
232
    // added while an existing job is running.  ie, we read from the
233
    // channel after each job completes.
234
    while rx_job_added.recv().await.is_some() {
2,367✔
235
        // Find the next job to run, and the number of jobs left in queue
236
        tracing::debug!("task process_jobs received JobAdded message.");
2,188✔
237
        let (next_job, num_pending) = {
2,188✔
238
            // acquire mutex lock
239
            let mut guard = shared_queue.lock().unwrap();
2,188✔
240

241
            // pick the highest priority job
242
            guard
2,188✔
243
                .jobs
2,188✔
244
                .make_contiguous()
2,188✔
245
                .sort_by(|a, b| b.priority.cmp(&a.priority));
2,188✔
246
            let job = guard.jobs.pop_front().unwrap();
2,188✔
247

248
            // set highest priority job as the current job
249
            guard.current_job = Some(CurrentJob {
2,188✔
250
                job_num,
2,188✔
251
                job_id: job.job_id,
2,188✔
252
                cancel_tx: job.cancel_tx.clone(),
2,188✔
253
            });
2,188✔
254

255
            (job, guard.jobs.len())
2,188✔
256
        }; // mutex lock is released when guard drops.
257

258
        // log that we are starting a job
259
        tracing::debug!(
2,188✔
260
            "  *** JobQueue: begin job #{} - {} - {} queued job(s) ***",
261
            job_num,
262
            next_job.job_id,
263
            num_pending
264
        );
265

266
        // record time that job starts
267
        let timer = tokio::time::Instant::now();
2,188✔
268

269
        // spawn task that performs the job, either async or blocking.
270
        let job_task_handle = if next_job.job.is_async() {
2,188✔
271
            tokio::spawn(
2,144✔
272
                async move { next_job.job.run_async_cancellable(next_job.cancel_rx).await },
2,143✔
273
            )
274
        } else {
275
            tokio::task::spawn_blocking(move || next_job.job.run(next_job.cancel_rx))
44✔
276
        };
277

278
        // execute job task and simultaneously listen for a 'stop' message.
279
        let job_task_result = tokio::select! {
2,188✔
280
            // execute the job task
281
            job_task_result = job_task_handle => job_task_result,
2,188✔
282

283
            // handle msg over 'stop' channel which indicates we must exit the loop.
284
            _ = rx_stop.changed() => {
2,188✔
285

286
                handle_stop_signal(&shared_queue).await;
3✔
287

288
                // exit loop, processing ends.
289
                break;
2✔
290
            },
291
        };
292

293
        // create JobCompletion from task results
294
        let job_completion = match job_task_result {
2,185✔
295
            Ok(jc) => jc,
2,183✔
296
            Err(e) => {
2✔
297
                if e.is_panic() {
2✔
298
                    JobCompletion::Panicked(e.into_panic())
2✔
299
                } else if e.is_cancelled() {
×
300
                    JobCompletion::Cancelled
×
301
                } else {
302
                    unreachable!()
×
303
                }
304
            }
305
        };
306

307
        // log that job has ended.
308
        tracing::debug!(
2,185✔
309
            "  *** JobQueue: ended job #{} - {} - Completion: {} - {} secs ***",
310
            job_num,
311
            next_job.job_id,
312
            job_completion,
313
            timer.elapsed().as_secs_f32()
×
314
        );
315
        job_num += 1;
2,185✔
316

317
        // obtain mutex lock and set current-job to None
318
        shared_queue.lock().unwrap().current_job = None;
2,185✔
319

320
        // send job results to the JobHandle receiver
321
        if let Err(e) = next_job.result_tx.send(job_completion) {
2,185✔
322
            tracing::warn!("job-handle dropped? {}", e);
3✔
323
        }
2,182✔
324
    }
325
    tracing::debug!("task process_jobs exiting");
14✔
326
}
14✔
327

328
/// handles the 'stop' branch of tokio::select!{} in process_job() task
329
async fn handle_stop_signal<P: Ord + Send + Sync + 'static>(
3✔
330
    shared_queue: &Arc<Mutex<SharedQueue<P>>>,
3✔
331
) {
3✔
332
    tracing::debug!("task process_jobs received Stop message.");
3✔
333

334
    // acquire mutex lock and obtain current_job info, if any.
335
    let maybe_info = shared_queue
3✔
336
        .lock()
3✔
337
        .unwrap()
3✔
338
        .current_job
3✔
339
        .as_ref()
3✔
340
        .map(|cj| (cj.job_id, cj.cancel_tx.clone()));
3✔
341

342
    // if there is a presently executing job we need to cancel it
343
    // and wait for it to complete.
344
    if let Some((job_id, cancel_tx)) = maybe_info {
3✔
345
        match cancel_tx.send(()) {
3✔
346
            Ok(()) => {
347
                // wait for channel to close, indicating job has cancelled (or otherwise completed)
348
                tracing::debug!(
3✔
349
                    "JobQueue: notified current job {} to cancel.  waiting...",
350
                    job_id
351
                );
352
                cancel_tx.closed().await;
3✔
353
                tracing::debug!("JobQueue: current job {} has cancelled.", job_id);
2✔
354
            }
355
            Err(e) => {
×
356
                tracing::warn!(
×
357
                    "could not send cancellation msg to current job {}. {}",
358
                    job_id,
359
                    e
360
                )
361
            }
362
        }
363
    }
×
364
}
2✔
365

366
/// represents a job in the queue.
367
pub(super) struct QueuedJob<P> {
368
    job: Box<dyn Job>,
369
    job_id: JobId,
370
    result_tx: JobResultSender,
371
    cancel_tx: JobCancelSender,
372
    cancel_rx: JobCancelReceiver,
373
    priority: P,
374
}
375

376
impl<P: fmt::Debug> fmt::Debug for QueuedJob<P> {
377
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2✔
378
        f.debug_struct("QueuedJob")
2✔
379
            .field("job", &"Box<dyn Job>")
2✔
380
            .field("job_id", &self.job_id)
2✔
381
            .field("result_tx", &"JobResultSender")
2✔
382
            .field("cancel_tx", &"JobCancelSender")
2✔
383
            .field("cancel_rx", &"JobCancelReceiver")
2✔
384
            .field("priority", &self.priority)
2✔
385
            .finish()
2✔
386
    }
2✔
387
}
388

389
/// represents the currently executing job
390
#[derive(Debug)]
391
pub(super) struct CurrentJob {
392
    job_num: usize,
393
    job_id: JobId,
394
    cancel_tx: JobCancelSender,
395
}
396

397
/// represents data shared between tasks/threads
398
#[derive(Debug)]
399
pub(super) struct SharedQueue<P: Ord> {
400
    jobs: VecDeque<QueuedJob<P>>,
401
    current_job: Option<CurrentJob>,
402
}
403

404
#[cfg(test)]
405
#[cfg_attr(coverage_nightly, coverage(off))]
406
mod tests {
407
    use std::time::Instant;
408

409
    use tracing_test::traced_test;
410

411
    use super::*;
412

413
    #[tokio::test(flavor = "multi_thread")]
414
    #[traced_test]
415
    async fn run_sync_jobs_by_priority() -> anyhow::Result<()> {
416
        workers::run_jobs_by_priority(false).await
417
    }
418

419
    #[tokio::test(flavor = "multi_thread")]
420
    #[traced_test]
421
    async fn run_async_jobs_by_priority() -> anyhow::Result<()> {
422
        workers::run_jobs_by_priority(true).await
423
    }
424

425
    #[tokio::test(flavor = "multi_thread")]
426
    #[traced_test]
427
    async fn get_sync_job_result() -> anyhow::Result<()> {
428
        workers::get_job_result(false).await
429
    }
430

431
    #[tokio::test(flavor = "multi_thread")]
432
    #[traced_test]
433
    async fn get_async_job_result() -> anyhow::Result<()> {
434
        workers::get_job_result(true).await
435
    }
436

437
    #[tokio::test(flavor = "multi_thread")]
438
    #[traced_test]
439
    async fn cancel_sync_job() -> anyhow::Result<()> {
440
        workers::cancel_job(false).await
441
    }
442

443
    #[tokio::test(flavor = "multi_thread")]
444
    #[traced_test]
445
    async fn cancel_async_job() -> anyhow::Result<()> {
446
        workers::cancel_job(true).await
447
    }
448

449
    #[tokio::test(flavor = "multi_thread")]
450
    #[traced_test]
451
    async fn cancel_sync_job_in_select() -> anyhow::Result<()> {
452
        workers::cancel_job_in_select(false).await
453
    }
454

455
    #[tokio::test(flavor = "multi_thread")]
456
    #[traced_test]
457
    async fn cancel_async_job_in_select() -> anyhow::Result<()> {
458
        workers::cancel_job_in_select(true).await
459
    }
460

461
    #[test]
462
    #[traced_test]
463
    fn runtime_shutdown_timeout_force_cancels_sync_job() -> anyhow::Result<()> {
464
        workers::runtime_shutdown_timeout_force_cancels_job(false)
465
    }
466

467
    #[test]
468
    #[traced_test]
469
    fn runtime_shutdown_timeout_force_cancels_async_job() -> anyhow::Result<()> {
470
        workers::runtime_shutdown_timeout_force_cancels_job(true)
471
    }
472

473
    #[test]
474
    #[traced_test]
475
    fn runtime_shutdown_cancels_sync_job() {
476
        let _ = workers::runtime_shutdown_cancels_job(false);
477
    }
478

479
    #[test]
480
    #[traced_test]
481
    fn runtime_shutdown_cancels_async_job() -> anyhow::Result<()> {
482
        workers::runtime_shutdown_cancels_job(true)
483
    }
484

485
    #[test]
486
    #[traced_test]
487
    fn spawned_tasks_live_as_long_as_jobqueue() -> anyhow::Result<()> {
488
        workers::spawned_tasks_live_as_long_as_jobqueue(true)
489
    }
490

491
    #[tokio::test(flavor = "multi_thread")]
492
    #[traced_test]
493
    async fn panic_in_async_job_ends_job_cleanly() -> anyhow::Result<()> {
494
        workers::panics::panic_in_job_ends_job_cleanly(true).await
495
    }
496

497
    #[tokio::test(flavor = "multi_thread")]
498
    #[traced_test]
499
    async fn panic_in_blocking_job_ends_job_cleanly() -> anyhow::Result<()> {
500
        workers::panics::panic_in_job_ends_job_cleanly(false).await
501
    }
502

503
    #[tokio::test(flavor = "multi_thread")]
504
    #[traced_test]
505
    async fn stop_queue() -> anyhow::Result<()> {
506
        workers::stop_queue().await
507
    }
508

509
    #[tokio::test(flavor = "multi_thread")]
510
    #[traced_test]
511
    async fn job_result_wrapper() -> anyhow::Result<()> {
512
        workers::job_result_wrapper().await
513
    }
514

515
    mod workers {
516
        use super::*;
517
        use crate::application::job_queue::errors::JobHandleError;
518
        use crate::application::job_queue::traits::JobResult;
519
        use crate::application::job_queue::JobResultWrapper;
520

521
        #[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
522
        pub enum DoubleJobPriority {
523
            Low = 1,
524
            Medium = 2,
525
            High = 3,
526
        }
527

528
        type DoubleJobResult = JobResultWrapper<(u64, u64, Instant)>;
529

530
        // represents a prover job.  implements Job.
531
        #[derive(Debug)]
532
        struct DoubleJob {
533
            data: u64,
534
            duration: std::time::Duration,
535
            is_async: bool,
536
        }
537

538
        #[async_trait::async_trait]
539
        impl Job for DoubleJob {
540
            fn is_async(&self) -> bool {
541
                self.is_async
542
            }
543

544
            fn run(&self, cancel_rx: JobCancelReceiver) -> JobCompletion {
545
                let start = Instant::now();
546
                let sleep_time =
547
                    std::cmp::min(std::time::Duration::from_micros(100), self.duration);
548

549
                let r = loop {
550
                    if start.elapsed() < self.duration {
551
                        match cancel_rx.has_changed() {
552
                            Ok(changed) if changed => break JobCompletion::Cancelled,
553
                            Err(_) => break JobCompletion::Cancelled,
554
                            _ => {}
555
                        }
556

557
                        std::thread::sleep(sleep_time);
558
                    } else {
559
                        break JobCompletion::Finished(
560
                            DoubleJobResult::new((self.data, self.data * 2, Instant::now())).into(),
561
                        );
562
                    }
563
                };
564

565
                tracing::info!("results: {:?}", r);
566
                r
567
            }
568

569
            async fn run_async(&self) -> Box<dyn JobResult> {
570
                tokio::time::sleep(self.duration).await;
571
                let r = DoubleJobResult::new((self.data, self.data * 2, Instant::now()));
572
                tracing::info!("results: {:?}", r);
573
                r.into()
574
            }
575
        }
576

577
        // this test demonstrates/verifies that:
578
        //  1. jobs are run in priority order, highest priority first.
579
        //  2. when multiple jobs have the same priority, they run in FIFO order.
580
        pub(super) async fn run_jobs_by_priority(is_async: bool) -> anyhow::Result<()> {
581
            let start_of_test = Instant::now();
582

583
            // create a job queue
584
            let mut job_queue = JobQueue::start();
585

586
            let mut handles = vec![];
587
            let duration = std::time::Duration::from_millis(20);
588

589
            // create 30 jobs, 10 at each priority level.
590
            for i in (1..10).rev() {
591
                let job1 = DoubleJob {
592
                    data: i,
593
                    duration,
594
                    is_async,
595
                };
596
                let job2 = DoubleJob {
597
                    data: i * 100,
598
                    duration,
599
                    is_async,
600
                };
601
                let job3 = DoubleJob {
602
                    data: i * 1000,
603
                    duration,
604
                    is_async,
605
                };
606

607
                // process job and print results.
608
                handles.push(job_queue.add_job_mut(job1, DoubleJobPriority::Low)?);
609
                handles.push(job_queue.add_job_mut(job2, DoubleJobPriority::Medium)?);
610
                handles.push(job_queue.add_job_mut(job3, DoubleJobPriority::High)?);
611
            }
612

613
            // we can't know exact number of jobs in queue because it is already processing.
614
            assert!(job_queue.num_jobs() > 0);
615
            assert!(job_queue.num_queued_jobs() > 0);
616

617
            // wait for all jobs to complete.
618
            let mut results = futures::future::join_all(handles).await;
619

620
            assert_eq!(0, job_queue.num_jobs());
621
            assert_eq!(0, job_queue.num_queued_jobs());
622

623
            // the results are in the same order as handles passed to join_all.
624
            // we sort them by the timestamp in job result, ascending.
625
            results.sort_by(|a_completion, b_completion| {
626
                let a = <&DoubleJobResult>::try_from(a_completion.as_ref().unwrap())
627
                    .unwrap()
628
                    .2;
629
                let b = <&DoubleJobResult>::try_from(b_completion.as_ref().unwrap())
630
                    .unwrap()
631
                    .2;
632

633
                a.cmp(&b)
634
            });
635

636
            // iterate job results and verify that:
637
            //   timestamp of each is greater than prev.
638
            //   input value of each is greater than prev, except every 9th item which should be < prev
639
            //     because there are nine jobs per level.
640
            let mut prev = DoubleJobResult::new((9999, 0, start_of_test));
641
            for (i, c) in results.into_iter().enumerate() {
642
                let job_result = DoubleJobResult::try_from(c?)?;
643

644
                assert!(job_result.2 > prev.2);
645

646
                // we don't do the assertion for the 2nd job because the job-queue starts
647
                // processing immediately and so a race condition is setup where it is possible
648
                // for either the Low priority or High job to start processing first.
649
                if i != 1 {
650
                    assert!(job_result.0 < prev.0);
651
                }
652

653
                prev = job_result;
654
            }
655

656
            Ok(())
657
        }
658

659
        // this test demonstrates/verifies that a job can return a result back to
660
        // the job initiator.
661
        pub(super) async fn get_job_result(is_async: bool) -> anyhow::Result<()> {
662
            // create a job queue
663
            let mut job_queue = JobQueue::start();
664
            let duration = std::time::Duration::from_millis(20);
665

666
            // create 10 jobs
667
            for i in 0..10 {
668
                let job = DoubleJob {
669
                    data: i,
670
                    duration,
671
                    is_async,
672
                };
673

674
                let completion = job_queue.add_job_mut(job, DoubleJobPriority::Low)?.await?;
675

676
                let job_result = DoubleJobResult::try_from(completion)?;
677

678
                assert_eq!(i, job_result.0);
679
                assert_eq!(i * 2, job_result.1);
680
            }
681

682
            Ok(())
683
        }
684

685
        // tests that stopping job_queue also cancels presently running job
686
        // and queued job(s)
687
        pub(super) async fn stop_queue() -> anyhow::Result<()> {
688
            // create a job queue
689
            let mut job_queue = JobQueue::start();
690
            // start a 1 hour job.
691
            let duration = std::time::Duration::from_secs(3600); // 1 hour job.
692

693
            let job = DoubleJob {
694
                data: 10,
695
                duration,
696
                is_async: true,
697
            };
698
            let job2 = DoubleJob {
699
                data: 10,
700
                duration,
701
                is_async: true,
702
            };
703
            let job_handle = job_queue.add_job_mut(job, DoubleJobPriority::Low)?;
704
            let job2_handle = job_queue.add_job_mut(job2, DoubleJobPriority::Low)?;
705

706
            // so we have some test coverage for debug impls.
707
            println!("job-queue: {:?}", job_queue);
708

709
            tokio::time::sleep(std::time::Duration::from_millis(20)).await;
710

711
            job_queue.stop().await?;
712

713
            assert!(job_handle.is_finished());
714
            assert!(job2_handle.is_finished());
715

716
            assert!(matches!(
717
                job_handle.await,
718
                Err(JobHandleError::JobResultError(_))
719
            ));
720
            assert!(matches!(
721
                job2_handle.await,
722
                Err(JobHandleError::JobResultError(_))
723
            ));
724

725
            Ok(())
726
        }
727

728
        // tests/demonstrates that a long running job can be cancelled early.
729
        pub(super) async fn cancel_job(is_async: bool) -> anyhow::Result<()> {
730
            // create a job queue
731
            let mut job_queue = JobQueue::start();
732
            // start a 1 hour job.
733
            let duration = std::time::Duration::from_secs(3600); // 1 hour job.
734

735
            let job = DoubleJob {
736
                data: 10,
737
                duration,
738
                is_async,
739
            };
740
            let job_handle = job_queue.add_job_mut(job, DoubleJobPriority::Low)?;
741

742
            tokio::time::sleep(std::time::Duration::from_millis(20)).await;
743

744
            job_handle.cancel().unwrap();
745
            let completion = job_handle.await.unwrap();
746
            assert!(matches!(completion, JobCompletion::Cancelled));
747

748
            Ok(())
749
        }
750

751
        // this test demonstrates how to listen for a cancellation message
752
        // and cancel a job when it is received.
753
        //
754
        // The key concepts demonstrated are:
755
        //  1. using tokio::select!{} to execute the job and listen for a
756
        //     cancellation message simultaneously.
757
        //  2. using tokio::pin!() to avoid borrow-checker complaints in the select.
758
        //  3. obtaining the job result.
759
        pub async fn cancel_job_in_select(is_async: bool) -> anyhow::Result<()> {
760
            async fn do_some_work(
761
                is_async: bool,
762
                cancel_work_rx: tokio::sync::oneshot::Receiver<()>,
763
            ) -> Result<DoubleJobResult, JobHandleError> {
764
                // create a job queue.  (this could be done elsewhere)
765
                let mut job_queue = JobQueue::start();
766

767
                // start a 1 hour job.
768
                let duration = std::time::Duration::from_secs(3600); // 1 hour job.
769

770
                let job = DoubleJob {
771
                    data: 10,
772
                    duration,
773
                    is_async,
774
                };
775

776
                // add the job to queue
777
                let job_handle = job_queue.add_job_mut(job, DoubleJobPriority::Low).unwrap();
778

779
                // pin job_handle, so borrow checker knows the address can't change
780
                // and it is safe to use in both select branches
781
                tokio::pin!(job_handle);
782

783
                // execute job and simultaneously listen for cancel msg from elsewhere
784
                let completion = tokio::select! {
785
                    // case: job completion.
786
                    completion = &mut job_handle => completion,
787

788
                    // case: sender cancelled, or sender dropped.
789
                    _ = cancel_work_rx => {
790
                        job_handle.cancel()?;
791
                        job_handle.await
792
                    }
793
                };
794

795
                println!("job_completion: {:#?}", completion);
796

797
                // obtain job result (via downcast)
798
                let result = DoubleJobResult::try_from(completion?)?;
799

800
                println!("job_result: {:#?}", result);
801

802
                Ok(result)
803
            }
804

805
            // create cancellation channel for the worker task
806
            let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
807

808
            // create the worker task, that will create and run the job
809
            let worker_task = async move { do_some_work(is_async, cancel_rx).await };
810

811
            // spawn the worker task
812
            let jh = tokio::task::spawn(worker_task);
813

814
            // send cancel message to the worker task
815
            cancel_tx.send(()).unwrap();
816

817
            // wait for worker task to finish (with an error)
818
            let job_handle_error = jh.await?.unwrap_err();
819

820
            // ensure the error indicates JobCancelled
821
            assert!(matches!(job_handle_error, JobHandleError::JobCancelled));
822

823
            Ok(())
824
        }
825

826
        // note: creates own tokio runtime.  caller must not use [tokio::test]
827
        //
828
        // this test starts a job that runs for 1 hour and then attempts to
829
        // shutdown tokio runtime via shutdown_timeout() with a 1 sec timeout.
830
        //
831
        // any async tasks should be aborted quickly.
832
        // any sync tasks will continue to run to completion.
833
        //
834
        // shutdown_timeout() will wait for tasks to abort for 1 sec and then
835
        // returns.  Any un-aborted tasks/threads become ignored/detached.
836
        // The OS can cleanup such threads when the process exits.
837
        //
838
        // the test checks that the shutdown completes in under 2 secs.
839
        //
840
        // the test demonstrates that shutdown_timeout() can be used to shutdown
841
        // tokio runtime even if sync (spawn_blocking) tasks/threads are still running
842
        // in the blocking threadpool.
843
        //
844
        // when called with is_async=true, it demonstrates that shutdown_timeout() also
845
        // aborts async jobs, as one would expect.
846
        pub(super) fn runtime_shutdown_timeout_force_cancels_job(
847
            is_async: bool,
848
        ) -> anyhow::Result<()> {
849
            let rt = tokio::runtime::Runtime::new()?;
850
            let result = rt.block_on(async {
851
                // create a job queue
852
                let mut job_queue = JobQueue::start();
853
                // start a 1 hour job.
854
                let duration = std::time::Duration::from_secs(3600); // 1 hour job.
855

856
                let job = DoubleJob {
857
                    data: 10,
858
                    duration,
859
                    is_async,
860
                };
861
                let _rx = job_queue.add_job_mut(job, DoubleJobPriority::Low)?;
862

863
                tokio::time::sleep(std::time::Duration::from_millis(20)).await;
864
                println!("finished scope");
865

866
                Ok(())
867
            });
868

869
            let start = std::time::Instant::now();
870

871
            println!("waiting 1 second for job before shutdown runtime");
872
            rt.shutdown_timeout(tokio::time::Duration::from_secs(1));
873

874
            assert!(start.elapsed() < std::time::Duration::from_secs(2));
875

876
            result
877
        }
878

879
        // note: creates own tokio runtime.  caller must not use [tokio::test]
880
        //
881
        // this test starts a job that runs for 5 secs and then attempts to
882
        // shutdown tokio runtime normally by dropping it.
883
        //
884
        // any async tasks should be aborted quickly.
885
        // any sync tasks will continue to run to completion.
886
        //
887
        // the tokio runtime does not complete the drop() until all tasks
888
        // have completed/aborted.
889
        //
890
        // the test checks that the job finishes in less than the 5 secs
891
        // required for full completion.  In other words, that it aborts.
892
        //
893
        // the test is expected to succeed for async jobs but fail for sync jobs.
894
        pub(super) fn runtime_shutdown_cancels_job(is_async: bool) -> anyhow::Result<()> {
895
            let rt = tokio::runtime::Runtime::new()?;
896
            let start = tokio::time::Instant::now();
897

898
            let result = rt.block_on(async {
899
                // create a job queue
900
                let mut job_queue = JobQueue::start();
901

902
                // this job takes at least 5 secs to complete.
903
                let duration = std::time::Duration::from_secs(5);
904

905
                let job = DoubleJob {
906
                    data: 10,
907
                    duration,
908
                    is_async,
909
                };
910

911
                let rx_handle = job_queue.add_job_mut(job, DoubleJobPriority::Low)?;
912
                drop(rx_handle);
913

914
                // sleep 50 ms to let job get started.
915
                tokio::time::sleep(std::time::Duration::from_millis(50)).await;
916

917
                Ok(())
918
            });
919

920
            // drop the tokio runtime. It will attempt to abort tasks.
921
            //   - async tasks can normally be aborted
922
            //   - spawn_blocking (sync) tasks cannot normally be aborted.
923
            drop(rt);
924

925
            // if test is successful, elapsed time should be less than the 5 secs
926
            // it takes for the job to complete.  (should be around 0.5 ms)
927

928
            // however it is expected/normal that sync tasks will not be aborted
929
            // and will run for full 5 secs.  thus this assert will fail for them.
930

931
            assert!(start.elapsed() < std::time::Duration::from_secs(5));
932

933
            result
934
        }
935

936
        // this test attempts to verify that the task spawned by the JobQueue
937
        // continues running until the JobQueue is dropped after the tokio
938
        // runtime is dropped.
939
        //
940
        // If the tasks are cencelled before JobQueue is dropped then a subsequent
941
        // api call that sends a msg will result in a "channel closed" error, which
942
        // is what the test checks for.
943
        //
944
        // note that the test has to do some tricky stuff to setup conditions
945
        // where the "channel closed" error can occur. It's a subtle issue.
946
        //
947
        // see description at:
948
        // https://github.com/tokio-rs/tokio/discussions/6961
949
        pub(super) fn spawned_tasks_live_as_long_as_jobqueue(is_async: bool) -> anyhow::Result<()> {
950
            let rt = tokio::runtime::Runtime::new()?;
951

952
            let result_ok: Arc<Mutex<bool>> = Arc::new(Mutex::new(true));
953

954
            let result_ok_clone = result_ok.clone();
955
            rt.block_on(async {
956
                // create a job queue (not mutable)
957
                let job_queue = Arc::new(JobQueue::start());
958

959
                // spawns background task that adds job
960
                let job_queue_cloned = job_queue.clone();
961
                let jh = tokio::spawn(async move {
962
                    // sleep 200 ms to let runtime finish.
963
                    // ie ensure drop(rt) will be reached and wait for us.
964
                    // note that we use std sleep.  if tokio sleep is used
965
                    // the test will always succeed due to the await point.
966
                    std::thread::sleep(std::time::Duration::from_millis(200));
967

968
                    let job = DoubleJob {
969
                        data: 10,
970
                        duration: std::time::Duration::from_secs(1),
971
                        is_async,
972
                    };
973

974
                    // add job (with JobQueue interior mutability).
975
                    let result = job_queue_cloned.add_job(job, DoubleJobPriority::Low);
976

977
                    // an assert on result.is_ok() would panic, but that panic would be
978
                    // printed and swallowed by tokio runtime, so the test would succeed
979
                    // despite the panic. instead we pass the result in a mutex so it
980
                    // can be asserted where it will be caught by the test runner.
981
                    *result_ok_clone.lock().unwrap() = result.is_ok();
982
                });
983

984
                // sleep 50 ms to let job get started.
985
                tokio::time::sleep(std::time::Duration::from_millis(50)).await;
986

987
                // note; awaiting the joinhandle makes the test succeed.
988

989
                jh.abort();
990
                let _ = jh.await;
991
            });
992

993
            // drop the tokio runtime. It will abort tasks.
994
            drop(rt);
995

996
            assert!(*result_ok.lock().unwrap());
997

998
            Ok(())
999
        }
1000

1001
        pub mod panics {
1002
            use super::*;
1003

1004
            const PANIC_STR: &str = "job panics unexpectedly";
1005

1006
            struct PanicJob {
1007
                is_async: bool,
1008
            }
1009

1010
            #[async_trait::async_trait]
1011
            impl Job for PanicJob {
1012
                fn is_async(&self) -> bool {
1013
                    self.is_async
1014
                }
1015

1016
                fn run(&self, _cancel_rx: JobCancelReceiver) -> JobCompletion {
1017
                    panic!("{}", PANIC_STR);
1018
                }
1019

1020
                async fn run_async_cancellable(
1021
                    &self,
1022
                    _cancel_rx: JobCancelReceiver,
1023
                ) -> JobCompletion {
1024
                    panic!("{}", PANIC_STR);
1025
                }
1026
            }
1027

1028
            /// verifies that a job that panics will be ended properly.
1029
            ///
1030
            /// Properly means that:
1031
            /// 1. an error is returned from JobCompletion::result() indicating job panicked.
1032
            /// 2. caller is able to obtain panic info, which matches job's panic msg.
1033
            /// 3. the job-queue continues accepting new jobs.
1034
            /// 4. the job-queue continues processing jobs.
1035
            ///
1036
            /// async_job == true --> test an async job
1037
            /// async_job == false --> test a blocking job
1038
            pub async fn panic_in_job_ends_job_cleanly(async_job: bool) -> anyhow::Result<()> {
1039
                // create a job queue
1040
                let mut job_queue = JobQueue::start();
1041

1042
                let job = PanicJob {
1043
                    is_async: async_job,
1044
                };
1045
                let job_handle = job_queue.add_job_mut(job, DoubleJobPriority::Low)?;
1046

1047
                let job_result = job_handle.await?.result();
1048

1049
                // verify that job_queue channels are still open
1050
                assert!(!job_queue.tx_job_added.is_closed());
1051
                assert!(!job_queue.tx_stop.is_closed());
1052

1053
                // verify that we get an error with the job's panic msg.
1054
                assert!(matches!(
1055
                    job_result,
1056
                    Err(e) if e.panic_message() == Some((*PANIC_STR).to_string())
1057
                ));
1058

1059
                // ensure we can still run another job afterwards.
1060
                let newjob = DoubleJob {
1061
                    data: 10,
1062
                    duration: std::time::Duration::from_millis(50),
1063
                    is_async: false,
1064
                };
1065

1066
                // ensure we can add another job.
1067
                let new_job_handle = job_queue.add_job_mut(newjob, DoubleJobPriority::Low)?;
1068

1069
                // ensure job processes and returns a result without error.
1070
                assert!(new_job_handle.await?.result().is_ok());
1071

1072
                Ok(())
1073
            }
1074
        }
1075

1076
        // demonstrates/tests usage of JobResultWrapper
1077
        pub(super) async fn job_result_wrapper() -> anyhow::Result<()> {
1078
            type MyJobResult = JobResultWrapper<(u64, u64, Instant)>;
1079

1080
            // represents a custom job.  implements Job.
1081
            #[derive(Debug)]
1082
            struct MyJob {
1083
                data: u64,
1084
                duration: std::time::Duration,
1085
            }
1086

1087
            #[async_trait::async_trait]
1088
            impl Job for MyJob {
1089
                fn is_async(&self) -> bool {
1090
                    true
1091
                }
1092

1093
                async fn run_async(&self) -> Box<dyn JobResult> {
1094
                    tokio::time::sleep(self.duration).await;
1095
                    MyJobResult::new((self.data, self.data * 2, Instant::now())).into()
1096
                }
1097
            }
1098

1099
            let mut job_queue = JobQueue::start();
1100
            let job = MyJob {
1101
                data: 15,
1102
                duration: std::time::Duration::from_secs(5),
1103
            };
1104
            let job_handle = job_queue.add_job_mut(job, 10usize)?;
1105
            let completion = job_handle.await?;
1106
            let job_result = MyJobResult::try_from(completion)?;
1107
            let answer = job_result.into_inner();
1108

1109
            assert_eq!(answer.0 * 2, answer.1);
1110

1111
            Ok(())
1112
        }
1113
    }
1114
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc