• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Neptune-Crypto / neptune-core / 13811054614

12 Mar 2025 12:12PM UTC coverage: 84.19% (+0.05%) from 84.138%
13811054614

push

github

Sword-Smith
test(Timestamp): Add tests that constructors are sane and correct

These constructors are used to define consensus. So they better be
correct. Also: They're defined in unintuitive ways, so these tests help
establish their correctness to the next programmer getting a heart
attack (like I did) when seeing e.g. the definition of `years` or
`months`.

27 of 27 new or added lines in 1 file covered. (100.0%)

6 existing lines in 3 files now uncovered.

50574 of 60071 relevant lines covered (84.19%)

175299.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.63
/src/job_queue/queue.rs
1
use std::collections::VecDeque;
2
use std::sync::Arc;
3
use std::sync::Mutex;
4

5
use tokio::sync::mpsc;
6
use tokio::sync::oneshot;
7
use tokio::sync::watch;
8
use tokio_util::task::TaskTracker;
9

10
use super::errors::JobHandleError;
11
use super::errors::JobQueueError;
12
use super::traits::Job;
13
use super::traits::JobCancelReceiver;
14
use super::traits::JobCancelSender;
15
use super::traits::JobCompletion;
16
use super::traits::JobResult;
17
use super::traits::JobResultReceiver;
18
use super::traits::JobResultSender;
19

20
/// A job-handle enables cancelling a job and awaiting results
21
#[derive(Debug)]
22
pub struct JobHandle {
23
    result_rx: JobResultReceiver,
24
    cancel_tx: JobCancelSender,
25
}
26
impl JobHandle {
27
    /// wait for job to complete
28
    ///
29
    /// a completed job may either be finished or cancelled.
30
    pub async fn complete(self) -> Result<JobCompletion, JobHandleError> {
1,102✔
31
        Ok(self.result_rx.await?)
1,102✔
32
    }
1,102✔
33

34
    /// wait for job result, or err if cancelled.
35
    pub async fn result(self) -> Result<Box<dyn JobResult>, JobHandleError> {
1,099✔
36
        match self.complete().await? {
1,099✔
37
            JobCompletion::Finished(r) => Ok(r),
1,099✔
38
            JobCompletion::Cancelled => Err(JobHandleError::JobCancelled),
×
39
        }
40
    }
1,099✔
41

42
    /// cancel job and return immediately.
43
    pub fn cancel(&self) -> Result<(), JobHandleError> {
×
44
        Ok(self.cancel_tx.send(())?)
×
45
    }
×
46

47
    /// cancel job and wait for it to complete.
48
    pub async fn cancel_and_await(self) -> Result<JobCompletion, JobHandleError> {
2✔
49
        self.cancel_tx.send(())?;
2✔
50
        self.complete().await
2✔
51
    }
2✔
52

53
    /// channel receiver for job results
54
    pub fn result_rx(self) -> JobResultReceiver {
54✔
55
        self.result_rx
54✔
56
    }
54✔
57

58
    /// channel sender for cancelling job.
59
    pub fn cancel_tx(&self) -> &JobCancelSender {
1,079✔
60
        &self.cancel_tx
1,079✔
61
    }
1,079✔
62
}
63

64
/// messages that can be sent to job-queue inner task.
65
enum JobQueueMsg<P> {
66
    AddJob(AddJobMsg<P>),
67
    Stop,
68
}
69

70
/// represents a msg to add a job to the queue.
71
struct AddJobMsg<P> {
72
    job: Box<dyn Job>,
73
    result_tx: JobResultSender,
74
    cancel_tx: JobCancelSender,
75
    cancel_rx: JobCancelReceiver,
76
    priority: P,
77
}
78

79
/// implements a job queue that sends result of each job to a listener.
80
pub struct JobQueue<P> {
81
    tx: mpsc::UnboundedSender<JobQueueMsg<P>>,
82
    tracker: TaskTracker,
83
    refs: Arc<()>,
84
}
85

86
impl<P> std::fmt::Debug for JobQueue<P> {
87
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
88
        f.debug_struct("JobQueue")
×
89
            .field("tx", &"mpsc::Sender")
×
90
            .finish()
×
91
    }
×
92
}
93

94
impl<P> Clone for JobQueue<P> {
95
    fn clone(&self) -> Self {
253✔
96
        Self {
253✔
97
            tx: self.tx.clone(),
253✔
98
            tracker: self.tracker.clone(),
253✔
99
            refs: self.refs.clone(),
253✔
100
        }
253✔
101
    }
253✔
102
}
103

104
impl<P> Drop for JobQueue<P> {
105
    // since JobQueue has impl Clone there can be other instances.
106
    // we must only send the Stop message when dropping the last instance.
107
    // if upgrade of a Weak Arc fails then we are the last one.
108
    // the refs struct member exists only for this purpose.
109
    //
110
    // test stop_only_called_once_when_cloned exists to check
111
    // it is working.
112
    fn drop(&mut self) {
1,583✔
113
        let refs_weak = Arc::downgrade(&self.refs);
1,583✔
114
        self.refs = Arc::new(());
1,583✔
115

1,583✔
116
        // if upgrade fails, then this is the last reference.
1,583✔
117
        if refs_weak.upgrade().is_none() {
1,583✔
118
            let _ = self.tx.send(JobQueueMsg::Stop);
1,330✔
119
        }
1,330✔
120
    }
1,583✔
121
}
122

123
impl<P: Ord + Send + Sync + 'static> JobQueue<P> {
124
    /// creates job queue and starts it processing.  returns immediately.
125
    pub fn start() -> Self {
1,330✔
126
        struct CurrentJob {
127
            job_num: usize,
128
            cancel_tx: JobCancelSender,
129
        }
130
        struct Shared<P: Ord> {
131
            jobs: VecDeque<AddJobMsg<P>>,
132
            current_job: Option<CurrentJob>,
133
        }
134

135
        let (tx, mut rx) = mpsc::unbounded_channel::<JobQueueMsg<P>>();
1,330✔
136

1,330✔
137
        let shared = Shared {
1,330✔
138
            jobs: VecDeque::new(),
1,330✔
139
            current_job: None,
1,330✔
140
        };
1,330✔
141
        let jobs: Arc<Mutex<Shared<P>>> = Arc::new(Mutex::new(shared));
1,330✔
142

1,330✔
143
        let (tx_deque, mut rx_deque) = tokio::sync::mpsc::unbounded_channel();
1,330✔
144

1,330✔
145
        let tracker = TaskTracker::new();
1,330✔
146

1,330✔
147
        // spawns background task that adds incoming jobs to job-queue
1,330✔
148
        let jobs_rc1 = jobs.clone();
1,330✔
149
        tracker.spawn(async move {
1,330✔
150
            while let Some(msg) = rx.recv().await {
2,184✔
151
                match msg {
2,033✔
152
                    JobQueueMsg::AddJob(m) => {
1,161✔
153
                        let (num_jobs, job_running) = {
1,161✔
154
                            let mut guard = jobs_rc1.lock().unwrap();
1,161✔
155
                            guard.jobs.push_back(m);
1,161✔
156
                            let job_running = match &guard.current_job {
1,161✔
157
                                Some(j) => format!("#{}", j.job_num),
×
158
                                None => "none".to_string(),
1,161✔
159
                            };
160
                            (guard.jobs.len(), job_running)
1,161✔
161
                        };
1,161✔
162
                        tracing::info!(
1,161✔
163
                            "JobQueue: job added.  {} queued job(s).  job running: {}",
×
164
                            num_jobs,
165
                            job_running
166
                        );
167
                        let _ = tx_deque.send(());
1,161✔
168
                    }
169
                    JobQueueMsg::Stop => {
170
                        tracing::info!("JobQueue: received stop message.  stopping.");
872✔
171
                        drop(tx_deque); // close queue
872✔
172

872✔
173
                        // if there is a presently executing job we need to cancel it.
872✔
174
                        let guard = jobs_rc1.lock().unwrap();
872✔
175
                        if let Some(current_job) = &guard.current_job {
872✔
176
                            if !current_job.cancel_tx.is_closed() {
4✔
177
                                current_job.cancel_tx.send(()).unwrap();
4✔
178
                                tracing::info!("JobQueue: notified current job to stop.");
4✔
UNCOV
179
                            }
×
180
                        }
868✔
181
                        break;
872✔
182
                    }
183
                }
184
            }
185
        });
1,330✔
186

1,330✔
187
        // spawns background task that processes job queue and runs jobs.
1,330✔
188
        let jobs_rc2 = jobs.clone();
1,330✔
189
        tracker.spawn(async move {
1,330✔
190
            let mut job_num: usize = 1;
1,023✔
191

192
            while rx_deque.recv().await.is_some() {
2,179✔
193
                let (msg, pending) = {
1,161✔
194
                    let mut guard = jobs_rc2.lock().unwrap();
1,161✔
195
                    guard
1,161✔
196
                        .jobs
1,161✔
197
                        .make_contiguous()
1,161✔
198
                        .sort_by(|a, b| b.priority.cmp(&a.priority));
1,161✔
199
                    let job = guard.jobs.pop_front().unwrap();
1,161✔
200
                    guard.current_job = Some(CurrentJob {
1,161✔
201
                        job_num,
1,161✔
202
                        cancel_tx: job.cancel_tx.clone(),
1,161✔
203
                    });
1,161✔
204
                    (job, guard.jobs.len())
1,161✔
205
                };
1,161✔
206

1,161✔
207
                tracing::info!(
1,161✔
208
                    "  *** JobQueue: begin job #{} - {} queued job(s) ***",
×
209
                    job_num,
210
                    pending
211
                );
212
                let timer = tokio::time::Instant::now();
1,161✔
213
                let job_completion = match msg.job.is_async() {
1,161✔
214
                    true => msg.job.run_async_cancellable(msg.cancel_rx).await,
1,121✔
215
                    false => tokio::task::spawn_blocking(move || msg.job.run(msg.cancel_rx))
40✔
216
                        .await
40✔
217
                        .unwrap(),
38✔
218
                };
219
                tracing::info!(
1,156✔
220
                    "  *** JobQueue: ended job #{} - Completion: {} - {} secs ***",
×
221
                    job_num,
×
222
                    job_completion,
×
223
                    timer.elapsed().as_secs_f32()
×
224
                );
225
                job_num += 1;
1,156✔
226

1,156✔
227
                jobs_rc2.lock().unwrap().current_job = None;
1,156✔
228

1,156✔
229
                let _ = msg.result_tx.send(job_completion);
1,156✔
230
            }
231
        });
1,330✔
232
        tracker.close();
1,330✔
233

1,330✔
234
        Self {
1,330✔
235
            tx,
1,330✔
236
            tracker,
1,330✔
237
            refs: Arc::new(()),
1,330✔
238
        }
1,330✔
239
    }
1,330✔
240

241
    /// alias of Self::start().
242
    /// here for two reasons:
243
    ///  1. backwards compat with existing tests
244
    ///  2. if tests call dummy() instead of start(), then it is easier
245
    ///     to find where start() is called for real.
246
    #[cfg(test)]
247
    pub fn dummy() -> Self {
1,148✔
248
        Self::start()
1,148✔
249
    }
1,148✔
250

251
    /// adds job to job-queue and returns immediately.
252
    ///
253
    /// job-results can be obtained by via JobHandle::results().await
254
    /// The job can be cancelled by JobHandle::cancel()
255
    pub async fn add_job(
1,161✔
256
        &self,
1,161✔
257
        job: Box<dyn Job>,
1,161✔
258
        priority: P,
1,161✔
259
    ) -> Result<JobHandle, JobQueueError> {
1,161✔
260
        let (result_tx, result_rx) = oneshot::channel();
1,161✔
261
        let (cancel_tx, cancel_rx) = watch::channel::<()>(());
1,161✔
262

1,161✔
263
        let msg = JobQueueMsg::AddJob(AddJobMsg {
1,161✔
264
            job,
1,161✔
265
            result_tx,
1,161✔
266
            cancel_tx: cancel_tx.clone(),
1,161✔
267
            cancel_rx: cancel_rx.clone(),
1,161✔
268
            priority,
1,161✔
269
        });
1,161✔
270
        self.tx
1,161✔
271
            .send(msg)
1,161✔
272
            .map_err(|e| JobQueueError::AddJobError(e.to_string()))?;
1,161✔
273

274
        Ok(JobHandle {
1,161✔
275
            result_rx,
1,161✔
276
            cancel_tx,
1,161✔
277
        })
1,161✔
278
    }
1,161✔
279
}
280

281
#[cfg(test)]
282
mod tests {
283
    use std::time::Instant;
284

285
    use tracing_test::traced_test;
286

287
    use super::*;
288

289
    #[tokio::test(flavor = "multi_thread")]
290
    #[traced_test]
×
291
    async fn run_sync_jobs_by_priority() -> anyhow::Result<()> {
1✔
292
        workers::run_jobs_by_priority(false).await
1✔
293
    }
1✔
294

295
    #[tokio::test(flavor = "multi_thread")]
296
    #[traced_test]
×
297
    async fn run_async_jobs_by_priority() -> anyhow::Result<()> {
1✔
298
        workers::run_jobs_by_priority(true).await
1✔
299
    }
1✔
300

301
    #[tokio::test(flavor = "multi_thread")]
302
    #[traced_test]
×
303
    async fn get_sync_job_result() -> anyhow::Result<()> {
1✔
304
        workers::get_job_result(false).await
1✔
305
    }
1✔
306

307
    #[tokio::test(flavor = "multi_thread")]
308
    #[traced_test]
×
309
    async fn get_async_job_result() -> anyhow::Result<()> {
1✔
310
        workers::get_job_result(true).await
1✔
311
    }
1✔
312

313
    #[tokio::test(flavor = "multi_thread")]
314
    #[traced_test]
×
315
    async fn cancel_sync_job() -> anyhow::Result<()> {
1✔
316
        workers::cancel_job(false).await
1✔
317
    }
1✔
318

319
    #[tokio::test(flavor = "multi_thread")]
320
    #[traced_test]
×
321
    async fn cancel_async_job() -> anyhow::Result<()> {
1✔
322
        workers::cancel_job(true).await
1✔
323
    }
1✔
324

325
    #[test]
326
    #[traced_test]
×
327
    fn runtime_shutdown_timeout_force_cancels_sync_job() -> anyhow::Result<()> {
1✔
328
        workers::runtime_shutdown_timeout_force_cancels_job(false)
1✔
329
    }
1✔
330

331
    #[test]
332
    #[traced_test]
×
333
    fn runtime_shutdown_timeout_force_cancels_async_job() -> anyhow::Result<()> {
1✔
334
        workers::runtime_shutdown_timeout_force_cancels_job(true)
1✔
335
    }
1✔
336

337
    #[test]
338
    #[traced_test]
×
339
    fn runtime_shutdown_cancels_sync_job() {
1✔
340
        let _ = workers::runtime_shutdown_cancels_job(false);
1✔
341
    }
1✔
342

343
    #[test]
344
    #[traced_test]
×
345
    fn runtime_shutdown_cancels_async_job() -> anyhow::Result<()> {
1✔
346
        workers::runtime_shutdown_cancels_job(true)
1✔
347
    }
1✔
348

349
    #[test]
350
    #[traced_test]
×
351
    fn spawned_tasks_live_as_long_as_jobqueue() -> anyhow::Result<()> {
1✔
352
        workers::spawned_tasks_live_as_long_as_jobqueue(true)
1✔
353
    }
1✔
354

355
    #[tokio::test]
356
    #[traced_test]
×
357
    async fn stop_only_called_once_when_cloned() -> anyhow::Result<()> {
1✔
358
        workers::stop_only_called_once_when_cloned().await
1✔
359
    }
1✔
360

361
    mod workers {
362
        use std::any::Any;
363

364
        use super::*;
365

366
        #[derive(PartialEq, Eq, PartialOrd, Ord)]
367
        pub enum DoubleJobPriority {
368
            Low = 1,
369
            Medium = 2,
370
            High = 3,
371
        }
372

373
        #[derive(PartialEq, Debug, Clone)]
374
        struct DoubleJobResult(u64, u64, Instant);
375
        impl JobResult for DoubleJobResult {
376
            fn as_any(&self) -> &dyn Any {
472✔
377
                self
472✔
378
            }
472✔
379
            fn into_any(self: Box<Self>) -> Box<dyn Any> {
74✔
380
                self
74✔
381
            }
74✔
382
        }
383

384
        // represents a prover job.  implements Job.
385
        struct DoubleJob {
386
            data: u64,
387
            duration: std::time::Duration,
388
            is_async: bool,
389
        }
390

391
        #[async_trait::async_trait]
392
        impl Job for DoubleJob {
393
            fn is_async(&self) -> bool {
82✔
394
                self.is_async
82✔
395
            }
82✔
396

397
            fn run(&self, cancel_rx: JobCancelReceiver) -> JobCompletion {
40✔
398
                let start = Instant::now();
40✔
399
                let sleep_time =
40✔
400
                    std::cmp::min(std::time::Duration::from_micros(100), self.duration);
40✔
401

402
                let r = loop {
40✔
403
                    if start.elapsed() < self.duration {
5,258✔
404
                        match cancel_rx.has_changed() {
5,221✔
405
                            Ok(changed) if changed => break JobCompletion::Cancelled,
1✔
406
                            Err(_) => break JobCompletion::Cancelled,
2✔
407
                            _ => {}
5,218✔
408
                        }
5,218✔
409

5,218✔
410
                        std::thread::sleep(sleep_time);
5,218✔
411
                    } else {
412
                        break JobCompletion::Finished(Box::new(DoubleJobResult(
37✔
413
                            self.data,
37✔
414
                            self.data * 2,
37✔
415
                            Instant::now(),
37✔
416
                        )));
37✔
417
                    }
418
                };
419

420
                tracing::info!("results: {:?}", r);
40✔
421
                r
40✔
422
            }
40✔
423

424
            async fn run_async(&self) -> Box<dyn JobResult> {
42✔
425
                tokio::time::sleep(self.duration).await;
42✔
426
                let r = DoubleJobResult(self.data, self.data * 2, Instant::now());
37✔
427

37✔
428
                tracing::info!("results: {:?}", r);
37✔
429
                Box::new(r)
37✔
430
            }
79✔
431
        }
432

433
        // this test demonstrates/verifies that:
434
        //  1. jobs are run in priority order, highest priority first.
435
        //  2. when multiple jobs have the same priority, they run in FIFO order.
436
        pub(super) async fn run_jobs_by_priority(is_async: bool) -> anyhow::Result<()> {
2✔
437
            let start_of_test = Instant::now();
2✔
438

2✔
439
            // create a job queue
2✔
440
            let job_queue = JobQueue::start();
2✔
441

2✔
442
            let mut handles = vec![];
2✔
443
            let duration = std::time::Duration::from_millis(20);
2✔
444

445
            // create 30 jobs, 10 at each priority level.
446
            for i in (1..10).rev() {
18✔
447
                let job1 = Box::new(DoubleJob {
18✔
448
                    data: i,
18✔
449
                    duration,
18✔
450
                    is_async,
18✔
451
                });
18✔
452
                let job2 = Box::new(DoubleJob {
18✔
453
                    data: i * 100,
18✔
454
                    duration,
18✔
455
                    is_async,
18✔
456
                });
18✔
457
                let job3 = Box::new(DoubleJob {
18✔
458
                    data: i * 1000,
18✔
459
                    duration,
18✔
460
                    is_async,
18✔
461
                });
18✔
462

18✔
463
                // process job and print results.
18✔
464
                handles.push(
18✔
465
                    job_queue
18✔
466
                        .add_job(job1, DoubleJobPriority::Low)
18✔
467
                        .await?
18✔
468
                        .result_rx(),
18✔
469
                );
18✔
470
                handles.push(
18✔
471
                    job_queue
18✔
472
                        .add_job(job2, DoubleJobPriority::Medium)
18✔
473
                        .await?
18✔
474
                        .result_rx(),
18✔
475
                );
18✔
476
                handles.push(
18✔
477
                    job_queue
18✔
478
                        .add_job(job3, DoubleJobPriority::High)
18✔
479
                        .await?
18✔
480
                        .result_rx(),
18✔
481
                );
482
            }
483

484
            // wait for all jobs to complete.
485
            let mut results = futures::future::join_all(handles).await;
2✔
486

487
            // the results are in the same order as handles passed to join_all.
488
            // we sort them by the timestamp in job result, ascending.
489
            results.sort_by(
2✔
490
                |a_completion, b_completion| match (a_completion, b_completion) {
236✔
491
                    (Ok(JobCompletion::Finished(a_dyn)), Ok(JobCompletion::Finished(b_dyn))) => {
236✔
492
                        let a = a_dyn.as_any().downcast_ref::<DoubleJobResult>().unwrap().2;
236✔
493

236✔
494
                        let b = b_dyn.as_any().downcast_ref::<DoubleJobResult>().unwrap().2;
236✔
495

236✔
496
                        a.cmp(&b)
236✔
497
                    }
498
                    _ => panic!("at least one job did not finish"),
×
499
                },
236✔
500
            );
2✔
501

2✔
502
            // iterate job results and verify that:
2✔
503
            //   timestamp of each is greater than prev.
2✔
504
            //   input value of each is greater than prev, except every 9th item which should be < prev
2✔
505
            //     because there are nine jobs per level.
2✔
506
            let mut prev = Box::new(DoubleJobResult(9999, 0, start_of_test));
2✔
507
            for (i, c) in results.into_iter().enumerate() {
54✔
508
                let dyn_result = match c {
54✔
509
                    Ok(JobCompletion::Finished(r)) => r,
54✔
510
                    _ => panic!("A job did not finish"),
×
511
                };
512

513
                let job_result = dyn_result.into_any().downcast::<DoubleJobResult>().unwrap();
54✔
514

54✔
515
                assert!(job_result.2 > prev.2);
54✔
516

517
                // we don't do the assertion for the 2nd job because the job-queue starts
518
                // processing immediately and so a race condition is setup where it is possible
519
                // for either the Low priority or High job to start processing first.
520
                if i != 1 {
54✔
521
                    assert!(job_result.0 < prev.0);
52✔
522
                }
2✔
523

524
                prev = job_result;
54✔
525
            }
526

527
            Ok(())
2✔
528
        }
2✔
529

530
        // this test demonstrates/verifies that a job can return a result back to
531
        // the job initiator.
532
        pub(super) async fn get_job_result(is_async: bool) -> anyhow::Result<()> {
2✔
533
            // create a job queue
2✔
534
            let job_queue = JobQueue::start();
2✔
535
            let duration = std::time::Duration::from_millis(20);
2✔
536

537
            // create 10 jobs
538
            for i in 0..10 {
22✔
539
                let job = Box::new(DoubleJob {
20✔
540
                    data: i,
20✔
541
                    duration,
20✔
542
                    is_async,
20✔
543
                });
20✔
544

545
                let result = job_queue
20✔
546
                    .add_job(job, DoubleJobPriority::Low)
20✔
547
                    .await?
20✔
548
                    .result()
20✔
549
                    .await?;
20✔
550

551
                let job_result = result.into_any().downcast::<DoubleJobResult>().unwrap();
20✔
552

20✔
553
                assert_eq!(i, job_result.0);
20✔
554
                assert_eq!(i * 2, job_result.1);
20✔
555
            }
556

557
            Ok(())
2✔
558
        }
2✔
559

560
        // tests/demonstrates that a long running job can be cancelled early.
561
        pub(super) async fn cancel_job(is_async: bool) -> anyhow::Result<()> {
2✔
562
            // create a job queue
2✔
563
            let job_queue = JobQueue::start();
2✔
564
            // start a 1 hour job.
2✔
565
            let duration = std::time::Duration::from_secs(3600); // 1 hour job.
2✔
566

2✔
567
            let job = Box::new(DoubleJob {
2✔
568
                data: 10,
2✔
569
                duration,
2✔
570
                is_async,
2✔
571
            });
2✔
572
            let job_handle = job_queue.add_job(job, DoubleJobPriority::Low).await?;
2✔
573

574
            tokio::time::sleep(std::time::Duration::from_millis(20)).await;
2✔
575

576
            let completion = job_handle.cancel_and_await().await.unwrap();
2✔
577
            assert!(matches!(completion, JobCompletion::Cancelled));
2✔
578

579
            Ok(())
2✔
580
        }
2✔
581

582
        // note: creates own tokio runtime.  caller must not use [tokio::test]
583
        //
584
        // this test starts a job that runs for 1 hour and then attempts to
585
        // shutdown tokio runtime via shutdown_timeout() with a 1 sec timeout.
586
        //
587
        // any async tasks should be aborted quickly.
588
        // any sync tasks will continue to run to completion.
589
        //
590
        // shutdown_timeout() will wait for tasks to abort for 1 sec and then
591
        // returns.  Any un-aborted tasks/threads become ignored/detached.
592
        // The OS can cleanup such threads when the process exits.
593
        //
594
        // the test checks that the shutdown completes in under 2 secs.
595
        //
596
        // the test demonstrates that shutdown_timeout() can be used to shutdown
597
        // tokio runtime even if sync (spawn_blocking) tasks/threads are still running
598
        // in the blocking threadpool.
599
        //
600
        // when called with is_async=true, it demonstrates that shutdown_timeout() also
601
        // aborts async jobs, as one would expect.
602
        pub(super) fn runtime_shutdown_timeout_force_cancels_job(
2✔
603
            is_async: bool,
2✔
604
        ) -> anyhow::Result<()> {
2✔
605
            let rt = tokio::runtime::Runtime::new()?;
2✔
606
            let result = rt.block_on(async {
2✔
607
                // create a job queue
2✔
608
                let job_queue = JobQueue::start();
2✔
609
                // start a 1 hour job.
2✔
610
                let duration = std::time::Duration::from_secs(3600); // 1 hour job.
2✔
611

2✔
612
                let job = Box::new(DoubleJob {
2✔
613
                    data: 10,
2✔
614
                    duration,
2✔
615
                    is_async,
2✔
616
                });
2✔
617
                let _rx = job_queue.add_job(job, DoubleJobPriority::Low).await?;
2✔
618

619
                tokio::time::sleep(std::time::Duration::from_millis(20)).await;
2✔
620
                println!("finished scope");
2✔
621

2✔
622
                Ok(())
2✔
623
            });
2✔
624

2✔
625
            let start = std::time::Instant::now();
2✔
626

2✔
627
            println!("waiting 1 second for job before shutdown runtime");
2✔
628
            rt.shutdown_timeout(tokio::time::Duration::from_secs(1));
2✔
629

2✔
630
            assert!(start.elapsed() < std::time::Duration::from_secs(2));
2✔
631

632
            result
2✔
633
        }
2✔
634

635
        // note: creates own tokio runtime.  caller must not use [tokio::test]
636
        //
637
        // this test starts a job that runs for 5 secs and then attempts to
638
        // shutdown tokio runtime normally by dropping it.
639
        //
640
        // any async tasks should be aborted quickly.
641
        // any sync tasks will continue to run to completion.
642
        //
643
        // the tokio runtime does not complete the drop() until all tasks
644
        // have completed/aborted.
645
        //
646
        // the test checks that the job finishes in less than the 5 secs
647
        // required for full completion.  In other words, that it aborts.
648
        //
649
        // the test is expected to succeed for async jobs but fail for sync jobs.
650
        pub(super) fn runtime_shutdown_cancels_job(is_async: bool) -> anyhow::Result<()> {
2✔
651
            let rt = tokio::runtime::Runtime::new()?;
2✔
652
            let start = tokio::time::Instant::now();
2✔
653

2✔
654
            let result = rt.block_on(async {
2✔
655
                // create a job queue
2✔
656
                let job_queue = JobQueue::start();
2✔
657

2✔
658
                // this job takes at least 5 secs to complete.
2✔
659
                let duration = std::time::Duration::from_secs(5);
2✔
660

2✔
661
                let job = Box::new(DoubleJob {
2✔
662
                    data: 10,
2✔
663
                    duration,
2✔
664
                    is_async,
2✔
665
                });
2✔
666

667
                let rx_handle = job_queue.add_job(job, DoubleJobPriority::Low).await?;
2✔
668
                drop(rx_handle);
2✔
669

2✔
670
                // sleep 50 ms to let job get started.
2✔
671
                tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2✔
672

673
                Ok(())
2✔
674
            });
2✔
675

2✔
676
            // drop the tokio runtime. It will attempt to abort tasks.
2✔
677
            //   - async tasks can normally be aborted
2✔
678
            //   - spawn_blocking (sync) tasks cannot normally be aborted.
2✔
679
            drop(rt);
2✔
680

2✔
681
            // if test is successful, elapsed time should be less than the 5 secs
2✔
682
            // it takes for the job to complete.  (should be around 0.5 ms)
2✔
683

2✔
684
            // however it is expected/normal that sync tasks will not be aborted
2✔
685
            // and will run for full 5 secs.  thus this assert will fail for them.
2✔
686

2✔
687
            assert!(start.elapsed() < std::time::Duration::from_secs(5));
2✔
688

689
            result
2✔
690
        }
2✔
691

692
        // this test attempts to verify that the tasks spawned by the JobQueue
693
        // continue running until the JobQueue is dropped after the tokio
694
        // runtime is dropped.
695
        //
696
        // If the tasks are cencelled before JobQueue is dropped then a subsequent
697
        // api call that sends a msg will result in a "channel closed" error, which
698
        // is what the test checks for.
699
        //
700
        // note that the test has to do some tricky stuff to setup conditions
701
        // where the "channel closed" error can occur. It's a subtle issue.
702
        //
703
        // see description at:
704
        // https://github.com/tokio-rs/tokio/discussions/6961
705
        pub(super) fn spawned_tasks_live_as_long_as_jobqueue(is_async: bool) -> anyhow::Result<()> {
1✔
706
            let rt = tokio::runtime::Runtime::new()?;
1✔
707

708
            let result_ok: Arc<Mutex<bool>> = Arc::new(Mutex::new(true));
1✔
709

1✔
710
            let result_ok_clone = result_ok.clone();
1✔
711
            rt.block_on(async {
1✔
712
                // create a job queue
1✔
713
                let job_queue = JobQueue::start();
1✔
714

1✔
715
                // spawns background task that adds job
1✔
716
                let job_queue_cloned = job_queue.clone();
1✔
717
                let jh = tokio::spawn(async move {
1✔
718
                    // sleep 200 ms to let runtime finish.
1✔
719
                    // ie ensure drop(rt) will be reached and wait for us.
1✔
720
                    // note that we use std sleep.  if tokio sleep is used
1✔
721
                    // the test will always succeed due to the await point.
1✔
722
                    std::thread::sleep(std::time::Duration::from_millis(200));
1✔
723

1✔
724
                    let job = Box::new(DoubleJob {
1✔
725
                        data: 10,
1✔
726
                        duration: std::time::Duration::from_secs(1),
1✔
727
                        is_async,
1✔
728
                    });
1✔
729

730
                    let result = job_queue_cloned.add_job(job, DoubleJobPriority::Low).await;
1✔
731

732
                    // an assert on result.is_ok() would panic, but that panic would be
733
                    // printed and swallowed by tokio runtime, so the test would succeed
734
                    // despite the panic. instead we pass the result in a mutex so it
735
                    // can be asserted where it will be caught by the test runner.
736
                    *result_ok_clone.lock().unwrap() = result.is_ok();
1✔
737
                });
1✔
738

1✔
739
                // sleep 50 ms to let job get started.
1✔
740
                tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1✔
741

742
                // note; awaiting the joinhandle makes the test succeed.
743

744
                jh.abort();
1✔
745
                let _ = jh.await;
1✔
746
            });
1✔
747

1✔
748
            // drop the tokio runtime. It will abort tasks.
1✔
749
            drop(rt);
1✔
750

1✔
751
            assert!(*result_ok.lock().unwrap());
1✔
752

753
            Ok(())
1✔
754
        }
1✔
755

756
        /// this test verifies that the queue is not stopped until the last
757
        /// JobQueue reference is dropped.
758
        pub(super) async fn stop_only_called_once_when_cloned() -> anyhow::Result<()> {
1✔
759
            let jq1 = JobQueue::start();
1✔
760
            let jq2 = jq1.clone();
1✔
761
            let jq3 = jq1.clone();
1✔
762

1✔
763
            let duration = std::time::Duration::from_secs(3600); // 1 hour job
1✔
764

1✔
765
            let job = Box::new(DoubleJob {
1✔
766
                data: 10,
1✔
767
                duration,
1✔
768
                is_async: true,
1✔
769
            });
1✔
770

771
            let rx_handle = jq1.add_job(job, DoubleJobPriority::Low).await?;
1✔
772

773
            assert!(!jq1.tx.is_closed());
1✔
774

775
            drop(jq2);
1✔
776
            assert!(!jq1.tx.is_closed());
1✔
777

778
            tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
1✔
779
            assert!(!rx_handle.cancel_tx.is_closed());
1✔
780

781
            drop(jq3);
1✔
782
            assert!(!jq1.tx.is_closed());
1✔
783

784
            drop(jq1);
1✔
785

786
            // rx_handle.cancel().unwrap();
787

788
            let completion = rx_handle.complete().await?;
1✔
789
            assert!(matches!(completion, JobCompletion::Cancelled));
1✔
790

791
            Ok(())
1✔
792
        }
1✔
793
    }
794
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc