• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Neptune-Crypto / neptune-core / 13911366652

17 Mar 2025 10:41PM UTC coverage: 84.279% (-0.04%) from 84.316%
13911366652

push

github

Sword-Smith
Revert "feat!: Communicate node's bootstrap status"

This reverts commit e62fa5bf3.

Revert to get rid of variant `PeerMessage::BootstrapStatus` which
unfortunately doesn't seem to be backwards compatible.

38 of 40 new or added lines in 2 files covered. (95.0%)

648 existing lines in 8 files now uncovered.

50757 of 60225 relevant lines covered (84.28%)

174317.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.54
/src/job_queue/queue.rs
1
use std::collections::VecDeque;
2
use std::sync::Arc;
3
use std::sync::Mutex;
4

5
use tokio::sync::mpsc;
6
use tokio::sync::oneshot;
7
use tokio::sync::watch;
8
use tokio_util::task::TaskTracker;
9

10
use super::errors::JobHandleError;
11
use super::errors::JobQueueError;
12
use super::traits::Job;
13
use super::traits::JobCancelReceiver;
14
use super::traits::JobCancelSender;
15
use super::traits::JobCompletion;
16
use super::traits::JobResult;
17
use super::traits::JobResultReceiver;
18
use super::traits::JobResultSender;
19

20
/// A job-handle enables cancelling a job and awaiting results
21
#[derive(Debug)]
22
pub struct JobHandle {
23
    result_rx: JobResultReceiver,
24
    cancel_tx: JobCancelSender,
25
}
26
impl JobHandle {
27
    /// wait for job to complete
28
    ///
29
    /// a completed job may either be finished or cancelled.
30
    pub async fn complete(self) -> Result<JobCompletion, JobHandleError> {
1,102✔
31
        Ok(self.result_rx.await?)
1,102✔
32
    }
1,102✔
33

34
    /// wait for job result, or err if cancelled.
35
    pub async fn result(self) -> Result<Box<dyn JobResult>, JobHandleError> {
1,099✔
36
        match self.complete().await? {
1,099✔
37
            JobCompletion::Finished(r) => Ok(r),
1,099✔
38
            JobCompletion::Cancelled => Err(JobHandleError::JobCancelled),
×
39
        }
40
    }
1,099✔
41

42
    /// cancel job and return immediately.
43
    pub fn cancel(&self) -> Result<(), JobHandleError> {
×
44
        Ok(self.cancel_tx.send(())?)
×
45
    }
×
46

47
    /// cancel job and wait for it to complete.
48
    pub async fn cancel_and_await(self) -> Result<JobCompletion, JobHandleError> {
2✔
49
        self.cancel_tx.send(())?;
2✔
50
        self.complete().await
2✔
51
    }
2✔
52

53
    /// channel receiver for job results
54
    pub fn result_rx(self) -> JobResultReceiver {
54✔
55
        self.result_rx
54✔
56
    }
54✔
57

58
    /// channel sender for cancelling job.
59
    pub fn cancel_tx(&self) -> &JobCancelSender {
1,079✔
60
        &self.cancel_tx
1,079✔
61
    }
1,079✔
62
}
63

64
/// messages that can be sent to job-queue inner task.
65
enum JobQueueMsg<P> {
66
    AddJob(AddJobMsg<P>),
67
    Stop,
68
}
69

70
/// represents a msg to add a job to the queue.
71
struct AddJobMsg<P> {
72
    job: Box<dyn Job>,
73
    result_tx: JobResultSender,
74
    cancel_tx: JobCancelSender,
75
    cancel_rx: JobCancelReceiver,
76
    priority: P,
77
}
78

79
/// implements a job queue that sends result of each job to a listener.
80
pub struct JobQueue<P> {
81
    tx: mpsc::UnboundedSender<JobQueueMsg<P>>,
82
    tracker: TaskTracker,
83
    refs: Arc<()>,
84
}
85

86
impl<P> std::fmt::Debug for JobQueue<P> {
87
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
88
        f.debug_struct("JobQueue")
×
89
            .field("tx", &"mpsc::Sender")
×
90
            .finish()
×
91
    }
×
92
}
93

94
impl<P> Clone for JobQueue<P> {
95
    fn clone(&self) -> Self {
263✔
96
        Self {
263✔
97
            tx: self.tx.clone(),
263✔
98
            tracker: self.tracker.clone(),
263✔
99
            refs: self.refs.clone(),
263✔
100
        }
263✔
101
    }
263✔
102
}
103

104
impl<P> Drop for JobQueue<P> {
105
    // since JobQueue has impl Clone there can be other instances.
106
    // we must only send the Stop message when dropping the last instance.
107
    // if upgrade of a Weak Arc fails then we are the last one.
108
    // the refs struct member exists only for this purpose.
109
    //
110
    // test stop_only_called_once_when_cloned exists to check
111
    // it is working.
112
    fn drop(&mut self) {
1,591✔
113
        let refs_weak = Arc::downgrade(&self.refs);
1,591✔
114
        self.refs = Arc::new(());
1,591✔
115

1,591✔
116
        // if upgrade fails, then this is the last reference.
1,591✔
117
        if refs_weak.upgrade().is_none() {
1,591✔
118
            let _ = self.tx.send(JobQueueMsg::Stop);
1,328✔
119
        }
1,328✔
120
    }
1,591✔
121
}
122

123
impl<P: Ord + Send + Sync + 'static> JobQueue<P> {
124
    /// creates job queue and starts it processing.  returns immediately.
125
    pub fn start() -> Self {
1,328✔
126
        struct CurrentJob {
127
            job_num: usize,
128
            cancel_tx: JobCancelSender,
129
        }
130
        struct Shared<P: Ord> {
131
            jobs: VecDeque<AddJobMsg<P>>,
132
            current_job: Option<CurrentJob>,
133
        }
134

135
        let (tx, mut rx) = mpsc::unbounded_channel::<JobQueueMsg<P>>();
1,328✔
136

1,328✔
137
        let shared = Shared {
1,328✔
138
            jobs: VecDeque::new(),
1,328✔
139
            current_job: None,
1,328✔
140
        };
1,328✔
141
        let jobs: Arc<Mutex<Shared<P>>> = Arc::new(Mutex::new(shared));
1,328✔
142

1,328✔
143
        let (tx_deque, mut rx_deque) = tokio::sync::mpsc::unbounded_channel();
1,328✔
144

1,328✔
145
        let tracker = TaskTracker::new();
1,328✔
146

1,328✔
147
        // spawns background task that adds incoming jobs to job-queue
1,328✔
148
        let jobs_rc1 = jobs.clone();
1,328✔
149
        tracker.spawn(async move {
1,328✔
150
            while let Some(msg) = rx.recv().await {
2,180✔
151
                match msg {
2,026✔
152
                    JobQueueMsg::AddJob(m) => {
1,161✔
153
                        let (num_jobs, job_running) = {
1,161✔
154
                            let mut guard = jobs_rc1.lock().unwrap();
1,161✔
155
                            guard.jobs.push_back(m);
1,161✔
156
                            let job_running = match &guard.current_job {
1,161✔
UNCOV
157
                                Some(j) => format!("#{}", j.job_num),
×
158
                                None => "none".to_string(),
1,161✔
159
                            };
160
                            (guard.jobs.len(), job_running)
1,161✔
161
                        };
1,161✔
162
                        tracing::info!(
1,161✔
163
                            "JobQueue: job added.  {} queued job(s).  job running: {}",
×
164
                            num_jobs,
165
                            job_running
166
                        );
167
                        let _ = tx_deque.send(());
1,161✔
168
                    }
169
                    JobQueueMsg::Stop => {
170
                        tracing::info!("JobQueue: received stop message.  stopping.");
865✔
171
                        drop(tx_deque); // close queue
865✔
172

865✔
173
                        // if there is a presently executing job we need to cancel it.
865✔
174
                        let guard = jobs_rc1.lock().unwrap();
865✔
175
                        if let Some(current_job) = &guard.current_job {
865✔
176
                            if !current_job.cancel_tx.is_closed() {
3✔
177
                                current_job.cancel_tx.send(()).unwrap();
3✔
178
                                tracing::info!("JobQueue: notified current job to stop.");
3✔
179
                            }
×
180
                        }
862✔
181
                        break;
865✔
182
                    }
183
                }
184
            }
185
        });
1,328✔
186

1,328✔
187
        // spawns background task that processes job queue and runs jobs.
1,328✔
188
        let jobs_rc2 = jobs.clone();
1,328✔
189
        tracker.spawn(async move {
1,328✔
190
            let mut job_num: usize = 1;
1,019✔
191

192
            while rx_deque.recv().await.is_some() {
2,175✔
193
                let (msg, pending) = {
1,160✔
194
                    let mut guard = jobs_rc2.lock().unwrap();
1,160✔
195
                    guard
1,160✔
196
                        .jobs
1,160✔
197
                        .make_contiguous()
1,160✔
198
                        .sort_by(|a, b| b.priority.cmp(&a.priority));
1,160✔
199
                    let job = guard.jobs.pop_front().unwrap();
1,160✔
200
                    guard.current_job = Some(CurrentJob {
1,160✔
201
                        job_num,
1,160✔
202
                        cancel_tx: job.cancel_tx.clone(),
1,160✔
203
                    });
1,160✔
204
                    (job, guard.jobs.len())
1,160✔
205
                };
1,160✔
206

1,160✔
207
                tracing::info!(
1,160✔
208
                    "  *** JobQueue: begin job #{} - {} queued job(s) ***",
×
209
                    job_num,
210
                    pending
211
                );
212
                let timer = tokio::time::Instant::now();
1,160✔
213
                let job_completion = match msg.job.is_async() {
1,160✔
214
                    true => msg.job.run_async_cancellable(msg.cancel_rx).await,
1,120✔
215
                    false => tokio::task::spawn_blocking(move || msg.job.run(msg.cancel_rx))
40✔
216
                        .await
40✔
217
                        .unwrap(),
38✔
218
                };
219
                tracing::info!(
1,156✔
220
                    "  *** JobQueue: ended job #{} - Completion: {} - {} secs ***",
×
221
                    job_num,
×
222
                    job_completion,
×
223
                    timer.elapsed().as_secs_f32()
×
224
                );
225
                job_num += 1;
1,156✔
226

1,156✔
227
                jobs_rc2.lock().unwrap().current_job = None;
1,156✔
228

1,156✔
229
                let _ = msg.result_tx.send(job_completion);
1,156✔
230
            }
231
        });
1,328✔
232
        tracker.close();
1,328✔
233

1,328✔
234
        Self {
1,328✔
235
            tx,
1,328✔
236
            tracker,
1,328✔
237
            refs: Arc::new(()),
1,328✔
238
        }
1,328✔
239
    }
1,328✔
240

241
    /// alias of Self::start().
242
    /// here for two reasons:
243
    ///  1. backwards compat with existing tests
244
    ///  2. if tests call dummy() instead of start(), then it is easier
245
    ///     to find where start() is called for real.
246
    #[cfg(test)]
247
    pub fn dummy() -> Self {
1,144✔
248
        Self::start()
1,144✔
249
    }
1,144✔
250

251
    /// adds job to job-queue and returns immediately.
252
    ///
253
    /// job-results can be obtained by via JobHandle::results().await
254
    /// The job can be cancelled by JobHandle::cancel()
255
    pub fn add_job(&self, job: Box<dyn Job>, priority: P) -> Result<JobHandle, JobQueueError> {
1,161✔
256
        let (result_tx, result_rx) = oneshot::channel();
1,161✔
257
        let (cancel_tx, cancel_rx) = watch::channel::<()>(());
1,161✔
258

1,161✔
259
        let msg = JobQueueMsg::AddJob(AddJobMsg {
1,161✔
260
            job,
1,161✔
261
            result_tx,
1,161✔
262
            cancel_tx: cancel_tx.clone(),
1,161✔
263
            cancel_rx: cancel_rx.clone(),
1,161✔
264
            priority,
1,161✔
265
        });
1,161✔
266
        self.tx
1,161✔
267
            .send(msg)
1,161✔
268
            .map_err(|e| JobQueueError::AddJobError(e.to_string()))?;
1,161✔
269

270
        Ok(JobHandle {
1,161✔
271
            result_rx,
1,161✔
272
            cancel_tx,
1,161✔
273
        })
1,161✔
274
    }
1,161✔
275
}
276

277
#[cfg(test)]
278
mod tests {
279
    use std::time::Instant;
280

281
    use tracing_test::traced_test;
282

283
    use super::*;
284

285
    #[tokio::test(flavor = "multi_thread")]
286
    #[traced_test]
×
287
    async fn run_sync_jobs_by_priority() -> anyhow::Result<()> {
1✔
288
        workers::run_jobs_by_priority(false).await
1✔
289
    }
1✔
290

291
    #[tokio::test(flavor = "multi_thread")]
292
    #[traced_test]
×
293
    async fn run_async_jobs_by_priority() -> anyhow::Result<()> {
1✔
294
        workers::run_jobs_by_priority(true).await
1✔
295
    }
1✔
296

297
    #[tokio::test(flavor = "multi_thread")]
298
    #[traced_test]
×
299
    async fn get_sync_job_result() -> anyhow::Result<()> {
1✔
300
        workers::get_job_result(false).await
1✔
301
    }
1✔
302

303
    #[tokio::test(flavor = "multi_thread")]
304
    #[traced_test]
×
305
    async fn get_async_job_result() -> anyhow::Result<()> {
1✔
306
        workers::get_job_result(true).await
1✔
307
    }
1✔
308

309
    #[tokio::test(flavor = "multi_thread")]
310
    #[traced_test]
×
311
    async fn cancel_sync_job() -> anyhow::Result<()> {
1✔
312
        workers::cancel_job(false).await
1✔
313
    }
1✔
314

315
    #[tokio::test(flavor = "multi_thread")]
316
    #[traced_test]
×
317
    async fn cancel_async_job() -> anyhow::Result<()> {
1✔
318
        workers::cancel_job(true).await
1✔
319
    }
1✔
320

321
    #[test]
322
    #[traced_test]
×
323
    fn runtime_shutdown_timeout_force_cancels_sync_job() -> anyhow::Result<()> {
1✔
324
        workers::runtime_shutdown_timeout_force_cancels_job(false)
1✔
325
    }
1✔
326

327
    #[test]
328
    #[traced_test]
×
329
    fn runtime_shutdown_timeout_force_cancels_async_job() -> anyhow::Result<()> {
1✔
330
        workers::runtime_shutdown_timeout_force_cancels_job(true)
1✔
331
    }
1✔
332

333
    #[test]
334
    #[traced_test]
×
335
    fn runtime_shutdown_cancels_sync_job() {
1✔
336
        let _ = workers::runtime_shutdown_cancels_job(false);
1✔
337
    }
1✔
338

339
    #[test]
340
    #[traced_test]
×
341
    fn runtime_shutdown_cancels_async_job() -> anyhow::Result<()> {
1✔
342
        workers::runtime_shutdown_cancels_job(true)
1✔
343
    }
1✔
344

345
    #[test]
346
    #[traced_test]
×
347
    fn spawned_tasks_live_as_long_as_jobqueue() -> anyhow::Result<()> {
1✔
348
        workers::spawned_tasks_live_as_long_as_jobqueue(true)
1✔
349
    }
1✔
350

351
    #[tokio::test]
352
    #[traced_test]
×
353
    async fn stop_only_called_once_when_cloned() -> anyhow::Result<()> {
1✔
354
        workers::stop_only_called_once_when_cloned().await
1✔
355
    }
1✔
356

357
    mod workers {
358
        use std::any::Any;
359

360
        use super::*;
361

362
        #[derive(PartialEq, Eq, PartialOrd, Ord)]
363
        pub enum DoubleJobPriority {
364
            Low = 1,
365
            Medium = 2,
366
            High = 3,
367
        }
368

369
        #[derive(PartialEq, Debug, Clone)]
370
        struct DoubleJobResult(u64, u64, Instant);
371
        impl JobResult for DoubleJobResult {
372
            fn as_any(&self) -> &dyn Any {
472✔
373
                self
472✔
374
            }
472✔
375
            fn into_any(self: Box<Self>) -> Box<dyn Any> {
74✔
376
                self
74✔
377
            }
74✔
378
        }
379

380
        // represents a prover job.  implements Job.
381
        struct DoubleJob {
382
            data: u64,
383
            duration: std::time::Duration,
384
            is_async: bool,
385
        }
386

387
        #[async_trait::async_trait]
388
        impl Job for DoubleJob {
389
            fn is_async(&self) -> bool {
81✔
390
                self.is_async
81✔
391
            }
81✔
392

393
            fn run(&self, cancel_rx: JobCancelReceiver) -> JobCompletion {
40✔
394
                let start = Instant::now();
40✔
395
                let sleep_time =
40✔
396
                    std::cmp::min(std::time::Duration::from_micros(100), self.duration);
40✔
397

398
                let r = loop {
40✔
399
                    if start.elapsed() < self.duration {
5,274✔
400
                        match cancel_rx.has_changed() {
5,237✔
401
                            Ok(changed) if changed => break JobCompletion::Cancelled,
1✔
402
                            Err(_) => break JobCompletion::Cancelled,
2✔
403
                            _ => {}
5,234✔
404
                        }
5,234✔
405

5,234✔
406
                        std::thread::sleep(sleep_time);
5,234✔
407
                    } else {
408
                        break JobCompletion::Finished(Box::new(DoubleJobResult(
37✔
409
                            self.data,
37✔
410
                            self.data * 2,
37✔
411
                            Instant::now(),
37✔
412
                        )));
37✔
413
                    }
414
                };
415

416
                tracing::info!("results: {:?}", r);
40✔
417
                r
40✔
418
            }
40✔
419

420
            async fn run_async(&self) -> Box<dyn JobResult> {
41✔
421
                tokio::time::sleep(self.duration).await;
41✔
422
                let r = DoubleJobResult(self.data, self.data * 2, Instant::now());
37✔
423

37✔
424
                tracing::info!("results: {:?}", r);
37✔
425
                Box::new(r)
37✔
426
            }
78✔
427
        }
428

429
        // this test demonstrates/verifies that:
430
        //  1. jobs are run in priority order, highest priority first.
431
        //  2. when multiple jobs have the same priority, they run in FIFO order.
432
        pub(super) async fn run_jobs_by_priority(is_async: bool) -> anyhow::Result<()> {
2✔
433
            let start_of_test = Instant::now();
2✔
434

2✔
435
            // create a job queue
2✔
436
            let job_queue = JobQueue::start();
2✔
437

2✔
438
            let mut handles = vec![];
2✔
439
            let duration = std::time::Duration::from_millis(20);
2✔
440

441
            // create 30 jobs, 10 at each priority level.
442
            for i in (1..10).rev() {
18✔
443
                let job1 = Box::new(DoubleJob {
18✔
444
                    data: i,
18✔
445
                    duration,
18✔
446
                    is_async,
18✔
447
                });
18✔
448
                let job2 = Box::new(DoubleJob {
18✔
449
                    data: i * 100,
18✔
450
                    duration,
18✔
451
                    is_async,
18✔
452
                });
18✔
453
                let job3 = Box::new(DoubleJob {
18✔
454
                    data: i * 1000,
18✔
455
                    duration,
18✔
456
                    is_async,
18✔
457
                });
18✔
458

18✔
459
                // process job and print results.
18✔
460
                handles.push(job_queue.add_job(job1, DoubleJobPriority::Low)?.result_rx());
18✔
461
                handles.push(
18✔
462
                    job_queue
18✔
463
                        .add_job(job2, DoubleJobPriority::Medium)?
18✔
464
                        .result_rx(),
18✔
465
                );
18✔
466
                handles.push(
18✔
467
                    job_queue
18✔
468
                        .add_job(job3, DoubleJobPriority::High)?
18✔
469
                        .result_rx(),
18✔
470
                );
471
            }
472

473
            // wait for all jobs to complete.
474
            let mut results = futures::future::join_all(handles).await;
2✔
475

476
            // the results are in the same order as handles passed to join_all.
477
            // we sort them by the timestamp in job result, ascending.
478
            results.sort_by(
2✔
479
                |a_completion, b_completion| match (a_completion, b_completion) {
236✔
480
                    (Ok(JobCompletion::Finished(a_dyn)), Ok(JobCompletion::Finished(b_dyn))) => {
236✔
481
                        let a = a_dyn.as_any().downcast_ref::<DoubleJobResult>().unwrap().2;
236✔
482

236✔
483
                        let b = b_dyn.as_any().downcast_ref::<DoubleJobResult>().unwrap().2;
236✔
484

236✔
485
                        a.cmp(&b)
236✔
486
                    }
487
                    _ => panic!("at least one job did not finish"),
×
488
                },
236✔
489
            );
2✔
490

2✔
491
            // iterate job results and verify that:
2✔
492
            //   timestamp of each is greater than prev.
2✔
493
            //   input value of each is greater than prev, except every 9th item which should be < prev
2✔
494
            //     because there are nine jobs per level.
2✔
495
            let mut prev = Box::new(DoubleJobResult(9999, 0, start_of_test));
2✔
496
            for (i, c) in results.into_iter().enumerate() {
54✔
497
                let Ok(JobCompletion::Finished(dyn_result)) = c else {
54✔
498
                    panic!("A job did not finish");
×
499
                };
500

501
                let job_result = dyn_result.into_any().downcast::<DoubleJobResult>().unwrap();
54✔
502

54✔
503
                assert!(job_result.2 > prev.2);
54✔
504

505
                // we don't do the assertion for the 2nd job because the job-queue starts
506
                // processing immediately and so a race condition is setup where it is possible
507
                // for either the Low priority or High job to start processing first.
508
                if i != 1 {
54✔
509
                    assert!(job_result.0 < prev.0);
52✔
510
                }
2✔
511

512
                prev = job_result;
54✔
513
            }
514

515
            Ok(())
2✔
516
        }
2✔
517

518
        // this test demonstrates/verifies that a job can return a result back to
519
        // the job initiator.
520
        pub(super) async fn get_job_result(is_async: bool) -> anyhow::Result<()> {
2✔
521
            // create a job queue
2✔
522
            let job_queue = JobQueue::start();
2✔
523
            let duration = std::time::Duration::from_millis(20);
2✔
524

525
            // create 10 jobs
526
            for i in 0..10 {
22✔
527
                let job = Box::new(DoubleJob {
20✔
528
                    data: i,
20✔
529
                    duration,
20✔
530
                    is_async,
20✔
531
                });
20✔
532

533
                let result = job_queue
20✔
534
                    .add_job(job, DoubleJobPriority::Low)?
20✔
535
                    .result()
20✔
536
                    .await?;
20✔
537

538
                let job_result = result.into_any().downcast::<DoubleJobResult>().unwrap();
20✔
539

20✔
540
                assert_eq!(i, job_result.0);
20✔
541
                assert_eq!(i * 2, job_result.1);
20✔
542
            }
543

544
            Ok(())
2✔
545
        }
2✔
546

547
        // tests/demonstrates that a long running job can be cancelled early.
548
        pub(super) async fn cancel_job(is_async: bool) -> anyhow::Result<()> {
2✔
549
            // create a job queue
2✔
550
            let job_queue = JobQueue::start();
2✔
551
            // start a 1 hour job.
2✔
552
            let duration = std::time::Duration::from_secs(3600); // 1 hour job.
2✔
553

2✔
554
            let job = Box::new(DoubleJob {
2✔
555
                data: 10,
2✔
556
                duration,
2✔
557
                is_async,
2✔
558
            });
2✔
559
            let job_handle = job_queue.add_job(job, DoubleJobPriority::Low)?;
2✔
560

561
            tokio::time::sleep(std::time::Duration::from_millis(20)).await;
2✔
562

563
            let completion = job_handle.cancel_and_await().await.unwrap();
2✔
564
            assert!(matches!(completion, JobCompletion::Cancelled));
2✔
565

566
            Ok(())
2✔
567
        }
2✔
568

569
        // note: creates own tokio runtime.  caller must not use [tokio::test]
570
        //
571
        // this test starts a job that runs for 1 hour and then attempts to
572
        // shutdown tokio runtime via shutdown_timeout() with a 1 sec timeout.
573
        //
574
        // any async tasks should be aborted quickly.
575
        // any sync tasks will continue to run to completion.
576
        //
577
        // shutdown_timeout() will wait for tasks to abort for 1 sec and then
578
        // returns.  Any un-aborted tasks/threads become ignored/detached.
579
        // The OS can cleanup such threads when the process exits.
580
        //
581
        // the test checks that the shutdown completes in under 2 secs.
582
        //
583
        // the test demonstrates that shutdown_timeout() can be used to shutdown
584
        // tokio runtime even if sync (spawn_blocking) tasks/threads are still running
585
        // in the blocking threadpool.
586
        //
587
        // when called with is_async=true, it demonstrates that shutdown_timeout() also
588
        // aborts async jobs, as one would expect.
589
        pub(super) fn runtime_shutdown_timeout_force_cancels_job(
2✔
590
            is_async: bool,
2✔
591
        ) -> anyhow::Result<()> {
2✔
592
            let rt = tokio::runtime::Runtime::new()?;
2✔
593
            let result = rt.block_on(async {
2✔
594
                // create a job queue
2✔
595
                let job_queue = JobQueue::start();
2✔
596
                // start a 1 hour job.
2✔
597
                let duration = std::time::Duration::from_secs(3600); // 1 hour job.
2✔
598

2✔
599
                let job = Box::new(DoubleJob {
2✔
600
                    data: 10,
2✔
601
                    duration,
2✔
602
                    is_async,
2✔
603
                });
2✔
604
                let _rx = job_queue.add_job(job, DoubleJobPriority::Low)?;
2✔
605

606
                tokio::time::sleep(std::time::Duration::from_millis(20)).await;
2✔
607
                println!("finished scope");
2✔
608

2✔
609
                Ok(())
2✔
610
            });
2✔
611

2✔
612
            let start = std::time::Instant::now();
2✔
613

2✔
614
            println!("waiting 1 second for job before shutdown runtime");
2✔
615
            rt.shutdown_timeout(tokio::time::Duration::from_secs(1));
2✔
616

2✔
617
            assert!(start.elapsed() < std::time::Duration::from_secs(2));
2✔
618

619
            result
2✔
620
        }
2✔
621

622
        // note: creates own tokio runtime.  caller must not use [tokio::test]
623
        //
624
        // this test starts a job that runs for 5 secs and then attempts to
625
        // shutdown tokio runtime normally by dropping it.
626
        //
627
        // any async tasks should be aborted quickly.
628
        // any sync tasks will continue to run to completion.
629
        //
630
        // the tokio runtime does not complete the drop() until all tasks
631
        // have completed/aborted.
632
        //
633
        // the test checks that the job finishes in less than the 5 secs
634
        // required for full completion.  In other words, that it aborts.
635
        //
636
        // the test is expected to succeed for async jobs but fail for sync jobs.
637
        pub(super) fn runtime_shutdown_cancels_job(is_async: bool) -> anyhow::Result<()> {
2✔
638
            let rt = tokio::runtime::Runtime::new()?;
2✔
639
            let start = tokio::time::Instant::now();
2✔
640

2✔
641
            let result = rt.block_on(async {
2✔
642
                // create a job queue
2✔
643
                let job_queue = JobQueue::start();
2✔
644

2✔
645
                // this job takes at least 5 secs to complete.
2✔
646
                let duration = std::time::Duration::from_secs(5);
2✔
647

2✔
648
                let job = Box::new(DoubleJob {
2✔
649
                    data: 10,
2✔
650
                    duration,
2✔
651
                    is_async,
2✔
652
                });
2✔
653

654
                let rx_handle = job_queue.add_job(job, DoubleJobPriority::Low)?;
2✔
655
                drop(rx_handle);
2✔
656

2✔
657
                // sleep 50 ms to let job get started.
2✔
658
                tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2✔
659

660
                Ok(())
2✔
661
            });
2✔
662

2✔
663
            // drop the tokio runtime. It will attempt to abort tasks.
2✔
664
            //   - async tasks can normally be aborted
2✔
665
            //   - spawn_blocking (sync) tasks cannot normally be aborted.
2✔
666
            drop(rt);
2✔
667

2✔
668
            // if test is successful, elapsed time should be less than the 5 secs
2✔
669
            // it takes for the job to complete.  (should be around 0.5 ms)
2✔
670

2✔
671
            // however it is expected/normal that sync tasks will not be aborted
2✔
672
            // and will run for full 5 secs.  thus this assert will fail for them.
2✔
673

2✔
674
            assert!(start.elapsed() < std::time::Duration::from_secs(5));
2✔
675

676
            result
2✔
677
        }
2✔
678

679
        // this test attempts to verify that the tasks spawned by the JobQueue
680
        // continue running until the JobQueue is dropped after the tokio
681
        // runtime is dropped.
682
        //
683
        // If the tasks are cencelled before JobQueue is dropped then a subsequent
684
        // api call that sends a msg will result in a "channel closed" error, which
685
        // is what the test checks for.
686
        //
687
        // note that the test has to do some tricky stuff to setup conditions
688
        // where the "channel closed" error can occur. It's a subtle issue.
689
        //
690
        // see description at:
691
        // https://github.com/tokio-rs/tokio/discussions/6961
692
        pub(super) fn spawned_tasks_live_as_long_as_jobqueue(is_async: bool) -> anyhow::Result<()> {
1✔
693
            let rt = tokio::runtime::Runtime::new()?;
1✔
694

695
            let result_ok: Arc<Mutex<bool>> = Arc::new(Mutex::new(true));
1✔
696

1✔
697
            let result_ok_clone = result_ok.clone();
1✔
698
            rt.block_on(async {
1✔
699
                // create a job queue
1✔
700
                let job_queue = JobQueue::start();
1✔
701

1✔
702
                // spawns background task that adds job
1✔
703
                let job_queue_cloned = job_queue.clone();
1✔
704
                let jh = tokio::spawn(async move {
1✔
705
                    // sleep 200 ms to let runtime finish.
1✔
706
                    // ie ensure drop(rt) will be reached and wait for us.
1✔
707
                    // note that we use std sleep.  if tokio sleep is used
1✔
708
                    // the test will always succeed due to the await point.
1✔
709
                    std::thread::sleep(std::time::Duration::from_millis(200));
1✔
710

1✔
711
                    let job = Box::new(DoubleJob {
1✔
712
                        data: 10,
1✔
713
                        duration: std::time::Duration::from_secs(1),
1✔
714
                        is_async,
1✔
715
                    });
1✔
716

1✔
717
                    let result = job_queue_cloned.add_job(job, DoubleJobPriority::Low);
1✔
718

1✔
719
                    // an assert on result.is_ok() would panic, but that panic would be
1✔
720
                    // printed and swallowed by tokio runtime, so the test would succeed
1✔
721
                    // despite the panic. instead we pass the result in a mutex so it
1✔
722
                    // can be asserted where it will be caught by the test runner.
1✔
723
                    *result_ok_clone.lock().unwrap() = result.is_ok();
1✔
724
                });
1✔
725

1✔
726
                // sleep 50 ms to let job get started.
1✔
727
                tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1✔
728

729
                // note; awaiting the joinhandle makes the test succeed.
730

731
                jh.abort();
1✔
732
                let _ = jh.await;
1✔
733
            });
1✔
734

1✔
735
            // drop the tokio runtime. It will abort tasks.
1✔
736
            drop(rt);
1✔
737

1✔
738
            assert!(*result_ok.lock().unwrap());
1✔
739

740
            Ok(())
1✔
741
        }
1✔
742

743
        /// this test verifies that the queue is not stopped until the last
744
        /// JobQueue reference is dropped.
745
        pub(super) async fn stop_only_called_once_when_cloned() -> anyhow::Result<()> {
1✔
746
            let jq1 = JobQueue::start();
1✔
747
            let jq2 = jq1.clone();
1✔
748
            let jq3 = jq1.clone();
1✔
749

1✔
750
            let duration = std::time::Duration::from_secs(3600); // 1 hour job
1✔
751

1✔
752
            let job = Box::new(DoubleJob {
1✔
753
                data: 10,
1✔
754
                duration,
1✔
755
                is_async: true,
1✔
756
            });
1✔
757

758
            let rx_handle = jq1.add_job(job, DoubleJobPriority::Low)?;
1✔
759

760
            assert!(!jq1.tx.is_closed());
1✔
761

762
            drop(jq2);
1✔
763
            assert!(!jq1.tx.is_closed());
1✔
764

765
            tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
1✔
766
            assert!(!rx_handle.cancel_tx.is_closed());
1✔
767

768
            drop(jq3);
1✔
769
            assert!(!jq1.tx.is_closed());
1✔
770

771
            drop(jq1);
1✔
772

773
            // rx_handle.cancel().unwrap();
774

775
            let completion = rx_handle.complete().await?;
1✔
776
            assert!(matches!(completion, JobCompletion::Cancelled));
1✔
777

778
            Ok(())
1✔
779
        }
1✔
780
    }
781
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc