• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

0xmichalis / nftbk / 18384488675

09 Oct 2025 05:48PM UTC coverage: 32.63%. Remained the same
18384488675

push

github

0xmichalis
feat: enable pin creation and rearchitect db schema

42 of 250 new or added lines in 10 files covered. (16.8%)

279 existing lines in 7 files now uncovered.

974 of 2985 relevant lines covered (32.63%)

5.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.31
/src/server/mod.rs
1
use futures_util::FutureExt;
2
use std::collections::{HashMap, HashSet};
3
use std::panic::AssertUnwindSafe;
4
use std::path::PathBuf;
5
use std::str::FromStr;
6
use std::sync::atomic::AtomicBool;
7
use std::sync::Arc;
8
use std::time::Instant;
9
use tokio::fs;
10
use tokio::sync::mpsc;
11
use tokio::sync::Mutex;
12
use tracing::{debug, error, info, warn};
13

14
use crate::backup::ChainConfig;
15
use crate::ipfs::IpfsProviderConfig;
16
use crate::server::api::{BackupRequest, Tokens};
17
use crate::server::archive::{
18
    get_zipped_backup_paths, zip_backup, ARCHIVE_INTERRUPTED_BY_SHUTDOWN,
19
};
20
use crate::server::db::Db;
21
use crate::server::hashing::compute_file_sha256;
22
use crate::{
23
    backup::backup_from_config, BackupConfig, ProcessManagementConfig, StorageConfig, TokenConfig,
24
};
25

26
pub mod api;
27
pub mod archive;
28
pub mod db;
29
pub mod handlers;
30
pub mod hashing;
31
pub mod privy;
32
pub mod pruner;
33
pub mod router;
34
pub mod workers;
35
pub use handlers::handle_backup::handle_backup;
36
pub use handlers::handle_backup_delete::handle_backup_delete;
37
pub use handlers::handle_backup_retry::handle_backup_retry;
38
pub use handlers::handle_backups::handle_backups;
39
pub use handlers::handle_download::handle_download;
40
pub use handlers::handle_download::handle_download_token;
41
pub use handlers::handle_status::handle_status;
42
pub use workers::spawn_backup_workers;
43

44
#[derive(Clone)]
45
pub struct AppState {
46
    pub chain_config: Arc<ChainConfig>,
47
    pub base_dir: Arc<String>,
48
    pub unsafe_skip_checksum_check: bool,
49
    pub auth_token: Option<String>,
50
    pub pruner_enabled: bool,
51
    pub pruner_retention_days: u64,
52
    pub download_tokens: Arc<Mutex<HashMap<String, (String, u64)>>>,
53
    pub backup_job_sender: mpsc::Sender<BackupJobOrShutdown>,
54
    pub db: Arc<Db>,
55
    pub shutdown_flag: Arc<AtomicBool>,
56
    pub ipfs_providers: Vec<IpfsProviderConfig>,
57
}
58

59
#[derive(Debug, Clone)]
60
pub enum BackupJobOrShutdown {
61
    Job(BackupJob),
62
    Shutdown,
63
}
64

65
#[derive(Debug, Clone, PartialEq, Eq)]
66
pub enum StorageMode {
67
    Filesystem,
68
    Ipfs,
69
    Both,
70
}
71

72
impl StorageMode {
73
    pub fn as_str(&self) -> &'static str {
1✔
74
        match self {
1✔
75
            StorageMode::Filesystem => "filesystem",
1✔
NEW
76
            StorageMode::Ipfs => "ipfs",
×
NEW
77
            StorageMode::Both => "both",
×
78
        }
79
    }
80
}
81

82
impl FromStr for StorageMode {
83
    type Err = String;
84

NEW
85
    fn from_str(s: &str) -> Result<Self, Self::Err> {
×
NEW
86
        match s {
×
NEW
87
            "filesystem" => Ok(StorageMode::Filesystem),
×
NEW
88
            "ipfs" => Ok(StorageMode::Ipfs),
×
NEW
89
            "both" => Ok(StorageMode::Both),
×
NEW
90
            _ => Err(format!("Unknown storage mode: {}", s)),
×
91
        }
92
    }
93
}
94

95
#[derive(Debug, Clone)]
96
pub struct BackupJob {
97
    pub task_id: String,
98
    pub request: BackupRequest,
99
    pub force: bool,
100
    pub storage_mode: StorageMode,
101
    pub archive_format: Option<String>,
102
    pub requestor: Option<String>,
103
}
104

105
impl Default for AppState {
UNCOV
106
    fn default() -> Self {
×
UNCOV
107
        panic!("AppState::default() should not be used; use AppState::new() instead");
×
108
    }
109
}
110

111
impl AppState {
112
    #[allow(clippy::too_many_arguments)]
UNCOV
113
    pub async fn new(
×
114
        chain_config_path: &str,
115
        base_dir: &str,
116
        unsafe_skip_checksum_check: bool,
117
        auth_token: Option<String>,
118
        pruner_enabled: bool,
119
        pruner_retention_days: u64,
120
        backup_job_sender: mpsc::Sender<BackupJobOrShutdown>,
121
        db_url: &str,
122
        max_connections: u32,
123
        shutdown_flag: Arc<AtomicBool>,
124
        ipfs_providers: Vec<IpfsProviderConfig>,
125
    ) -> Self {
UNCOV
126
        let config_content = tokio::fs::read_to_string(chain_config_path)
×
UNCOV
127
            .await
×
128
            .expect("Failed to read chain config");
UNCOV
129
        let chains: std::collections::HashMap<String, String> =
×
UNCOV
130
            toml::from_str(&config_content).expect("Failed to parse chain config");
×
UNCOV
131
        let mut chain_config = ChainConfig(chains);
×
UNCOV
132
        chain_config
×
133
            .resolve_env_vars()
134
            .expect("Failed to resolve environment variables in chain config");
UNCOV
135
        let db = Arc::new(Db::new(db_url, max_connections).await);
×
136

137
        AppState {
138
            chain_config: Arc::new(chain_config),
×
139
            base_dir: Arc::new(base_dir.to_string()),
×
140
            unsafe_skip_checksum_check,
141
            auth_token,
142
            pruner_enabled,
143
            pruner_retention_days,
UNCOV
144
            download_tokens: Arc::new(Mutex::new(HashMap::new())),
×
145
            backup_job_sender,
146
            db,
147
            shutdown_flag,
148
            ipfs_providers,
149
        }
150
    }
151
}
152

UNCOV
153
pub async fn check_backup_on_disk(
×
154
    base_dir: &str,
155
    task_id: &str,
156
    unsafe_skip_checksum_check: bool,
157
    archive_format: &str,
158
) -> Option<PathBuf> {
159
    let (path, checksum_path) =
×
UNCOV
160
        crate::server::archive::get_zipped_backup_paths(base_dir, task_id, archive_format);
×
161

162
    // First check if both files exist
163
    match (
164
        fs::try_exists(&path).await,
×
UNCOV
165
        fs::try_exists(&checksum_path).await,
×
166
    ) {
167
        (Ok(true), Ok(true)) => {
UNCOV
168
            if unsafe_skip_checksum_check {
×
169
                // Only check for existence, skip reading and comparing checksums
170
                return Some(path);
×
171
            }
172
            // Read stored checksum
UNCOV
173
            info!("Checking backup on disk for task {}", task_id);
×
UNCOV
174
            let stored_checksum = match fs::read_to_string(&checksum_path).await {
×
UNCOV
175
                Ok(checksum) => checksum,
×
176
                Err(e) => {
×
UNCOV
177
                    warn!("Failed to read checksum file for {}: {}", path.display(), e);
×
UNCOV
178
                    return None;
×
179
                }
180
            };
181

182
            // Compute current checksum
UNCOV
183
            debug!("Computing backup checksum for task {}", task_id);
×
UNCOV
184
            let current_checksum = match compute_file_sha256(&path).await {
×
185
                Ok(checksum) => checksum,
×
UNCOV
186
                Err(e) => {
×
UNCOV
187
                    warn!("Failed to compute checksum for {}: {}", path.display(), e);
×
UNCOV
188
                    return None;
×
189
                }
190
            };
191

192
            if stored_checksum.trim() != current_checksum {
×
UNCOV
193
                warn!(
×
UNCOV
194
                    "Backup archive {} is corrupted: checksum mismatch",
×
UNCOV
195
                    path.display()
×
196
                );
197
                return None;
×
198
            }
199

200
            Some(path)
×
201
        }
202
        _ => None,
×
203
    }
204
}
205

206
/// Recover incomplete backup jobs from the database and enqueue them for processing
207
/// This is called on server startup to handle jobs that were interrupted by server shutdown
208
pub async fn recover_incomplete_jobs(
×
209
    state: &AppState,
210
) -> Result<usize, Box<dyn std::error::Error + Send + Sync>> {
NEW
211
    debug!("Recovering incomplete protection jobs from database...");
×
212

NEW
213
    let incomplete_jobs = state.db.get_incomplete_protection_jobs().await?;
×
UNCOV
214
    let job_count = incomplete_jobs.len();
×
215

216
    if job_count == 0 {
×
NEW
217
        debug!("No incomplete protection jobs found");
×
218
        return Ok(0);
×
219
    }
220

UNCOV
221
    debug!(
×
NEW
222
        "Found {} incomplete protection jobs, re-queueing for processing",
×
223
        job_count
224
    );
225

226
    for job_meta in incomplete_jobs {
×
227
        // Parse the tokens JSON back to Vec<Tokens>
UNCOV
228
        let tokens: Vec<Tokens> = match serde_json::from_value(job_meta.tokens.clone()) {
×
229
            Ok(tokens) => tokens,
×
UNCOV
230
            Err(e) => {
×
UNCOV
231
                warn!(
×
232
                    "Failed to parse tokens for job {}: {}, skipping",
×
233
                    job_meta.task_id, e
234
                );
235
                // Mark this job as error since we can't process it
UNCOV
236
                let _ = state
×
UNCOV
237
                    .db
×
238
                    .set_backup_error(
UNCOV
239
                        &job_meta.task_id,
×
240
                        &format!("Failed to parse tokens during recovery: {e}"),
×
241
                    )
UNCOV
242
                    .await;
×
243
                continue;
×
244
            }
245
        };
246

NEW
247
        let storage_mode = job_meta.storage_mode.parse().unwrap_or(StorageMode::Both);
×
NEW
248
        let pin_on_ipfs = storage_mode == StorageMode::Ipfs || storage_mode == StorageMode::Both;
×
249

250
        let backup_job = BackupJob {
251
            task_id: job_meta.task_id.clone(),
×
252
            request: BackupRequest {
×
253
                tokens,
254
                pin_on_ipfs,
255
            },
256
            force: true, // Force recovery to ensure backup actually runs
257
            storage_mode,
UNCOV
258
            archive_format: job_meta.archive_format,
×
UNCOV
259
            requestor: Some(job_meta.requestor),
×
260
        };
261

262
        // Try to enqueue the job
UNCOV
263
        if let Err(e) = state
×
264
            .backup_job_sender
×
265
            .send(BackupJobOrShutdown::Job(backup_job))
×
266
            .await
×
267
        {
268
            warn!(
×
UNCOV
269
                "Failed to enqueue recovered job {}: {}",
×
270
                job_meta.task_id, e
271
            );
272
            // Mark as error if we can't enqueue it
273
            let _ = state
×
UNCOV
274
                .db
×
275
                .set_backup_error(
276
                    &job_meta.task_id,
×
UNCOV
277
                    &format!("Failed to enqueue during recovery: {e}"),
×
278
                )
279
                .await;
×
280
        } else {
UNCOV
281
            debug!("Re-queued backup job: {}", job_meta.task_id);
×
282
        }
283
    }
284

UNCOV
285
    Ok(job_count)
×
286
}
287

288
async fn run_backup_job_inner(state: AppState, job: BackupJob) {
×
UNCOV
289
    let task_id = job.task_id.clone();
×
UNCOV
290
    let tokens = job.request.tokens.clone();
×
UNCOV
291
    let force = job.force;
×
NEW
292
    let storage_mode = job.storage_mode.clone();
×
NEW
293
    info!(
×
NEW
294
        "Running protection job for task {} (storage_mode: {})",
×
295
        task_id,
NEW
296
        storage_mode.as_str()
×
297
    );
298

299
    // If force is set, clean up the error log if it exists
UNCOV
300
    if force {
×
UNCOV
301
        let _ = state.db.clear_backup_errors(&task_id).await;
×
302
    }
303

304
    // Prepare backup config
305
    let shutdown_flag = Some(state.shutdown_flag.clone());
×
306
    let mut token_map = std::collections::HashMap::new();
×
UNCOV
307
    for entry in tokens.clone() {
×
308
        token_map.insert(entry.chain, entry.tokens);
×
309
    }
UNCOV
310
    let token_config = TokenConfig { chains: token_map };
×
311

312
    // Determine output path and IPFS settings based on storage mode
NEW
313
    let (output_path, ipfs_providers) = match storage_mode {
×
314
        StorageMode::Filesystem => {
315
            // Filesystem only: permanent directory, no IPFS
NEW
316
            let out_dir = format!("{}/nftbk-{}", state.base_dir, task_id);
×
NEW
317
            (Some(PathBuf::from(out_dir)), Vec::new())
×
318
        }
319
        StorageMode::Ipfs => {
320
            // IPFS only: no downloads, just pin existing CIDs
NEW
321
            (None, state.ipfs_providers.clone())
×
322
        }
323
        StorageMode::Both => {
324
            // Both: permanent directory and IPFS pinning
NEW
325
            let out_dir = format!("{}/nftbk-{}", state.base_dir, task_id);
×
NEW
326
            (Some(PathBuf::from(out_dir)), state.ipfs_providers.clone())
×
327
        }
328
    };
329

330
    // Run backup
331
    let backup_cfg = BackupConfig {
332
        chain_config: (*state.chain_config).clone(),
×
333
        token_config,
UNCOV
334
        storage_config: StorageConfig {
×
335
            output_path: output_path.clone(),
336
            prune_redundant: false,
337
            ipfs_providers,
338
        },
UNCOV
339
        process_config: ProcessManagementConfig {
×
340
            exit_on_error: false,
341
            shutdown_flag: shutdown_flag.clone(),
342
        },
343
    };
NEW
344
    let span = tracing::info_span!("protection_job", task_id = %task_id);
×
345
    let backup_result = backup_from_config(backup_cfg, Some(span)).await;
×
346

347
    // Check backup result
348
    let (files_written, token_pin_mappings, error_log) = match backup_result {
×
UNCOV
349
        Ok((files, token_pin_mappings, errors)) => (files, token_pin_mappings, errors),
×
UNCOV
350
        Err(e) => {
×
UNCOV
351
            let error_msg = e.to_string();
×
352
            if error_msg.contains("interrupted by shutdown signal") {
×
353
                info!(
×
UNCOV
354
                    "Backup {} was gracefully interrupted by shutdown signal",
×
355
                    task_id
356
                );
357
                return;
×
358
            }
359
            error!("Backup {task_id} failed: {}", e);
×
360
            let _ = state
×
UNCOV
361
                .db
×
362
                .set_backup_error(&task_id, &format!("Backup failed: {e}"))
×
UNCOV
363
                .await;
×
UNCOV
364
            return;
×
365
        }
366
    };
367

368
    // Persist token-pin request mappings atomically, if any
369
    if !token_pin_mappings.is_empty() {
×
UNCOV
370
        let req = job.requestor.as_deref().unwrap_or("");
×
UNCOV
371
        let _ = state
×
UNCOV
372
            .db
×
373
            .insert_pin_requests_with_tokens(req, &token_pin_mappings)
×
UNCOV
374
            .await;
×
375
    }
376

377
    // Store non-fatal error log in DB if present
378
    if !error_log.is_empty() {
×
UNCOV
379
        let log_str = error_log.join("\n");
×
UNCOV
380
        let _ = state
×
UNCOV
381
            .db
×
NEW
382
            .update_protection_job_error_log(&task_id, &log_str)
×
UNCOV
383
            .await;
×
384
    }
385

386
    // Handle archiving based on storage mode
NEW
387
    match storage_mode {
×
388
        StorageMode::Filesystem | StorageMode::Both => {
389
            // We have a filesystem output path
NEW
390
            let out_path = output_path.as_ref().unwrap();
×
391

392
            // Sync all files and directories to disk before archiving
NEW
393
            info!("Syncing {} to disk before archiving", out_path.display());
×
NEW
394
            let files_written_clone = files_written.clone();
×
NEW
395
            tokio::task::spawn_blocking(move || {
×
NEW
396
                sync_files(&files_written_clone);
×
397
            })
NEW
398
            .await
×
399
            .unwrap();
400
            info!(
×
NEW
401
                "Synced {} to disk before archiving ({} files)",
×
NEW
402
                out_path.display(),
×
NEW
403
                files_written.len()
×
404
            );
405

406
            // Archive the output dir
NEW
407
            let archive_fmt = job.archive_format.as_deref().unwrap_or("zip");
×
NEW
408
            let (zip_pathbuf, checksum_path) =
×
NEW
409
                get_zipped_backup_paths(&state.base_dir, &task_id, archive_fmt);
×
NEW
410
            info!("Archiving backup to {}", zip_pathbuf.display());
×
NEW
411
            let start_time = Instant::now();
×
NEW
412
            let out_path_clone = out_path.clone();
×
NEW
413
            let zip_pathbuf_clone = zip_pathbuf.clone();
×
NEW
414
            let archive_format_clone = archive_fmt.to_string();
×
NEW
415
            let shutdown_flag_clone = shutdown_flag.clone();
×
NEW
416
            let zip_result = tokio::task::spawn_blocking(move || {
×
NEW
417
                zip_backup(
×
NEW
418
                    &out_path_clone,
×
NEW
419
                    &zip_pathbuf_clone,
×
NEW
420
                    archive_format_clone,
×
NEW
421
                    shutdown_flag_clone,
×
422
                )
423
            })
NEW
424
            .await
×
425
            .unwrap();
426

427
            // Check archive result
NEW
428
            match zip_result {
×
NEW
429
                Ok(checksum) => {
×
NEW
430
                    info!(
×
NEW
431
                        "Archived backup at {} in {:?}s",
×
NEW
432
                        zip_pathbuf.display(),
×
NEW
433
                        start_time.elapsed().as_secs()
×
434
                    );
NEW
435
                    if let Err(e) = tokio::fs::write(&checksum_path, &checksum).await {
×
NEW
436
                        let error_msg = format!("Failed to write archive checksum file: {e}");
×
NEW
437
                        error!("{error_msg}");
×
NEW
438
                        let _ = state.db.set_backup_error(&task_id, &error_msg).await;
×
NEW
439
                        return;
×
440
                    }
NEW
441
                    let _ = state
×
NEW
442
                        .db
×
NEW
443
                        .update_protection_job_status(&task_id, "done")
×
NEW
444
                        .await;
×
445
                }
NEW
446
                Err(e) => {
×
NEW
447
                    let err_str = e.to_string();
×
NEW
448
                    if err_str.contains(ARCHIVE_INTERRUPTED_BY_SHUTDOWN) {
×
NEW
449
                        info!(
×
NEW
450
                            "Archiving for backup {} was gracefully interrupted by shutdown signal",
×
451
                            task_id
452
                        );
NEW
453
                        let _ = std::fs::remove_file(&zip_pathbuf);
×
NEW
454
                        return;
×
455
                    }
NEW
456
                    let error_msg = format!("Failed to archive backup: {e}");
×
NEW
457
                    error!("{error_msg}");
×
NEW
458
                    let _ = state.db.set_backup_error(&task_id, &error_msg).await;
×
NEW
459
                    return;
×
460
                }
461
            }
462

NEW
463
            info!("Backup {} ready", task_id);
×
464
        }
465
        StorageMode::Ipfs => {
466
            // IPFS-only mode: no filesystem operations needed
467
            let _ = state
×
468
                .db
×
NEW
469
                .update_protection_job_status(&task_id, "done")
×
UNCOV
470
                .await;
×
NEW
471
            info!("IPFS pinning for {} complete", task_id);
×
472
        }
473
    }
474
}
475

476
pub async fn run_backup_job(state: AppState, job: BackupJob) {
×
477
    let task_id = job.task_id.clone();
×
478
    let state_clone = state.clone();
×
479
    let task_id_clone = task_id.clone();
×
480

481
    let fut = AssertUnwindSafe(run_backup_job_inner(state, job)).catch_unwind();
×
482

483
    let result = fut.await;
×
484
    if let Err(panic) = result {
×
485
        let panic_msg = if let Some(s) = panic.downcast_ref::<&str>() {
×
486
            s.to_string()
×
487
        } else if let Some(s) = panic.downcast_ref::<String>() {
×
488
            s.clone()
×
489
        } else {
490
            "Unknown panic".to_string()
×
491
        };
UNCOV
492
        let error_msg = format!("Backup job for task {task_id_clone} panicked: {panic_msg}");
×
493
        error!("{error_msg}");
×
UNCOV
494
        let _ = state_clone
×
UNCOV
495
            .db
×
UNCOV
496
            .set_backup_error(&task_id_clone, &error_msg)
×
497
            .await;
×
498
    }
499
}
500

501
fn sync_files(files_written: &[std::path::PathBuf]) {
×
502
    let mut synced_dirs = HashSet::new();
×
UNCOV
503
    for file in files_written {
×
504
        if file.is_file() {
×
505
            if let Ok(f) = std::fs::File::open(file) {
×
506
                let _ = f.sync_all();
×
507
            }
508
        }
UNCOV
509
        if let Some(parent) = file.parent() {
×
510
            if synced_dirs.insert(parent.to_path_buf()) {
×
511
                if let Ok(dir) = std::fs::File::open(parent) {
×
512
                    let _ = dir.sync_all();
×
513
                }
514
            }
515
        }
516
    }
517
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc