• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

0xmichalis / nftbk / 18678579964

21 Oct 2025 09:00AM UTC coverage: 37.593% (+0.03%) from 37.563%
18678579964

Pull #79

github

web-flow
Merge 283a4d730 into cce49e9ba
Pull Request #79: fix: avoid marking backups as error on graceful shutdown

0 of 27 new or added lines in 2 files covered. (0.0%)

4 existing lines in 2 files now uncovered.

1721 of 4578 relevant lines covered (37.59%)

7.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/server/database/mod.rs
1
use chrono::{DateTime, Utc};
2
use serde::{Deserialize, Serialize};
3
use sqlx::{postgres::PgPoolOptions, PgPool, Row};
4
use tracing::{info, warn};
5

6
use crate::server::database::r#trait::Database;
7
use crate::server::StorageMode;
8

9
pub mod r#trait;
10

11
#[derive(Debug, Serialize, Deserialize, Clone)]
12
pub struct BackupTask {
13
    pub task_id: String,
14
    pub created_at: DateTime<Utc>,
15
    pub updated_at: DateTime<Utc>,
16
    pub requestor: String,
17
    pub nft_count: i32,
18
    pub tokens: serde_json::Value,
19
    pub archive_status: Option<String>,
20
    pub ipfs_status: Option<String>,
21
    pub archive_error_log: Option<String>,
22
    pub ipfs_error_log: Option<String>,
23
    pub archive_fatal_error: Option<String>,
24
    pub ipfs_fatal_error: Option<String>,
25
    pub storage_mode: String,
26
    pub archive_format: Option<String>,
27
    pub expires_at: Option<DateTime<Utc>>,
28
    pub archive_deleted_at: Option<DateTime<Utc>>,
29
    pub pins_deleted_at: Option<DateTime<Utc>>,
30
}
31

32
#[derive(Debug, Serialize, Deserialize, Clone, utoipa::ToSchema)]
33
#[schema(description = "IPFS pin information for a specific CID")]
34
pub struct PinInfo {
35
    /// Content Identifier (CID) of the pinned content
36
    #[schema(example = "QmYwAPJzv5CZsnA625s3Xf2nemtYgPpHdWEz79ojWnPbdG")]
37
    pub cid: String,
38
    /// IPFS provider type where the content is pinned
39
    #[schema(example = "pinata")]
40
    pub provider_type: String,
41
    /// IPFS provider URL where the content is pinned
42
    #[schema(example = "https://api.pinata.cloud")]
43
    pub provider_url: String,
44
    /// Pin status (pinned, pinning, failed, queued)
45
    #[schema(example = "pinned")]
46
    pub status: String,
47
    /// When the pin was created (ISO 8601 timestamp)
48
    #[schema(example = "2024-01-01T12:00:00Z")]
49
    pub created_at: DateTime<Utc>,
50
}
51

52
#[derive(Debug, Serialize, Deserialize, Clone, utoipa::ToSchema)]
53
#[schema(description = "Token information with associated pin requests")]
54
pub struct TokenWithPins {
55
    /// Blockchain identifier (e.g., ethereum, tezos)
56
    #[schema(example = "ethereum")]
57
    pub chain: String,
58
    /// NFT contract address
59
    #[schema(example = "0x1234567890123456789012345678901234567890")]
60
    pub contract_address: String,
61
    /// NFT token ID
62
    #[schema(example = "123")]
63
    pub token_id: String,
64
    /// List of IPFS pins for this token
65
    pub pins: Vec<PinInfo>,
66
}
67

68
#[derive(Debug, Serialize, Deserialize, Clone)]
69
pub struct PinRow {
70
    pub id: i64,
71
    pub task_id: String,
72
    pub provider_type: String,
73
    pub provider_url: Option<String>,
74
    pub cid: String,
75
    pub request_id: String,
76
    pub pin_status: String,
77
    pub created_at: DateTime<Utc>,
78
}
79

80
#[derive(Debug, Clone)]
81
pub struct ExpiredBackup {
82
    pub task_id: String,
83
    pub archive_format: String,
84
}
85

86
#[derive(Clone)]
87
pub struct Db {
88
    pub pool: PgPool,
89
}
90

91
impl Db {
92
    pub async fn new(database_url: &str, max_connections: u32) -> Self {
×
93
        let pool = PgPoolOptions::new()
×
94
            .max_connections(max_connections)
×
95
            .connect(database_url)
×
96
            .await
×
97
            .expect("Failed to connect to Postgres");
98
        // Health check: run a simple query
99
        sqlx::query("SELECT 1")
×
100
            .execute(&pool)
×
101
            .await
×
102
            .expect("Postgres connection is not healthy");
103
        tracing::info!("Postgres connection is healthy");
×
104
        Db { pool }
105
    }
106

107
    #[allow(clippy::too_many_arguments)]
108
    pub async fn insert_backup_task(
×
109
        &self,
110
        task_id: &str,
111
        requestor: &str,
112
        nft_count: i32,
113
        tokens: &serde_json::Value,
114
        storage_mode: &str,
115
        archive_format: Option<&str>,
116
        retention_days: Option<u64>,
117
    ) -> Result<(), sqlx::Error> {
118
        let mut tx = self.pool.begin().await?;
×
119

120
        // Insert into backup_tasks (tokens JSON removed; tokens are stored in tokens table)
121
        sqlx::query(
122
            r#"
123
            INSERT INTO backup_tasks (
124
                task_id, created_at, updated_at, requestor, nft_count, storage_mode
125
            ) VALUES (
126
                $1, NOW(), NOW(), $2, $3, $4
127
            )
128
            ON CONFLICT (task_id) DO UPDATE SET
129
                updated_at = NOW(),
130
                nft_count = EXCLUDED.nft_count,
131
                storage_mode = EXCLUDED.storage_mode
132
            "#,
133
        )
134
        .bind(task_id)
135
        .bind(requestor)
136
        .bind(nft_count)
137
        .bind(storage_mode)
138
        .execute(&mut *tx)
139
        .await?;
×
140

141
        // Replace tokens for this task with the provided list (idempotent)
142
        // Expecting JSON shape: Vec<crate::server::api::Tokens>
143
        let token_entries: Vec<crate::server::api::Tokens> =
×
144
            serde_json::from_value(tokens.clone()).unwrap_or_default();
145

146
        for entry in &token_entries {
×
147
            for token_str in &entry.tokens {
×
148
                if let Some((contract_address, token_id)) = token_str.split_once(':') {
×
149
                    sqlx::query(
150
                        r#"INSERT INTO tokens (task_id, chain, contract_address, token_id)
151
                           VALUES ($1, $2, $3, $4)
152
                           ON CONFLICT (task_id, chain, contract_address, token_id) DO NOTHING"#,
153
                    )
154
                    .bind(task_id)
155
                    .bind(&entry.chain)
156
                    .bind(contract_address)
157
                    .bind(token_id)
158
                    .execute(&mut *tx)
159
                    .await?;
×
160
                }
161
            }
162
        }
163

164
        // Insert into archive_requests if storage mode includes archive
165
        if storage_mode == "archive" || storage_mode == "full" {
×
166
            let archive_fmt = archive_format.unwrap_or("zip");
×
167

168
            if let Some(days) = retention_days {
×
169
                sqlx::query(
170
                    r#"
171
                    INSERT INTO archive_requests (task_id, archive_format, expires_at, status)
172
                    VALUES ($1, $2, NOW() + make_interval(days => $3::int), 'in_progress')
173
                    ON CONFLICT (task_id) DO UPDATE SET
174
                        archive_format = EXCLUDED.archive_format,
175
                        expires_at = EXCLUDED.expires_at
176
                    "#,
177
                )
178
                .bind(task_id)
179
                .bind(archive_fmt)
180
                .bind(days as i64)
181
                .execute(&mut *tx)
182
                .await?;
×
183
            } else {
184
                sqlx::query(
185
                    r#"
186
                    INSERT INTO archive_requests (task_id, archive_format, expires_at, status)
187
                    VALUES ($1, $2, NULL, 'in_progress')
188
                    ON CONFLICT (task_id) DO UPDATE SET
189
                        archive_format = EXCLUDED.archive_format,
190
                        expires_at = EXCLUDED.expires_at
191
                    "#,
192
                )
193
                .bind(task_id)
×
194
                .bind(archive_fmt)
×
195
                .execute(&mut *tx)
×
196
                .await?;
×
197
            }
198
        }
199

200
        // Insert into pin_requests if storage mode includes IPFS
201
        if storage_mode == "ipfs" || storage_mode == "full" {
×
202
            sqlx::query(
203
                r#"
204
                INSERT INTO pin_requests (task_id, status)
205
                VALUES ($1, 'in_progress')
206
                ON CONFLICT (task_id) DO UPDATE SET
207
                    status = EXCLUDED.status
208
                "#,
209
            )
210
            .bind(task_id)
211
            .execute(&mut *tx)
212
            .await?;
×
213
        }
214

215
        tx.commit().await?;
×
216
        Ok(())
×
217
    }
218

219
    pub async fn delete_backup_task(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
220
        // CASCADE will delete associated archive_requests row if it exists
221
        sqlx::query!("DELETE FROM backup_tasks WHERE task_id = $1", task_id)
×
222
            .execute(&self.pool)
×
223
            .await?;
×
224
        Ok(())
×
225
    }
226

227
    pub async fn set_error_logs(
×
228
        &self,
229
        task_id: &str,
230
        archive_error_log: Option<&str>,
231
        ipfs_error_log: Option<&str>,
232
    ) -> Result<(), sqlx::Error> {
233
        let mut tx = self.pool.begin().await?;
×
234
        if let Some(a) = archive_error_log {
×
235
            sqlx::query("UPDATE archive_requests SET error_log = $2 WHERE task_id = $1")
×
236
                .bind(task_id)
×
237
                .bind(a)
×
238
                .execute(&mut *tx)
×
239
                .await?;
×
240
        }
241
        if let Some(i) = ipfs_error_log {
×
242
            sqlx::query(
243
                r#"
244
                    UPDATE pin_requests
245
                    SET error_log = $2
246
                    WHERE task_id = $1
247
                    "#,
248
            )
249
            .bind(task_id)
×
250
            .bind(i)
×
251
            .execute(&mut *tx)
×
252
            .await?;
×
253
        }
254
        tx.commit().await?;
×
255
        Ok(())
×
256
    }
257

258
    pub async fn update_archive_request_error_log(
×
259
        &self,
260
        task_id: &str,
261
        error_log: &str,
262
    ) -> Result<(), sqlx::Error> {
263
        sqlx::query(
264
            r#"
265
            UPDATE archive_requests
266
            SET error_log = $2
267
            WHERE task_id = $1
268
            "#,
269
        )
270
        .bind(task_id)
×
271
        .bind(error_log)
×
272
        .execute(&self.pool)
×
273
        .await?;
×
274
        Ok(())
×
275
    }
276

277
    pub async fn update_pin_request_error_log(
×
278
        &self,
279
        task_id: &str,
280
        error_log: &str,
281
    ) -> Result<(), sqlx::Error> {
282
        sqlx::query(
283
            r#"
284
            UPDATE pin_requests
285
            SET error_log = $2
286
            WHERE task_id = $1
287
            "#,
288
        )
289
        .bind(task_id)
×
290
        .bind(error_log)
×
291
        .execute(&self.pool)
×
292
        .await?;
×
293
        Ok(())
×
294
    }
295

296
    /// Log a human-friendly backup status based on a machine status value and storage mode
NEW
297
    fn log_status(task_id: &str, status: &str, mode: &StorageMode) {
×
NEW
298
        match status {
×
NEW
299
            "done" => info!("Backup {} ready (storage: {})", task_id, mode.as_str()),
×
NEW
300
            "error" => warn!("Backup {} errored (storage: {})", task_id, mode.as_str()),
×
NEW
301
            _ => (),
×
302
        }
303
    }
304

UNCOV
305
    pub async fn update_pin_request_status(
×
306
        &self,
307
        task_id: &str,
308
        status: &str,
309
    ) -> Result<(), sqlx::Error> {
310
        sqlx::query(
311
            r#"
312
            UPDATE pin_requests
313
            SET status = $2
314
            WHERE task_id = $1
315
            "#,
316
        )
317
        .bind(task_id)
×
318
        .bind(status)
×
319
        .execute(&self.pool)
×
320
        .await?;
×
321

NEW
322
        Self::log_status(task_id, status, &StorageMode::Ipfs);
×
323

UNCOV
324
        Ok(())
×
325
    }
326

327
    pub async fn update_archive_request_status(
×
328
        &self,
329
        task_id: &str,
330
        status: &str,
331
    ) -> Result<(), sqlx::Error> {
332
        sqlx::query(
333
            r#"
334
            UPDATE archive_requests
335
            SET status = $2
336
            WHERE task_id = $1
337
            "#,
338
        )
339
        .bind(task_id)
×
340
        .bind(status)
×
341
        .execute(&self.pool)
×
342
        .await?;
×
343

NEW
344
        Self::log_status(task_id, status, &StorageMode::Archive);
×
345

UNCOV
346
        Ok(())
×
347
    }
348

349
    pub async fn update_archive_request_statuses(
×
350
        &self,
351
        task_ids: &[String],
352
        status: &str,
353
    ) -> Result<(), sqlx::Error> {
354
        if task_ids.is_empty() {
×
355
            return Ok(());
×
356
        }
357

358
        // Use a transaction for atomicity
359
        let mut tx = self.pool.begin().await?;
×
360

361
        // Update each task_id individually with a prepared statement
362
        for task_id in task_ids {
×
363
            sqlx::query("UPDATE archive_requests SET status = $1 WHERE task_id = $2")
×
364
                .bind(status)
×
365
                .bind(task_id)
×
366
                .execute(&mut *tx)
×
367
                .await?;
×
368
        }
369

370
        tx.commit().await?;
×
371
        Ok(())
×
372
    }
373

374
    pub async fn retry_backup(
×
375
        &self,
376
        task_id: &str,
377
        scope: &str,
378
        retention_days: u64,
379
    ) -> Result<(), sqlx::Error> {
380
        let mut tx = self.pool.begin().await?;
×
381

382
        // Reset statuses per requested scope
383
        if scope == "archive" || scope == "full" {
×
384
            sqlx::query(
385
                r#"
386
                UPDATE archive_requests
387
                SET status = 'in_progress', fatal_error = NULL, error_log = NULL
388
                WHERE task_id = $1
389
                "#,
390
            )
391
            .bind(task_id)
×
392
            .execute(&mut *tx)
×
393
            .await?;
×
394
            sqlx::query(
395
                r#"
396
                UPDATE archive_requests
397
                SET expires_at = NOW() + make_interval(days => $2::int)
398
                WHERE task_id = $1
399
                "#,
400
            )
401
            .bind(task_id)
×
402
            .bind(retention_days as i64)
×
403
            .execute(&mut *tx)
×
404
            .await?;
×
405
        }
406
        if scope == "ipfs" || scope == "full" {
×
407
            sqlx::query(
408
                r#"
409
                UPDATE pin_requests
410
                SET status = 'in_progress', fatal_error = NULL, error_log = NULL
411
                WHERE task_id = $1
412
                "#,
413
            )
414
            .bind(task_id)
×
415
            .execute(&mut *tx)
×
416
            .await?;
×
417
        }
418

419
        tx.commit().await?;
×
420
        Ok(())
×
421
    }
422

423
    pub async fn clear_backup_errors(&self, task_id: &str, scope: &str) -> Result<(), sqlx::Error> {
×
424
        let mut tx = self.pool.begin().await?;
×
425
        // Clear archive errors if scope includes archive
426
        sqlx::query(
427
            r#"
428
            UPDATE archive_requests
429
            SET error_log = NULL, fatal_error = NULL
430
            WHERE task_id = $1 AND ($2 IN ('archive', 'full'))
431
            "#,
432
        )
433
        .bind(task_id)
×
434
        .bind(scope)
×
435
        .execute(&mut *tx)
×
436
        .await?;
×
437
        // Clear IPFS errors if scope includes ipfs
438
        sqlx::query(
439
            r#"
440
            UPDATE pin_requests
441
            SET error_log = NULL, fatal_error = NULL
442
            WHERE task_id = $1 AND ($2 IN ('ipfs', 'full'))
443
            "#,
444
        )
445
        .bind(task_id)
×
446
        .bind(scope)
×
447
        .execute(&mut *tx)
×
448
        .await?;
×
449
        tx.commit().await?;
×
450
        Ok(())
×
451
    }
452

453
    pub async fn set_archive_request_error(
×
454
        &self,
455
        task_id: &str,
456
        fatal_error: &str,
457
    ) -> Result<(), sqlx::Error> {
458
        sqlx::query(
459
            r#"
460
            UPDATE archive_requests
461
            SET status = 'error', fatal_error = $2
462
            WHERE task_id = $1
463
            "#,
464
        )
465
        .bind(task_id)
×
466
        .bind(fatal_error)
×
467
        .execute(&self.pool)
×
468
        .await?;
×
469
        Ok(())
×
470
    }
471

472
    pub async fn set_pin_request_error(
×
473
        &self,
474
        task_id: &str,
475
        fatal_error: &str,
476
    ) -> Result<(), sqlx::Error> {
477
        sqlx::query(
478
            r#"
479
            UPDATE pin_requests
480
            SET status = 'error', fatal_error = $2
481
            WHERE task_id = $1
482
            "#,
483
        )
484
        .bind(task_id)
×
485
        .bind(fatal_error)
×
486
        .execute(&self.pool)
×
487
        .await?;
×
488
        Ok(())
×
489
    }
490

491
    pub async fn start_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
492
        let sql = r#"
×
493
            WITH task_mode AS (
×
494
                SELECT storage_mode FROM backup_tasks WHERE task_id = $1
×
495
            ),
496
            touch AS (
×
497
                UPDATE backup_tasks SET updated_at = NOW() WHERE task_id = $1 RETURNING 1
×
498
            ),
499
            ar_inprog AS (
×
500
                SELECT 1 FROM archive_requests ar, task_mode tm
×
501
                WHERE ar.task_id = $1 AND ar.status = 'in_progress'
×
502
                  AND tm.storage_mode IN ('archive','full')
×
503
                LIMIT 1
×
504
            ),
505
            pr_inprog AS (
×
506
                SELECT 1 FROM pin_requests pr, task_mode tm
×
507
                WHERE pr.task_id = $1 AND pr.status = 'in_progress'
×
508
                  AND tm.storage_mode IN ('ipfs','full')
×
509
                LIMIT 1
×
510
            ),
511
            upd_archive AS (
×
512
                UPDATE archive_requests ar
×
513
                SET deleted_at = NOW()
×
514
                WHERE ar.task_id = $1 AND ar.deleted_at IS NULL
×
515
                  AND EXISTS (SELECT 1 FROM task_mode tm WHERE tm.storage_mode IN ('archive','full'))
×
516
                  AND NOT EXISTS (SELECT 1 FROM ar_inprog)
×
517
                RETURNING 1
×
518
            ),
519
            upd_pins AS (
×
520
                UPDATE pin_requests pr
×
521
                SET deleted_at = NOW()
×
522
                WHERE pr.task_id = $1 AND pr.deleted_at IS NULL
×
523
                  AND EXISTS (SELECT 1 FROM task_mode tm WHERE tm.storage_mode IN ('ipfs','full'))
×
524
                  AND NOT EXISTS (SELECT 1 FROM pr_inprog)
×
525
                RETURNING 1
×
526
            )
527
            SELECT EXISTS(SELECT 1 FROM ar_inprog) AS ar_blocked,
×
528
                   EXISTS(SELECT 1 FROM pr_inprog) AS pr_blocked
×
529
        "#;
×
530

531
        let row = sqlx::query(sql).bind(task_id).fetch_one(&self.pool).await?;
×
532
        let ar_blocked: bool = row.get("ar_blocked");
×
533
        let pr_blocked: bool = row.get("pr_blocked");
×
534
        if ar_blocked || pr_blocked {
×
535
            return Err(sqlx::Error::Protocol(
×
536
                "in_progress task cannot be deleted".into(),
×
537
            ));
538
        }
539
        Ok(())
×
540
    }
541

542
    /// Mark archive as being deleted (similar to start_deletion but for archive subresource)
543
    pub async fn start_archive_request_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
544
        let row = sqlx::query(
545
            r#"
546
            WITH ar_inprog AS (
547
                SELECT 1 FROM archive_requests WHERE task_id = $1 AND status = 'in_progress' LIMIT 1
548
            ), upd AS (
549
                UPDATE archive_requests
550
                SET deleted_at = NOW()
551
                WHERE task_id = $1 AND deleted_at IS NULL AND NOT EXISTS (SELECT 1 FROM ar_inprog)
552
                RETURNING 1
553
            )
554
            SELECT EXISTS(SELECT 1 FROM ar_inprog) AS blocked
555
            "#,
556
        )
557
        .bind(task_id)
×
558
        .fetch_one(&self.pool)
×
559
        .await?;
×
560
        let blocked: bool = row.get("blocked");
×
561
        if blocked {
×
562
            return Err(sqlx::Error::Protocol(
×
563
                "in_progress task cannot be deleted".into(),
×
564
            ));
565
        }
566
        Ok(())
×
567
    }
568

569
    /// Mark IPFS pins as being deleted (similar to start_deletion but for IPFS pins subresource)
570
    pub async fn start_pin_request_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
571
        let row = sqlx::query(
572
            r#"
573
            WITH pr_inprog AS (
574
                SELECT 1 FROM pin_requests WHERE task_id = $1 AND status = 'in_progress' LIMIT 1
575
            ), upd AS (
576
                UPDATE pin_requests
577
                SET deleted_at = NOW()
578
                WHERE task_id = $1 AND deleted_at IS NULL AND NOT EXISTS (SELECT 1 FROM pr_inprog)
579
                RETURNING 1
580
            )
581
            SELECT EXISTS(SELECT 1 FROM pr_inprog) AS blocked
582
            "#,
583
        )
584
        .bind(task_id)
×
585
        .fetch_one(&self.pool)
×
586
        .await?;
×
587
        let blocked: bool = row.get("blocked");
×
588
        if blocked {
×
589
            return Err(sqlx::Error::Protocol(
×
590
                "in_progress task cannot be deleted".into(),
×
591
            ));
592
        }
593
        Ok(())
×
594
    }
595

596
    pub async fn get_backup_task(&self, task_id: &str) -> Result<Option<BackupTask>, sqlx::Error> {
×
597
        let row = sqlx::query(
598
            r#"
599
            SELECT 
600
                b.task_id, b.created_at, b.updated_at, b.requestor, b.nft_count,
601
                ar.status as archive_status, ar.fatal_error, b.storage_mode,
602
                ar.archive_format, ar.expires_at, ar.deleted_at as archive_deleted_at,
603
                ar.error_log as archive_error_log,
604
                pr.status as ipfs_status,
605
                pr.error_log as ipfs_error_log,
606
                pr.fatal_error as ipfs_fatal_error,
607
                pr.deleted_at as pins_deleted_at
608
            FROM backup_tasks b
609
            LEFT JOIN archive_requests ar ON b.task_id = ar.task_id
610
            LEFT JOIN pin_requests pr ON b.task_id = pr.task_id
611
            WHERE b.task_id = $1
612
            "#,
613
        )
614
        .bind(task_id)
×
615
        .fetch_optional(&self.pool)
×
616
        .await?;
×
617

618
        if let Some(row) = row {
×
619
            // Fetch tokens for this task from tokens table and aggregate by chain
620
            let token_rows = sqlx::query(
621
                r#"
622
                SELECT chain, contract_address, token_id
623
                FROM tokens
624
                WHERE task_id = $1
625
                ORDER BY chain, contract_address, token_id
626
                "#,
627
            )
628
            .bind(task_id)
629
            .fetch_all(&self.pool)
630
            .await?;
×
631

632
            use std::collections::BTreeMap;
633
            let mut by_chain: BTreeMap<String, Vec<String>> = BTreeMap::new();
634
            for r in token_rows {
×
635
                let chain: String = r.get("chain");
636
                let contract_address: String = r.get("contract_address");
637
                let token_id: String = r.get("token_id");
638
                by_chain
639
                    .entry(chain)
640
                    .or_default()
641
                    .push(format!("{}:{}", contract_address, token_id));
642
            }
643
            let tokens_json = serde_json::json!(by_chain
644
                .into_iter()
645
                .map(|(chain, toks)| serde_json::json!({
646
                    "chain": chain,
×
647
                    "tokens": toks,
×
648
                }))
649
                .collect::<Vec<_>>());
650

651
            Ok(Some(BackupTask {
652
                task_id: row.get("task_id"),
653
                created_at: row.get("created_at"),
654
                updated_at: row.get("updated_at"),
655
                requestor: row.get("requestor"),
656
                nft_count: row.get("nft_count"),
657
                tokens: tokens_json,
658
                archive_status: row
659
                    .try_get::<Option<String>, _>("archive_status")
660
                    .ok()
661
                    .flatten(),
662
                ipfs_status: row
663
                    .try_get::<Option<String>, _>("ipfs_status")
664
                    .ok()
665
                    .flatten(),
666
                archive_error_log: row.get("archive_error_log"),
667
                ipfs_error_log: row.get("ipfs_error_log"),
668
                archive_fatal_error: row.get("fatal_error"),
669
                ipfs_fatal_error: row
670
                    .try_get::<Option<String>, _>("ipfs_fatal_error")
671
                    .ok()
672
                    .flatten(),
673
                storage_mode: row.get("storage_mode"),
674
                archive_format: row.get("archive_format"),
675
                expires_at: row.get("expires_at"),
676
                archive_deleted_at: row.get("archive_deleted_at"),
677
                pins_deleted_at: row.get("pins_deleted_at"),
678
            }))
679
        } else {
680
            Ok(None)
×
681
        }
682
    }
683

684
    /// Fetch backup task plus a paginated slice of its tokens; returns (meta, total_token_count)
685
    pub async fn get_backup_task_with_tokens(
×
686
        &self,
687
        task_id: &str,
688
        limit: i64,
689
        offset: i64,
690
    ) -> Result<Option<(BackupTask, u32)>, sqlx::Error> {
691
        // Base metadata (same as get_backup_task)
692
        let row = sqlx::query(
693
            r#"
694
            SELECT 
695
                b.task_id, b.created_at, b.updated_at, b.requestor, b.nft_count,
696
                ar.status as archive_status, ar.fatal_error, b.storage_mode,
697
                ar.archive_format, ar.expires_at, ar.deleted_at as archive_deleted_at,
698
                ar.error_log as archive_error_log,
699
                pr.status as ipfs_status,
700
                pr.error_log as ipfs_error_log,
701
                pr.fatal_error as ipfs_fatal_error,
702
                pr.deleted_at as pins_deleted_at
703
            FROM backup_tasks b
704
            LEFT JOIN archive_requests ar ON b.task_id = ar.task_id
705
            LEFT JOIN pin_requests pr ON b.task_id = pr.task_id
706
            WHERE b.task_id = $1
707
            "#,
708
        )
709
        .bind(task_id)
×
710
        .fetch_optional(&self.pool)
×
711
        .await?;
×
712

713
        let Some(row) = row else { return Ok(None) };
×
714

715
        // Total tokens for pagination
716
        let total_row = sqlx::query!(
×
717
            r#"SELECT COUNT(*) as count FROM tokens WHERE task_id = $1"#,
718
            task_id
719
        )
720
        .fetch_one(&self.pool)
×
721
        .await?;
×
722
        let total: u32 = total_row.count.unwrap_or(0) as u32;
×
723

724
        // Page of tokens
725
        let token_rows = sqlx::query(
726
            r#"
727
            SELECT chain, contract_address, token_id
728
            FROM tokens
729
            WHERE task_id = $1
730
            ORDER BY chain, contract_address, token_id
731
            LIMIT $2 OFFSET $3
732
            "#,
733
        )
734
        .bind(task_id)
×
735
        .bind(limit)
×
736
        .bind(offset)
×
737
        .fetch_all(&self.pool)
×
738
        .await?;
×
739

740
        use std::collections::BTreeMap;
741
        let mut by_chain: BTreeMap<String, Vec<String>> = BTreeMap::new();
×
742
        for r in token_rows {
×
743
            let chain: String = r.get("chain");
×
744
            let contract_address: String = r.get("contract_address");
×
745
            let token_id: String = r.get("token_id");
×
746
            by_chain
×
747
                .entry(chain)
×
748
                .or_default()
749
                .push(format!("{}:{}", contract_address, token_id));
×
750
        }
751
        let tokens_json = serde_json::json!(by_chain
×
752
            .into_iter()
×
753
            .map(|(chain, toks)| serde_json::json!({ "chain": chain, "tokens": toks }))
×
754
            .collect::<Vec<_>>());
×
755

756
        let meta = BackupTask {
757
            task_id: row.get("task_id"),
×
758
            created_at: row.get("created_at"),
×
759
            updated_at: row.get("updated_at"),
×
760
            requestor: row.get("requestor"),
×
761
            nft_count: row.get("nft_count"),
×
762
            tokens: tokens_json,
763
            archive_status: row
×
764
                .try_get::<Option<String>, _>("archive_status")
765
                .ok()
766
                .flatten(),
767
            ipfs_status: row
×
768
                .try_get::<Option<String>, _>("ipfs_status")
769
                .ok()
770
                .flatten(),
771
            archive_error_log: row.get("archive_error_log"),
×
772
            ipfs_error_log: row.get("ipfs_error_log"),
×
773
            archive_fatal_error: row.get("fatal_error"),
×
774
            ipfs_fatal_error: row
×
775
                .try_get::<Option<String>, _>("ipfs_fatal_error")
776
                .ok()
777
                .flatten(),
778
            storage_mode: row.get("storage_mode"),
×
779
            archive_format: row.get("archive_format"),
×
780
            expires_at: row.get("expires_at"),
×
781
            archive_deleted_at: row.get("archive_deleted_at"),
×
782
            pins_deleted_at: row.get("pins_deleted_at"),
×
783
        };
784

785
        Ok(Some((meta, total)))
×
786
    }
787

788
    pub async fn list_requestor_backup_tasks_paginated(
×
789
        &self,
790
        requestor: &str,
791
        limit: i64,
792
        offset: i64,
793
    ) -> Result<(Vec<BackupTask>, u32), sqlx::Error> {
794
        // Total count
795
        let total_row = sqlx::query!(
×
796
            r#"SELECT COUNT(*) as count FROM backup_tasks b WHERE b.requestor = $1"#,
797
            requestor
798
        )
799
        .fetch_one(&self.pool)
×
800
        .await?;
×
801
        let total: u32 = total_row.count.unwrap_or(0) as u32;
×
802

803
        let rows = sqlx::query(
804
            r#"
805
            SELECT 
806
                b.task_id, b.created_at, b.updated_at, b.requestor, b.nft_count,
807
                ar.status as archive_status, ar.fatal_error, b.storage_mode,
808
                ar.archive_format, ar.expires_at, ar.deleted_at as archive_deleted_at,
809
                ar.error_log as archive_error_log,
810
                pr.status as ipfs_status,
811
                pr.error_log as ipfs_error_log,
812
                pr.fatal_error as ipfs_fatal_error,
813
                pr.deleted_at as pins_deleted_at
814
            FROM backup_tasks b
815
            LEFT JOIN archive_requests ar ON b.task_id = ar.task_id
816
            LEFT JOIN pin_requests pr ON b.task_id = pr.task_id
817
            WHERE b.requestor = $1
818
            ORDER BY b.created_at DESC
819
            LIMIT $2 OFFSET $3
820
            "#,
821
        )
822
        .bind(requestor)
×
823
        .bind(limit)
×
824
        .bind(offset)
×
825
        .fetch_all(&self.pool)
×
826
        .await?;
×
827

828
        let recs = rows
×
829
            .into_iter()
830
            .map(|row| {
×
831
                let task_id: String = row.get("task_id");
×
832

833
                BackupTask {
×
834
                    task_id,
×
835
                    created_at: row.get("created_at"),
×
836
                    updated_at: row.get("updated_at"),
×
837
                    requestor: row.get("requestor"),
×
838
                    nft_count: row.get("nft_count"),
×
839
                    // Client should use get_backup_task to get tokens so the tokens
840
                    // can be properly paginated.
841
                    tokens: serde_json::Value::Null,
×
842
                    archive_status: row
×
843
                        .try_get::<Option<String>, _>("archive_status")
×
844
                        .ok()
×
845
                        .flatten(),
×
846
                    ipfs_status: row
×
847
                        .try_get::<Option<String>, _>("ipfs_status")
×
848
                        .ok()
×
849
                        .flatten(),
×
850
                    archive_error_log: row.get("archive_error_log"),
×
851
                    ipfs_error_log: row.get("ipfs_error_log"),
×
852
                    archive_fatal_error: row.get("fatal_error"),
×
853
                    ipfs_fatal_error: row
×
854
                        .try_get::<Option<String>, _>("ipfs_fatal_error")
×
855
                        .ok()
×
856
                        .flatten(),
×
857
                    storage_mode: row.get("storage_mode"),
×
858
                    archive_format: row.get("archive_format"),
×
859
                    expires_at: row.get("expires_at"),
×
860
                    archive_deleted_at: row.get("archive_deleted_at"),
×
861
                    pins_deleted_at: row.get("pins_deleted_at"),
×
862
                }
863
            })
864
            .collect();
865

866
        Ok((recs, total))
×
867
    }
868

869
    pub async fn list_unprocessed_expired_backups(
×
870
        &self,
871
    ) -> Result<Vec<ExpiredBackup>, sqlx::Error> {
872
        let rows = sqlx::query(
873
            r#"
874
            SELECT b.task_id, ar.archive_format 
875
            FROM backup_tasks b
876
            JOIN archive_requests ar ON b.task_id = ar.task_id
877
            WHERE ar.expires_at IS NOT NULL AND ar.expires_at < NOW() AND ar.status != 'expired'
878
            "#,
879
        )
880
        .fetch_all(&self.pool)
×
881
        .await?;
×
882
        let recs = rows
×
883
            .into_iter()
884
            .map(|row| ExpiredBackup {
×
885
                task_id: row.get("task_id"),
×
886
                archive_format: row.get("archive_format"),
×
887
            })
888
            .collect();
889
        Ok(recs)
×
890
    }
891

892
    /// Retrieve all backup tasks that are in 'in_progress' status
893
    /// This is used to recover incomplete tasks on server restart
894
    pub async fn get_incomplete_backup_tasks(&self) -> Result<Vec<BackupTask>, sqlx::Error> {
×
895
        let rows = sqlx::query(
896
            r#"
897
            SELECT 
898
                b.task_id, b.created_at, b.updated_at, b.requestor, b.nft_count,
899
                ar.status as archive_status, ar.fatal_error, b.storage_mode,
900
                ar.archive_format, ar.expires_at, ar.deleted_at as archive_deleted_at,
901
                ar.error_log as archive_error_log,
902
                pr.status as ipfs_status,
903
                pr.error_log as ipfs_error_log,
904
                pr.deleted_at as pins_deleted_at
905
            FROM backup_tasks b
906
            LEFT JOIN archive_requests ar ON b.task_id = ar.task_id
907
            LEFT JOIN pin_requests pr ON b.task_id = pr.task_id
908
            WHERE (
909
                -- Archive-only mode: check archive status (record must exist and be in_progress)
910
                (b.storage_mode = 'archive' AND ar.status = 'in_progress')
911
                OR
912
                -- IPFS-only mode: check IPFS status (record must exist and be in_progress)
913
                (b.storage_mode = 'ipfs' AND pr.status = 'in_progress')
914
                OR
915
                -- Full mode: check both archive and IPFS status (task is incomplete if either is in_progress)
916
                (b.storage_mode = 'full' AND (ar.status = 'in_progress' OR pr.status = 'in_progress'))
917
            )
918
            ORDER BY b.created_at ASC
919
            "#,
920
        )
921
        .fetch_all(&self.pool)
×
922
        .await?;
×
923

924
        // If no incomplete tasks, return early
925
        if rows.is_empty() {
×
926
            return Ok(Vec::new());
×
927
        }
928

929
        // Collect task_ids to fetch tokens in bulk
930
        let task_ids: Vec<String> = rows.iter().map(|r| r.get::<String, _>("task_id")).collect();
×
931

932
        // Fetch all tokens for these tasks and aggregate by task_id and chain
933
        use std::collections::BTreeMap;
934
        let mut tokens_by_task: BTreeMap<String, BTreeMap<String, Vec<String>>> = BTreeMap::new();
×
935

936
        let token_rows = sqlx::query(
937
            r#"
938
            SELECT task_id, chain, contract_address, token_id
939
            FROM tokens
940
            WHERE task_id = ANY($1)
941
            ORDER BY chain, contract_address, token_id
942
            "#,
943
        )
944
        .bind(&task_ids)
×
945
        .fetch_all(&self.pool)
×
946
        .await?;
×
947

948
        for r in token_rows {
×
949
            let task_id: String = r.get("task_id");
×
950
            let chain: String = r.get("chain");
×
951
            let contract_address: String = r.get("contract_address");
×
952
            let token_id: String = r.get("token_id");
×
953
            tokens_by_task
×
954
                .entry(task_id)
×
955
                .or_default()
956
                .entry(chain)
×
957
                .or_default()
958
                .push(format!("{}:{}", contract_address, token_id));
×
959
        }
960

961
        let recs = rows
×
962
            .into_iter()
963
            .map(|row| {
×
964
                let task_id: String = row.get("task_id");
×
965
                let tokens_json = if let Some(by_chain) = tokens_by_task.get(&task_id) {
×
966
                    serde_json::json!(by_chain
×
967
                        .iter()
×
968
                        .map(|(chain, toks)| serde_json::json!({
×
969
                            "chain": chain,
×
970
                            "tokens": toks,
×
971
                        }))
972
                        .collect::<Vec<_>>())
×
973
                } else {
974
                    // No tokens recorded for this task
975
                    serde_json::json!([])
×
976
                };
977

978
                BackupTask {
×
979
                    task_id,
×
980
                    created_at: row.get("created_at"),
×
981
                    updated_at: row.get("updated_at"),
×
982
                    requestor: row.get("requestor"),
×
983
                    nft_count: row.get("nft_count"),
×
984
                    tokens: tokens_json,
×
985
                    archive_status: row
×
986
                        .try_get::<Option<String>, _>("archive_status")
×
987
                        .ok()
×
988
                        .flatten(),
×
989
                    ipfs_status: row
×
990
                        .try_get::<Option<String>, _>("ipfs_status")
×
991
                        .ok()
×
992
                        .flatten(),
×
993
                    archive_error_log: row.get("archive_error_log"),
×
994
                    ipfs_error_log: row.get("ipfs_error_log"),
×
995
                    archive_fatal_error: row.get("fatal_error"),
×
996
                    ipfs_fatal_error: None,
×
997
                    storage_mode: row.get("storage_mode"),
×
998
                    archive_format: row.get("archive_format"),
×
999
                    expires_at: row.get("expires_at"),
×
1000
                    archive_deleted_at: row.get("archive_deleted_at"),
×
1001
                    pins_deleted_at: row.get("pins_deleted_at"),
×
1002
                }
1003
            })
1004
            .collect();
1005

1006
        Ok(recs)
×
1007
    }
1008

1009
    /// Insert pins and their associated tokens in a single atomic transaction
1010
    pub async fn insert_pins_with_tokens(
×
1011
        &self,
1012
        task_id: &str,
1013
        token_pin_mappings: &[crate::TokenPinMapping],
1014
    ) -> Result<(), sqlx::Error> {
1015
        if token_pin_mappings.is_empty() {
×
1016
            return Ok(());
×
1017
        }
1018

1019
        // Collect all pin responses and prepare token data
1020
        let mut all_pin_responses = Vec::new();
×
1021
        let mut all_token_data = Vec::new(); // (index_in_pin_responses, chain, contract_address, token_id)
×
1022

1023
        for mapping in token_pin_mappings {
×
1024
            for pin_response in &mapping.pin_responses {
×
1025
                let index = all_pin_responses.len();
×
1026
                all_pin_responses.push(pin_response);
×
1027
                all_token_data.push((
×
1028
                    index,
×
1029
                    mapping.chain.clone(),
×
1030
                    mapping.contract_address.clone(),
×
1031
                    mapping.token_id.clone(),
×
1032
                ));
1033
            }
1034
        }
1035

1036
        if all_pin_responses.is_empty() {
×
1037
            return Ok(());
×
1038
        }
1039

1040
        // Start a transaction for atomicity
1041
        let mut tx = self.pool.begin().await?;
×
1042

1043
        // Insert pins one by one and collect generated IDs
1044
        let mut pin_ids: Vec<i64> = Vec::new();
×
1045
        for pin_response in &all_pin_responses {
×
1046
            // Map status enum to lowercase string to satisfy CHECK constraint
1047
            let status = match pin_response.status {
×
1048
                crate::ipfs::PinResponseStatus::Queued => "queued",
×
1049
                crate::ipfs::PinResponseStatus::Pinning => "pinning",
×
1050
                crate::ipfs::PinResponseStatus::Pinned => "pinned",
×
1051
                crate::ipfs::PinResponseStatus::Failed => "failed",
×
1052
            };
1053

1054
            let row = sqlx::query(
1055
                "INSERT INTO pins (task_id, provider_type, provider_url, cid, request_id, pin_status) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id"
1056
            )
1057
            .bind(task_id)
×
1058
            .bind(&pin_response.provider_type)
×
1059
            .bind(&pin_response.provider_url)
×
1060
            .bind(&pin_response.cid)
×
1061
            .bind(&pin_response.id)
×
1062
            .bind(status)
×
1063
            .fetch_one(&mut *tx)
×
1064
            .await?;
×
1065

1066
            pin_ids.push(row.get("id"));
×
1067
        }
1068

1069
        // Resolve token_ids and update pins rows with token_id
1070
        for (index, chain, contract_address, token_id) in &all_token_data {
×
1071
            // Ensure token row exists and fetch its id
1072
            let inserted = sqlx::query(
1073
                r#"INSERT INTO tokens (task_id, chain, contract_address, token_id)
1074
                   VALUES ($1, $2, $3, $4)
1075
                   ON CONFLICT (task_id, chain, contract_address, token_id) DO NOTHING
1076
                   RETURNING id"#,
1077
            )
1078
            .bind(task_id)
×
1079
            .bind(chain)
×
1080
            .bind(contract_address)
×
1081
            .bind(token_id)
×
1082
            .fetch_optional(&mut *tx)
×
1083
            .await?;
×
1084

1085
            let tok_id: i64 = if let Some(row) = inserted {
×
1086
                row.get("id")
×
1087
            } else {
1088
                sqlx::query("SELECT id FROM tokens WHERE task_id = $1 AND chain = $2 AND contract_address = $3 AND token_id = $4")
×
1089
                    .bind(task_id)
×
1090
                    .bind(chain)
×
1091
                    .bind(contract_address)
×
1092
                    .bind(token_id)
×
1093
                    .fetch_one(&mut *tx)
×
1094
                    .await?
×
1095
                    .get("id")
1096
            };
1097

1098
            sqlx::query("UPDATE pins SET token_id = $2 WHERE id = $1")
×
1099
                .bind(pin_ids[*index])
×
1100
                .bind(tok_id)
×
1101
                .execute(&mut *tx)
×
1102
                .await?;
×
1103
        }
1104

1105
        // Commit the transaction
1106
        tx.commit().await?;
×
1107
        Ok(())
×
1108
    }
1109

1110
    /// Get all pins for a specific backup task
1111
    pub async fn get_pins_by_task_id(&self, task_id: &str) -> Result<Vec<PinRow>, sqlx::Error> {
×
1112
        let rows = sqlx::query(
1113
            r#"
1114
            SELECT id, task_id, provider_type, provider_url, cid, request_id, pin_status, created_at
1115
            FROM pins
1116
            WHERE task_id = $1
1117
            ORDER BY id
1118
            "#,
1119
        )
1120
        .bind(task_id)
×
1121
        .fetch_all(&self.pool)
×
1122
        .await?;
×
1123

1124
        Ok(rows
×
1125
            .into_iter()
×
1126
            .map(|row| PinRow {
×
1127
                id: row.get("id"),
×
1128
                task_id: row.get("task_id"),
×
1129
                provider_type: row.get("provider_type"),
×
1130
                provider_url: row
×
1131
                    .try_get::<Option<String>, _>("provider_url")
×
1132
                    .ok()
×
1133
                    .flatten(),
×
1134
                cid: row.get("cid"),
×
1135
                request_id: row.get("request_id"),
×
1136
                pin_status: row.get("pin_status"),
×
1137
                created_at: row.get("created_at"),
×
1138
            })
1139
            .collect())
×
1140
    }
1141

1142
    /// Paginated pinned tokens grouped by (chain, contract_address, token_id)
1143
    pub async fn get_pinned_tokens_by_requestor(
×
1144
        &self,
1145
        requestor: &str,
1146
        limit: i64,
1147
        offset: i64,
1148
    ) -> Result<(Vec<TokenWithPins>, u32), sqlx::Error> {
1149
        // Total distinct tokens for this requestor
1150
        let total_row = sqlx::query(
1151
            r#"
1152
            SELECT COUNT(*) as count
1153
            FROM (
1154
                SELECT DISTINCT t.chain, t.contract_address, t.token_id
1155
                FROM tokens t
1156
                JOIN pins p ON p.token_id = t.id
1157
                JOIN backup_tasks bt ON bt.task_id = p.task_id
1158
                WHERE bt.requestor = $1
1159
            ) t
1160
            "#,
1161
        )
1162
        .bind(requestor)
×
1163
        .fetch_one(&self.pool)
×
1164
        .await?;
×
1165
        let total: u32 = (total_row.get::<i64, _>("count")).max(0) as u32;
1166

1167
        // Page of distinct tokens ordered by most recent pin time
1168
        let rows = sqlx::query(
1169
            r#"
1170
            SELECT t.chain, t.contract_address, t.token_id
1171
            FROM (
1172
                SELECT t.chain, t.contract_address, t.token_id, MAX(p.created_at) AS last_created
1173
                FROM tokens t
1174
                JOIN pins p ON p.token_id = t.id
1175
                JOIN backup_tasks bt ON bt.task_id = p.task_id
1176
                WHERE bt.requestor = $1
1177
                GROUP BY t.chain, t.contract_address, t.token_id
1178
            ) t
1179
            ORDER BY last_created DESC
1180
            LIMIT $2 OFFSET $3
1181
            "#,
1182
        )
1183
        .bind(requestor)
1184
        .bind(limit)
1185
        .bind(offset)
1186
        .fetch_all(&self.pool)
1187
        .await?;
×
1188

1189
        // For each token key, fetch pins (ordered by created_at desc)
1190
        let mut result: Vec<TokenWithPins> = Vec::new();
1191
        for r in rows {
×
1192
            let token_rows = sqlx::query(
1193
                r#"
1194
                SELECT t.chain, t.contract_address, t.token_id,
1195
                       p.cid, p.provider_type, p.provider_url, p.pin_status, p.created_at
1196
                FROM tokens t
1197
                JOIN pins p ON p.token_id = t.id
1198
                JOIN backup_tasks bt ON bt.task_id = p.task_id
1199
                WHERE bt.requestor = $1
1200
                  AND t.chain = $2
1201
                  AND t.contract_address = $3
1202
                  AND t.token_id = $4
1203
                ORDER BY p.created_at DESC
1204
                "#,
1205
            )
1206
            .bind(requestor)
×
1207
            .bind(r.get::<String, _>("chain"))
×
1208
            .bind(r.get::<String, _>("contract_address"))
×
1209
            .bind(r.get::<String, _>("token_id"))
×
1210
            .fetch_all(&self.pool)
×
1211
            .await?;
×
1212

1213
            let mut pins: Vec<PinInfo> = Vec::new();
1214
            let mut chain = String::new();
1215
            let mut contract_address = String::new();
1216
            let mut token_id = String::new();
1217
            for row in token_rows {
×
1218
                chain = row.get("chain");
1219
                contract_address = row.get("contract_address");
1220
                token_id = row.get("token_id");
1221
                let cid: String = row.get("cid");
1222
                let provider_type: String = row.get("provider_type");
1223
                let provider_url: String = row
1224
                    .try_get::<Option<String>, _>("provider_url")
1225
                    .ok()
1226
                    .flatten()
1227
                    .unwrap_or_default();
1228
                let status: String = row.get("pin_status");
1229
                let created_at: DateTime<Utc> = row.get("created_at");
1230
                pins.push(PinInfo {
1231
                    cid,
1232
                    provider_type,
1233
                    provider_url,
1234
                    status,
1235
                    created_at,
1236
                });
1237
            }
1238
            result.push(TokenWithPins {
1239
                chain,
1240
                contract_address,
1241
                token_id,
1242
                pins,
1243
            });
1244
        }
1245

1246
        Ok((result, total))
×
1247
    }
1248

1249
    /// Get a specific pinned token for a requestor
1250
    pub async fn get_pinned_token_by_requestor(
×
1251
        &self,
1252
        requestor: &str,
1253
        chain: &str,
1254
        contract_address: &str,
1255
        token_id: &str,
1256
    ) -> Result<Option<TokenWithPins>, sqlx::Error> {
1257
        let query = r#"
×
1258
            SELECT t.chain, t.contract_address, t.token_id,
×
1259
                   p.cid, p.provider_type, p.provider_url, p.pin_status, p.created_at
×
1260
            FROM tokens t
×
1261
            JOIN pins p ON p.token_id = t.id
×
1262
            JOIN backup_tasks bt ON bt.task_id = p.task_id
×
1263
            WHERE bt.requestor = $1
×
1264
              AND t.chain = $2
×
1265
              AND t.contract_address = $3
×
1266
              AND t.token_id = $4
×
1267
            ORDER BY p.created_at DESC
×
1268
        "#;
×
1269

1270
        let rows = sqlx::query(query)
×
1271
            .bind(requestor)
×
1272
            .bind(chain)
×
1273
            .bind(contract_address)
×
1274
            .bind(token_id)
×
1275
            .fetch_all(&self.pool)
×
1276
            .await?;
×
1277

1278
        if rows.is_empty() {
×
1279
            return Ok(None);
×
1280
        }
1281

1282
        let mut pins = Vec::new();
×
1283
        let mut token_chain = String::new();
×
1284
        let mut token_contract_address = String::new();
×
1285
        let mut token_token_id = String::new();
×
1286

1287
        for row in rows {
×
1288
            token_chain = row.get("chain");
×
1289
            token_contract_address = row.get("contract_address");
×
1290
            token_token_id = row.get("token_id");
×
1291
            let cid: String = row.get("cid");
×
1292
            let provider_type: String = row.get("provider_type");
×
1293
            // provider_url may be NULL for legacy rows; default to empty string for API stability
1294
            let provider_url: String = row
×
1295
                .try_get::<Option<String>, _>("provider_url")
1296
                .ok()
1297
                .flatten()
1298
                .unwrap_or_default();
1299
            let status: String = row.get("pin_status");
×
1300
            let created_at: DateTime<Utc> = row.get("created_at");
×
1301

1302
            pins.push(PinInfo {
×
1303
                cid,
×
1304
                provider_type,
×
1305
                provider_url,
×
1306
                status,
×
1307
                created_at,
×
1308
            });
1309
        }
1310

1311
        Ok(Some(TokenWithPins {
×
1312
            chain: token_chain,
×
1313
            contract_address: token_contract_address,
×
1314
            token_id: token_token_id,
×
1315
            pins,
×
1316
        }))
1317
    }
1318

1319
    /// Get all pins that are in 'queued' or 'pinning' status
1320
    /// This is used by the pin monitor to check for status updates
1321
    pub async fn get_active_pins(&self) -> Result<Vec<PinRow>, sqlx::Error> {
×
1322
        let rows = sqlx::query(
1323
            r#"
1324
            SELECT id, task_id, provider_type, provider_url, cid, request_id, pin_status, created_at
1325
            FROM pins
1326
            WHERE pin_status IN ('queued', 'pinning')
1327
            ORDER BY id
1328
            "#,
1329
        )
1330
        .fetch_all(&self.pool)
×
1331
        .await?;
×
1332

1333
        Ok(rows
×
1334
            .into_iter()
×
1335
            .map(|row| PinRow {
×
1336
                id: row.get("id"),
×
1337
                task_id: row.get("task_id"),
×
1338
                provider_type: row.get("provider_type"),
×
1339
                provider_url: row
×
1340
                    .try_get::<Option<String>, _>("provider_url")
×
1341
                    .ok()
×
1342
                    .flatten(),
×
1343
                cid: row.get("cid"),
×
1344
                request_id: row.get("request_id"),
×
1345
                pin_status: row.get("pin_status"),
×
1346
                created_at: row.get("created_at"),
×
1347
            })
1348
            .collect())
×
1349
    }
1350

1351
    /// Set backup fatal error for relevant subresources in a single SQL statement.
1352
    /// The update is based on the `storage_mode` value from the `backup_tasks` table for the given `task_id`:
1353
    /// - If storage_mode is 'archive' or 'full': updates archive_requests.status and archive_requests.fatal_error
1354
    /// - If storage_mode is 'ipfs' or 'full': updates pin_requests.status and pin_requests.fatal_error
1355
    pub async fn set_backup_error(
×
1356
        &self,
1357
        task_id: &str,
1358
        fatal_error: &str,
1359
    ) -> Result<(), sqlx::Error> {
1360
        let sql = r#"
×
1361
            WITH task_mode AS (
×
1362
                SELECT storage_mode FROM backup_tasks WHERE task_id = $1
×
1363
            ),
1364
            upd_archive AS (
×
1365
                UPDATE archive_requests ar
×
1366
                SET status = 'error', fatal_error = $2
×
1367
                WHERE ar.task_id = $1
×
1368
                  AND EXISTS (
×
1369
                      SELECT 1 FROM task_mode tm
×
1370
                      WHERE tm.storage_mode IN ('archive', 'full')
×
1371
                  )
1372
                RETURNING 1
×
1373
            ),
1374
            upd_pins AS (
×
1375
                UPDATE pin_requests pr
×
1376
                SET status = 'error', fatal_error = $2
×
1377
                WHERE pr.task_id = $1
×
1378
                  AND EXISTS (
×
1379
                      SELECT 1 FROM task_mode tm
×
1380
                      WHERE tm.storage_mode IN ('ipfs', 'full')
×
1381
                  )
1382
                RETURNING 1
×
1383
            )
1384
            SELECT COALESCE((SELECT COUNT(*) FROM upd_archive), 0) AS archive_updates,
×
1385
                   COALESCE((SELECT COUNT(*) FROM upd_pins), 0)     AS pin_updates
×
1386
        "#;
×
1387
        sqlx::query(sql)
×
1388
            .bind(task_id)
×
1389
            .bind(fatal_error)
×
1390
            .execute(&self.pool)
×
1391
            .await?;
×
1392
        Ok(())
×
1393
    }
1394

1395
    /// Update backup subresource statuses for the task based on its storage mode
1396
    /// - archive or full: updates archive_requests.status
1397
    /// - ipfs or full: updates pin_requests.status
1398
    pub async fn update_backup_statuses(
×
1399
        &self,
1400
        task_id: &str,
1401
        scope: &str,
1402
        archive_status: &str,
1403
        ipfs_status: &str,
1404
    ) -> Result<(), sqlx::Error> {
1405
        let sql = r#"
×
1406
            WITH upd_archive AS (
×
1407
                UPDATE archive_requests ar
×
1408
                SET status = $2
×
1409
                WHERE ar.task_id = $1
×
1410
                  AND ($4 IN ('archive', 'full'))
×
1411
                RETURNING 1
×
1412
            ),
1413
            upd_pins AS (
×
1414
                UPDATE pin_requests pr
×
1415
                SET status = $3
×
1416
                WHERE pr.task_id = $1
×
1417
                  AND ($4 IN ('ipfs', 'full'))
×
1418
                RETURNING 1
×
1419
            )
1420
            SELECT COALESCE((SELECT COUNT(*) FROM upd_archive), 0) AS archive_updates,
×
1421
                   COALESCE((SELECT COUNT(*) FROM upd_pins), 0)     AS pin_updates
×
1422
        "#;
×
1423
        sqlx::query(sql)
×
1424
            .bind(task_id)
×
1425
            .bind(archive_status)
×
1426
            .bind(ipfs_status)
×
1427
            .bind(scope)
×
1428
            .execute(&self.pool)
×
1429
            .await?;
×
1430
        Ok(())
×
1431
    }
1432

1433
    pub async fn update_pin_statuses(&self, updates: &[(i64, String)]) -> Result<(), sqlx::Error> {
×
1434
        if updates.is_empty() {
×
1435
            return Ok(());
×
1436
        }
1437

1438
        let mut tx = self.pool.begin().await?;
×
1439

1440
        for (id, status) in updates {
×
1441
            sqlx::query(
1442
                r#"
1443
                UPDATE pins
1444
                SET pin_status = $2
1445
                WHERE id = $1
1446
                "#,
1447
            )
1448
            .bind(id)
×
1449
            .bind(status)
×
1450
            .execute(&mut *tx)
×
1451
            .await?;
×
1452
        }
1453

1454
        tx.commit().await?;
×
1455
        Ok(())
×
1456
    }
1457

1458
    /// Ensure the missing subresource exists and upgrade the backup to full storage mode.
1459
    /// If `add_archive` is true, create/ensure archive_requests row with provided format/retention.
1460
    /// Otherwise, ensure pin_requests row exists. Always flips backup_tasks.storage_mode to 'full'.
1461
    pub async fn upgrade_backup_to_full(
×
1462
        &self,
1463
        task_id: &str,
1464
        add_archive: bool,
1465
        archive_format: Option<&str>,
1466
        retention_days: Option<u64>,
1467
    ) -> Result<(), sqlx::Error> {
1468
        let mut tx = self.pool.begin().await?;
×
1469

1470
        // Upgrade storage mode to full
1471
        sqlx::query(
1472
            r#"
1473
            UPDATE backup_tasks
1474
            SET storage_mode = 'full', updated_at = NOW()
1475
            WHERE task_id = $1
1476
            "#,
1477
        )
1478
        .bind(task_id)
1479
        .execute(&mut *tx)
1480
        .await?;
×
1481

1482
        if add_archive {
×
1483
            let fmt = archive_format.unwrap_or("zip");
×
1484
            if let Some(days) = retention_days {
×
1485
                sqlx::query(
1486
                    r#"
1487
                    INSERT INTO archive_requests (task_id, archive_format, expires_at, status)
1488
                    VALUES ($1, $2, NOW() + make_interval(days => $3::int), 'in_progress')
1489
                    ON CONFLICT (task_id) DO NOTHING
1490
                    "#,
1491
                )
1492
                .bind(task_id)
1493
                .bind(fmt)
1494
                .bind(days as i64)
1495
                .execute(&mut *tx)
1496
                .await?;
×
1497
            } else {
1498
                sqlx::query(
1499
                    r#"
1500
                    INSERT INTO archive_requests (task_id, archive_format, expires_at, status)
1501
                    VALUES ($1, $2, NULL, 'in_progress')
1502
                    ON CONFLICT (task_id) DO NOTHING
1503
                    "#,
1504
                )
1505
                .bind(task_id)
×
1506
                .bind(fmt)
×
1507
                .execute(&mut *tx)
×
1508
                .await?;
×
1509
            }
1510
        } else {
1511
            sqlx::query(
1512
                r#"
1513
                INSERT INTO pin_requests (task_id, status)
1514
                VALUES ($1, 'in_progress')
1515
                ON CONFLICT (task_id) DO NOTHING
1516
                "#,
1517
            )
1518
            .bind(task_id)
1519
            .execute(&mut *tx)
1520
            .await?;
×
1521
        }
1522

1523
        tx.commit().await?;
×
1524
        Ok(())
×
1525
    }
1526

1527
    /// Complete archive deletion:
1528
    /// - If current storage_mode is 'archive', delete the whole backup (finalize deletion)
1529
    /// - Else if current storage_mode is 'full', flip to 'ipfs' to reflect archive removed
1530
    pub async fn complete_archive_request_deletion(
×
1531
        &self,
1532
        task_id: &str,
1533
    ) -> Result<(), sqlx::Error> {
1534
        // Atomically: delete when archive-only; else if full, flip to ipfs
1535
        let sql = r#"
×
1536
            WITH del AS (
×
1537
                DELETE FROM backup_tasks
×
1538
                WHERE task_id = $1 AND storage_mode = 'archive'
×
1539
                RETURNING 1
×
1540
            ), upd AS (
×
1541
                UPDATE backup_tasks
×
1542
                SET storage_mode = 'ipfs', updated_at = NOW()
×
1543
                WHERE task_id = $1 AND storage_mode = 'full' AND NOT EXISTS (SELECT 1 FROM del)
×
1544
                RETURNING 1
×
1545
            )
1546
            SELECT COALESCE((SELECT COUNT(*) FROM del), 0) AS deleted,
×
1547
                   COALESCE((SELECT COUNT(*) FROM upd), 0) AS updated
×
1548
        "#;
×
1549
        let _ = sqlx::query(sql).bind(task_id).execute(&self.pool).await?;
×
1550
        Ok(())
×
1551
    }
1552

1553
    /// Complete IPFS pins deletion:
1554
    /// - If current storage_mode is 'ipfs', delete the whole backup (finalize deletion)
1555
    /// - Else if current storage_mode is 'full', flip to 'archive' to reflect pins removed
1556
    pub async fn complete_pin_request_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
1557
        // Atomically: delete when ipfs-only; else if full, flip to archive
1558
        let sql = r#"
×
1559
            WITH del AS (
×
1560
                DELETE FROM backup_tasks
×
1561
                WHERE task_id = $1 AND storage_mode = 'ipfs'
×
1562
                RETURNING 1
×
1563
            ), upd AS (
×
1564
                UPDATE backup_tasks
×
1565
                SET storage_mode = 'archive', updated_at = NOW()
×
1566
                WHERE task_id = $1 AND storage_mode = 'full' AND NOT EXISTS (SELECT 1 FROM del)
×
1567
                RETURNING 1
×
1568
            )
1569
            SELECT COALESCE((SELECT COUNT(*) FROM del), 0) AS deleted,
×
1570
                   COALESCE((SELECT COUNT(*) FROM upd), 0) AS updated
×
1571
        "#;
×
1572
        let _ = sqlx::query(sql).bind(task_id).execute(&self.pool).await?;
×
1573
        Ok(())
×
1574
    }
1575
}
1576

1577
// Implement the unified Database trait for the real Db struct
1578
#[async_trait::async_trait]
1579
impl Database for Db {
1580
    // Backup task operations
1581

1582
    async fn insert_backup_task(
1583
        &self,
1584
        task_id: &str,
1585
        requestor: &str,
1586
        nft_count: i32,
1587
        tokens: &serde_json::Value,
1588
        storage_mode: &str,
1589
        archive_format: Option<&str>,
1590
        retention_days: Option<u64>,
1591
    ) -> Result<(), sqlx::Error> {
1592
        Db::insert_backup_task(
1593
            self,
×
1594
            task_id,
×
1595
            requestor,
×
1596
            nft_count,
×
1597
            tokens,
×
1598
            storage_mode,
×
1599
            archive_format,
×
1600
            retention_days,
×
1601
        )
1602
        .await
×
1603
    }
1604

1605
    async fn get_backup_task(&self, task_id: &str) -> Result<Option<BackupTask>, sqlx::Error> {
×
1606
        Db::get_backup_task(self, task_id).await
×
1607
    }
1608

1609
    async fn get_backup_task_with_tokens(
1610
        &self,
1611
        task_id: &str,
1612
        limit: i64,
1613
        offset: i64,
1614
    ) -> Result<Option<(BackupTask, u32)>, sqlx::Error> {
1615
        Db::get_backup_task_with_tokens(self, task_id, limit, offset).await
×
1616
    }
1617

1618
    async fn delete_backup_task(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
1619
        Db::delete_backup_task(self, task_id).await
×
1620
    }
1621

1622
    async fn get_incomplete_backup_tasks(&self) -> Result<Vec<BackupTask>, sqlx::Error> {
×
1623
        Db::get_incomplete_backup_tasks(self).await
×
1624
    }
1625

1626
    async fn list_requestor_backup_tasks_paginated(
1627
        &self,
1628
        requestor: &str,
1629
        limit: i64,
1630
        offset: i64,
1631
    ) -> Result<(Vec<BackupTask>, u32), sqlx::Error> {
1632
        Db::list_requestor_backup_tasks_paginated(self, requestor, limit, offset).await
×
1633
    }
1634

1635
    async fn list_unprocessed_expired_backups(&self) -> Result<Vec<ExpiredBackup>, sqlx::Error> {
×
1636
        Db::list_unprocessed_expired_backups(self).await
×
1637
    }
1638

1639
    // Backup task status and error operations
1640
    async fn clear_backup_errors(&self, task_id: &str, scope: &str) -> Result<(), sqlx::Error> {
×
1641
        Db::clear_backup_errors(self, task_id, scope).await
×
1642
    }
1643

1644
    async fn set_backup_error(&self, task_id: &str, error: &str) -> Result<(), sqlx::Error> {
×
1645
        Db::set_backup_error(self, task_id, error).await
×
1646
    }
1647

1648
    async fn set_error_logs(
1649
        &self,
1650
        task_id: &str,
1651
        archive_error_log: Option<&str>,
1652
        ipfs_error_log: Option<&str>,
1653
    ) -> Result<(), sqlx::Error> {
1654
        Db::set_error_logs(self, task_id, archive_error_log, ipfs_error_log).await
×
1655
    }
1656

1657
    async fn update_archive_request_error_log(
1658
        &self,
1659
        task_id: &str,
1660
        error_log: &str,
1661
    ) -> Result<(), sqlx::Error> {
1662
        Db::update_archive_request_error_log(self, task_id, error_log).await
×
1663
    }
1664

1665
    async fn update_pin_request_error_log(
1666
        &self,
1667
        task_id: &str,
1668
        error_log: &str,
1669
    ) -> Result<(), sqlx::Error> {
1670
        Db::update_pin_request_error_log(self, task_id, error_log).await
×
1671
    }
1672

1673
    async fn set_archive_request_error(
1674
        &self,
1675
        task_id: &str,
1676
        fatal_error: &str,
1677
    ) -> Result<(), sqlx::Error> {
1678
        Db::set_archive_request_error(self, task_id, fatal_error).await
×
1679
    }
1680

1681
    async fn set_pin_request_error(
1682
        &self,
1683
        task_id: &str,
1684
        fatal_error: &str,
1685
    ) -> Result<(), sqlx::Error> {
1686
        Db::set_pin_request_error(self, task_id, fatal_error).await
×
1687
    }
1688

1689
    // Status update operations
1690
    async fn update_archive_request_status(
1691
        &self,
1692
        task_id: &str,
1693
        status: &str,
1694
    ) -> Result<(), sqlx::Error> {
1695
        Db::update_archive_request_status(self, task_id, status).await
×
1696
    }
1697

1698
    async fn update_pin_request_status(
1699
        &self,
1700
        task_id: &str,
1701
        status: &str,
1702
    ) -> Result<(), sqlx::Error> {
1703
        Db::update_pin_request_status(self, task_id, status).await
×
1704
    }
1705

1706
    async fn update_backup_statuses(
1707
        &self,
1708
        task_id: &str,
1709
        scope: &str,
1710
        archive_status: &str,
1711
        ipfs_status: &str,
1712
    ) -> Result<(), sqlx::Error> {
1713
        Db::update_backup_statuses(self, task_id, scope, archive_status, ipfs_status).await
×
1714
    }
1715

1716
    async fn update_archive_request_statuses(
1717
        &self,
1718
        task_ids: &[String],
1719
        status: &str,
1720
    ) -> Result<(), sqlx::Error> {
1721
        Db::update_archive_request_statuses(self, task_ids, status).await
×
1722
    }
1723

1724
    async fn upgrade_backup_to_full(
1725
        &self,
1726
        task_id: &str,
1727
        add_archive: bool,
1728
        archive_format: Option<&str>,
1729
        retention_days: Option<u64>,
1730
    ) -> Result<(), sqlx::Error> {
1731
        Db::upgrade_backup_to_full(self, task_id, add_archive, archive_format, retention_days).await
×
1732
    }
1733

1734
    // Deletion operations
1735
    async fn start_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
1736
        Db::start_deletion(self, task_id).await
×
1737
    }
1738

1739
    async fn start_archive_request_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
1740
        Db::start_archive_request_deletion(self, task_id).await
×
1741
    }
1742

1743
    async fn start_pin_request_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
1744
        Db::start_pin_request_deletion(self, task_id).await
×
1745
    }
1746

1747
    async fn complete_archive_request_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
1748
        Db::complete_archive_request_deletion(self, task_id).await
×
1749
    }
1750

1751
    async fn complete_pin_request_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
1752
        Db::complete_pin_request_deletion(self, task_id).await
×
1753
    }
1754

1755
    // Retry operations
1756
    async fn retry_backup(
1757
        &self,
1758
        task_id: &str,
1759
        scope: &str,
1760
        retention_days: u64,
1761
    ) -> Result<(), sqlx::Error> {
1762
        Db::retry_backup(self, task_id, scope, retention_days).await
×
1763
    }
1764

1765
    // Pin operations
1766
    async fn insert_pins_with_tokens(
1767
        &self,
1768
        task_id: &str,
1769
        token_pin_mappings: &[crate::TokenPinMapping],
1770
    ) -> Result<(), sqlx::Error> {
1771
        Db::insert_pins_with_tokens(self, task_id, token_pin_mappings).await
×
1772
    }
1773

1774
    async fn get_pins_by_task_id(&self, task_id: &str) -> Result<Vec<PinRow>, sqlx::Error> {
×
1775
        Db::get_pins_by_task_id(self, task_id).await
×
1776
    }
1777

1778
    async fn get_active_pins(&self) -> Result<Vec<PinRow>, sqlx::Error> {
×
1779
        Db::get_active_pins(self).await
×
1780
    }
1781

1782
    async fn update_pin_statuses(&self, updates: &[(i64, String)]) -> Result<(), sqlx::Error> {
×
1783
        Db::update_pin_statuses(self, updates).await
×
1784
    }
1785

1786
    // Pinned tokens operations
1787
    async fn get_pinned_tokens_by_requestor(
1788
        &self,
1789
        requestor: &str,
1790
        limit: i64,
1791
        offset: i64,
1792
    ) -> Result<(Vec<TokenWithPins>, u32), sqlx::Error> {
1793
        Db::get_pinned_tokens_by_requestor(self, requestor, limit, offset).await
×
1794
    }
1795

1796
    async fn get_pinned_token_by_requestor(
1797
        &self,
1798
        requestor: &str,
1799
        chain: &str,
1800
        contract_address: &str,
1801
        token_id: &str,
1802
    ) -> Result<Option<TokenWithPins>, sqlx::Error> {
1803
        Db::get_pinned_token_by_requestor(self, requestor, chain, contract_address, token_id).await
×
1804
    }
1805
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc