• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Neptune-Crypto / neptune-core / 14743851146

30 Apr 2025 12:15AM UTC coverage: 74.966% (-4.7%) from 79.634%
14743851146

Pull #573

github

web-flow
Merge 66b171787 into 0112461e0
Pull Request #573: docs: document release candidate testnet policy

26065 of 34769 relevant lines covered (74.97%)

295956.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.86
/src/job_queue/queue.rs
1
use std::collections::VecDeque;
2
use std::fmt;
3
use std::sync::Arc;
4
use std::sync::Mutex;
5

6
use tokio::sync::oneshot;
7
use tokio::sync::watch;
8
use tokio::task::JoinHandle;
9

10
use super::errors::AddJobError;
11
use super::errors::JobHandleError;
12
use super::errors::StopQueueError;
13
use super::traits::Job;
14
use super::traits::JobCancelReceiver;
15
use super::traits::JobCancelSender;
16
use super::traits::JobCompletion;
17
use super::traits::JobResult;
18
use super::traits::JobResultReceiver;
19
use super::traits::JobResultSender;
20

21
/// a randomly generated Job identifier
22
#[derive(Debug, Clone, Copy)]
23
pub struct JobId([u8; 12]);
24

25
impl std::fmt::Display for JobId {
26
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2,728✔
27
        for byte in &self.0 {
35,464✔
28
            write!(f, "{:02x}", byte)?;
32,736✔
29
        }
30
        Ok(())
2,728✔
31
    }
2,728✔
32
}
33

34
impl JobId {
35
    fn random() -> Self {
1,382✔
36
        Self(rand::random())
1,382✔
37
    }
1,382✔
38
}
39

40
/// A job-handle enables cancelling a job and awaiting results
41
#[derive(Debug)]
42
pub struct JobHandle {
43
    job_id: JobId,
44
    result_rx: JobResultReceiver,
45
    cancel_tx: JobCancelSender,
46
}
47
impl JobHandle {
48
    /// wait for job to complete
49
    ///
50
    /// a completed job may either be finished, cancelled, or panicked.
51
    pub async fn complete(self) -> Result<JobCompletion, JobHandleError> {
1,323✔
52
        Ok(self.result_rx.await?)
1,323✔
53
    }
1,323✔
54

55
    /// wait for job result, or err if cancelled or a panic occurred within job.
56
    pub async fn result(self) -> Result<Box<dyn JobResult>, JobHandleError> {
1,321✔
57
        match self.complete().await? {
1,321✔
58
            JobCompletion::Finished(r) => Ok(r),
1,319✔
59
            JobCompletion::Cancelled => Err(JobHandleError::JobCancelled),
×
60
            JobCompletion::Panicked(e) => Err(JobHandleError::JobPanicked(e)),
2✔
61
        }
62
    }
1,321✔
63

64
    /// cancel job and return immediately.
65
    pub fn cancel(&self) -> Result<(), JobHandleError> {
×
66
        Ok(self.cancel_tx.send(())?)
×
67
    }
×
68

69
    /// cancel job and wait for it to complete.
70
    pub async fn cancel_and_await(self) -> Result<JobCompletion, JobHandleError> {
2✔
71
        self.cancel_tx.send(())?;
2✔
72
        self.complete().await
2✔
73
    }
2✔
74

75
    /// channel receiver for job results
76
    pub fn result_rx(self) -> JobResultReceiver {
54✔
77
        self.result_rx
54✔
78
    }
54✔
79

80
    /// channel sender for cancelling job.
81
    pub fn cancel_tx(&self) -> &JobCancelSender {
1,297✔
82
        &self.cancel_tx
1,297✔
83
    }
1,297✔
84

85
    pub fn job_id(&self) -> JobId {
×
86
        self.job_id
×
87
    }
×
88
}
89

90
/// represents a job in the queue.
91
struct QueuedJob<P> {
92
    job: Box<dyn Job>,
93
    job_id: JobId,
94
    result_tx: JobResultSender,
95
    cancel_tx: JobCancelSender,
96
    cancel_rx: JobCancelReceiver,
97
    priority: P,
98
}
99

100
impl<P: fmt::Debug> fmt::Debug for QueuedJob<P> {
101
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
×
102
        f.debug_struct("QueuedJob")
×
103
            .field("job", &"Box<dyn Job>")
×
104
            .field("job_id", &self.job_id)
×
105
            .field("result_tx", &"JobResultSender")
×
106
            .field("cancel_tx", &"JobCancelSender")
×
107
            .field("cancel_rx", &"JobCancelReceiver")
×
108
            .field("priority", &self.priority)
×
109
            .finish()
×
110
    }
×
111
}
112

113
/// represents the currently executing job
114
#[derive(Debug)]
115
struct CurrentJob {
116
    job_num: usize,
117
    job_id: JobId,
118
    cancel_tx: JobCancelSender,
119
}
120

121
/// represents data shared between tasks/threads
122
#[derive(Debug)]
123
struct Shared<P: Ord> {
124
    jobs: VecDeque<QueuedJob<P>>,
125
    current_job: Option<CurrentJob>,
126
}
127

128
/// implements a job queue that sends result of each job to a listener.
129
#[derive(Debug)]
130
pub struct JobQueue<P: Ord + Send + Sync + 'static> {
131
    shared: Arc<Mutex<Shared<P>>>,
132

133
    tx_job_added: tokio::sync::mpsc::UnboundedSender<()>,
134
    tx_stop: tokio::sync::watch::Sender<()>,
135

136
    process_jobs_task_handle: Option<JoinHandle<()>>, // Store the job processing task handle
137
}
138

139
impl<P: Ord + Send + Sync + 'static> Drop for JobQueue<P> {
140
    fn drop(&mut self) {
13✔
141
        tracing::debug!("in JobQueue::drop()");
13✔
142

143
        if !self.tx_stop.is_closed() {
13✔
144
            if let Err(e) = self.tx_stop.send(()) {
13✔
145
                tracing::error!("{}", e);
×
146
            }
13✔
147
        }
×
148
    }
13✔
149
}
150

151
impl<P: Ord + Send + Sync + 'static> JobQueue<P> {
152
    /// creates job queue and starts it processing.  returns immediately.
153
    pub fn start() -> Self {
145✔
154
        let shared = Shared {
145✔
155
            jobs: VecDeque::new(),
145✔
156
            current_job: None,
145✔
157
        };
145✔
158
        let shared: Arc<Mutex<Shared<P>>> = Arc::new(Mutex::new(shared));
145✔
159

145✔
160
        let (tx_job_added, mut rx_job_added) = tokio::sync::mpsc::unbounded_channel();
145✔
161
        let (tx_stop, mut rx_stop) = tokio::sync::watch::channel(());
145✔
162

145✔
163
        // spawns background task that processes job queue and runs jobs.
145✔
164
        let shared2 = shared.clone();
145✔
165
        let process_jobs_task_handle = tokio::spawn(async move {
145✔
166
            let mut job_num: usize = 1;
145✔
167

168
            loop {
169
                tokio::select!(
1,522✔
170
                _ = rx_stop.changed() => {
1,522✔
171
                    tracing::debug!("task process_jobs received Stop message.");
×
172

173
                    // if there is a presently executing job we need to cancel it
174
                    // and wait for it to complete.
175
                    let maybe_info = shared2.lock().unwrap().current_job.as_ref().map(|cj| (cj.job_id, cj.cancel_tx.clone()) );
×
176
                    if let Some((job_id, cancel_tx)) = maybe_info {
×
177
                        match cancel_tx.send(()) {
×
178
                            Ok(()) => {
179
                                // wait for channel to close, indicating job has cancelled (or otherwise completed)
180
                                tracing::debug!("JobQueue: notified current job {} to cancel.  waiting...", job_id);
×
181
                                cancel_tx.closed().await;
×
182
                                tracing::debug!("JobQueue: current job {} has cancelled.", job_id);
×
183
                            }
184
                            Err(e) => {
×
185
                                tracing::warn!("could not send cancellation msg to current job {}. {}", job_id, e)
×
186
                            }
187
                        }
188
                    }
×
189

190
                    break;
×
191
                }
192
                _ = rx_job_added.recv() => {
1,522✔
193
                    tracing::debug!("task process_jobs received JobAdded message.");
1,385✔
194
                    let (next_job, num_pending) = {
1,385✔
195
                        let mut guard = shared2.lock().unwrap();
1,385✔
196

1,385✔
197
                        // This is where we pick the highest priority job
1,385✔
198
                        guard
1,385✔
199
                            .jobs
1,385✔
200
                            .make_contiguous()
1,385✔
201
                            .sort_by(|a, b| b.priority.cmp(&a.priority));
1,385✔
202
                        let job = guard.jobs.pop_front().unwrap();
1,385✔
203

1,385✔
204
                        guard.current_job = Some(CurrentJob {
1,385✔
205
                            job_num,
1,385✔
206
                            job_id: job.job_id,
1,385✔
207
                            cancel_tx: job.cancel_tx.clone(),
1,385✔
208
                        });
1,385✔
209
                        (job, guard.jobs.len())
1,385✔
210
                    };
1,385✔
211

1,385✔
212
                    tracing::info!(
1,385✔
213
                        "  *** JobQueue: begin job #{} - {} - {} queued job(s) ***",
×
214
                        job_num,
215
                        next_job.job_id,
216
                        num_pending
217
                    );
218
                    let timer = tokio::time::Instant::now();
1,385✔
219
                    let task_handle = if next_job.job.is_async() {
1,385✔
220
                        tokio::spawn(async move {
1,339✔
221
                            next_job.job.run_async_cancellable(next_job.cancel_rx).await
1,339✔
222
                        })
1,336✔
223
                    } else {
224
                        tokio::task::spawn_blocking(move || next_job.job.run(next_job.cancel_rx))
46✔
225
                    };
226

227
                    let job_completion = match task_handle.await {
1,385✔
228
                        Ok(jc) => jc,
1,375✔
229
                        Err(e) => {
2✔
230
                            if e.is_panic() {
2✔
231
                                JobCompletion::Panicked(e.into_panic())
2✔
232
                            } else if e.is_cancelled() {
×
233
                                JobCompletion::Cancelled
×
234
                            } else {
235
                                unreachable!()
×
236
                            }
237
                        }
238
                    };
239

240
                    tracing::info!(
1,377✔
241
                        "  *** JobQueue: ended job #{} - {} - Completion: {} - {} secs ***",
×
242
                        job_num,
×
243
                        next_job.job_id,
×
244
                        job_completion,
×
245
                        timer.elapsed().as_secs_f32()
×
246
                    );
247
                    job_num += 1;
1,377✔
248

1,377✔
249
                    shared2.lock().unwrap().current_job = None;
1,377✔
250

251
                    if let Err(e) = next_job.result_tx.send(job_completion) {
1,377✔
252
                        tracing::warn!("job-handle dropped? {}", e);
×
253
                    }
1,377✔
254
                });
255
            }
256
            tracing::debug!("task process_jobs exiting");
×
257
        });
×
258

259
        tracing::info!("JobQueue: started new queue.");
145✔
260

261
        Self {
145✔
262
            tx_job_added,
145✔
263
            tx_stop,
145✔
264
            shared,
145✔
265
            process_jobs_task_handle: Some(process_jobs_task_handle),
145✔
266
        }
145✔
267
    }
145✔
268

269
    /// stop the job-queue, and drop it.
270
    ///
271
    /// this method sends a message to the spawned job-queue task
272
    /// to stop and then waits for it to complete.
273
    ///
274
    /// Comparison with drop():
275
    ///
276
    /// if JobQueue is dropped:
277
    ///  1. the stop message will be sent, but any error is ignored.
278
    ///  2. the spawned task is not awaited.
279
    pub async fn stop(mut self) -> Result<(), StopQueueError> {
×
280
        tracing::info!("JobQueue: stopping.");
×
281

282
        self.tx_stop.send(())?;
×
283

284
        if let Some(jh) = self.process_jobs_task_handle.take() {
×
285
            jh.await?;
×
286
        }
×
287

288
        Ok(())
×
289
    }
×
290

291
    /// adds job to job-queue and returns immediately.
292
    ///
293
    /// job-results can be obtained by via JobHandle::results().await
294
    /// The job can be cancelled by JobHandle::cancel()
295
    pub fn add_job(&self, job: Box<dyn Job>, priority: P) -> Result<JobHandle, AddJobError> {
1,382✔
296
        let (result_tx, result_rx) = oneshot::channel();
1,382✔
297
        let (cancel_tx, cancel_rx) = watch::channel::<()>(());
1,382✔
298

1,382✔
299
        let job_id = JobId::random();
1,382✔
300

1,382✔
301
        let m = QueuedJob {
1,382✔
302
            job,
1,382✔
303
            job_id,
1,382✔
304
            result_tx,
1,382✔
305
            cancel_tx: cancel_tx.clone(),
1,382✔
306
            cancel_rx,
1,382✔
307
            priority,
1,382✔
308
        };
1,382✔
309

310
        let (num_jobs, job_running) = {
1,382✔
311
            let mut guard = self.shared.lock().unwrap();
1,382✔
312
            guard.jobs.push_back(m);
1,382✔
313
            let job_running = match &guard.current_job {
1,382✔
314
                Some(j) => format!("#{} - {}", j.job_num, j.job_id),
45✔
315
                None => "none".to_string(),
1,337✔
316
            };
317
            (guard.jobs.len(), job_running)
1,382✔
318
        };
1,382✔
319
        tracing::info!(
1,382✔
320
            "JobQueue: job added - {}  {} queued job(s).  job running: {}",
×
321
            job_id,
322
            num_jobs,
323
            job_running
324
        );
325

326
        self.tx_job_added.send(())?;
1,382✔
327

328
        Ok(JobHandle {
1,382✔
329
            job_id,
1,382✔
330
            result_rx,
1,382✔
331
            cancel_tx,
1,382✔
332
        })
1,382✔
333
    }
1,382✔
334
}
335

336
#[cfg(test)]
337
#[cfg_attr(coverage_nightly, coverage(off))]
338
mod tests {
339
    use std::time::Instant;
340

341
    use tracing_test::traced_test;
342

343
    use super::*;
344

345
    #[tokio::test(flavor = "multi_thread")]
346
    #[traced_test]
347
    async fn run_sync_jobs_by_priority() -> anyhow::Result<()> {
348
        workers::run_jobs_by_priority(false).await
349
    }
350

351
    #[tokio::test(flavor = "multi_thread")]
352
    #[traced_test]
353
    async fn run_async_jobs_by_priority() -> anyhow::Result<()> {
354
        workers::run_jobs_by_priority(true).await
355
    }
356

357
    #[tokio::test(flavor = "multi_thread")]
358
    #[traced_test]
359
    async fn get_sync_job_result() -> anyhow::Result<()> {
360
        workers::get_job_result(false).await
361
    }
362

363
    #[tokio::test(flavor = "multi_thread")]
364
    #[traced_test]
365
    async fn get_async_job_result() -> anyhow::Result<()> {
366
        workers::get_job_result(true).await
367
    }
368

369
    #[tokio::test(flavor = "multi_thread")]
370
    #[traced_test]
371
    async fn cancel_sync_job() -> anyhow::Result<()> {
372
        workers::cancel_job(false).await
373
    }
374

375
    #[tokio::test(flavor = "multi_thread")]
376
    #[traced_test]
377
    async fn cancel_async_job() -> anyhow::Result<()> {
378
        workers::cancel_job(true).await
379
    }
380

381
    #[test]
382
    #[traced_test]
383
    fn runtime_shutdown_timeout_force_cancels_sync_job() -> anyhow::Result<()> {
384
        workers::runtime_shutdown_timeout_force_cancels_job(false)
385
    }
386

387
    #[test]
388
    #[traced_test]
389
    fn runtime_shutdown_timeout_force_cancels_async_job() -> anyhow::Result<()> {
390
        workers::runtime_shutdown_timeout_force_cancels_job(true)
391
    }
392

393
    #[test]
394
    #[traced_test]
395
    fn runtime_shutdown_cancels_sync_job() {
396
        let _ = workers::runtime_shutdown_cancels_job(false);
397
    }
398

399
    #[test]
400
    #[traced_test]
401
    fn runtime_shutdown_cancels_async_job() -> anyhow::Result<()> {
402
        workers::runtime_shutdown_cancels_job(true)
403
    }
404

405
    #[test]
406
    #[traced_test]
407
    fn spawned_tasks_live_as_long_as_jobqueue() -> anyhow::Result<()> {
408
        workers::spawned_tasks_live_as_long_as_jobqueue(true)
409
    }
410

411
    #[tokio::test(flavor = "multi_thread")]
412
    #[traced_test]
413
    async fn panic_in_async_job_ends_job_cleanly() -> anyhow::Result<()> {
414
        workers::panics::panic_in_job_ends_job_cleanly(true).await
415
    }
416

417
    #[tokio::test(flavor = "multi_thread")]
418
    #[traced_test]
419
    async fn panic_in_blocking_job_ends_job_cleanly() -> anyhow::Result<()> {
420
        workers::panics::panic_in_job_ends_job_cleanly(false).await
421
    }
422

423
    mod workers {
424
        use std::any::Any;
425

426
        use super::*;
427
        use crate::job_queue::errors::JobHandleErrorSync;
428

429
        #[derive(PartialEq, Eq, PartialOrd, Ord)]
430
        pub enum DoubleJobPriority {
431
            Low = 1,
432
            Medium = 2,
433
            High = 3,
434
        }
435

436
        #[derive(PartialEq, Debug, Clone)]
437
        struct DoubleJobResult(u64, u64, Instant);
438
        impl JobResult for DoubleJobResult {
439
            fn as_any(&self) -> &dyn Any {
440
                self
441
            }
442
            fn into_any(self: Box<Self>) -> Box<dyn Any> {
443
                self
444
            }
445
        }
446

447
        // represents a prover job.  implements Job.
448
        struct DoubleJob {
449
            data: u64,
450
            duration: std::time::Duration,
451
            is_async: bool,
452
        }
453

454
        #[async_trait::async_trait]
455
        impl Job for DoubleJob {
456
            fn is_async(&self) -> bool {
457
                self.is_async
458
            }
459

460
            fn run(&self, cancel_rx: JobCancelReceiver) -> JobCompletion {
461
                let start = Instant::now();
462
                let sleep_time =
463
                    std::cmp::min(std::time::Duration::from_micros(100), self.duration);
464

465
                let r = loop {
466
                    if start.elapsed() < self.duration {
467
                        match cancel_rx.has_changed() {
468
                            Ok(changed) if changed => break JobCompletion::Cancelled,
469
                            Err(_) => break JobCompletion::Cancelled,
470
                            _ => {}
471
                        }
472

473
                        std::thread::sleep(sleep_time);
474
                    } else {
475
                        break JobCompletion::Finished(Box::new(DoubleJobResult(
476
                            self.data,
477
                            self.data * 2,
478
                            Instant::now(),
479
                        )));
480
                    }
481
                };
482

483
                tracing::info!("results: {:?}", r);
484
                r
485
            }
486

487
            async fn run_async(&self) -> Box<dyn JobResult> {
488
                tokio::time::sleep(self.duration).await;
489
                let r = DoubleJobResult(self.data, self.data * 2, Instant::now());
490

491
                tracing::info!("results: {:?}", r);
492
                Box::new(r)
493
            }
494
        }
495

496
        // this test demonstrates/verifies that:
497
        //  1. jobs are run in priority order, highest priority first.
498
        //  2. when multiple jobs have the same priority, they run in FIFO order.
499
        pub(super) async fn run_jobs_by_priority(is_async: bool) -> anyhow::Result<()> {
500
            let start_of_test = Instant::now();
501

502
            // create a job queue
503
            let job_queue = JobQueue::start();
504

505
            let mut handles = vec![];
506
            let duration = std::time::Duration::from_millis(20);
507

508
            // create 30 jobs, 10 at each priority level.
509
            for i in (1..10).rev() {
510
                let job1 = Box::new(DoubleJob {
511
                    data: i,
512
                    duration,
513
                    is_async,
514
                });
515
                let job2 = Box::new(DoubleJob {
516
                    data: i * 100,
517
                    duration,
518
                    is_async,
519
                });
520
                let job3 = Box::new(DoubleJob {
521
                    data: i * 1000,
522
                    duration,
523
                    is_async,
524
                });
525

526
                // process job and print results.
527
                handles.push(job_queue.add_job(job1, DoubleJobPriority::Low)?.result_rx());
528
                handles.push(
529
                    job_queue
530
                        .add_job(job2, DoubleJobPriority::Medium)?
531
                        .result_rx(),
532
                );
533
                handles.push(
534
                    job_queue
535
                        .add_job(job3, DoubleJobPriority::High)?
536
                        .result_rx(),
537
                );
538
            }
539

540
            // wait for all jobs to complete.
541
            let mut results = futures::future::join_all(handles).await;
542

543
            // the results are in the same order as handles passed to join_all.
544
            // we sort them by the timestamp in job result, ascending.
545
            results.sort_by(
546
                |a_completion, b_completion| match (a_completion, b_completion) {
547
                    (Ok(JobCompletion::Finished(a_dyn)), Ok(JobCompletion::Finished(b_dyn))) => {
548
                        let a = a_dyn.as_any().downcast_ref::<DoubleJobResult>().unwrap().2;
549

550
                        let b = b_dyn.as_any().downcast_ref::<DoubleJobResult>().unwrap().2;
551

552
                        a.cmp(&b)
553
                    }
554
                    _ => panic!("at least one job did not finish"),
555
                },
556
            );
557

558
            // iterate job results and verify that:
559
            //   timestamp of each is greater than prev.
560
            //   input value of each is greater than prev, except every 9th item which should be < prev
561
            //     because there are nine jobs per level.
562
            let mut prev = Box::new(DoubleJobResult(9999, 0, start_of_test));
563
            for (i, c) in results.into_iter().enumerate() {
564
                let Ok(JobCompletion::Finished(dyn_result)) = c else {
565
                    panic!("A job did not finish");
566
                };
567

568
                let job_result = dyn_result.into_any().downcast::<DoubleJobResult>().unwrap();
569

570
                assert!(job_result.2 > prev.2);
571

572
                // we don't do the assertion for the 2nd job because the job-queue starts
573
                // processing immediately and so a race condition is setup where it is possible
574
                // for either the Low priority or High job to start processing first.
575
                if i != 1 {
576
                    assert!(job_result.0 < prev.0);
577
                }
578

579
                prev = job_result;
580
            }
581

582
            Ok(())
583
        }
584

585
        // this test demonstrates/verifies that a job can return a result back to
586
        // the job initiator.
587
        pub(super) async fn get_job_result(is_async: bool) -> anyhow::Result<()> {
588
            // create a job queue
589
            let job_queue = JobQueue::start();
590
            let duration = std::time::Duration::from_millis(20);
591

592
            // create 10 jobs
593
            for i in 0..10 {
594
                let job = Box::new(DoubleJob {
595
                    data: i,
596
                    duration,
597
                    is_async,
598
                });
599

600
                let result = job_queue
601
                    .add_job(job, DoubleJobPriority::Low)?
602
                    .result()
603
                    .await
604
                    .map_err(|e| e.into_sync())?;
605

606
                let job_result = result.into_any().downcast::<DoubleJobResult>().unwrap();
607

608
                assert_eq!(i, job_result.0);
609
                assert_eq!(i * 2, job_result.1);
610
            }
611

612
            Ok(())
613
        }
614

615
        // tests/demonstrates that a long running job can be cancelled early.
616
        pub(super) async fn cancel_job(is_async: bool) -> anyhow::Result<()> {
617
            // create a job queue
618
            let job_queue = JobQueue::start();
619
            // start a 1 hour job.
620
            let duration = std::time::Duration::from_secs(3600); // 1 hour job.
621

622
            let job = Box::new(DoubleJob {
623
                data: 10,
624
                duration,
625
                is_async,
626
            });
627
            let job_handle = job_queue.add_job(job, DoubleJobPriority::Low)?;
628

629
            tokio::time::sleep(std::time::Duration::from_millis(20)).await;
630

631
            let completion = job_handle.cancel_and_await().await.unwrap();
632
            assert!(matches!(completion, JobCompletion::Cancelled));
633

634
            Ok(())
635
        }
636

637
        // note: creates own tokio runtime.  caller must not use [tokio::test]
638
        //
639
        // this test starts a job that runs for 1 hour and then attempts to
640
        // shutdown tokio runtime via shutdown_timeout() with a 1 sec timeout.
641
        //
642
        // any async tasks should be aborted quickly.
643
        // any sync tasks will continue to run to completion.
644
        //
645
        // shutdown_timeout() will wait for tasks to abort for 1 sec and then
646
        // returns.  Any un-aborted tasks/threads become ignored/detached.
647
        // The OS can cleanup such threads when the process exits.
648
        //
649
        // the test checks that the shutdown completes in under 2 secs.
650
        //
651
        // the test demonstrates that shutdown_timeout() can be used to shutdown
652
        // tokio runtime even if sync (spawn_blocking) tasks/threads are still running
653
        // in the blocking threadpool.
654
        //
655
        // when called with is_async=true, it demonstrates that shutdown_timeout() also
656
        // aborts async jobs, as one would expect.
657
        pub(super) fn runtime_shutdown_timeout_force_cancels_job(
658
            is_async: bool,
659
        ) -> anyhow::Result<()> {
660
            let rt = tokio::runtime::Runtime::new()?;
661
            let result = rt.block_on(async {
662
                // create a job queue
663
                let job_queue = JobQueue::start();
664
                // start a 1 hour job.
665
                let duration = std::time::Duration::from_secs(3600); // 1 hour job.
666

667
                let job = Box::new(DoubleJob {
668
                    data: 10,
669
                    duration,
670
                    is_async,
671
                });
672
                let _rx = job_queue.add_job(job, DoubleJobPriority::Low)?;
673

674
                tokio::time::sleep(std::time::Duration::from_millis(20)).await;
675
                println!("finished scope");
676

677
                Ok(())
678
            });
679

680
            let start = std::time::Instant::now();
681

682
            println!("waiting 1 second for job before shutdown runtime");
683
            rt.shutdown_timeout(tokio::time::Duration::from_secs(1));
684

685
            assert!(start.elapsed() < std::time::Duration::from_secs(2));
686

687
            result
688
        }
689

690
        // note: creates own tokio runtime.  caller must not use [tokio::test]
691
        //
692
        // this test starts a job that runs for 5 secs and then attempts to
693
        // shutdown tokio runtime normally by dropping it.
694
        //
695
        // any async tasks should be aborted quickly.
696
        // any sync tasks will continue to run to completion.
697
        //
698
        // the tokio runtime does not complete the drop() until all tasks
699
        // have completed/aborted.
700
        //
701
        // the test checks that the job finishes in less than the 5 secs
702
        // required for full completion.  In other words, that it aborts.
703
        //
704
        // the test is expected to succeed for async jobs but fail for sync jobs.
705
        pub(super) fn runtime_shutdown_cancels_job(is_async: bool) -> anyhow::Result<()> {
706
            let rt = tokio::runtime::Runtime::new()?;
707
            let start = tokio::time::Instant::now();
708

709
            let result = rt.block_on(async {
710
                // create a job queue
711
                let job_queue = JobQueue::start();
712

713
                // this job takes at least 5 secs to complete.
714
                let duration = std::time::Duration::from_secs(5);
715

716
                let job = Box::new(DoubleJob {
717
                    data: 10,
718
                    duration,
719
                    is_async,
720
                });
721

722
                let rx_handle = job_queue.add_job(job, DoubleJobPriority::Low)?;
723
                drop(rx_handle);
724

725
                // sleep 50 ms to let job get started.
726
                tokio::time::sleep(std::time::Duration::from_millis(50)).await;
727

728
                Ok(())
729
            });
730

731
            // drop the tokio runtime. It will attempt to abort tasks.
732
            //   - async tasks can normally be aborted
733
            //   - spawn_blocking (sync) tasks cannot normally be aborted.
734
            drop(rt);
735

736
            // if test is successful, elapsed time should be less than the 5 secs
737
            // it takes for the job to complete.  (should be around 0.5 ms)
738

739
            // however it is expected/normal that sync tasks will not be aborted
740
            // and will run for full 5 secs.  thus this assert will fail for them.
741

742
            assert!(start.elapsed() < std::time::Duration::from_secs(5));
743

744
            result
745
        }
746

747
        // this test attempts to verify that the task spawned by the JobQueue
748
        // continues running until the JobQueue is dropped after the tokio
749
        // runtime is dropped.
750
        //
751
        // If the tasks are cencelled before JobQueue is dropped then a subsequent
752
        // api call that sends a msg will result in a "channel closed" error, which
753
        // is what the test checks for.
754
        //
755
        // note that the test has to do some tricky stuff to setup conditions
756
        // where the "channel closed" error can occur. It's a subtle issue.
757
        //
758
        // see description at:
759
        // https://github.com/tokio-rs/tokio/discussions/6961
760
        pub(super) fn spawned_tasks_live_as_long_as_jobqueue(is_async: bool) -> anyhow::Result<()> {
761
            let rt = tokio::runtime::Runtime::new()?;
762

763
            let result_ok: Arc<Mutex<bool>> = Arc::new(Mutex::new(true));
764

765
            let result_ok_clone = result_ok.clone();
766
            rt.block_on(async {
767
                // create a job queue
768
                let job_queue = Arc::new(JobQueue::start());
769

770
                // spawns background task that adds job
771
                let job_queue_cloned = job_queue.clone();
772
                let jh = tokio::spawn(async move {
773
                    // sleep 200 ms to let runtime finish.
774
                    // ie ensure drop(rt) will be reached and wait for us.
775
                    // note that we use std sleep.  if tokio sleep is used
776
                    // the test will always succeed due to the await point.
777
                    std::thread::sleep(std::time::Duration::from_millis(200));
778

779
                    let job = Box::new(DoubleJob {
780
                        data: 10,
781
                        duration: std::time::Duration::from_secs(1),
782
                        is_async,
783
                    });
784

785
                    let result = job_queue_cloned.add_job(job, DoubleJobPriority::Low);
786

787
                    // an assert on result.is_ok() would panic, but that panic would be
788
                    // printed and swallowed by tokio runtime, so the test would succeed
789
                    // despite the panic. instead we pass the result in a mutex so it
790
                    // can be asserted where it will be caught by the test runner.
791
                    *result_ok_clone.lock().unwrap() = result.is_ok();
792
                });
793

794
                // sleep 50 ms to let job get started.
795
                tokio::time::sleep(std::time::Duration::from_millis(50)).await;
796

797
                // note; awaiting the joinhandle makes the test succeed.
798

799
                jh.abort();
800
                let _ = jh.await;
801
            });
802

803
            // drop the tokio runtime. It will abort tasks.
804
            drop(rt);
805

806
            assert!(*result_ok.lock().unwrap());
807

808
            Ok(())
809
        }
810

811
        pub mod panics {
812
            use super::*;
813

814
            const PANIC_STR: &str = "job panics unexpectedly";
815

816
            struct PanicJob {
817
                is_async: bool,
818
            }
819

820
            #[async_trait::async_trait]
821
            impl Job for PanicJob {
822
                fn is_async(&self) -> bool {
823
                    self.is_async
824
                }
825

826
                fn run(&self, _cancel_rx: JobCancelReceiver) -> JobCompletion {
827
                    panic!("{}", PANIC_STR);
828
                }
829

830
                async fn run_async_cancellable(
831
                    &self,
832
                    _cancel_rx: JobCancelReceiver,
833
                ) -> JobCompletion {
834
                    panic!("{}", PANIC_STR);
835
                }
836
            }
837

838
            /// verifies that a job that panics will be ended properly.
839
            ///
840
            /// Properly means that:
841
            /// 1. an error is returned from job_handle.result() indicating job panicked.
842
            /// 2. caller is able to obtain panic info, which matches job's panic msg.
843
            /// 3. the job-queue continues accepting new jobs.
844
            /// 4. the job-queue continues processing jobs.
845
            ///
846
            /// async_job == true --> test an async job
847
            /// async_job == false --> test a blocking job
848
            pub async fn panic_in_job_ends_job_cleanly(async_job: bool) -> anyhow::Result<()> {
849
                // create a job queue
850
                let job_queue = JobQueue::start();
851

852
                let job = PanicJob {
853
                    is_async: async_job,
854
                };
855
                let job_handle = job_queue.add_job(Box::new(job), DoubleJobPriority::Low)?;
856

857
                let job_result = job_handle.result().await;
858

859
                println!("job_result: {:#?}", job_result);
860

861
                // verify that job_queue channels are still open
862
                assert!(!job_queue.tx_job_added.is_closed());
863
                assert!(!job_queue.tx_stop.is_closed());
864

865
                // verify that we get an error with the job's panic msg.
866
                assert!(matches!(
867
                    job_result.map_err(|e| e.into_sync()),
868
                    Err(JobHandleErrorSync::JobPanicked(e)) if e == *PANIC_STR
869
                ));
870

871
                // ensure we can still run another job afterwards.
872
                let newjob = Box::new(DoubleJob {
873
                    data: 10,
874
                    duration: std::time::Duration::from_millis(50),
875
                    is_async: false,
876
                });
877

878
                // ensure we can add another job.
879
                let new_job_handle = job_queue.add_job(newjob, DoubleJobPriority::Low)?;
880

881
                // ensure job processes and returns a result without error.
882
                assert!(new_job_handle.result().await.is_ok());
883

884
                Ok(())
885
            }
886
        }
887
    }
888
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc