• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

0xmichalis / nftbk / 18466999828

13 Oct 2025 01:15PM UTC coverage: 35.191% (-0.009%) from 35.2%
18466999828

push

github

web-flow
fix: associate pin requests to providers correctly (#55)

* fix: associate pin requests to providers correctly

* fix: address copilot comments

13 of 40 new or added lines in 6 files covered. (32.5%)

1 existing line in 1 file now uncovered.

1244 of 3535 relevant lines covered (35.19%)

5.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

12.59
/src/server/mod.rs
1
use futures_util::FutureExt;
2
use std::collections::{HashMap, HashSet};
3
use std::panic::AssertUnwindSafe;
4
use std::path::PathBuf;
5
use std::str::FromStr;
6
use std::sync::atomic::AtomicBool;
7
use std::sync::Arc;
8
use std::time::Instant;
9
use tokio::fs;
10
use tokio::sync::mpsc;
11
use tokio::sync::Mutex;
12
use tracing::{debug, error, info, warn};
13

14
use crate::backup::ChainConfig;
15
use crate::ipfs::{IpfsPinningProvider, IpfsProviderConfig};
16
use crate::server::api::{BackupRequest, Tokens};
17
use crate::server::archive::{
18
    get_zipped_backup_paths, zip_backup, ARCHIVE_INTERRUPTED_BY_SHUTDOWN,
19
};
20
use crate::server::db::Db;
21
use crate::server::hashing::compute_file_sha256;
22
use crate::{
23
    backup::backup_from_config, BackupConfig, ProcessManagementConfig, StorageConfig, TokenConfig,
24
};
25

26
pub mod api;
27
pub mod archive;
28
pub mod db;
29
pub mod handlers;
30
pub mod hashing;
31
pub mod pin_monitor;
32
pub mod privy;
33
pub mod pruner;
34
pub mod router;
35
pub mod workers;
36
pub use handlers::handle_backup::handle_backup;
37
pub use handlers::handle_backup_delete::handle_backup_delete;
38
pub use handlers::handle_backup_delete_archive::handle_backup_delete_archive;
39
pub use handlers::handle_backup_delete_pins::handle_backup_delete_pins;
40
pub use handlers::handle_backup_retry::handle_backup_retry;
41
pub use handlers::handle_backups::handle_backups;
42
pub use handlers::handle_download::handle_download;
43
pub use handlers::handle_download::handle_download_token;
44
pub use handlers::handle_status::handle_status;
45
pub use workers::spawn_backup_workers;
46

47
#[derive(Debug, Clone)]
48
pub enum BackupTaskOrShutdown {
49
    Task(TaskType),
50
    Shutdown,
51
}
52

53
#[derive(Debug, Clone)]
54
pub enum TaskType {
55
    Creation(BackupTask),
56
    Deletion(DeletionTask),
57
}
58

59
#[derive(Debug, Clone)]
60
pub struct BackupTask {
61
    pub task_id: String,
62
    pub request: BackupRequest,
63
    pub force: bool,
64
    pub storage_mode: StorageMode,
65
    pub archive_format: Option<String>,
66
    pub requestor: Option<String>,
67
}
68

69
#[derive(Debug, Clone)]
70
pub struct DeletionTask {
71
    pub task_id: String,
72
    pub requestor: Option<String>,
73
    /// Determines which parts of the backup to delete (e.g., only the archive, only the IPFS pins, or both).
74
    pub scope: StorageMode,
75
}
76

77
#[derive(Debug, Clone, PartialEq, Eq)]
78
pub enum StorageMode {
79
    Archive,
80
    Ipfs,
81
    Full,
82
}
83

84
impl StorageMode {
85
    pub fn as_str(&self) -> &'static str {
7✔
86
        match self {
7✔
87
            StorageMode::Archive => "archive",
6✔
88
            StorageMode::Ipfs => "ipfs",
1✔
89
            StorageMode::Full => "full",
×
90
        }
91
    }
92
}
93

94
impl FromStr for StorageMode {
95
    type Err = String;
96

97
    fn from_str(s: &str) -> Result<Self, Self::Err> {
5✔
98
        match s {
5✔
99
            "archive" => Ok(StorageMode::Archive),
8✔
100
            "ipfs" => Ok(StorageMode::Ipfs),
2✔
101
            "full" => Ok(StorageMode::Full),
3✔
102
            _ => Err(format!("Unknown storage mode: {}", s)),
1✔
103
        }
104
    }
105
}
106

107
#[derive(Clone)]
108
pub struct AppState {
109
    pub chain_config: Arc<ChainConfig>,
110
    pub base_dir: Arc<String>,
111
    pub unsafe_skip_checksum_check: bool,
112
    pub auth_token: Option<String>,
113
    pub pruner_enabled: bool,
114
    pub pruner_retention_days: u64,
115
    pub download_tokens: Arc<Mutex<HashMap<String, (String, u64)>>>,
116
    pub backup_task_sender: mpsc::Sender<BackupTaskOrShutdown>,
117
    pub db: Arc<Db>,
118
    pub shutdown_flag: Arc<AtomicBool>,
119
    pub ipfs_providers: Vec<IpfsProviderConfig>,
120
    pub ipfs_provider_instances: Arc<Vec<Arc<dyn IpfsPinningProvider>>>,
121
}
122

123
impl Default for AppState {
124
    fn default() -> Self {
×
125
        panic!("AppState::default() should not be used; use AppState::new() instead");
×
126
    }
127
}
128

129
impl AppState {
130
    #[allow(clippy::too_many_arguments)]
131
    pub async fn new(
×
132
        chain_config_path: &str,
133
        base_dir: &str,
134
        unsafe_skip_checksum_check: bool,
135
        auth_token: Option<String>,
136
        pruner_enabled: bool,
137
        pruner_retention_days: u64,
138
        backup_task_sender: mpsc::Sender<BackupTaskOrShutdown>,
139
        db_url: &str,
140
        max_connections: u32,
141
        shutdown_flag: Arc<AtomicBool>,
142
        ipfs_providers: Vec<IpfsProviderConfig>,
143
    ) -> Self {
144
        let config_content = tokio::fs::read_to_string(chain_config_path)
×
145
            .await
×
146
            .expect("Failed to read chain config");
147
        let chains: std::collections::HashMap<String, String> =
×
148
            toml::from_str(&config_content).expect("Failed to parse chain config");
×
149
        let mut chain_config = ChainConfig(chains);
×
150
        chain_config
×
151
            .resolve_env_vars()
152
            .expect("Failed to resolve environment variables in chain config");
153
        let db = Arc::new(Db::new(db_url, max_connections).await);
×
154

155
        // Create IPFS provider instances at startup
156
        let mut ipfs_provider_instances = Vec::new();
×
157
        for config in &ipfs_providers {
×
158
            match config.create_provider() {
×
159
                Ok(provider) => {
×
160
                    info!(
×
NEW
161
                        "Successfully created IPFS provider {} ({})",
×
NEW
162
                        provider.provider_type(),
×
NEW
163
                        provider.provider_url()
×
164
                    );
165
                    ipfs_provider_instances.push(Arc::from(provider));
×
166
                }
167
                Err(e) => {
×
168
                    error!(
×
169
                        "Failed to create IPFS provider from config {:?}: {}",
×
170
                        config, e
171
                    );
172
                }
173
            }
174
        }
175

176
        AppState {
177
            chain_config: Arc::new(chain_config),
×
178
            base_dir: Arc::new(base_dir.to_string()),
×
179
            unsafe_skip_checksum_check,
180
            auth_token,
181
            pruner_enabled,
182
            pruner_retention_days,
183
            download_tokens: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
×
184
            backup_task_sender,
185
            db,
186
            shutdown_flag,
187
            ipfs_providers,
188
            ipfs_provider_instances: Arc::new(ipfs_provider_instances),
×
189
        }
190
    }
191
}
192

193
pub async fn check_backup_on_disk(
×
194
    base_dir: &str,
195
    task_id: &str,
196
    unsafe_skip_checksum_check: bool,
197
    archive_format: &str,
198
) -> Option<PathBuf> {
199
    let (path, checksum_path) =
×
200
        crate::server::archive::get_zipped_backup_paths(base_dir, task_id, archive_format);
×
201

202
    // First check if both files exist
203
    match (
204
        fs::try_exists(&path).await,
×
205
        fs::try_exists(&checksum_path).await,
×
206
    ) {
207
        (Ok(true), Ok(true)) => {
208
            if unsafe_skip_checksum_check {
×
209
                // Only check for existence, skip reading and comparing checksums
210
                return Some(path);
×
211
            }
212
            // Read stored checksum
213
            info!("Checking backup on disk for task {}", task_id);
×
214
            let stored_checksum = match fs::read_to_string(&checksum_path).await {
×
215
                Ok(checksum) => checksum,
216
                Err(e) => {
×
217
                    warn!("Failed to read checksum file for {}: {}", path.display(), e);
×
218
                    return None;
×
219
                }
220
            };
221

222
            // Compute current checksum
223
            debug!("Computing backup checksum for task {}", task_id);
×
224
            let current_checksum = match compute_file_sha256(&path).await {
×
225
                Ok(checksum) => checksum,
226
                Err(e) => {
×
227
                    warn!("Failed to compute checksum for {}: {}", path.display(), e);
×
228
                    return None;
×
229
                }
230
            };
231

232
            if stored_checksum.trim() != current_checksum {
233
                warn!(
×
234
                    "Backup archive {} is corrupted: checksum mismatch",
×
235
                    path.display()
×
236
                );
237
                return None;
×
238
            }
239

240
            Some(path)
241
        }
242
        _ => None,
×
243
    }
244
}
245

246
fn sync_files(files_written: &[std::path::PathBuf]) {
×
247
    let mut synced_dirs = HashSet::new();
×
248
    for file in files_written {
×
249
        if file.is_file() {
×
250
            if let Ok(f) = std::fs::File::open(file) {
×
251
                let _ = f.sync_all();
×
252
            }
253
        }
254
        if let Some(parent) = file.parent() {
×
255
            if synced_dirs.insert(parent.to_path_buf()) {
×
256
                if let Ok(dir) = std::fs::File::open(parent) {
×
257
                    let _ = dir.sync_all();
×
258
                }
259
            }
260
        }
261
    }
262
}
263

264
// Trait for database operations needed by recovery
265
#[async_trait::async_trait]
266
pub trait RecoveryDb {
267
    async fn get_incomplete_backup_tasks(&self) -> Result<Vec<RecoveryTask>, sqlx::Error>;
268
    async fn set_backup_error(&self, task_id: &str, error: &str) -> Result<(), sqlx::Error>;
269
}
270

271
// Recovery task struct that matches the database schema
272
#[derive(Debug, Clone)]
273
pub struct RecoveryTask {
274
    pub task_id: String,
275
    pub requestor: String,
276
    pub tokens: serde_json::Value,
277
    pub storage_mode: String,
278
    pub archive_format: Option<String>,
279
}
280

281
// Implement the trait for the real Db
282
#[async_trait::async_trait]
283
impl RecoveryDb for Db {
284
    async fn get_incomplete_backup_tasks(&self) -> Result<Vec<RecoveryTask>, sqlx::Error> {
×
285
        let tasks = Db::get_incomplete_backup_tasks(self).await?;
×
286
        Ok(tasks
×
287
            .into_iter()
×
288
            .map(|task| RecoveryTask {
×
289
                task_id: task.task_id,
×
290
                requestor: task.requestor,
×
291
                tokens: task.tokens,
×
292
                storage_mode: task.storage_mode,
×
293
                archive_format: task.archive_format,
×
294
            })
295
            .collect())
×
296
    }
297

298
    async fn set_backup_error(&self, task_id: &str, error: &str) -> Result<(), sqlx::Error> {
×
299
        Db::set_backup_error(self, task_id, error).await
×
300
    }
301
}
302

303
// Implement the trait for Arc<Db> to support the AppState usage
304
#[async_trait::async_trait]
305
impl RecoveryDb for std::sync::Arc<Db> {
306
    async fn get_incomplete_backup_tasks(&self) -> Result<Vec<RecoveryTask>, sqlx::Error> {
×
307
        let tasks = self.as_ref().get_incomplete_backup_tasks().await?;
×
308
        Ok(tasks
×
309
            .into_iter()
×
310
            .map(|task| RecoveryTask {
×
311
                task_id: task.task_id,
×
312
                requestor: task.requestor,
×
313
                tokens: task.tokens,
×
314
                storage_mode: task.storage_mode,
×
315
                archive_format: task.archive_format,
×
316
            })
317
            .collect())
×
318
    }
319

320
    async fn set_backup_error(&self, task_id: &str, error: &str) -> Result<(), sqlx::Error> {
×
321
        self.as_ref().set_backup_error(task_id, error).await
×
322
    }
323
}
324

325
/// Recover incomplete backup tasks from the database and enqueue them for processing
326
/// This is called on server startup to handle tasks that were interrupted by server shutdown
327
pub async fn recover_incomplete_tasks<DB: RecoveryDb + ?Sized>(
5✔
328
    db: &DB,
329
    backup_task_sender: &mpsc::Sender<BackupTaskOrShutdown>,
330
) -> Result<usize, Box<dyn std::error::Error + Send + Sync>> {
331
    debug!("Recovering incomplete backup tasks from database...");
5✔
332

333
    let incomplete_tasks = db.get_incomplete_backup_tasks().await?;
15✔
334
    let task_count = incomplete_tasks.len();
×
335

336
    if task_count == 0 {
×
337
        debug!("No incomplete backup tasks found");
1✔
338
        return Ok(0);
1✔
339
    }
340

341
    debug!(
×
342
        "Found {} incomplete backup tasks, re-queueing for processing",
×
343
        task_count
×
344
    );
345

346
    for task_meta in incomplete_tasks {
11✔
347
        // Parse the tokens JSON back to Vec<Tokens>
348
        let tokens: Vec<Tokens> = match serde_json::from_value(task_meta.tokens.clone()) {
11✔
349
            Ok(tokens) => tokens,
×
350
            Err(e) => {
1✔
351
                warn!(
1✔
352
                    "Failed to parse tokens for backup task {}: {}, skipping",
×
353
                    task_meta.task_id, e
×
354
                );
355
                // Mark this task as error since we can't process it
356
                let _ = db
1✔
357
                    .set_backup_error(
358
                        &task_meta.task_id,
1✔
359
                        &format!("Failed to parse tokens during recovery: {e}"),
1✔
360
                    )
361
                    .await;
1✔
362
                continue;
1✔
363
            }
364
        };
365

366
        let storage_mode = task_meta.storage_mode.parse().unwrap_or(StorageMode::Full);
×
367
        let pin_on_ipfs = storage_mode != StorageMode::Archive;
×
368
        let create_archive = storage_mode != StorageMode::Ipfs;
×
369

370
        let backup_task = BackupTask {
371
            task_id: task_meta.task_id.clone(),
×
372
            request: BackupRequest {
×
373
                tokens,
374
                pin_on_ipfs,
375
                create_archive,
376
            },
377
            force: true, // Force recovery to ensure backup actually runs
378
            storage_mode,
379
            archive_format: task_meta.archive_format,
×
380
            requestor: Some(task_meta.requestor),
×
381
        };
382

383
        // Try to enqueue the task
384
        if let Err(e) = backup_task_sender
×
385
            .send(BackupTaskOrShutdown::Task(TaskType::Creation(backup_task)))
×
386
            .await
×
387
        {
388
            warn!(
×
389
                "Failed to enqueue recovered task {}: {}",
×
390
                task_meta.task_id, e
×
391
            );
392
            // Mark as error if we can't enqueue it
393
            let _ = db
×
394
                .set_backup_error(
395
                    &task_meta.task_id,
×
396
                    &format!("Failed to enqueue during recovery: {e}"),
×
397
                )
398
                .await;
×
399
        } else {
400
            debug!("Re-queued backup task: {}", task_meta.task_id);
3✔
401
        }
402
    }
403

404
    Ok(task_count)
3✔
405
}
406

407
// Trait for database operations needed by backup and deletion tasks
408
#[async_trait::async_trait]
409
pub trait BackupTaskDb {
410
    async fn clear_backup_errors(&self, task_id: &str) -> Result<(), sqlx::Error>;
411
    async fn set_backup_error(&self, task_id: &str, error: &str) -> Result<(), sqlx::Error>;
412
    async fn insert_pin_requests_with_tokens(
413
        &self,
414
        task_id: &str,
415
        requestor: &str,
416
        token_pin_mappings: &[crate::TokenPinMapping],
417
    ) -> Result<(), sqlx::Error>;
418
    async fn update_backup_task_error_log(
419
        &self,
420
        task_id: &str,
421
        error_log: &str,
422
    ) -> Result<(), sqlx::Error>;
423
    async fn update_backup_task_status(
424
        &self,
425
        task_id: &str,
426
        status: &str,
427
    ) -> Result<(), sqlx::Error>;
428
    async fn get_backup_task(
429
        &self,
430
        task_id: &str,
431
    ) -> Result<Option<crate::server::db::BackupTask>, sqlx::Error>;
432
    async fn start_deletion(&self, task_id: &str) -> Result<(), sqlx::Error>;
433
    async fn delete_backup_task(&self, task_id: &str) -> Result<(), sqlx::Error>;
434
    async fn downgrade_full_to_ipfs(&self, task_id: &str) -> Result<(), sqlx::Error>;
435
    async fn downgrade_full_to_archive(&self, task_id: &str) -> Result<(), sqlx::Error>;
436
}
437
// Backwards-compat alias
438
// Backwards-compat trait alias: allow older bounds to compile
439
pub trait BackuptaskDb: BackupTaskDb {}
440
impl<T: BackupTaskDb + ?Sized> BackuptaskDb for T {}
441

442
// Implement BackuptaskDb trait for the real Db
443
#[async_trait::async_trait]
444
impl BackupTaskDb for Db {
445
    async fn clear_backup_errors(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
446
        Db::clear_backup_errors(self, task_id).await
×
447
    }
448

449
    async fn set_backup_error(&self, task_id: &str, error: &str) -> Result<(), sqlx::Error> {
×
450
        Db::set_backup_error(self, task_id, error).await
×
451
    }
452

453
    async fn insert_pin_requests_with_tokens(
454
        &self,
455
        task_id: &str,
456
        requestor: &str,
457
        token_pin_mappings: &[crate::TokenPinMapping],
458
    ) -> Result<(), sqlx::Error> {
459
        Db::insert_pin_requests_with_tokens(self, task_id, requestor, token_pin_mappings).await
×
460
    }
461

462
    async fn update_backup_task_error_log(
463
        &self,
464
        task_id: &str,
465
        error_log: &str,
466
    ) -> Result<(), sqlx::Error> {
467
        Db::update_backup_task_error_log(self, task_id, error_log).await
×
468
    }
469

470
    async fn update_backup_task_status(
471
        &self,
472
        task_id: &str,
473
        status: &str,
474
    ) -> Result<(), sqlx::Error> {
475
        Db::update_backup_task_status(self, task_id, status).await
×
476
    }
477

478
    async fn get_backup_task(
479
        &self,
480
        task_id: &str,
481
    ) -> Result<Option<crate::server::db::BackupTask>, sqlx::Error> {
482
        Db::get_backup_task(self, task_id).await
×
483
    }
484

485
    async fn start_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
486
        Db::start_deletion(self, task_id).await
×
487
    }
488

489
    async fn delete_backup_task(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
490
        Db::delete_backup_task(self, task_id).await
×
491
    }
492

493
    async fn downgrade_full_to_ipfs(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
494
        Db::downgrade_full_to_ipfs(self, task_id).await
×
495
    }
496

497
    async fn downgrade_full_to_archive(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
498
        Db::downgrade_full_to_archive(self, task_id).await
×
499
    }
500
}
501

502
async fn run_backup_task_inner<DB: BackupTaskDb + ?Sized>(
×
503
    state: AppState,
504
    task: BackupTask,
505
    db: &DB,
506
) {
507
    let task_id = task.task_id.clone();
×
508
    let tokens = task.request.tokens.clone();
×
509
    let force = task.force;
×
510
    let storage_mode = task.storage_mode.clone();
×
511
    info!(
×
512
        "Running backup task for task {} (storage_mode: {})",
×
513
        task_id,
×
514
        storage_mode.as_str()
×
515
    );
516

517
    // If force is set, clean up the error log if it exists
518
    if force {
×
519
        let _ = db.clear_backup_errors(&task_id).await;
×
520
    }
521

522
    // Prepare backup config
523
    let shutdown_flag = Some(state.shutdown_flag.clone());
×
524
    let mut token_map = std::collections::HashMap::new();
×
525
    for entry in tokens.clone() {
×
526
        token_map.insert(entry.chain, entry.tokens);
×
527
    }
528
    let token_config = TokenConfig { chains: token_map };
×
529

530
    // Determine output path and IPFS settings based on storage mode
531
    let (output_path, ipfs_providers) = match storage_mode {
×
532
        StorageMode::Archive => {
×
533
            // Filesystem only: permanent directory, no IPFS
534
            let out_dir = format!("{}/nftbk-{}", state.base_dir, task_id);
×
535
            (Some(PathBuf::from(out_dir)), Vec::new())
×
536
        }
537
        StorageMode::Ipfs => {
×
538
            // IPFS only: no downloads, just pin existing CIDs
539
            (None, state.ipfs_providers.clone())
×
540
        }
541
        StorageMode::Full => {
×
542
            // Both: permanent directory and IPFS pinning
543
            let out_dir = format!("{}/nftbk-{}", state.base_dir, task_id);
×
544
            (Some(PathBuf::from(out_dir)), state.ipfs_providers.clone())
×
545
        }
546
    };
547

548
    // Run backup
549
    let backup_cfg = BackupConfig {
550
        chain_config: (*state.chain_config).clone(),
×
551
        token_config,
552
        storage_config: StorageConfig {
×
553
            output_path: output_path.clone(),
554
            prune_redundant: false,
555
            ipfs_providers,
556
        },
557
        process_config: ProcessManagementConfig {
×
558
            exit_on_error: false,
559
            shutdown_flag: shutdown_flag.clone(),
560
        },
561
    };
562
    let span = tracing::info_span!("backup_task", task_id = %task_id);
×
563
    let backup_result = backup_from_config(backup_cfg, Some(span)).await;
×
564

565
    // Check backup result
566
    let (files_written, token_pin_mappings, error_log) = match backup_result {
×
567
        Ok((files, token_pin_mappings, errors)) => (files, token_pin_mappings, errors),
×
568
        Err(e) => {
×
569
            let error_msg = e.to_string();
×
570
            if error_msg.contains("interrupted by shutdown signal") {
×
571
                info!(
×
572
                    "Backup {} was gracefully interrupted by shutdown signal",
×
573
                    task_id
×
574
                );
575
                return;
×
576
            }
577
            error!("Backup {task_id} failed: {}", e);
×
578
            let _ = db
×
579
                .set_backup_error(&task_id, &format!("Backup failed: {e}"))
×
580
                .await;
×
581
            return;
×
582
        }
583
    };
584

585
    // Persist token-pin request mappings atomically, if any
586
    if !token_pin_mappings.is_empty() {
×
587
        let req = task.requestor.as_deref().unwrap_or("");
×
588
        let _ = db
×
589
            .insert_pin_requests_with_tokens(&task.task_id, req, &token_pin_mappings)
×
590
            .await;
×
591
    }
592

593
    // Store non-fatal error log in DB if present
594
    if !error_log.is_empty() {
×
595
        let log_str = error_log.join("\n");
×
596
        let _ = db.update_backup_task_error_log(&task_id, &log_str).await;
×
597
    }
598

599
    // Handle archiving based on storage mode
600
    match storage_mode {
×
601
        StorageMode::Archive | StorageMode::Full => {
×
602
            // We have a filesystem output path
603
            let out_path = output_path.as_ref().unwrap();
×
604

605
            // Sync all files and directories to disk before archiving
606
            info!("Syncing {} to disk before archiving", out_path.display());
×
607
            let files_written_clone = files_written.clone();
×
608
            tokio::task::spawn_blocking(move || {
×
609
                sync_files(&files_written_clone);
×
610
            })
611
            .await
×
612
            .unwrap();
613
            info!(
×
614
                "Synced {} to disk before archiving ({} files)",
×
615
                out_path.display(),
×
616
                files_written.len()
×
617
            );
618

619
            // Archive the output dir
620
            let archive_fmt = task.archive_format.as_deref().unwrap_or("zip");
×
621
            let (zip_pathbuf, checksum_path) =
×
622
                get_zipped_backup_paths(&state.base_dir, &task_id, archive_fmt);
×
623
            info!("Archiving backup to {}", zip_pathbuf.display());
×
624
            let start_time = Instant::now();
×
625
            let out_path_clone = out_path.clone();
×
626
            let zip_pathbuf_clone = zip_pathbuf.clone();
×
627
            let archive_format_clone = archive_fmt.to_string();
×
628
            let shutdown_flag_clone = shutdown_flag.clone();
×
629
            let zip_result = tokio::task::spawn_blocking(move || {
×
630
                zip_backup(
×
631
                    &out_path_clone,
×
632
                    &zip_pathbuf_clone,
×
633
                    archive_format_clone,
×
634
                    shutdown_flag_clone,
×
635
                )
636
            })
637
            .await
×
638
            .unwrap();
639

640
            // Check archive result
641
            match zip_result {
×
642
                Ok(checksum) => {
×
643
                    info!(
×
644
                        "Archived backup at {} in {:?}s",
×
645
                        zip_pathbuf.display(),
×
646
                        start_time.elapsed().as_secs()
×
647
                    );
648
                    if let Err(e) = tokio::fs::write(&checksum_path, &checksum).await {
×
649
                        let error_msg = format!("Failed to write archive checksum file: {e}");
×
650
                        error!("{error_msg}");
×
651
                        let _ = db.set_backup_error(&task_id, &error_msg).await;
×
652
                        return;
×
653
                    }
654
                    let _ = db.update_backup_task_status(&task_id, "done").await;
×
655
                }
656
                Err(e) => {
×
657
                    let err_str = e.to_string();
×
658
                    if err_str.contains(ARCHIVE_INTERRUPTED_BY_SHUTDOWN) {
×
659
                        info!(
×
660
                            "Archiving for backup {} was gracefully interrupted by shutdown signal",
×
661
                            task_id
×
662
                        );
663
                        let _ = std::fs::remove_file(&zip_pathbuf);
×
664
                        return;
×
665
                    }
666
                    let error_msg = format!("Failed to archive backup: {e}");
×
667
                    error!("{error_msg}");
×
668
                    let _ = db.set_backup_error(&task_id, &error_msg).await;
×
669
                    return;
×
670
                }
671
            }
672

673
            info!("Backup {} ready", task_id);
×
674
        }
675
        StorageMode::Ipfs => {
×
676
            // IPFS-only mode: no filesystem operations needed
677
            let _ = db.update_backup_task_status(&task_id, "done").await;
×
678
            info!("IPFS pinning for {} complete", task_id);
×
679
        }
680
    }
681
}
682

683
async fn delete_dir_and_archive_for_task(
2✔
684
    base_dir: &str,
685
    task_id: &str,
686
    archive_format: Option<&str>,
687
) -> Result<bool, String> {
688
    let mut deleted_anything = false;
4✔
689

690
    if let Some(archive_format) = archive_format {
4✔
691
        let (archive_path, archive_checksum_path) =
692
            crate::server::archive::get_zipped_backup_paths(base_dir, task_id, archive_format);
693
        for path in [&archive_path, &archive_checksum_path] {
4✔
694
            match tokio::fs::remove_file(path).await {
12✔
695
                Ok(_) => {
2✔
696
                    deleted_anything = true;
2✔
697
                }
698
                Err(e) => {
2✔
699
                    if e.kind() != std::io::ErrorKind::NotFound {
2✔
700
                        return Err(format!("Failed to delete file {}: {}", path.display(), e));
×
701
                    }
702
                }
703
            }
704
        }
705
    }
706

707
    let backup_dir = format!("{}/nftbk-{}", base_dir, task_id);
2✔
708
    match tokio::fs::remove_dir_all(&backup_dir).await {
709
        Ok(_) => {
1✔
710
            deleted_anything = true;
1✔
711
        }
712
        Err(e) => {
1✔
713
            if e.kind() != std::io::ErrorKind::NotFound {
1✔
714
                return Err(format!("Failed to delete backup dir {backup_dir}: {e}"));
×
715
            }
716
        }
717
    }
718

719
    Ok(deleted_anything)
2✔
720
}
721

722
async fn delete_ipfs_pins(
2✔
723
    provider_instances: &[std::sync::Arc<dyn crate::ipfs::IpfsPinningProvider>],
724
    task_id: &str,
725
    pin_requests: &[crate::server::db::PinRequestRow],
726
) -> Result<bool, String> {
727
    let mut deleted_anything = false;
4✔
728
    for pin_request in pin_requests {
7✔
729
        let provider = provider_instances
6✔
730
            .iter()
731
            .find(|provider| pin_request.provider_url.as_deref() == Some(provider.provider_url()));
7✔
732

733
        let provider = provider.ok_or_else(|| {
9✔
734
            format!(
1✔
735
                "No provider instance found for provider URL {} when unpinning {}",
1✔
736
                pin_request.provider_url.as_deref().unwrap_or(""),
4✔
737
                pin_request.cid
738
            )
739
        })?;
740

741
        provider
742
            .delete_pin(&pin_request.request_id)
743
            .await
744
            .map_err(|e| {
2✔
745
                format!(
×
746
                    "Failed to unpin {} from provider {} for task {}: {}",
×
747
                    pin_request.cid,
NEW
748
                    pin_request.provider_url.as_deref().unwrap_or(""),
×
749
                    task_id,
750
                    e
751
                )
752
            })?;
753

754
        tracing::info!(
2✔
755
            "Successfully unpinned {} from provider {} for task {}",
×
756
            pin_request.cid,
NEW
757
            pin_request.provider_url.as_deref().unwrap_or(""),
×
758
            task_id
759
        );
760
        deleted_anything = true;
761
    }
762

763
    Ok(deleted_anything)
1✔
764
}
765

766
async fn run_deletion_task_inner<DB: BackupTaskDb + ?Sized>(
×
767
    state: AppState,
768
    task: DeletionTask,
769
    db: &DB,
770
) {
771
    let task_id = task.task_id.clone();
×
772
    info!("Running deletion task for task {}", task_id);
×
773

774
    // Get the backup task metadata
775
    let meta = match db.get_backup_task(&task_id).await {
×
776
        Ok(Some(m)) => m,
×
777
        Ok(None) => {
×
778
            error!("Backup task {} not found for deletion", task_id);
×
779
            let _ = db
×
780
                .set_backup_error(&task_id, "Task not found for deletion")
×
781
                .await;
×
782
            return;
×
783
        }
784
        Err(e) => {
×
785
            error!("Failed to read metadata for task {}: {}", task_id, e);
×
786
            let _ = db
×
787
                .set_backup_error(&task_id, &format!("Failed to read metadata: {}", e))
×
788
                .await;
×
789
            return;
×
790
        }
791
    };
792

793
    // Verify requestor matches (if provided)
794
    if let Some(requestor) = &task.requestor {
×
795
        if meta.requestor != *requestor {
×
796
            error!(
×
797
                "Requestor {} does not match task owner {} for task {}",
×
798
                requestor, meta.requestor, task_id
×
799
            );
800
            let _ = db
×
801
                .set_backup_error(&task_id, "Requestor does not match task owner")
×
802
                .await;
×
803
            return;
×
804
        }
805
    }
806

807
    // Check if task is in progress
808
    if meta.status == "in_progress" {
×
809
        error!("Cannot delete task {} that is in progress", task_id);
×
810
        let _ = db
×
811
            .set_backup_error(&task_id, "Can only delete completed tasks")
×
812
            .await;
×
813
        return;
×
814
    }
815

816
    // Set status to in_progress for deletion
817
    if let Err(e) = db.start_deletion(&task_id).await {
×
818
        error!("Failed to start deletion for task {}: {}", task_id, e);
×
819
        let _ = db
×
820
            .set_backup_error(&task_id, &format!("Failed to start deletion: {}", e))
×
821
            .await;
×
822
        return;
×
823
    }
824

825
    // Handle archive cleanup if requested
826
    if task.scope == StorageMode::Full || task.scope == StorageMode::Archive {
×
827
        if !(meta.storage_mode == "archive" || meta.storage_mode == "full") {
×
828
            info!(
×
829
                "Skipping archive cleanup for task {} (no archive data)",
×
830
                task_id
×
831
            );
832
        } else {
833
            match delete_dir_and_archive_for_task(
×
834
                &state.base_dir,
×
835
                &task_id,
×
836
                meta.archive_format.as_deref(),
×
837
            )
838
            .await
×
839
            {
840
                Ok(_) => {
×
841
                    info!("Archive cleanup completed for task {}", task_id);
×
842
                }
843
                Err(e) => {
×
844
                    error!("Archive deletion failed for task {}: {}", task_id, e);
×
845
                    // Log the error but continue with deletion - don't fail the task
846
                }
847
            }
848
        }
849
    }
850

851
    // Handle IPFS pin deletion if requested
852
    if task.scope == StorageMode::Full || task.scope == StorageMode::Ipfs {
×
853
        if !(meta.storage_mode == "ipfs" || meta.storage_mode == "full") {
×
854
            info!(
×
855
                "Skipping IPFS pin cleanup for task {} (no IPFS pins)",
×
856
                task_id
×
857
            );
858
        } else {
859
            let pin_requests = match state.db.get_pin_requests_by_task_id(&task_id).await {
×
860
                Ok(v) => v,
×
861
                Err(e) => {
×
862
                    error!("Failed to get pin requests for task {}: {}", task_id, e);
×
863
                    Vec::new()
×
864
                }
865
            };
866
            match delete_ipfs_pins(&state.ipfs_provider_instances, &task_id, &pin_requests).await {
×
867
                Ok(_) => {
×
868
                    info!("IPFS pin cleanup completed for task {}", task_id);
×
869
                }
870
                Err(e) => {
×
871
                    error!("IPFS pin deletion failed for task {}: {}", task_id, e);
×
872
                    // Log the error but continue with deletion - don't fail the task
873
                }
874
            }
875
        }
876
    }
877

878
    // Delete or update the backup task metadata depending on scope and storage mode
879
    match task.scope {
×
880
        StorageMode::Full => {
×
881
            if let Err(e) = db.delete_backup_task(&task_id).await {
×
882
                error!("Database deletion failed for task {}: {}", task_id, e);
×
883
                let _ = db
×
884
                    .set_backup_error(
885
                        &task_id,
×
886
                        &format!("Failed to delete metadata from database: {}", e),
×
887
                    )
888
                    .await;
×
889
                return;
×
890
            }
891
        }
892
        StorageMode::Archive => {
×
893
            if meta.storage_mode == "full" {
×
894
                if let Err(e) = db.downgrade_full_to_ipfs(&task_id).await {
×
895
                    error!(
×
896
                        "Failed to downgrade storage mode for task {}: {}",
×
897
                        task_id, e
×
898
                    );
899
                }
900
            } else if meta.storage_mode == "archive" {
×
901
                if let Err(e) = db.delete_backup_task(&task_id).await {
×
902
                    error!("Failed to delete backup task for task {}: {}", task_id, e);
×
903
                }
904
            }
905
        }
906
        StorageMode::Ipfs => {
×
907
            if meta.storage_mode == "full" {
×
908
                if let Err(e) = db.downgrade_full_to_archive(&task_id).await {
×
909
                    error!(
×
910
                        "Failed to downgrade storage mode for task {}: {}",
×
911
                        task_id, e
×
912
                    );
913
                }
914
            } else if meta.storage_mode == "ipfs" {
×
915
                if let Err(e) = db.delete_backup_task(&task_id).await {
×
916
                    error!("Failed to delete backup task for task {}: {}", task_id, e);
×
917
                }
918
            }
919
        }
920
    }
921

922
    info!("Successfully deleted backup {}", task_id);
×
923
}
924

925
pub async fn run_backup_task(state: AppState, task: BackupTask) {
×
926
    let task_id = task.task_id.clone();
×
927
    let state_clone = state.clone();
×
928
    let task_id_clone = task_id.clone();
×
929

930
    let fut =
×
931
        AssertUnwindSafe(run_backup_task_inner(state.clone(), task, &*state.db)).catch_unwind();
×
932

933
    let result = fut.await;
×
934
    if let Err(panic) = result {
×
935
        let panic_msg = if let Some(s) = panic.downcast_ref::<&str>() {
×
936
            s.to_string()
×
937
        } else if let Some(s) = panic.downcast_ref::<String>() {
×
938
            s.clone()
×
939
        } else {
940
            "Unknown panic".to_string()
×
941
        };
942
        let error_msg = format!("Backup task for task {task_id_clone} panicked: {panic_msg}");
×
943
        error!("{error_msg}");
×
944
        let _ = BackupTaskDb::set_backup_error(&*state_clone.db, &task_id_clone, &error_msg).await;
×
945
    }
946
}
947

948
pub async fn run_deletion_task(state: AppState, task: DeletionTask) {
×
949
    let task_id = task.task_id.clone();
×
950
    let state_clone = state.clone();
×
951
    let task_id_clone = task_id.clone();
×
952

953
    let fut =
×
954
        AssertUnwindSafe(run_deletion_task_inner(state.clone(), task, &*state.db)).catch_unwind();
×
955

956
    let result = fut.await;
×
957
    if let Err(panic) = result {
×
958
        let panic_msg = if let Some(s) = panic.downcast_ref::<&str>() {
×
959
            s.to_string()
×
960
        } else if let Some(s) = panic.downcast_ref::<String>() {
×
961
            s.clone()
×
962
        } else {
963
            "Unknown panic".to_string()
×
964
        };
965
        let error_msg = format!("Deletion task for task {task_id_clone} panicked: {panic_msg}");
×
966
        error!("{error_msg}");
×
967
        let _ = BackupTaskDb::set_backup_error(&*state_clone.db, &task_id_clone, &error_msg).await;
×
968
    }
969
}
970

971
#[cfg(test)]
972
mod deletion_utils_tests {
973
    use super::*;
974

975
    #[tokio::test]
976
    async fn delete_dir_and_archive_for_task_removes_files_and_dir() {
977
        // Arrange: create a unique temp base dir
978
        let base = std::env::temp_dir()
979
            .join(format!("nftbk-test-{}", uuid::Uuid::new_v4()))
980
            .to_string_lossy()
981
            .to_string();
982
        let base_dir = base.clone();
983
        tokio::fs::create_dir_all(&base_dir).await.unwrap();
984

985
        let task_id = "tdel";
986
        let archive_fmt = "zip";
987

988
        // Create archive files and backup directory matching get_zipped_backup_paths
989
        let (archive_path, checksum_path) =
990
            crate::server::archive::get_zipped_backup_paths(&base_dir, task_id, archive_fmt);
991
        if let Some(parent) = archive_path.parent() {
992
            tokio::fs::create_dir_all(parent).await.unwrap();
993
        }
994
        tokio::fs::write(&archive_path, b"dummy").await.unwrap();
995
        tokio::fs::write(&checksum_path, b"deadbeef").await.unwrap();
996

997
        let backup_dir = format!("{}/nftbk-{}", base_dir, task_id);
998
        tokio::fs::create_dir_all(&backup_dir).await.unwrap();
999
        tokio::fs::write(format!("{backup_dir}/file.txt"), b"x")
1000
            .await
1001
            .unwrap();
1002

1003
        // Act
1004
        let deleted = delete_dir_and_archive_for_task(&base_dir, task_id, Some(archive_fmt))
1005
            .await
1006
            .unwrap();
1007

1008
        // Assert
1009
        assert!(deleted);
1010
        assert!(!tokio::fs::try_exists(&archive_path).await.unwrap());
1011
        assert!(!tokio::fs::try_exists(&checksum_path).await.unwrap());
1012
        assert!(!tokio::fs::try_exists(&backup_dir).await.unwrap());
1013

1014
        // Cleanup
1015
        let _ = tokio::fs::remove_dir_all(&base_dir).await;
1016
    }
1017

1018
    #[tokio::test]
1019
    async fn delete_dir_and_archive_for_task_noop_when_missing() {
1020
        let base_dir = std::env::temp_dir()
1021
            .join(format!("nftbk-test-missing-{}", uuid::Uuid::new_v4()))
1022
            .to_string_lossy()
1023
            .to_string();
1024
        // Do not create anything under base_dir
1025
        let deleted = delete_dir_and_archive_for_task(&base_dir, "nope", Some("zip"))
1026
            .await
1027
            .unwrap();
1028
        assert!(!deleted);
1029
    }
1030

1031
    struct MockProvider {
1032
        name: &'static str,
1033
        deleted: std::sync::Arc<std::sync::Mutex<Vec<String>>>,
1034
    }
1035

1036
    #[async_trait::async_trait]
1037
    impl crate::ipfs::IpfsPinningProvider for MockProvider {
1038
        fn provider_type(&self) -> &str {
1039
            self.name
1040
        }
1041
        fn provider_url(&self) -> &str {
1042
            self.name
1043
        }
1044
        async fn create_pin(
1045
            &self,
1046
            _request: &crate::ipfs::PinRequest,
1047
        ) -> anyhow::Result<crate::ipfs::PinResponse> {
1048
            unimplemented!()
1049
        }
1050
        async fn get_pin(&self, _pin_id: &str) -> anyhow::Result<crate::ipfs::PinResponse> {
1051
            unimplemented!()
1052
        }
1053
        async fn list_pins(&self) -> anyhow::Result<Vec<crate::ipfs::PinResponse>> {
1054
            unimplemented!()
1055
        }
1056
        async fn delete_pin(&self, request_id: &str) -> anyhow::Result<()> {
1057
            self.deleted.lock().unwrap().push(request_id.to_string());
1058
            Ok(())
1059
        }
1060
    }
1061

1062
    #[tokio::test]
1063
    async fn delete_ipfs_pins_unpins_all_matching_requests() {
1064
        let deleted = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
1065
        let provider = std::sync::Arc::new(MockProvider {
1066
            name: "mock",
1067
            deleted: deleted.clone(),
1068
        });
1069
        let providers: Vec<std::sync::Arc<dyn crate::ipfs::IpfsPinningProvider>> = vec![provider];
1070

1071
        let rows = vec![
1072
            crate::server::db::PinRequestRow {
1073
                id: 1,
1074
                task_id: "t".into(),
1075
                provider_type: "mock".into(),
1076
                provider_url: Some("mock".into()),
1077
                cid: "cid1".into(),
1078
                request_id: "rid1".into(),
1079
                status: "pinned".into(),
1080
                requestor: "u".into(),
1081
            },
1082
            crate::server::db::PinRequestRow {
1083
                id: 2,
1084
                task_id: "t".into(),
1085
                provider_type: "mock".into(),
1086
                provider_url: Some("mock".into()),
1087
                cid: "cid2".into(),
1088
                request_id: "rid2".into(),
1089
                status: "pinned".into(),
1090
                requestor: "u".into(),
1091
            },
1092
        ];
1093

1094
        let result = delete_ipfs_pins(&providers, "t", &rows).await.unwrap();
1095
        assert!(result);
1096
        let calls = deleted.lock().unwrap().clone();
1097
        assert_eq!(calls, vec!["rid1".to_string(), "rid2".to_string()]);
1098
    }
1099

1100
    #[tokio::test]
1101
    async fn delete_ipfs_pins_errors_when_provider_missing() {
1102
        let providers: Vec<std::sync::Arc<dyn crate::ipfs::IpfsPinningProvider>> = vec![];
1103
        let rows = vec![crate::server::db::PinRequestRow {
1104
            id: 1,
1105
            task_id: "t".into(),
1106
            provider_type: "unknown".into(),
1107
            provider_url: Some("https://unknown".into()),
1108
            cid: "cid".into(),
1109
            request_id: "rid".into(),
1110
            status: "pinned".into(),
1111
            requestor: "u".into(),
1112
        }];
1113
        let err = delete_ipfs_pins(&providers, "t", &rows).await.unwrap_err();
1114
        assert!(err.contains("No provider instance"));
1115
    }
1116
}
1117

1118
#[cfg(test)]
1119
mod recover_incomplete_tasks_tests {
1120
    use super::*;
1121
    use serde_json::json;
1122
    use std::sync::Arc;
1123
    use tokio::sync::mpsc;
1124

1125
    // Mock implementation of RecoveryDb for testing
1126
    struct MockRecoveryDb {
1127
        tasks: Vec<RecoveryTask>,
1128
        should_fail_get_tasks: bool,
1129
        should_fail_set_error: bool,
1130
        set_error_calls: Arc<std::sync::Mutex<Vec<(String, String)>>>,
1131
    }
1132

1133
    impl MockRecoveryDb {
1134
        fn new(tasks: Vec<RecoveryTask>) -> Self {
1135
            Self {
1136
                tasks,
1137
                should_fail_get_tasks: false,
1138
                should_fail_set_error: false,
1139
                set_error_calls: Arc::new(std::sync::Mutex::new(Vec::new())),
1140
            }
1141
        }
1142

1143
        fn with_get_tasks_failure(mut self) -> Self {
1144
            self.should_fail_get_tasks = true;
1145
            self
1146
        }
1147

1148
        fn get_set_error_calls(&self) -> Vec<(String, String)> {
1149
            self.set_error_calls.lock().unwrap().clone()
1150
        }
1151
    }
1152

1153
    #[async_trait::async_trait]
1154
    impl RecoveryDb for MockRecoveryDb {
1155
        async fn get_incomplete_backup_tasks(&self) -> Result<Vec<RecoveryTask>, sqlx::Error> {
1156
            if self.should_fail_get_tasks {
1157
                return Err(sqlx::Error::Configuration("Mock error".into()));
1158
            }
1159
            Ok(self.tasks.clone())
1160
        }
1161

1162
        async fn set_backup_error(&self, task_id: &str, error: &str) -> Result<(), sqlx::Error> {
1163
            if self.should_fail_set_error {
1164
                return Err(sqlx::Error::Configuration("Mock error".into()));
1165
            }
1166
            self.set_error_calls
1167
                .lock()
1168
                .unwrap()
1169
                .push((task_id.to_string(), error.to_string()));
1170
            Ok(())
1171
        }
1172
    }
1173

1174
    #[tokio::test]
1175
    async fn test_recover_incomplete_tasks_no_tasks() {
1176
        let mock_db = MockRecoveryDb::new(vec![]);
1177
        let (tx, _rx) = mpsc::channel(10);
1178

1179
        let result = recover_incomplete_tasks(&mock_db, &tx).await;
1180
        assert!(result.is_ok());
1181
        assert_eq!(result.unwrap(), 0);
1182
    }
1183

1184
    #[tokio::test]
1185
    async fn test_recover_incomplete_tasks_success() {
1186
        let tasks = vec![
1187
            RecoveryTask {
1188
                task_id: "task1".to_string(),
1189
                requestor: "user1".to_string(),
1190
                tokens: json!([{"chain": "ethereum", "tokens": ["0x123"]}]),
1191
                storage_mode: "archive".to_string(),
1192
                archive_format: Some("zip".to_string()),
1193
            },
1194
            RecoveryTask {
1195
                task_id: "task2".to_string(),
1196
                requestor: "user2".to_string(),
1197
                tokens: json!([{"chain": "tezos", "tokens": ["KT1ABC"]}]),
1198
                storage_mode: "full".to_string(),
1199
                archive_format: None,
1200
            },
1201
        ];
1202
        let mock_db = MockRecoveryDb::new(tasks);
1203
        let (tx, mut rx) = mpsc::channel(10);
1204

1205
        let result = recover_incomplete_tasks(&mock_db, &tx).await;
1206
        assert!(result.is_ok());
1207
        assert_eq!(result.unwrap(), 2);
1208

1209
        // Check that tasks were enqueued (order-independent)
1210
        let j1 = rx.recv().await.unwrap();
1211
        let j2 = rx.recv().await.unwrap();
1212
        let mut seen_task1 = false;
1213
        let mut seen_task2 = false;
1214
        for task in [j1, j2] {
1215
            match task {
1216
                BackupTaskOrShutdown::Task(TaskType::Creation(task)) => {
1217
                    if task.task_id == "task1" {
1218
                        seen_task1 = true;
1219
                        assert!(task.force);
1220
                        assert_eq!(task.storage_mode, StorageMode::Archive);
1221
                        assert_eq!(task.archive_format, Some("zip".to_string()));
1222
                        assert_eq!(task.requestor, Some("user1".to_string()));
1223
                    } else if task.task_id == "task2" {
1224
                        seen_task2 = true;
1225
                        assert!(task.force);
1226
                        assert_eq!(task.storage_mode, StorageMode::Full);
1227
                        assert_eq!(task.archive_format, None);
1228
                        assert_eq!(task.requestor, Some("user2".to_string()));
1229
                    } else {
1230
                        panic!("Unexpected task id");
1231
                    }
1232
                }
1233
                _ => panic!("Expected backup task"),
1234
            }
1235
        }
1236
        assert!(seen_task1 && seen_task2);
1237
    }
1238

1239
    #[tokio::test]
1240
    async fn test_recover_incomplete_tasks_invalid_tokens() {
1241
        let tasks = vec![RecoveryTask {
1242
            task_id: "task1".to_string(),
1243
            requestor: "user1".to_string(),
1244
            tokens: json!("invalid tokens"), // Invalid JSON for tokens
1245
            storage_mode: "archive".to_string(),
1246
            archive_format: None,
1247
        }];
1248
        let mock_db = MockRecoveryDb::new(tasks);
1249
        let (tx, mut rx) = mpsc::channel(10);
1250

1251
        let result = recover_incomplete_tasks(&mock_db, &tx).await;
1252
        assert!(result.is_ok());
1253
        assert_eq!(result.unwrap(), 1);
1254

1255
        // Should not enqueue any tasks
1256
        assert!(rx.try_recv().is_err());
1257

1258
        // Should have called set_backup_error
1259
        let error_calls = mock_db.get_set_error_calls();
1260
        assert_eq!(error_calls.len(), 1);
1261
        assert_eq!(error_calls[0].0, "task1");
1262
        assert!(error_calls[0]
1263
            .1
1264
            .contains("Failed to parse tokens during recovery"));
1265
    }
1266

1267
    #[tokio::test]
1268
    async fn test_recover_incomplete_tasks_db_error() {
1269
        let mock_db = MockRecoveryDb::new(vec![]).with_get_tasks_failure();
1270
        let (tx, _rx) = mpsc::channel(10);
1271

1272
        let result = recover_incomplete_tasks(&mock_db, &tx).await;
1273
        assert!(result.is_err());
1274
    }
1275

1276
    #[tokio::test]
1277
    async fn test_recover_incomplete_tasks_storage_mode_parsing() {
1278
        let tasks = vec![RecoveryTask {
1279
            task_id: "task1".to_string(),
1280
            requestor: "user1".to_string(),
1281
            tokens: json!([{"chain": "ethereum", "tokens": ["0x123"]}]),
1282
            storage_mode: "invalid_mode".to_string(), // Invalid storage mode
1283
            archive_format: None,
1284
        }];
1285
        let mock_db = MockRecoveryDb::new(tasks);
1286
        let (tx, mut rx) = mpsc::channel(10);
1287

1288
        let result = recover_incomplete_tasks(&mock_db, &tx).await;
1289
        assert!(result.is_ok());
1290
        assert_eq!(result.unwrap(), 1);
1291

1292
        // Should default to StorageMode::Full for invalid mode
1293
        let task = rx.recv().await.unwrap();
1294
        match task {
1295
            BackupTaskOrShutdown::Task(TaskType::Creation(task)) => {
1296
                assert_eq!(task.storage_mode, StorageMode::Full);
1297
                assert!(task.request.pin_on_ipfs); // Both includes IPFS
1298
            }
1299
            _ => panic!("Expected backup task"),
1300
        }
1301
    }
1302
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc