• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

0xmichalis / nftbk / 18659833205

20 Oct 2025 05:26PM UTC coverage: 37.576% (+0.01%) from 37.563%
18659833205

Pull #79

github

web-flow
Merge 645b89dee into cce49e9ba
Pull Request #79: fix: avoid marking backups as error on graceful shutdown

0 of 30 new or added lines in 1 file covered. (0.0%)

1 existing line in 1 file now uncovered.

1721 of 4580 relevant lines covered (37.58%)

7.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

6.76
/src/server/workers/creation.rs
1
use futures_util::FutureExt;
2
use std::collections::HashMap;
3
use std::panic::AssertUnwindSafe;
4
use std::path::PathBuf;
5
use std::time::Instant;
6
use tracing::{error, info};
7

8
use crate::backup::backup_from_config;
9
use crate::server::archive::{
10
    get_zipped_backup_paths, sync_files, zip_backup, ARCHIVE_INTERRUPTED_BY_SHUTDOWN,
11
};
12
use crate::server::database::r#trait::Database;
13
use crate::server::{AppState, BackupTask, StorageMode};
14
use crate::{BackupConfig, IpfsOutcome, ProcessManagementConfig, StorageConfig, TokenConfig};
15

16
/// Persist non-fatal error logs for archive and/or IPFS based on the requested scope
17
async fn persist_non_fatal_error_logs<DB: Database + ?Sized>(
4✔
18
    db: &DB,
19
    task_id: &str,
20
    scope: &crate::server::StorageMode,
21
    archive_errors: &[String],
22
    ipfs_errors: &[String],
23
) {
24
    // Determine scope based on which non-fatal errors are present
25
    let archive_log = if archive_errors.is_empty() {
12✔
26
        None
2✔
27
    } else {
28
        Some(archive_errors.join("\n"))
2✔
29
    };
30
    let ipfs_log = if ipfs_errors.is_empty() {
12✔
31
        None
2✔
32
    } else {
33
        Some(ipfs_errors.join("\n"))
2✔
34
    };
35
    if archive_log.is_none() && ipfs_log.is_none() {
12✔
36
        return;
1✔
37
    }
38
    match scope {
×
39
        crate::server::StorageMode::Full => {
×
40
            let _ = db
1✔
41
                .set_error_logs(task_id, archive_log.as_deref(), ipfs_log.as_deref())
6✔
42
                .await;
1✔
43
        }
44
        crate::server::StorageMode::Archive => {
×
45
            if let Some(a) = archive_log.as_deref() {
2✔
46
                let _ = db.update_archive_request_error_log(task_id, a).await;
×
47
            }
48
        }
49
        crate::server::StorageMode::Ipfs => {
×
50
            if let Some(i) = ipfs_log.as_deref() {
2✔
51
                let _ = db.update_pin_request_error_log(task_id, i).await;
×
52
            }
53
        }
54
    }
55
}
56

57
#[derive(Debug, PartialEq)]
58
enum ArchiveResult {
59
    Success,
60
    Error,
61
    ShutdownInterrupted,
62
}
63

UNCOV
64
async fn process_archive_outcome<DB: Database + ?Sized>(
×
65
    state: &AppState,
66
    task: &BackupTask,
67
    task_id: &str,
68
    out_path: &std::path::Path,
69
    archive_outcome: &crate::ArchiveOutcome,
70
    shutdown_flag: Option<std::sync::Arc<std::sync::atomic::AtomicBool>>,
71
    db: &DB,
72
) -> ArchiveResult {
73
    // Sync all files and directories to disk before archiving
74
    info!("Syncing {} to disk before archiving", out_path.display());
×
75
    let files_written = archive_outcome.files.clone();
×
76
    let files_written_clone = files_written.clone();
×
77
    if tokio::task::spawn_blocking(move || {
×
78
        sync_files(&files_written_clone);
×
79
    })
80
    .await
×
81
    .is_err()
82
    {
83
        let error_msg = "Sync before archiving failed".to_string();
×
84
        error!("{error_msg}");
×
85
        let _ = db.set_archive_request_error(task_id, &error_msg).await;
×
NEW
86
        return ArchiveResult::Error;
×
87
    }
88
    info!(
×
89
        "Synced {} to disk before archiving ({} files)",
×
90
        out_path.display(),
×
91
        files_written.len()
×
92
    );
93

94
    // Archive the output dir
95
    let archive_fmt = task.archive_format.as_deref().unwrap_or("zip");
×
96
    let (zip_pathbuf, checksum_path) =
×
97
        get_zipped_backup_paths(&state.base_dir, task_id, archive_fmt);
×
98
    info!("Archiving backup to {}", zip_pathbuf.display());
×
99
    let start_time = Instant::now();
×
100
    let out_path_clone = out_path.to_path_buf();
×
101
    let zip_pathbuf_clone = zip_pathbuf.clone();
×
102
    let archive_format_clone = archive_fmt.to_string();
×
103
    let shutdown_flag_clone = shutdown_flag.clone();
×
104
    let zip_result = match tokio::task::spawn_blocking(move || {
×
105
        zip_backup(
×
106
            &out_path_clone,
×
107
            &zip_pathbuf_clone,
×
108
            archive_format_clone,
×
109
            shutdown_flag_clone,
×
110
        )
111
    })
112
    .await
×
113
    {
114
        Ok(r) => r,
×
115
        Err(_) => {
×
116
            let error_msg = "Archiving task panicked".to_string();
×
117
            error!("{error_msg}");
×
118
            let _ = db.set_archive_request_error(task_id, &error_msg).await;
×
NEW
119
            return ArchiveResult::Error;
×
120
        }
121
    };
122

123
    // Check archive result
124
    match zip_result {
×
125
        Ok(checksum) => {
×
126
            info!(
×
127
                "Archived backup at {} in {:?}s",
×
128
                zip_pathbuf.display(),
×
129
                start_time.elapsed().as_secs()
×
130
            );
131
            if let Err(e) = tokio::fs::write(&checksum_path, &checksum).await {
×
132
                let error_msg = format!("Failed to write archive checksum file: {e}");
×
133
                error!("{error_msg}");
×
134
                let _ = db.set_archive_request_error(task_id, &error_msg).await;
×
NEW
135
                return ArchiveResult::Error;
×
136
            }
137
        }
138
        Err(e) => {
×
139
            let err_str = e.to_string();
×
140
            if err_str.contains(ARCHIVE_INTERRUPTED_BY_SHUTDOWN) {
×
141
                info!(
×
142
                    "Archiving for backup {task_id} was gracefully interrupted by shutdown signal"
×
143
                );
144
                let _ = std::fs::remove_file(&zip_pathbuf);
×
NEW
145
                return ArchiveResult::ShutdownInterrupted;
×
146
            }
147
            let error_msg = format!("Failed to archive backup: {e}");
×
148
            error!("{error_msg}");
×
149
            let _ = db.set_archive_request_error(task_id, &error_msg).await;
×
NEW
150
            return ArchiveResult::Error;
×
151
        }
152
    }
153

NEW
154
    ArchiveResult::Success
×
155
}
156

157
async fn process_ipfs_outcome<DB: Database + ?Sized>(
×
158
    db: &DB,
159
    task: &BackupTask,
160
    ipfs_outcome: &IpfsOutcome,
161
) -> bool {
162
    if ipfs_outcome.pin_requests.is_empty() {
×
163
        // No pin requests in an IPFS backup means the library failed to contact any of the current IPFS providers
164
        return false;
×
165
    }
166
    let _ = db
×
167
        .insert_pins_with_tokens(&task.task_id, &ipfs_outcome.pin_requests)
×
168
        .await;
×
169
    true
×
170
}
171

172
fn prepare_backup_config(
×
173
    state: &AppState,
174
    task_id: &str,
175
    scope: &StorageMode,
176
    tokens: &[crate::server::api::Tokens],
177
) -> BackupConfig {
178
    let shutdown_flag = Some(state.shutdown_flag.clone());
×
179
    let mut token_map = HashMap::new();
×
180
    for entry in tokens {
×
181
        token_map.insert(entry.chain.clone(), entry.tokens.clone());
×
182
    }
183
    let token_config = TokenConfig { chains: token_map };
×
184

185
    // Determine output path and IPFS settings based on storage mode
186
    let (output_path, ipfs_pinning_configs) = match scope {
×
187
        StorageMode::Archive => {
188
            // Filesystem only: permanent directory, no IPFS
189
            let out_dir = format!("{}/nftbk-{}", state.base_dir, task_id);
×
190
            (Some(PathBuf::from(out_dir)), Vec::new())
×
191
        }
192
        StorageMode::Ipfs => {
193
            // IPFS only: no downloads, just pin existing CIDs
194
            (None, state.ipfs_pinning_configs.clone())
×
195
        }
196
        StorageMode::Full => {
197
            // Both: permanent directory and IPFS pinning
198
            let out_dir = format!("{}/nftbk-{}", state.base_dir, task_id);
×
199
            (
200
                Some(PathBuf::from(out_dir)),
×
201
                state.ipfs_pinning_configs.clone(),
×
202
            )
203
        }
204
    };
205

206
    BackupConfig {
207
        chain_config: (*state.chain_config).clone(),
×
208
        token_config,
209
        storage_config: StorageConfig {
×
210
            output_path: output_path.clone(),
211
            prune_redundant: false,
212
            ipfs_pinning_configs,
213
        },
214
        process_config: ProcessManagementConfig {
×
215
            exit_on_error: false,
216
            shutdown_flag: shutdown_flag.clone(),
217
        },
218
        task_id: Some(task_id.to_string()),
×
219
    }
220
}
221

222
async fn run_backup_task_inner<DB: Database + ?Sized>(state: AppState, task: BackupTask, db: &DB) {
×
223
    let task_id = task.task_id.clone();
×
224
    let scope = task.scope.clone();
×
225
    info!(
×
226
        "Running backup task for task {} (scope: {})",
×
227
        task_id,
×
228
        scope.as_str()
×
229
    );
230

231
    // Prepare backup config
232
    let backup_cfg = prepare_backup_config(&state, &task_id, &scope, &task.request.tokens);
×
233
    let output_path = backup_cfg.storage_config.output_path.clone();
×
234
    let span = tracing::info_span!("backup_task", task_id = %task_id);
×
235

236
    // Run backup
237
    let backup_result = backup_from_config(backup_cfg, Some(span)).await;
×
238

239
    // Check backup result
240
    let (archive_outcome, ipfs_outcome) = match backup_result {
×
241
        Ok((archive_outcome, ipfs_outcome)) => (archive_outcome, ipfs_outcome),
×
242
        Err(e) => {
×
243
            let error_msg = e.to_string();
×
244
            if error_msg.contains("interrupted by shutdown signal") {
×
245
                info!(
×
246
                    "Backup {} was gracefully interrupted by shutdown signal",
×
247
                    task_id
×
248
                );
249
                return;
×
250
            }
251
            error!("Backup {task_id} failed: {}", e);
×
252
            let _ = db
×
253
                .set_backup_error(&task_id, &format!("Backup failed: {e}"))
×
254
                .await;
×
255
            return;
×
256
        }
257
    };
258

259
    // Store non-fatal library error logs
260
    persist_non_fatal_error_logs(
261
        db,
×
262
        &task_id,
×
263
        &scope,
×
264
        &archive_outcome.errors,
×
265
        &ipfs_outcome.errors,
×
266
    )
267
    .await;
×
268

269
    // Process archive and IPFS outcomes based on scope of the backup task.
NEW
270
    let mut was_interrupted = false;
×
271
    match scope {
×
272
        StorageMode::Ipfs => {
×
273
            let success = process_ipfs_outcome(db, &task, &ipfs_outcome).await;
×
274
            let status = if success { "done" } else { "error" };
×
275
            let _ = db.update_pin_request_status(&task_id, status).await;
×
276
        }
277
        StorageMode::Archive => {
×
278
            let out_path = output_path.as_ref().unwrap();
×
279
            let result = process_archive_outcome(
280
                &state,
×
281
                &task,
×
282
                &task_id,
×
283
                out_path,
×
284
                &archive_outcome,
×
285
                Some(state.shutdown_flag.clone()),
×
286
                db,
×
287
            )
288
            .await;
×
NEW
289
            match result {
×
NEW
290
                ArchiveResult::Success => {
×
NEW
291
                    let _ = db.update_archive_request_status(&task_id, "done").await;
×
292
                }
NEW
293
                ArchiveResult::Error => {
×
NEW
294
                    let _ = db.update_archive_request_status(&task_id, "error").await;
×
295
                }
NEW
296
                ArchiveResult::ShutdownInterrupted => {
×
297
                    // Don't update status - leave it as is (likely "in_progress")
NEW
298
                    was_interrupted = true;
×
299
                }
300
            }
301
        }
302
        StorageMode::Full => {
×
303
            let out_path = output_path.as_ref().unwrap().clone();
×
304
            let state_ref = &state;
×
305
            let task_ref = &task;
×
306
            let task_id_ref = task_id.clone();
×
307

308
            // Process archive and IPFS outcomes in parallel. This should speed up status
309
            // updates for ipfs tasks since archives usually take longer to complete.
310
            let archive_fut = async move {
×
311
                let result = process_archive_outcome(
312
                    state_ref,
×
313
                    task_ref,
×
314
                    &task_id_ref,
×
315
                    &out_path,
×
316
                    &archive_outcome,
×
317
                    Some(state_ref.shutdown_flag.clone()),
×
318
                    db,
×
319
                )
320
                .await;
×
NEW
321
                match result {
×
NEW
322
                    ArchiveResult::Success => {
×
NEW
323
                        let _ = db.update_archive_request_status(&task_id_ref, "done").await;
×
324
                    }
NEW
325
                    ArchiveResult::Error => {
×
NEW
326
                        let _ = db
×
NEW
327
                            .update_archive_request_status(&task_id_ref, "error")
×
NEW
328
                            .await;
×
329
                    }
NEW
330
                    ArchiveResult::ShutdownInterrupted => {
×
331
                        // Don't update status - leave it as is (likely "in_progress")
NEW
332
                        info!("Archive status left unchanged due to shutdown interruption");
×
333
                    }
334
                }
NEW
335
                result
×
336
            };
337

338
            let ipfs_fut = async {
×
339
                let success = process_ipfs_outcome(db, &task, &ipfs_outcome).await;
×
340
                let status = if success { "done" } else { "error" };
×
341
                let _ = db.update_pin_request_status(&task_id, status).await;
×
NEW
342
                success
×
343
            };
344

NEW
345
            let (archive_result, _ipfs_success) = tokio::join!(archive_fut, ipfs_fut);
×
NEW
346
            if matches!(archive_result, ArchiveResult::ShutdownInterrupted) {
×
NEW
347
                was_interrupted = true;
×
348
            }
349
        }
350
    }
351

NEW
352
    if !was_interrupted {
×
NEW
353
        info!("Backup {} ready", task_id);
×
354
    }
355
}
356

357
pub async fn run_backup_task(state: AppState, task: BackupTask) {
×
358
    let task_id = task.task_id.clone();
×
359
    let state_clone = state.clone();
×
360

361
    let fut =
×
362
        AssertUnwindSafe(run_backup_task_inner(state.clone(), task, &*state.db)).catch_unwind();
×
363

364
    let result = fut.await;
×
365
    if let Err(panic) = result {
×
366
        let panic_msg = if let Some(s) = panic.downcast_ref::<&str>() {
×
367
            s.to_string()
×
368
        } else if let Some(s) = panic.downcast_ref::<String>() {
×
369
            s.clone()
×
370
        } else {
371
            "Unknown panic".to_string()
×
372
        };
373
        let error_msg = format!("Backup task for task {task_id} panicked: {panic_msg}");
×
374
        error!("{error_msg}");
×
375
        let _ = Database::set_backup_error(&*state_clone.db, &task_id, &error_msg).await;
×
376
    }
377
}
378

379
#[cfg(test)]
380
mod persist_error_logs_tests {
381
    use super::persist_non_fatal_error_logs;
382
    use crate::server::database::r#trait::MockDatabase;
383

384
    #[tokio::test]
385
    async fn no_errors_makes_no_call() {
386
        let db = MockDatabase::default();
387
        // Should complete without panicking when no errors are provided
388
        persist_non_fatal_error_logs(&db, "t1", &crate::server::StorageMode::Full, &[], &[]).await;
389
    }
390

391
    #[tokio::test]
392
    async fn archive_only_calls_once() {
393
        let db = MockDatabase::default();
394
        // Should complete without panicking when only archive errors are provided
395
        persist_non_fatal_error_logs(
396
            &db,
397
            "t1",
398
            &crate::server::StorageMode::Archive,
399
            &["a1".into(), "a2".into()],
400
            &[],
401
        )
402
        .await;
403
    }
404

405
    #[tokio::test]
406
    async fn ipfs_only_calls_once() {
407
        let db = MockDatabase::default();
408
        // Should complete without panicking when only IPFS errors are provided
409
        persist_non_fatal_error_logs(
410
            &db,
411
            "t1",
412
            &crate::server::StorageMode::Ipfs,
413
            &[],
414
            &["i1".into()],
415
        )
416
        .await;
417
    }
418

419
    #[tokio::test]
420
    async fn both_calls_once_with_both_logs() {
421
        let db = MockDatabase::default();
422
        // Should complete without panicking when both archive and IPFS errors are provided
423
        persist_non_fatal_error_logs(
424
            &db,
425
            "t1",
426
            &crate::server::StorageMode::Full,
427
            &["a".into()],
428
            &["i1".into(), "i2".into()],
429
        )
430
        .await;
431
    }
432
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc