• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

0xmichalis / nftbk / 18504496065

14 Oct 2025 05:14PM UTC coverage: 34.809% (-1.5%) from 36.262%
18504496065

Pull #63

github

web-flow
Merge cfc697a95 into fcba7227a
Pull Request #63: Separate error logs

88 of 540 new or added lines in 16 files covered. (16.3%)

38 existing lines in 7 files now uncovered.

1321 of 3795 relevant lines covered (34.81%)

5.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

11.63
/src/server/mod.rs
1
use std::collections::HashMap;
2
use std::path::PathBuf;
3
use std::str::FromStr;
4
use std::sync::atomic::AtomicBool;
5
use std::sync::Arc;
6
use tokio::fs;
7
use tokio::sync::mpsc;
8
use tokio::sync::Mutex;
9
use tracing::{debug, error, info, warn};
10

11
use crate::backup::ChainConfig;
12
use crate::ipfs::{IpfsPinningProvider, IpfsProviderConfig};
13
use crate::server::api::{BackupRequest, Tokens};
14
use crate::server::db::Db;
15
use crate::server::hashing::compute_file_sha256;
16

17
pub mod api;
18
pub mod archive;
19
pub mod db;
20
pub mod handlers;
21
pub mod hashing;
22
pub mod pin_monitor;
23
pub mod privy;
24
pub mod pruner;
25
pub mod recovery;
26
pub mod router;
27
pub mod workers;
28
pub use handlers::handle_backup::handle_backup;
29
pub use handlers::handle_backup_delete_archive::handle_backup_delete_archive;
30
pub use handlers::handle_backup_delete_pins::handle_backup_delete_pins;
31
pub use handlers::handle_backup_retry::handle_backup_retry;
32
pub use handlers::handle_backups::handle_backups;
33
pub use handlers::handle_download::handle_download;
34
pub use handlers::handle_download::handle_download_token;
35
pub use handlers::handle_status::handle_status;
36
pub use recovery::{recover_incomplete_tasks, RecoveryDb};
37
pub use workers::deletion::{complete_deletion_for_scope, start_deletion_for_scope};
38
pub use workers::spawn_backup_workers;
39

40
#[derive(Debug, Clone)]
41
pub enum BackupTaskOrShutdown {
42
    Task(TaskType),
43
    Shutdown,
44
}
45

46
#[derive(Debug, Clone)]
47
pub enum TaskType {
48
    Creation(BackupTask),
49
    Deletion(DeletionTask),
50
}
51

52
#[derive(Debug, Clone)]
53
pub struct BackupTask {
54
    pub task_id: String,
55
    pub request: BackupRequest,
56
    pub force: bool,
57
    pub scope: StorageMode,
58
    pub archive_format: Option<String>,
59
    pub requestor: Option<String>,
60
}
61

62
#[derive(Debug, Clone)]
63
pub struct DeletionTask {
64
    pub task_id: String,
65
    pub requestor: Option<String>,
66
    /// Determines which parts of the backup to delete (e.g., only the archive, only the IPFS pins, or both).
67
    pub scope: StorageMode,
68
}
69

70
#[derive(Debug, Clone, PartialEq, Eq)]
71
pub enum StorageMode {
72
    Archive,
73
    Ipfs,
74
    Full,
75
}
76

77
impl StorageMode {
78
    pub fn as_str(&self) -> &'static str {
7✔
79
        match self {
7✔
80
            StorageMode::Archive => "archive",
6✔
81
            StorageMode::Ipfs => "ipfs",
1✔
82
            StorageMode::Full => "full",
×
83
        }
84
    }
85
}
86

87
impl FromStr for StorageMode {
88
    type Err = String;
89

90
    fn from_str(s: &str) -> Result<Self, Self::Err> {
5✔
91
        match s {
5✔
92
            "archive" => Ok(StorageMode::Archive),
8✔
93
            "ipfs" => Ok(StorageMode::Ipfs),
2✔
94
            "full" => Ok(StorageMode::Full),
3✔
95
            _ => Err(format!("Unknown storage mode: {}", s)),
1✔
96
        }
97
    }
98
}
99

100
#[derive(Clone)]
101
pub struct AppState {
102
    pub chain_config: Arc<ChainConfig>,
103
    pub base_dir: Arc<String>,
104
    pub unsafe_skip_checksum_check: bool,
105
    pub auth_token: Option<String>,
106
    pub pruner_enabled: bool,
107
    pub pruner_retention_days: u64,
108
    pub download_tokens: Arc<Mutex<HashMap<String, (String, u64)>>>,
109
    pub backup_task_sender: mpsc::Sender<BackupTaskOrShutdown>,
110
    pub db: Arc<Db>,
111
    pub shutdown_flag: Arc<AtomicBool>,
112
    pub ipfs_providers: Vec<IpfsProviderConfig>,
113
    pub ipfs_provider_instances: Arc<Vec<Arc<dyn IpfsPinningProvider>>>,
114
}
115

116
impl Default for AppState {
117
    fn default() -> Self {
×
118
        panic!("AppState::default() should not be used; use AppState::new() instead");
×
119
    }
120
}
121

122
impl AppState {
123
    #[allow(clippy::too_many_arguments)]
124
    pub async fn new(
×
125
        chain_config_path: &str,
126
        base_dir: &str,
127
        unsafe_skip_checksum_check: bool,
128
        auth_token: Option<String>,
129
        pruner_enabled: bool,
130
        pruner_retention_days: u64,
131
        backup_task_sender: mpsc::Sender<BackupTaskOrShutdown>,
132
        db_url: &str,
133
        max_connections: u32,
134
        shutdown_flag: Arc<AtomicBool>,
135
        ipfs_providers: Vec<IpfsProviderConfig>,
136
    ) -> Self {
137
        let config_content = tokio::fs::read_to_string(chain_config_path)
×
138
            .await
×
139
            .expect("Failed to read chain config");
140
        let chains: std::collections::HashMap<String, String> =
×
141
            toml::from_str(&config_content).expect("Failed to parse chain config");
×
142
        let mut chain_config = ChainConfig(chains);
×
143
        chain_config
×
144
            .resolve_env_vars()
145
            .expect("Failed to resolve environment variables in chain config");
146
        let db = Arc::new(Db::new(db_url, max_connections).await);
×
147

148
        // Create IPFS provider instances at startup
149
        let mut ipfs_provider_instances = Vec::new();
×
150
        for config in &ipfs_providers {
×
151
            match config.create_provider() {
×
152
                Ok(provider) => {
×
153
                    info!(
×
154
                        "Successfully created IPFS provider {} ({})",
×
155
                        provider.provider_type(),
×
156
                        provider.provider_url()
×
157
                    );
158
                    ipfs_provider_instances.push(Arc::from(provider));
×
159
                }
160
                Err(e) => {
×
161
                    error!(
×
162
                        "Failed to create IPFS provider from config {:?}: {}",
×
163
                        config, e
164
                    );
165
                }
166
            }
167
        }
168

169
        AppState {
170
            chain_config: Arc::new(chain_config),
×
171
            base_dir: Arc::new(base_dir.to_string()),
×
172
            unsafe_skip_checksum_check,
173
            auth_token,
174
            pruner_enabled,
175
            pruner_retention_days,
176
            download_tokens: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
×
177
            backup_task_sender,
178
            db,
179
            shutdown_flag,
180
            ipfs_providers,
181
            ipfs_provider_instances: Arc::new(ipfs_provider_instances),
×
182
        }
183
    }
184
}
185

186
pub async fn check_backup_on_disk(
×
187
    base_dir: &str,
188
    task_id: &str,
189
    unsafe_skip_checksum_check: bool,
190
    archive_format: &str,
191
) -> Option<PathBuf> {
192
    let (path, checksum_path) =
×
193
        crate::server::archive::get_zipped_backup_paths(base_dir, task_id, archive_format);
×
194

195
    // First check if both files exist
196
    match (
197
        fs::try_exists(&path).await,
×
198
        fs::try_exists(&checksum_path).await,
×
199
    ) {
200
        (Ok(true), Ok(true)) => {
201
            if unsafe_skip_checksum_check {
×
202
                // Only check for existence, skip reading and comparing checksums
203
                return Some(path);
×
204
            }
205
            // Read stored checksum
206
            info!("Checking backup on disk for task {}", task_id);
×
207
            let stored_checksum = match fs::read_to_string(&checksum_path).await {
×
208
                Ok(checksum) => checksum,
209
                Err(e) => {
×
210
                    warn!("Failed to read checksum file for {}: {}", path.display(), e);
×
211
                    return None;
×
212
                }
213
            };
214

215
            // Compute current checksum
216
            debug!("Computing backup checksum for task {}", task_id);
×
217
            let current_checksum = match compute_file_sha256(&path).await {
×
218
                Ok(checksum) => checksum,
219
                Err(e) => {
×
220
                    warn!("Failed to compute checksum for {}: {}", path.display(), e);
×
221
                    return None;
×
222
                }
223
            };
224

225
            if stored_checksum.trim() != current_checksum {
226
                warn!(
×
227
                    "Backup archive {} is corrupted: checksum mismatch",
×
228
                    path.display()
×
229
                );
230
                return None;
×
231
            }
232

233
            Some(path)
234
        }
235
        _ => None,
×
236
    }
237
}
238

239
// Trait for database operations needed by backup and deletion tasks
240
#[async_trait::async_trait]
241
pub trait BackupTaskDb {
242
    async fn clear_backup_errors(&self, task_id: &str, scope: &str) -> Result<(), sqlx::Error>;
243
    async fn set_backup_error(&self, task_id: &str, error: &str) -> Result<(), sqlx::Error>;
244
    async fn insert_pins_with_tokens(
245
        &self,
246
        task_id: &str,
247
        token_pin_mappings: &[crate::TokenPinMapping],
248
    ) -> Result<(), sqlx::Error>;
249
    async fn set_error_logs(
250
        &self,
251
        task_id: &str,
252
        archive_error_log: Option<&str>,
253
        ipfs_error_log: Option<&str>,
254
    ) -> Result<(), sqlx::Error>;
255
    async fn update_archive_error_log(
256
        &self,
257
        task_id: &str,
258
        error_log: &str,
259
    ) -> Result<(), sqlx::Error>;
260
    async fn update_ipfs_task_error_log(
261
        &self,
262
        task_id: &str,
263
        error_log: &str,
264
    ) -> Result<(), sqlx::Error>;
265
    async fn set_archive_request_error(
266
        &self,
267
        task_id: &str,
268
        fatal_error: &str,
269
    ) -> Result<(), sqlx::Error>;
270
    async fn update_archive_request_status(
271
        &self,
272
        task_id: &str,
273
        status: &str,
274
    ) -> Result<(), sqlx::Error>;
275
    async fn update_pin_request_status(
276
        &self,
277
        task_id: &str,
278
        status: &str,
279
    ) -> Result<(), sqlx::Error>;
280
    async fn update_backup_statuses(
281
        &self,
282
        task_id: &str,
283
        scope: &str,
284
        archive_status: &str,
285
        ipfs_status: &str,
286
    ) -> Result<(), sqlx::Error>;
287
    async fn get_backup_task(
288
        &self,
289
        task_id: &str,
290
    ) -> Result<Option<crate::server::db::BackupTask>, sqlx::Error>;
291
    async fn start_deletion(&self, task_id: &str) -> Result<(), sqlx::Error>;
292
    async fn start_archive_deletion(&self, task_id: &str) -> Result<(), sqlx::Error>;
293
    async fn start_ipfs_pins_deletion(&self, task_id: &str) -> Result<(), sqlx::Error>;
294
    async fn delete_backup_task(&self, task_id: &str) -> Result<(), sqlx::Error>;
295
    async fn complete_archive_deletion(&self, task_id: &str) -> Result<(), sqlx::Error>;
296
    async fn complete_ipfs_pins_deletion(&self, task_id: &str) -> Result<(), sqlx::Error>;
297
    async fn update_ipfs_task_status(&self, task_id: &str, status: &str)
298
        -> Result<(), sqlx::Error>;
299
    async fn set_ipfs_task_error(
300
        &self,
301
        task_id: &str,
302
        fatal_error: &str,
303
    ) -> Result<(), sqlx::Error>;
304
}
305

306
// Implement BackupTaskDb trait for the real Db
307
#[async_trait::async_trait]
308
impl BackupTaskDb for Db {
NEW
309
    async fn clear_backup_errors(&self, task_id: &str, scope: &str) -> Result<(), sqlx::Error> {
×
NEW
310
        Db::clear_backup_errors(self, task_id, scope).await
×
311
    }
312

313
    async fn set_backup_error(&self, task_id: &str, error: &str) -> Result<(), sqlx::Error> {
×
314
        Db::set_backup_error(self, task_id, error).await
×
315
    }
316

317
    async fn insert_pins_with_tokens(
318
        &self,
319
        task_id: &str,
320
        token_pin_mappings: &[crate::TokenPinMapping],
321
    ) -> Result<(), sqlx::Error> {
NEW
322
        Db::insert_pins_with_tokens(self, task_id, token_pin_mappings).await
×
323
    }
324

325
    async fn set_error_logs(
326
        &self,
327
        task_id: &str,
328
        archive_error_log: Option<&str>,
329
        ipfs_error_log: Option<&str>,
330
    ) -> Result<(), sqlx::Error> {
NEW
331
        Db::set_error_logs(self, task_id, archive_error_log, ipfs_error_log).await
×
332
    }
333

334
    async fn update_archive_error_log(
335
        &self,
336
        task_id: &str,
337
        error_log: &str,
338
    ) -> Result<(), sqlx::Error> {
NEW
339
        Db::update_archive_error_log(self, task_id, error_log).await
×
340
    }
341

342
    async fn update_ipfs_task_error_log(
343
        &self,
344
        task_id: &str,
345
        error_log: &str,
346
    ) -> Result<(), sqlx::Error> {
NEW
347
        Db::update_ipfs_task_error_log(self, task_id, error_log).await
×
348
    }
349

350
    async fn set_archive_request_error(
351
        &self,
352
        task_id: &str,
353
        fatal_error: &str,
354
    ) -> Result<(), sqlx::Error> {
NEW
355
        Db::set_archive_request_error(self, task_id, fatal_error).await
×
356
    }
357

358
    async fn update_archive_request_status(
359
        &self,
360
        task_id: &str,
361
        status: &str,
362
    ) -> Result<(), sqlx::Error> {
NEW
363
        Db::update_archive_request_status(self, task_id, status).await
×
364
    }
365

366
    async fn update_pin_request_status(
367
        &self,
368
        task_id: &str,
369
        status: &str,
370
    ) -> Result<(), sqlx::Error> {
NEW
371
        Db::update_pin_request_status(self, task_id, status).await
×
372
    }
373

374
    async fn update_backup_statuses(
375
        &self,
376
        task_id: &str,
377
        scope: &str,
378
        archive_status: &str,
379
        ipfs_status: &str,
380
    ) -> Result<(), sqlx::Error> {
NEW
381
        Db::update_backup_statuses(self, task_id, scope, archive_status, ipfs_status).await
×
382
    }
383

384
    async fn get_backup_task(
385
        &self,
386
        task_id: &str,
387
    ) -> Result<Option<crate::server::db::BackupTask>, sqlx::Error> {
388
        Db::get_backup_task(self, task_id).await
×
389
    }
390

391
    async fn start_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
392
        Db::start_deletion(self, task_id).await
×
393
    }
394

395
    async fn start_archive_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
396
        Db::start_archive_deletion(self, task_id).await
×
397
    }
398

399
    async fn start_ipfs_pins_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
400
        Db::start_ipfs_pins_deletion(self, task_id).await
×
401
    }
402

403
    async fn delete_backup_task(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
404
        Db::delete_backup_task(self, task_id).await
×
405
    }
406

407
    async fn complete_archive_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
408
        Db::complete_archive_deletion(self, task_id).await
×
409
    }
410

411
    async fn complete_ipfs_pins_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
412
        Db::complete_ipfs_pins_deletion(self, task_id).await
×
413
    }
414

415
    async fn update_ipfs_task_status(
416
        &self,
417
        task_id: &str,
418
        status: &str,
419
    ) -> Result<(), sqlx::Error> {
NEW
420
        Db::update_ipfs_task_status(self, task_id, status).await
×
421
    }
422

423
    async fn set_ipfs_task_error(
424
        &self,
425
        task_id: &str,
426
        fatal_error: &str,
427
    ) -> Result<(), sqlx::Error> {
NEW
428
        Db::set_ipfs_task_error(self, task_id, fatal_error).await
×
429
    }
430
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc