• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nicholaswilde / aria2-mcp-rs / 22745817467

06 Mar 2026 02:07AM UTC coverage: 89.687%. Remained the same
22745817467

Pull #10

github

web-flow
Merge 5ccd615a5 into 491001803
Pull Request #10: chore(deps): bump docker/login-action from 3 to 4

5070 of 5653 relevant lines covered (89.69%)

1519542.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.97
/src/server/mod.rs
1
pub mod handler;
2
pub mod mcp;
3
pub mod sse;
4

5
use anyhow::Result;
6
use chrono::{Datelike, Local, Timelike};
7
use std::sync::Arc;
8
use tokio::sync::RwLock;
9
use tokio::time::{self, Duration};
10

11
use crate::aria2::notifications::Aria2Notification;
12
use crate::aria2::recovery::RecoveryManager;
13
use crate::aria2::Aria2Client;
14
use crate::config::{Config, TransportType};
15
use crate::prompts::PromptRegistry;
16
use crate::resources::ResourceRegistry;
17
use crate::tools::ToolRegistry;
18

19
pub struct McpServer {
20
    config: Config,
21
    registry: Arc<RwLock<ToolRegistry>>,
22
    resource_registry: Arc<RwLock<ResourceRegistry>>,
23
    prompt_registry: Arc<RwLock<PromptRegistry>>,
24
    clients: Vec<Arc<Aria2Client>>,
25
    recovery_manager: Arc<RecoveryManager>,
26
}
27

28
impl McpServer {
29
    pub fn new(
3✔
30
        config: Config,
3✔
31
        registry: ToolRegistry,
3✔
32
        resource_registry: ResourceRegistry,
3✔
33
        prompt_registry: PromptRegistry,
3✔
34
        clients: Vec<Aria2Client>,
3✔
35
    ) -> Self {
3✔
36
        let recovery_manager = Arc::new(RecoveryManager::new(config.retry_config.clone()));
3✔
37
        Self {
3✔
38
            config,
3✔
39
            registry: Arc::new(RwLock::new(registry)),
3✔
40
            resource_registry: Arc::new(RwLock::new(resource_registry)),
3✔
41
            prompt_registry: Arc::new(RwLock::new(prompt_registry)),
3✔
42
            clients: clients.into_iter().map(Arc::new).collect(),
3✔
43
            recovery_manager,
3✔
44
        }
3✔
45
    }
3✔
46

47
    #[must_use]
48
    pub fn clients(&self) -> &[Arc<Aria2Client>] {
1✔
49
        &self.clients
1✔
50
    }
1✔
51

52
    pub async fn run(&self) -> Result<()> {
1✔
53
        let (notification_tx, notification_rx) =
1✔
54
            tokio::sync::mpsc::channel::<Aria2Notification>(100);
1✔
55

56
        for client in &self.clients {
1✔
57
            let client_clone = Arc::clone(client);
1✔
58
            let tx_clone = notification_tx.clone();
1✔
59
            tokio::spawn(async move {
1✔
60
                if let Err(e) = client_clone.start_notifications(tx_clone) {
×
61
                    log::error!(
×
62
                        "Notification error for instance {}: {}",
63
                        client_clone.name,
×
64
                        e
65
                    );
66
                }
×
67
            });
×
68

69
            let client_clone = Arc::clone(client);
1✔
70
            tokio::spawn(async move {
1✔
71
                if let Err(e) = start_scheduler(client_clone).await {
×
72
                    log::error!("Scheduler error: {e}");
×
73
                }
×
74
            });
×
75

76
            let client_clone = Arc::clone(client);
1✔
77
            let recovery_manager_clone = Arc::clone(&self.recovery_manager);
1✔
78
            tokio::spawn(async move {
1✔
79
                if let Err(e) = start_recovery_task(client_clone, recovery_manager_clone).await {
×
80
                    log::error!("Recovery task error: {e}");
×
81
                }
×
82
            });
×
83

84
            let client_clone = Arc::clone(client);
1✔
85
            tokio::spawn(async move {
1✔
86
                if let Err(e) = crate::tools::rss::start_rss_monitoring(client_clone).await {
×
87
                    log::error!("RSS monitoring error: {e}");
×
88
                }
×
89
            });
×
90

91
            let client_clone = Arc::clone(client);
1✔
92
            tokio::spawn(async move {
1✔
93
                if let Err(e) = start_purge_task(client_clone).await {
×
94
                    log::error!("Purge task error: {e}");
×
95
                }
×
96
            });
×
97
        }
98

99
        match self.config.transport {
1✔
100
            TransportType::Stdio => {
101
                let state = Arc::new(RwLock::new(mcp::McpState::new(self.config.lazy_mode)));
×
102
                mcp::run_stdio(
×
103
                    state,
×
104
                    Arc::clone(&self.registry),
×
105
                    Arc::clone(&self.resource_registry),
×
106
                    Arc::clone(&self.prompt_registry),
×
107
                    self.clients.clone(),
×
108
                    notification_rx,
×
109
                )
×
110
                .await
×
111
            }
112
            TransportType::Sse => {
113
                if !check_port_available(&self.config.http_host, self.config.http_port).await {
1✔
114
                    return Err(anyhow::anyhow!(
1✔
115
                        "HTTP port {}:{} is already in use",
1✔
116
                        self.config.http_host,
1✔
117
                        self.config.http_port
1✔
118
                    ));
1✔
119
                }
×
120
                sse::run_server(
×
121
                    self.config.http_host.clone(),
×
122
                    self.config.http_port,
×
123
                    self.config.http_auth_token.clone(),
×
124
                    Arc::clone(&self.registry),
×
125
                    Arc::clone(&self.resource_registry),
×
126
                    Arc::clone(&self.prompt_registry),
×
127
                    self.clients.clone(),
×
128
                    notification_rx,
×
129
                )
×
130
                .await
×
131
            }
132
        }
133
    }
1✔
134
}
135

136
pub async fn start_purge_task(client: Arc<Aria2Client>) -> Result<()> {
3✔
137
    let mut interval = time::interval(Duration::from_secs(60));
3✔
138

139
    loop {
140
        interval.tick().await;
7✔
141

142
        let config = client.config();
4✔
143
        let purge_config = {
4✔
144
            let config_guard = config.read().await;
4✔
145
            config_guard.purge_config.clone()
4✔
146
        };
147

148
        if !purge_config.enabled {
4✔
149
            continue;
1✔
150
        }
3✔
151

152
        // Adjust interval if needed
153
        if interval.period().as_secs() != purge_config.interval_secs {
3✔
154
            interval = time::interval(Duration::from_secs(purge_config.interval_secs));
2✔
155
            interval.tick().await;
2✔
156
        }
1✔
157

158
        let stopped = match client.tell_stopped(0, 1000, None).await {
3✔
159
            Ok(s) => s,
2✔
160
            Err(e) => {
1✔
161
                log::error!("Failed to fetch stopped downloads for purge: {e}");
1✔
162
                continue;
1✔
163
            }
164
        };
165

166
        if let Some(items) = stopped.as_array() {
2✔
167
            for item in items {
4✔
168
                if let Some(gid) = item.get("gid").and_then(|v| v.as_str()) {
4✔
169
                    if purge_config.excluded_gids.contains(gid) {
4✔
170
                        continue;
×
171
                    }
4✔
172

173
                    if is_purgeable(item) {
4✔
174
                        log::info!("Purging download {} (instance {})...", gid, client.name);
2✔
175
                        if let Err(e) = client.remove_download_result(gid).await {
2✔
176
                            log::error!("Failed to purge download {gid}: {e}");
×
177
                        }
2✔
178
                    }
2✔
179
                }
×
180
            }
181
        }
×
182
    }
183
}
184

185
#[must_use]
186
pub fn is_purgeable(item: &serde_json::Value) -> bool {
8✔
187
    if let Some(status) = item.get("status").and_then(|v| v.as_str()) {
8✔
188
        // For now, we purge anything that is complete or error.
189
        // We could add more complex logic here later if aria2 provides timestamps.
190
        return status == "complete" || status == "error";
8✔
191
    }
×
192
    false
×
193
}
8✔
194

195
async fn start_recovery_task(
2✔
196
    client: Arc<Aria2Client>,
2✔
197
    recovery_manager: Arc<RecoveryManager>,
2✔
198
) -> Result<()> {
2✔
199
    let mut interval = time::interval(Duration::from_secs(30));
2✔
200

201
    loop {
202
        interval.tick().await;
4✔
203

204
        // Check for stopped downloads with errors
205
        let stopped = match client.tell_stopped(0, 100, None).await {
2✔
206
            Ok(s) => s,
2✔
207
            Err(e) => {
×
208
                log::error!("Failed to fetch stopped downloads for recovery: {e}");
×
209
                continue;
×
210
            }
211
        };
212

213
        if let Some(items) = stopped.as_array() {
2✔
214
            for item in items {
2✔
215
                if let Some(gid) = item.get("gid").and_then(|v| v.as_str()) {
1✔
216
                    if let Some(backoff) = recovery_manager
1✔
217
                        .analyze_and_get_retry_backoff(gid, item)
1✔
218
                        .await
1✔
219
                    {
220
                        log::info!(
×
221
                            "Recovery needed for download {} (instance {}). Retrying in {} seconds.",
222
                            gid,
223
                            client.name,
×
224
                            backoff
225
                        );
226

227
                        let client_retry = Arc::clone(&client);
×
228
                        let recovery_manager_retry = Arc::clone(&recovery_manager);
×
229
                        let gid_retry = gid.to_string();
×
230

231
                        tokio::spawn(async move {
×
232
                            tokio::time::sleep(Duration::from_secs(backoff)).await;
×
233
                            if let Err(e) = recovery_manager_retry
×
234
                                .perform_retry(&client_retry, &gid_retry)
×
235
                                .await
×
236
                            {
237
                                log::error!("Retry failed for download {gid_retry}: {e}");
×
238
                            }
×
239
                        });
×
240
                    }
1✔
241
                }
×
242
            }
243
        }
×
244
    }
245
}
246

247
async fn check_port_available(host: &str, port: u16) -> bool {
3✔
248
    let addr_str = format!("{host}:{port}");
3✔
249
    tokio::net::TcpListener::bind(&addr_str).await.is_ok()
3✔
250
}
3✔
251

252
fn get_active_profile(
7✔
253
    current_day: &str,
7✔
254
    current_time: &str,
7✔
255
    schedules: &[crate::config::BandwidthSchedule],
7✔
256
) -> Option<String> {
7✔
257
    for schedule in schedules {
10✔
258
        if schedule.day == "daily" || schedule.day == current_day {
10✔
259
            let start = schedule.start_time.as_str();
7✔
260
            let end = schedule.end_time.as_str();
7✔
261

262
            if start <= end {
7✔
263
                // Normal range
264
                if current_time >= start && current_time < end {
4✔
265
                    return Some(schedule.profile_name.clone());
4✔
266
                }
×
267
            } else {
268
                // Wraps around midnight
269
                if current_time >= start || current_time < end {
3✔
270
                    return Some(schedule.profile_name.clone());
1✔
271
                }
2✔
272
            }
273
        }
3✔
274
    }
275
    None
2✔
276
}
7✔
277

278
async fn start_scheduler(client: Arc<Aria2Client>) -> Result<()> {
3✔
279
    let mut interval = time::interval(Duration::from_secs(60));
3✔
280
    let mut last_profile: Option<String> = None;
3✔
281

282
    loop {
283
        interval.tick().await;
6✔
284

285
        let now = Local::now();
3✔
286
        let current_day = match now.weekday() {
3✔
287
            chrono::Weekday::Mon => "mon",
×
288
            chrono::Weekday::Tue => "tue",
×
289
            chrono::Weekday::Wed => "wed",
×
290
            chrono::Weekday::Thu => "thu",
×
291
            chrono::Weekday::Fri => "fri",
3✔
292
            chrono::Weekday::Sat => "sat",
×
293
            chrono::Weekday::Sun => "sun",
×
294
        };
295
        let current_time = format!("{:02}:{:02}", now.hour(), now.minute());
3✔
296

297
        let (profiles, schedules) = {
3✔
298
            let config = client.config();
3✔
299
            let config_guard = config.read().await;
3✔
300
            (
3✔
301
                config_guard.bandwidth_profiles.clone(),
3✔
302
                config_guard.bandwidth_schedules.clone(),
3✔
303
            )
3✔
304
        };
305

306
        let active_profile_name = get_active_profile(current_day, &current_time, &schedules);
3✔
307

308
        if let Some(profile_name) = active_profile_name {
3✔
309
            if last_profile.as_ref() != Some(&profile_name) {
3✔
310
                if let Some(profile) = profiles.get(&profile_name) {
3✔
311
                    log::info!("Activating bandwidth profile: {profile_name}");
2✔
312
                    let options = serde_json::json!({
2✔
313
                        "max-overall-download-limit": profile.max_download,
2✔
314
                        "max-overall-upload-limit": profile.max_upload,
2✔
315
                    });
316
                    if let Err(e) = client.change_global_option(options).await {
2✔
317
                        log::error!("Failed to activate profile '{profile_name}': {e}");
1✔
318
                    } else {
1✔
319
                        last_profile = Some(profile_name);
1✔
320
                    }
1✔
321
                }
1✔
322
            }
×
323
        } else if last_profile.is_some() {
×
324
            last_profile = None;
×
325
        }
×
326
    }
327
}
328

329
#[cfg(test)]
330
mod tests {
331
    use super::*;
332
    use crate::config::BandwidthSchedule;
333

334
    #[test]
335
    fn test_get_active_profile() {
1✔
336
        let schedules = vec![
1✔
337
            BandwidthSchedule {
1✔
338
                day: "mon".to_string(),
1✔
339
                start_time: "09:00".to_string(),
1✔
340
                end_time: "17:00".to_string(),
1✔
341
                profile_name: "work".to_string(),
1✔
342
            },
1✔
343
            BandwidthSchedule {
1✔
344
                day: "daily".to_string(),
1✔
345
                start_time: "22:00".to_string(),
1✔
346
                end_time: "06:00".to_string(), // Note: this simple logic won't handle midnight wrap correctly if not split, but let's test it as implemented
1✔
347
                profile_name: "night".to_string(),
1✔
348
            },
1✔
349
        ];
350

351
        // Monday 10:00 -> work
352
        assert_eq!(
1✔
353
            get_active_profile("mon", "10:00", &schedules),
1✔
354
            Some("work".to_string())
1✔
355
        );
356

357
        // Tuesday 10:00 -> None
358
        assert_eq!(get_active_profile("tue", "10:00", &schedules), None);
1✔
359

360
        // Any day 23:00 -> night
361
        assert_eq!(
1✔
362
            get_active_profile("wed", "23:00", &schedules),
1✔
363
            Some("night".to_string())
1✔
364
        );
365

366
        // Any day 07:00 -> None
367
        assert_eq!(get_active_profile("thu", "07:00", &schedules), None);
1✔
368
    }
1✔
369

370
    #[test]
371
    fn test_new_server() {
1✔
372
        let config = Config::default();
1✔
373
        let registry = ToolRegistry::new(&config);
1✔
374
        let resource_registry = ResourceRegistry::default();
1✔
375
        let prompt_registry = PromptRegistry::default();
1✔
376
        let client = Aria2Client::new(config.clone());
1✔
377
        let _server = McpServer::new(
1✔
378
            config,
1✔
379
            registry,
1✔
380
            resource_registry,
1✔
381
            prompt_registry,
1✔
382
            vec![client],
1✔
383
        );
384
    }
1✔
385

386
    #[tokio::test]
387
    async fn test_server_run_sse_error() {
1✔
388
        // Find a port and keep it occupied
389
        let listener = std::net::TcpListener::bind("0.0.0.0:0").unwrap();
1✔
390
        let port = listener.local_addr().unwrap().port();
1✔
391

392
        let config = Config {
1✔
393
            transport: TransportType::Sse,
1✔
394
            http_host: "0.0.0.0".to_string(),
1✔
395
            http_port: port,
1✔
396
            ..Default::default()
1✔
397
        };
1✔
398
        let registry = ToolRegistry::new(&config);
1✔
399
        let resource_registry = ResourceRegistry::default();
1✔
400
        let prompt_registry = PromptRegistry::default();
1✔
401
        let client = Aria2Client::new(config.clone());
1✔
402
        let server = McpServer::new(
1✔
403
            config,
1✔
404
            registry,
1✔
405
            resource_registry,
1✔
406
            prompt_registry,
1✔
407
            vec![client],
1✔
408
        );
409
        let result = server.run().await;
1✔
410
        assert!(result.is_err());
1✔
411
        assert!(result.unwrap_err().to_string().contains("already in use"));
1✔
412
    }
1✔
413

414
    #[tokio::test]
415
    async fn test_check_port_available() {
1✔
416
        // Find a free port
417
        let listener = std::net::TcpListener::bind("0.0.0.0:0").unwrap();
1✔
418
        let addr = listener.local_addr().unwrap();
1✔
419
        let port = addr.port();
1✔
420
        let host = addr.ip().to_string();
1✔
421

422
        // Port is occupied by 'listener', so check_port_available should return false
423
        assert!(!super::check_port_available(&host, port).await);
1✔
424

425
        drop(listener);
1✔
426

427
        // Port is now free
428
        assert!(super::check_port_available(&host, port).await);
1✔
429
    }
1✔
430

431
    #[test]
432
    fn test_is_purgeable() {
1✔
433
        let item_complete = serde_json::json!({ "status": "complete" });
1✔
434
        let item_error = serde_json::json!({ "status": "error" });
1✔
435
        let item_active = serde_json::json!({ "status": "active" });
1✔
436
        let item_removed = serde_json::json!({ "status": "removed" });
1✔
437

438
        assert!(is_purgeable(&item_complete));
1✔
439
        assert!(is_purgeable(&item_error));
1✔
440
        assert!(!is_purgeable(&item_active));
1✔
441
        assert!(!is_purgeable(&item_removed));
1✔
442
    }
1✔
443

444
    #[tokio::test]
445
    async fn test_start_purge_task_mock() {
1✔
446
        use wiremock::matchers::{method, path};
447
        use wiremock::{Mock, MockServer, ResponseTemplate};
448

449
        let mock_server = MockServer::start().await;
1✔
450
        let rpc_url = format!("{}/jsonrpc", mock_server.uri());
1✔
451

452
        let config = Config {
1✔
453
            instances: vec![crate::config::Aria2Instance {
1✔
454
                name: "test".to_string(),
1✔
455
                rpc_url,
1✔
456
                rpc_secret: None,
1✔
457
            }],
1✔
458
            purge_config: crate::config::PurgeConfig {
1✔
459
                enabled: true,
1✔
460
                interval_secs: 1,
1✔
461
                ..Default::default()
1✔
462
            },
1✔
463
            ..Default::default()
1✔
464
        };
1✔
465

466
        let client = Arc::new(Aria2Client::new_with_instance(
1✔
467
            config.clone(),
1✔
468
            config.instances[0].clone(),
1✔
469
        ));
470

471
        // Mock tellStopped
472
        Mock::given(method("POST"))
1✔
473
            .and(path("/jsonrpc"))
1✔
474
            .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
1✔
475
                "jsonrpc": "2.0",
1✔
476
                "id": "1",
1✔
477
                "result": [
1✔
478
                    { "gid": "purge1", "status": "complete" },
1✔
479
                    { "gid": "keep1", "status": "active" }
1✔
480
                ]
1✔
481
            })))
1✔
482
            .mount(&mock_server)
1✔
483
            .await;
1✔
484

485
        // Mock removeDownloadResult
486
        Mock::given(method("POST"))
1✔
487
            .and(path("/jsonrpc"))
1✔
488
            .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
1✔
489
                "jsonrpc": "2.0",
1✔
490
                "id": "1",
1✔
491
                "result": "OK"
1✔
492
            })))
1✔
493
            .mount(&mock_server)
1✔
494
            .await;
1✔
495

496
        // Run for a short time
497
        let task_client = Arc::clone(&client);
1✔
498
        tokio::spawn(async move {
1✔
499
            let _ = start_purge_task(task_client).await;
1✔
500
        });
×
501

502
        tokio::time::sleep(Duration::from_millis(1500)).await;
1✔
503
        // If we reach here, it at least didn't panic and ran one iteration
504
    }
1✔
505

506
    #[tokio::test]
507
    async fn test_start_recovery_task_mock() {
1✔
508
        use wiremock::matchers::{method, path};
509
        use wiremock::{Mock, MockServer, ResponseTemplate};
510

511
        let mock_server = MockServer::start().await;
1✔
512
        let rpc_url = format!("{}/jsonrpc", mock_server.uri());
1✔
513

514
        let config = Config {
1✔
515
            instances: vec![crate::config::Aria2Instance {
1✔
516
                name: "test".to_string(),
1✔
517
                rpc_url,
1✔
518
                rpc_secret: None,
1✔
519
            }],
1✔
520
            retry_config: crate::aria2::recovery::RetryConfig {
1✔
521
                max_retries: 3,
1✔
522
                ..Default::default()
1✔
523
            },
1✔
524
            ..Default::default()
1✔
525
        };
1✔
526

527
        let client = Arc::new(Aria2Client::new_with_instance(
1✔
528
            config.clone(),
1✔
529
            config.instances[0].clone(),
1✔
530
        ));
531
        let recovery_manager = Arc::new(RecoveryManager::new(config.retry_config.clone()));
1✔
532

533
        // Mock tellStopped
534
        Mock::given(method("POST"))
1✔
535
            .and(path("/jsonrpc"))
1✔
536
            .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
1✔
537
                "jsonrpc": "2.0",
1✔
538
                "id": "1",
1✔
539
                "result": [
1✔
540
                    { "gid": "recover1", "status": "error", "errorCode": "1" }
1✔
541
                ]
1✔
542
            })))
1✔
543
            .mount(&mock_server)
1✔
544
            .await;
1✔
545

546
        // Mock tellStatus for recovery check
547
        Mock::given(method("POST"))
1✔
548
            .and(path("/jsonrpc"))
1✔
549
            .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
1✔
550
                "jsonrpc": "2.0",
1✔
551
                "id": "1",
1✔
552
                "result": { "gid": "recover1", "status": "error", "errorCode": "1" }
1✔
553
            })))
1✔
554
            .mount(&mock_server)
1✔
555
            .await;
1✔
556

557
        // Run for a short time
558
        let task_client = Arc::clone(&client);
1✔
559
        let task_manager = Arc::clone(&recovery_manager);
1✔
560
        tokio::spawn(async move {
1✔
561
            let _ = start_recovery_task(task_client, task_manager).await;
1✔
562
        });
×
563

564
        tokio::time::sleep(Duration::from_millis(100)).await;
1✔
565
    }
1✔
566

567
    #[tokio::test]
568
    async fn test_start_scheduler_mock() {
1✔
569
        use wiremock::matchers::{method, path};
570
        use wiremock::{Mock, MockServer, ResponseTemplate};
571

572
        let mock_server = MockServer::start().await;
1✔
573
        let rpc_url = format!("{}/jsonrpc", mock_server.uri());
1✔
574

575
        let config = Config {
1✔
576
            instances: vec![crate::config::Aria2Instance {
1✔
577
                name: "test".to_string(),
1✔
578
                rpc_url,
1✔
579
                rpc_secret: None,
1✔
580
            }],
1✔
581
            bandwidth_profiles: std::collections::HashMap::from([(
1✔
582
                "night".to_string(),
1✔
583
                crate::config::BandwidthProfile {
1✔
584
                    max_download: "1M".to_string(),
1✔
585
                    max_upload: "100K".to_string(),
1✔
586
                },
1✔
587
            )]),
1✔
588
            bandwidth_schedules: vec![crate::config::BandwidthSchedule {
1✔
589
                day: "daily".to_string(),
1✔
590
                start_time: "00:00".to_string(),
1✔
591
                end_time: "23:59".to_string(),
1✔
592
                profile_name: "night".to_string(),
1✔
593
            }],
1✔
594
            ..Default::default()
1✔
595
        };
1✔
596

597
        let client = Arc::new(Aria2Client::new_with_instance(
1✔
598
            config.clone(),
1✔
599
            config.instances[0].clone(),
1✔
600
        ));
601

602
        // Mock changeGlobalOption
603
        Mock::given(method("POST"))
1✔
604
            .and(path("/jsonrpc"))
1✔
605
            .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
1✔
606
                "jsonrpc": "2.0",
1✔
607
                "id": "1",
1✔
608
                "result": "OK"
1✔
609
            })))
1✔
610
            .mount(&mock_server)
1✔
611
            .await;
1✔
612

613
        let task_client = Arc::clone(&client);
1✔
614
        tokio::spawn(async move {
1✔
615
            let _ = start_scheduler(task_client).await;
1✔
616
        });
×
617

618
        tokio::time::sleep(Duration::from_millis(100)).await;
1✔
619
    }
1✔
620

621
    #[tokio::test]
622
    async fn test_start_recovery_task_empty_mock() {
1✔
623
        use wiremock::matchers::{method, path};
624
        use wiremock::{Mock, MockServer, ResponseTemplate};
625

626
        let mock_server = MockServer::start().await;
1✔
627
        let rpc_url = format!("{}/jsonrpc", mock_server.uri());
1✔
628

629
        let config = Config {
1✔
630
            instances: vec![crate::config::Aria2Instance {
1✔
631
                name: "test".to_string(),
1✔
632
                rpc_url,
1✔
633
                rpc_secret: None,
1✔
634
            }],
1✔
635
            ..Default::default()
1✔
636
        };
1✔
637

638
        let client = Arc::new(Aria2Client::new_with_instance(
1✔
639
            config.clone(),
1✔
640
            config.instances[0].clone(),
1✔
641
        ));
642
        let recovery_manager = Arc::new(RecoveryManager::new(config.retry_config.clone()));
1✔
643

644
        // Mock tellStopped with empty result
645
        Mock::given(method("POST"))
1✔
646
            .and(path("/jsonrpc"))
1✔
647
            .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
1✔
648
                "jsonrpc": "2.0",
1✔
649
                "id": "1",
1✔
650
                "result": []
1✔
651
            })))
1✔
652
            .mount(&mock_server)
1✔
653
            .await;
1✔
654

655
        let task_client = Arc::clone(&client);
1✔
656
        let task_manager = Arc::clone(&recovery_manager);
1✔
657
        tokio::spawn(async move {
1✔
658
            let _ = start_recovery_task(task_client, task_manager).await;
1✔
659
        });
×
660

661
        tokio::time::sleep(Duration::from_millis(100)).await;
1✔
662
    }
1✔
663

664
    #[tokio::test]
665
    async fn test_start_scheduler_missing_profile_mock() {
1✔
666
        use wiremock::MockServer;
667

668
        let mock_server = MockServer::start().await;
1✔
669

670
        let config = Config {
1✔
671
            instances: vec![crate::config::Aria2Instance {
1✔
672
                name: "test".to_string(),
1✔
673
                rpc_url: format!("{}/jsonrpc", mock_server.uri()),
1✔
674
                rpc_secret: None,
1✔
675
            }],
1✔
676
            bandwidth_schedules: vec![crate::config::BandwidthSchedule {
1✔
677
                day: "daily".to_string(),
1✔
678
                start_time: "00:00".to_string(),
1✔
679
                end_time: "23:59".to_string(),
1✔
680
                profile_name: "nonexistent".to_string(),
1✔
681
            }],
1✔
682
            ..Default::default()
1✔
683
        };
1✔
684

685
        let client = Arc::new(Aria2Client::new_with_instance(
1✔
686
            config.clone(),
1✔
687
            config.instances[0].clone(),
1✔
688
        ));
689

690
        let task_client = Arc::clone(&client);
1✔
691
        tokio::spawn(async move {
1✔
692
            let _ = start_scheduler(task_client).await;
1✔
693
        });
×
694

695
        tokio::time::sleep(Duration::from_millis(100)).await;
1✔
696
    }
1✔
697

698
    #[tokio::test]
699
    async fn test_start_purge_task_error_mock() {
1✔
700
        use wiremock::matchers::{method, path};
701
        use wiremock::{Mock, MockServer, ResponseTemplate};
702

703
        let mock_server = MockServer::start().await;
1✔
704
        let rpc_url = format!("{}/jsonrpc", mock_server.uri());
1✔
705

706
        let config = Config {
1✔
707
            instances: vec![crate::config::Aria2Instance {
1✔
708
                name: "test".to_string(),
1✔
709
                rpc_url,
1✔
710
                rpc_secret: None,
1✔
711
            }],
1✔
712
            purge_config: crate::config::PurgeConfig {
1✔
713
                enabled: true,
1✔
714
                interval_secs: 1,
1✔
715
                ..Default::default()
1✔
716
            },
1✔
717
            ..Default::default()
1✔
718
        };
1✔
719

720
        let client = Arc::new(Aria2Client::new_with_instance(
1✔
721
            config.clone(),
1✔
722
            config.instances[0].clone(),
1✔
723
        ));
724

725
        // Mock error for tellStopped
726
        Mock::given(method("POST"))
1✔
727
            .and(path("/jsonrpc"))
1✔
728
            .respond_with(ResponseTemplate::new(500))
1✔
729
            .mount(&mock_server)
1✔
730
            .await;
1✔
731

732
        let task_client = Arc::clone(&client);
1✔
733
        tokio::spawn(async move {
1✔
734
            let _ = start_purge_task(task_client).await;
1✔
735
        });
×
736

737
        tokio::time::sleep(Duration::from_millis(100)).await;
1✔
738
    }
1✔
739

740
    #[tokio::test]
741
    async fn test_start_purge_task_disabled_mock() {
1✔
742
        let config = Config {
1✔
743
            purge_config: crate::config::PurgeConfig {
1✔
744
                enabled: false,
1✔
745
                ..Default::default()
1✔
746
            },
1✔
747
            ..Default::default()
1✔
748
        };
1✔
749

750
        let client = Arc::new(Aria2Client::new(config));
1✔
751

752
        let task_client = Arc::clone(&client);
1✔
753
        tokio::spawn(async move {
1✔
754
            let _ = start_purge_task(task_client).await;
1✔
755
        });
×
756

757
        tokio::time::sleep(Duration::from_millis(100)).await;
1✔
758
    }
1✔
759

760
    #[tokio::test]
761
    async fn test_start_scheduler_error_mock() {
1✔
762
        use wiremock::matchers::{method, path};
763
        use wiremock::{Mock, MockServer, ResponseTemplate};
764

765
        let mock_server = MockServer::start().await;
1✔
766
        let rpc_url = format!("{}/jsonrpc", mock_server.uri());
1✔
767

768
        let config = Config {
1✔
769
            instances: vec![crate::config::Aria2Instance {
1✔
770
                name: "test".to_string(),
1✔
771
                rpc_url,
1✔
772
                rpc_secret: None,
1✔
773
            }],
1✔
774
            bandwidth_profiles: std::collections::HashMap::from([(
1✔
775
                "night".to_string(),
1✔
776
                crate::config::BandwidthProfile {
1✔
777
                    max_download: "1M".to_string(),
1✔
778
                    max_upload: "100K".to_string(),
1✔
779
                },
1✔
780
            )]),
1✔
781
            bandwidth_schedules: vec![crate::config::BandwidthSchedule {
1✔
782
                day: "daily".to_string(),
1✔
783
                start_time: "00:00".to_string(),
1✔
784
                end_time: "23:59".to_string(),
1✔
785
                profile_name: "night".to_string(),
1✔
786
            }],
1✔
787
            ..Default::default()
1✔
788
        };
1✔
789

790
        let client = Arc::new(Aria2Client::new_with_instance(
1✔
791
            config.clone(),
1✔
792
            config.instances[0].clone(),
1✔
793
        ));
794

795
        // Mock error for changeGlobalOption
796
        Mock::given(method("POST"))
1✔
797
            .and(path("/jsonrpc"))
1✔
798
            .respond_with(ResponseTemplate::new(500))
1✔
799
            .mount(&mock_server)
1✔
800
            .await;
1✔
801

802
        let task_client = Arc::clone(&client);
1✔
803
        tokio::spawn(async move {
1✔
804
            let _ = start_scheduler(task_client).await;
1✔
805
        });
×
806

807
        tokio::time::sleep(Duration::from_millis(100)).await;
1✔
808
    }
1✔
809
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc