• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

0xmichalis / nftbk / 18596877411

17 Oct 2025 03:10PM UTC coverage: 35.671% (-0.2%) from 35.871%
18596877411

Pull #75

github

web-flow
Merge 92ceaa2f7 into 12bcc4654
Pull Request #75: fix: avoid subresource deletion when in progress

5 of 58 new or added lines in 3 files covered. (8.62%)

3 existing lines in 1 file now uncovered.

1526 of 4278 relevant lines covered (35.67%)

6.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/server/database/mod.rs
1
use chrono::{DateTime, Utc};
2
use serde::{Deserialize, Serialize};
3
use sqlx::{postgres::PgPoolOptions, PgPool, Row};
4

5
use crate::server::database::r#trait::Database;
6

7
pub mod r#trait;
8

9
#[derive(Debug, Serialize, Deserialize, Clone)]
10
pub struct BackupTask {
11
    pub task_id: String,
12
    pub created_at: DateTime<Utc>,
13
    pub updated_at: DateTime<Utc>,
14
    pub requestor: String,
15
    pub nft_count: i32,
16
    pub tokens: serde_json::Value,
17
    pub archive_status: Option<String>,
18
    pub ipfs_status: Option<String>,
19
    pub archive_error_log: Option<String>,
20
    pub ipfs_error_log: Option<String>,
21
    pub archive_fatal_error: Option<String>,
22
    pub ipfs_fatal_error: Option<String>,
23
    pub storage_mode: String,
24
    pub archive_format: Option<String>,
25
    pub expires_at: Option<DateTime<Utc>>,
26
    pub archive_deleted_at: Option<DateTime<Utc>>,
27
    pub pins_deleted_at: Option<DateTime<Utc>>,
28
}
29

30
#[derive(Debug, Serialize, Deserialize, Clone, utoipa::ToSchema)]
31
#[schema(description = "IPFS pin information for a specific CID")]
32
pub struct PinInfo {
33
    /// Content Identifier (CID) of the pinned content
34
    #[schema(example = "QmYwAPJzv5CZsnA625s3Xf2nemtYgPpHdWEz79ojWnPbdG")]
35
    pub cid: String,
36
    /// IPFS provider type where the content is pinned
37
    #[schema(example = "pinata")]
38
    pub provider_type: String,
39
    /// IPFS provider URL where the content is pinned
40
    #[schema(example = "https://api.pinata.cloud")]
41
    pub provider_url: String,
42
    /// Pin status (pinned, pinning, failed, queued)
43
    #[schema(example = "pinned")]
44
    pub status: String,
45
    /// When the pin was created (ISO 8601 timestamp)
46
    #[schema(example = "2024-01-01T12:00:00Z")]
47
    pub created_at: DateTime<Utc>,
48
}
49

50
#[derive(Debug, Serialize, Deserialize, Clone, utoipa::ToSchema)]
51
#[schema(description = "Token information with associated pin requests")]
52
pub struct TokenWithPins {
53
    /// Blockchain identifier (e.g., ethereum, tezos)
54
    #[schema(example = "ethereum")]
55
    pub chain: String,
56
    /// NFT contract address
57
    #[schema(example = "0x1234567890123456789012345678901234567890")]
58
    pub contract_address: String,
59
    /// NFT token ID
60
    #[schema(example = "123")]
61
    pub token_id: String,
62
    /// List of IPFS pins for this token
63
    pub pins: Vec<PinInfo>,
64
}
65

66
#[derive(Debug, Serialize, Deserialize, Clone)]
67
pub struct PinRow {
68
    pub id: i64,
69
    pub task_id: String,
70
    pub provider_type: String,
71
    pub provider_url: Option<String>,
72
    pub cid: String,
73
    pub request_id: String,
74
    pub pin_status: String,
75
    pub created_at: DateTime<Utc>,
76
}
77

78
#[derive(Debug, Clone)]
79
pub struct ExpiredBackup {
80
    pub task_id: String,
81
    pub archive_format: String,
82
}
83

84
#[derive(Clone)]
85
pub struct Db {
86
    pub pool: PgPool,
87
}
88

89
impl Db {
90
    pub async fn new(database_url: &str, max_connections: u32) -> Self {
×
91
        let pool = PgPoolOptions::new()
×
92
            .max_connections(max_connections)
×
93
            .connect(database_url)
×
94
            .await
×
95
            .expect("Failed to connect to Postgres");
96
        // Health check: run a simple query
97
        sqlx::query("SELECT 1")
×
98
            .execute(&pool)
×
99
            .await
×
100
            .expect("Postgres connection is not healthy");
101
        tracing::info!("Postgres connection is healthy");
×
102
        Db { pool }
103
    }
104

105
    #[allow(clippy::too_many_arguments)]
106
    pub async fn insert_backup_task(
×
107
        &self,
108
        task_id: &str,
109
        requestor: &str,
110
        nft_count: i32,
111
        tokens: &serde_json::Value,
112
        storage_mode: &str,
113
        archive_format: Option<&str>,
114
        retention_days: Option<u64>,
115
    ) -> Result<(), sqlx::Error> {
116
        let mut tx = self.pool.begin().await?;
×
117

118
        // Insert into backup_tasks (tokens JSON removed; tokens are stored in tokens table)
119
        sqlx::query(
120
            r#"
121
            INSERT INTO backup_tasks (
122
                task_id, created_at, updated_at, requestor, nft_count, storage_mode
123
            ) VALUES (
124
                $1, NOW(), NOW(), $2, $3, $4
125
            )
126
            ON CONFLICT (task_id) DO UPDATE SET
127
                updated_at = NOW(),
128
                nft_count = EXCLUDED.nft_count,
129
                storage_mode = EXCLUDED.storage_mode
130
            "#,
131
        )
132
        .bind(task_id)
133
        .bind(requestor)
134
        .bind(nft_count)
135
        .bind(storage_mode)
136
        .execute(&mut *tx)
137
        .await?;
×
138

139
        // Replace tokens for this task with the provided list (idempotent)
140
        // Expecting JSON shape: Vec<crate::server::api::Tokens>
141
        let token_entries: Vec<crate::server::api::Tokens> =
×
142
            serde_json::from_value(tokens.clone()).unwrap_or_default();
143

144
        for entry in &token_entries {
×
145
            for token_str in &entry.tokens {
×
146
                if let Some((contract_address, token_id)) = token_str.split_once(':') {
×
147
                    sqlx::query(
148
                        r#"INSERT INTO tokens (task_id, chain, contract_address, token_id)
149
                           VALUES ($1, $2, $3, $4)
150
                           ON CONFLICT (task_id, chain, contract_address, token_id) DO NOTHING"#,
151
                    )
152
                    .bind(task_id)
153
                    .bind(&entry.chain)
154
                    .bind(contract_address)
155
                    .bind(token_id)
156
                    .execute(&mut *tx)
157
                    .await?;
×
158
                }
159
            }
160
        }
161

162
        // Insert into archive_requests if storage mode includes archive
163
        if storage_mode == "archive" || storage_mode == "full" {
×
164
            let archive_fmt = archive_format.unwrap_or("zip");
×
165

166
            if let Some(days) = retention_days {
×
167
                sqlx::query(
168
                    r#"
169
                    INSERT INTO archive_requests (task_id, archive_format, expires_at, status)
170
                    VALUES ($1, $2, NOW() + make_interval(days => $3::int), 'in_progress')
171
                    ON CONFLICT (task_id) DO UPDATE SET
172
                        archive_format = EXCLUDED.archive_format,
173
                        expires_at = EXCLUDED.expires_at
174
                    "#,
175
                )
176
                .bind(task_id)
177
                .bind(archive_fmt)
178
                .bind(days as i64)
179
                .execute(&mut *tx)
180
                .await?;
×
181
            } else {
182
                sqlx::query(
183
                    r#"
184
                    INSERT INTO archive_requests (task_id, archive_format, expires_at, status)
185
                    VALUES ($1, $2, NULL, 'in_progress')
186
                    ON CONFLICT (task_id) DO UPDATE SET
187
                        archive_format = EXCLUDED.archive_format,
188
                        expires_at = EXCLUDED.expires_at
189
                    "#,
190
                )
191
                .bind(task_id)
×
192
                .bind(archive_fmt)
×
193
                .execute(&mut *tx)
×
194
                .await?;
×
195
            }
196
        }
197

198
        // Insert into pin_requests if storage mode includes IPFS
199
        if storage_mode == "ipfs" || storage_mode == "full" {
×
200
            sqlx::query(
201
                r#"
202
                INSERT INTO pin_requests (task_id, status)
203
                VALUES ($1, 'in_progress')
204
                ON CONFLICT (task_id) DO UPDATE SET
205
                    status = EXCLUDED.status
206
                "#,
207
            )
208
            .bind(task_id)
209
            .execute(&mut *tx)
210
            .await?;
×
211
        }
212

213
        tx.commit().await?;
×
214
        Ok(())
×
215
    }
216

217
    pub async fn delete_backup_task(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
218
        // CASCADE will delete associated archive_requests row if it exists
219
        sqlx::query!("DELETE FROM backup_tasks WHERE task_id = $1", task_id)
×
220
            .execute(&self.pool)
×
221
            .await?;
×
222
        Ok(())
×
223
    }
224

225
    pub async fn set_error_logs(
×
226
        &self,
227
        task_id: &str,
228
        archive_error_log: Option<&str>,
229
        ipfs_error_log: Option<&str>,
230
    ) -> Result<(), sqlx::Error> {
231
        let mut tx = self.pool.begin().await?;
×
232
        if let Some(a) = archive_error_log {
×
233
            sqlx::query("UPDATE archive_requests SET error_log = $2 WHERE task_id = $1")
×
234
                .bind(task_id)
×
235
                .bind(a)
×
236
                .execute(&mut *tx)
×
237
                .await?;
×
238
        }
239
        if let Some(i) = ipfs_error_log {
×
240
            sqlx::query(
241
                r#"
242
                    UPDATE pin_requests
243
                    SET error_log = $2
244
                    WHERE task_id = $1
245
                    "#,
246
            )
247
            .bind(task_id)
×
248
            .bind(i)
×
249
            .execute(&mut *tx)
×
250
            .await?;
×
251
        }
252
        tx.commit().await?;
×
253
        Ok(())
×
254
    }
255

256
    pub async fn update_archive_request_error_log(
×
257
        &self,
258
        task_id: &str,
259
        error_log: &str,
260
    ) -> Result<(), sqlx::Error> {
261
        sqlx::query(
262
            r#"
263
            UPDATE archive_requests
264
            SET error_log = $2
265
            WHERE task_id = $1
266
            "#,
267
        )
268
        .bind(task_id)
×
269
        .bind(error_log)
×
270
        .execute(&self.pool)
×
271
        .await?;
×
272
        Ok(())
×
273
    }
274

275
    pub async fn update_pin_request_error_log(
×
276
        &self,
277
        task_id: &str,
278
        error_log: &str,
279
    ) -> Result<(), sqlx::Error> {
280
        sqlx::query(
281
            r#"
282
            UPDATE pin_requests
283
            SET error_log = $2
284
            WHERE task_id = $1
285
            "#,
286
        )
287
        .bind(task_id)
×
288
        .bind(error_log)
×
289
        .execute(&self.pool)
×
290
        .await?;
×
291
        Ok(())
×
292
    }
293

294
    pub async fn update_pin_request_status(
×
295
        &self,
296
        task_id: &str,
297
        status: &str,
298
    ) -> Result<(), sqlx::Error> {
299
        sqlx::query(
300
            r#"
301
            UPDATE pin_requests
302
            SET status = $2
303
            WHERE task_id = $1
304
            "#,
305
        )
306
        .bind(task_id)
×
307
        .bind(status)
×
308
        .execute(&self.pool)
×
309
        .await?;
×
310
        Ok(())
×
311
    }
312

313
    pub async fn update_archive_request_status(
×
314
        &self,
315
        task_id: &str,
316
        status: &str,
317
    ) -> Result<(), sqlx::Error> {
318
        sqlx::query(
319
            r#"
320
            UPDATE archive_requests
321
            SET status = $2
322
            WHERE task_id = $1
323
            "#,
324
        )
325
        .bind(task_id)
×
326
        .bind(status)
×
327
        .execute(&self.pool)
×
328
        .await?;
×
329
        Ok(())
×
330
    }
331

332
    pub async fn update_archive_request_statuses(
×
333
        &self,
334
        task_ids: &[String],
335
        status: &str,
336
    ) -> Result<(), sqlx::Error> {
337
        if task_ids.is_empty() {
×
338
            return Ok(());
×
339
        }
340

341
        // Use a transaction for atomicity
342
        let mut tx = self.pool.begin().await?;
×
343

344
        // Update each task_id individually with a prepared statement
345
        for task_id in task_ids {
×
346
            sqlx::query("UPDATE archive_requests SET status = $1 WHERE task_id = $2")
×
347
                .bind(status)
×
348
                .bind(task_id)
×
349
                .execute(&mut *tx)
×
350
                .await?;
×
351
        }
352

353
        tx.commit().await?;
×
354
        Ok(())
×
355
    }
356

357
    pub async fn retry_backup(
×
358
        &self,
359
        task_id: &str,
360
        scope: &str,
361
        retention_days: u64,
362
    ) -> Result<(), sqlx::Error> {
363
        let mut tx = self.pool.begin().await?;
×
364

365
        // Reset statuses per requested scope
366
        if scope == "archive" || scope == "full" {
×
367
            sqlx::query(
368
                r#"
369
                UPDATE archive_requests
370
                SET status = 'in_progress', fatal_error = NULL, error_log = NULL
371
                WHERE task_id = $1
372
                "#,
373
            )
374
            .bind(task_id)
×
375
            .execute(&mut *tx)
×
376
            .await?;
×
377
            sqlx::query(
378
                r#"
379
                UPDATE archive_requests
380
                SET expires_at = NOW() + make_interval(days => $2::int)
381
                WHERE task_id = $1
382
                "#,
383
            )
384
            .bind(task_id)
×
385
            .bind(retention_days as i64)
×
386
            .execute(&mut *tx)
×
387
            .await?;
×
388
        }
389
        if scope == "ipfs" || scope == "full" {
×
390
            sqlx::query(
391
                r#"
392
                UPDATE pin_requests
393
                SET status = 'in_progress', fatal_error = NULL, error_log = NULL
394
                WHERE task_id = $1
395
                "#,
396
            )
397
            .bind(task_id)
×
398
            .execute(&mut *tx)
×
399
            .await?;
×
400
        }
401

402
        tx.commit().await?;
×
403
        Ok(())
×
404
    }
405

406
    pub async fn clear_backup_errors(&self, task_id: &str, scope: &str) -> Result<(), sqlx::Error> {
×
407
        let mut tx = self.pool.begin().await?;
×
408
        // Clear archive errors if scope includes archive
409
        sqlx::query(
410
            r#"
411
            UPDATE archive_requests
412
            SET error_log = NULL, fatal_error = NULL
413
            WHERE task_id = $1 AND ($2 IN ('archive', 'full'))
414
            "#,
415
        )
416
        .bind(task_id)
×
417
        .bind(scope)
×
418
        .execute(&mut *tx)
×
419
        .await?;
×
420
        // Clear IPFS errors if scope includes ipfs
421
        sqlx::query(
422
            r#"
423
            UPDATE pin_requests
424
            SET error_log = NULL, fatal_error = NULL
425
            WHERE task_id = $1 AND ($2 IN ('ipfs', 'full'))
426
            "#,
427
        )
428
        .bind(task_id)
×
429
        .bind(scope)
×
430
        .execute(&mut *tx)
×
431
        .await?;
×
432
        tx.commit().await?;
×
433
        Ok(())
×
434
    }
435

436
    pub async fn set_archive_request_error(
×
437
        &self,
438
        task_id: &str,
439
        fatal_error: &str,
440
    ) -> Result<(), sqlx::Error> {
441
        sqlx::query(
442
            r#"
443
            UPDATE archive_requests
444
            SET status = 'error', fatal_error = $2
445
            WHERE task_id = $1
446
            "#,
447
        )
448
        .bind(task_id)
×
449
        .bind(fatal_error)
×
450
        .execute(&self.pool)
×
451
        .await?;
×
452
        Ok(())
×
453
    }
454

455
    pub async fn set_pin_request_error(
×
456
        &self,
457
        task_id: &str,
458
        fatal_error: &str,
459
    ) -> Result<(), sqlx::Error> {
460
        sqlx::query(
461
            r#"
462
            UPDATE pin_requests
463
            SET status = 'error', fatal_error = $2
464
            WHERE task_id = $1
465
            "#,
466
        )
467
        .bind(task_id)
×
468
        .bind(fatal_error)
×
469
        .execute(&self.pool)
×
470
        .await?;
×
471
        Ok(())
×
472
    }
473

474
    pub async fn start_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
NEW
475
        let sql = r#"
×
NEW
476
            WITH task_mode AS (
×
NEW
477
                SELECT storage_mode FROM backup_tasks WHERE task_id = $1
×
478
            ),
NEW
479
            touch AS (
×
NEW
480
                UPDATE backup_tasks SET updated_at = NOW() WHERE task_id = $1 RETURNING 1
×
481
            ),
NEW
482
            ar_inprog AS (
×
NEW
483
                SELECT 1 FROM archive_requests ar, task_mode tm
×
NEW
484
                WHERE ar.task_id = $1 AND ar.status = 'in_progress'
×
NEW
485
                  AND tm.storage_mode IN ('archive','full')
×
NEW
486
                LIMIT 1
×
487
            ),
NEW
488
            pr_inprog AS (
×
NEW
489
                SELECT 1 FROM pin_requests pr, task_mode tm
×
NEW
490
                WHERE pr.task_id = $1 AND pr.status = 'in_progress'
×
NEW
491
                  AND tm.storage_mode IN ('ipfs','full')
×
NEW
492
                LIMIT 1
×
493
            ),
NEW
494
            upd_archive AS (
×
NEW
495
                UPDATE archive_requests ar
×
NEW
496
                SET deleted_at = NOW()
×
NEW
497
                WHERE ar.task_id = $1 AND ar.deleted_at IS NULL
×
NEW
498
                  AND EXISTS (SELECT 1 FROM task_mode tm WHERE tm.storage_mode IN ('archive','full'))
×
NEW
499
                  AND NOT EXISTS (SELECT 1 FROM ar_inprog)
×
NEW
500
                RETURNING 1
×
501
            ),
NEW
502
            upd_pins AS (
×
NEW
503
                UPDATE pin_requests pr
×
NEW
504
                SET deleted_at = NOW()
×
NEW
505
                WHERE pr.task_id = $1 AND pr.deleted_at IS NULL
×
NEW
506
                  AND EXISTS (SELECT 1 FROM task_mode tm WHERE tm.storage_mode IN ('ipfs','full'))
×
NEW
507
                  AND NOT EXISTS (SELECT 1 FROM pr_inprog)
×
NEW
508
                RETURNING 1
×
509
            )
NEW
510
            SELECT EXISTS(SELECT 1 FROM ar_inprog) AS ar_blocked,
×
NEW
511
                   EXISTS(SELECT 1 FROM pr_inprog) AS pr_blocked
×
NEW
512
        "#;
×
513

NEW
514
        let row = sqlx::query(sql).bind(task_id).fetch_one(&self.pool).await?;
×
NEW
515
        let ar_blocked: bool = row.get("ar_blocked");
×
NEW
516
        let pr_blocked: bool = row.get("pr_blocked");
×
NEW
517
        if ar_blocked || pr_blocked {
×
NEW
518
            return Err(sqlx::Error::Protocol(
×
NEW
519
                "in_progress task cannot be deleted".into(),
×
520
            ));
521
        }
UNCOV
522
        Ok(())
×
523
    }
524

525
    /// Mark archive as being deleted (similar to start_deletion but for archive subresource)
526
    pub async fn start_archive_request_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
527
        let row = sqlx::query(
528
            r#"
529
            WITH ar_inprog AS (
530
                SELECT 1 FROM archive_requests WHERE task_id = $1 AND status = 'in_progress' LIMIT 1
531
            ), upd AS (
532
                UPDATE archive_requests
533
                SET deleted_at = NOW()
534
                WHERE task_id = $1 AND deleted_at IS NULL AND NOT EXISTS (SELECT 1 FROM ar_inprog)
535
                RETURNING 1
536
            )
537
            SELECT EXISTS(SELECT 1 FROM ar_inprog) AS blocked
538
            "#,
539
        )
NEW
540
        .bind(task_id)
×
NEW
541
        .fetch_one(&self.pool)
×
542
        .await?;
×
NEW
543
        let blocked: bool = row.get("blocked");
×
NEW
544
        if blocked {
×
NEW
545
            return Err(sqlx::Error::Protocol(
×
NEW
546
                "in_progress task cannot be deleted".into(),
×
547
            ));
548
        }
UNCOV
549
        Ok(())
×
550
    }
551

552
    /// Mark IPFS pins as being deleted (similar to start_deletion but for IPFS pins subresource)
NEW
553
    pub async fn start_pin_request_deletions(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
554
        let row = sqlx::query(
555
            r#"
556
            WITH pr_inprog AS (
557
                SELECT 1 FROM pin_requests WHERE task_id = $1 AND status = 'in_progress' LIMIT 1
558
            ), upd AS (
559
                UPDATE pin_requests
560
                SET deleted_at = NOW()
561
                WHERE task_id = $1 AND deleted_at IS NULL AND NOT EXISTS (SELECT 1 FROM pr_inprog)
562
                RETURNING 1
563
            )
564
            SELECT EXISTS(SELECT 1 FROM pr_inprog) AS blocked
565
            "#,
566
        )
NEW
567
        .bind(task_id)
×
NEW
568
        .fetch_one(&self.pool)
×
569
        .await?;
×
NEW
570
        let blocked: bool = row.get("blocked");
×
NEW
571
        if blocked {
×
NEW
572
            return Err(sqlx::Error::Protocol(
×
NEW
573
                "in_progress task cannot be deleted".into(),
×
574
            ));
575
        }
UNCOV
576
        Ok(())
×
577
    }
578

579
    pub async fn get_backup_task(&self, task_id: &str) -> Result<Option<BackupTask>, sqlx::Error> {
×
580
        let row = sqlx::query(
581
            r#"
582
            SELECT 
583
                b.task_id, b.created_at, b.updated_at, b.requestor, b.nft_count,
584
                ar.status as archive_status, ar.fatal_error, b.storage_mode,
585
                ar.archive_format, ar.expires_at, ar.deleted_at as archive_deleted_at,
586
                ar.error_log as archive_error_log,
587
                pr.status as ipfs_status,
588
                pr.error_log as ipfs_error_log,
589
                pr.fatal_error as ipfs_fatal_error,
590
                pr.deleted_at as pins_deleted_at
591
            FROM backup_tasks b
592
            LEFT JOIN archive_requests ar ON b.task_id = ar.task_id
593
            LEFT JOIN pin_requests pr ON b.task_id = pr.task_id
594
            WHERE b.task_id = $1
595
            "#,
596
        )
597
        .bind(task_id)
×
598
        .fetch_optional(&self.pool)
×
599
        .await?;
×
600

601
        if let Some(row) = row {
×
602
            // Fetch tokens for this task from tokens table and aggregate by chain
603
            let token_rows = sqlx::query(
604
                r#"
605
                SELECT chain, contract_address, token_id
606
                FROM tokens
607
                WHERE task_id = $1
608
                ORDER BY chain, contract_address, token_id
609
                "#,
610
            )
611
            .bind(task_id)
612
            .fetch_all(&self.pool)
613
            .await?;
×
614

615
            use std::collections::BTreeMap;
616
            let mut by_chain: BTreeMap<String, Vec<String>> = BTreeMap::new();
617
            for r in token_rows {
×
618
                let chain: String = r.get("chain");
619
                let contract_address: String = r.get("contract_address");
620
                let token_id: String = r.get("token_id");
621
                by_chain
622
                    .entry(chain)
623
                    .or_default()
624
                    .push(format!("{}:{}", contract_address, token_id));
625
            }
626
            let tokens_json = serde_json::json!(by_chain
627
                .into_iter()
628
                .map(|(chain, toks)| serde_json::json!({
629
                    "chain": chain,
×
630
                    "tokens": toks,
×
631
                }))
632
                .collect::<Vec<_>>());
633

634
            Ok(Some(BackupTask {
635
                task_id: row.get("task_id"),
636
                created_at: row.get("created_at"),
637
                updated_at: row.get("updated_at"),
638
                requestor: row.get("requestor"),
639
                nft_count: row.get("nft_count"),
640
                tokens: tokens_json,
641
                archive_status: row
642
                    .try_get::<Option<String>, _>("archive_status")
643
                    .ok()
644
                    .flatten(),
645
                ipfs_status: row
646
                    .try_get::<Option<String>, _>("ipfs_status")
647
                    .ok()
648
                    .flatten(),
649
                archive_error_log: row.get("archive_error_log"),
650
                ipfs_error_log: row.get("ipfs_error_log"),
651
                archive_fatal_error: row.get("fatal_error"),
652
                ipfs_fatal_error: row
653
                    .try_get::<Option<String>, _>("ipfs_fatal_error")
654
                    .ok()
655
                    .flatten(),
656
                storage_mode: row.get("storage_mode"),
657
                archive_format: row.get("archive_format"),
658
                expires_at: row.get("expires_at"),
659
                archive_deleted_at: row.get("archive_deleted_at"),
660
                pins_deleted_at: row.get("pins_deleted_at"),
661
            }))
662
        } else {
663
            Ok(None)
×
664
        }
665
    }
666

667
    /// Fetch backup task plus a paginated slice of its tokens; returns (meta, total_token_count)
668
    pub async fn get_backup_task_with_tokens(
×
669
        &self,
670
        task_id: &str,
671
        limit: i64,
672
        offset: i64,
673
    ) -> Result<Option<(BackupTask, u32)>, sqlx::Error> {
674
        // Base metadata (same as get_backup_task)
675
        let row = sqlx::query(
676
            r#"
677
            SELECT 
678
                b.task_id, b.created_at, b.updated_at, b.requestor, b.nft_count,
679
                ar.status as archive_status, ar.fatal_error, b.storage_mode,
680
                ar.archive_format, ar.expires_at, ar.deleted_at as archive_deleted_at,
681
                ar.error_log as archive_error_log,
682
                pr.status as ipfs_status,
683
                pr.error_log as ipfs_error_log,
684
                pr.fatal_error as ipfs_fatal_error,
685
                pr.deleted_at as pins_deleted_at
686
            FROM backup_tasks b
687
            LEFT JOIN archive_requests ar ON b.task_id = ar.task_id
688
            LEFT JOIN pin_requests pr ON b.task_id = pr.task_id
689
            WHERE b.task_id = $1
690
            "#,
691
        )
692
        .bind(task_id)
×
693
        .fetch_optional(&self.pool)
×
694
        .await?;
×
695

696
        let Some(row) = row else { return Ok(None) };
×
697

698
        // Total tokens for pagination
699
        let total_row = sqlx::query!(
×
700
            r#"SELECT COUNT(*) as count FROM tokens WHERE task_id = $1"#,
701
            task_id
702
        )
703
        .fetch_one(&self.pool)
×
704
        .await?;
×
705
        let total: u32 = total_row.count.unwrap_or(0) as u32;
×
706

707
        // Page of tokens
708
        let token_rows = sqlx::query(
709
            r#"
710
            SELECT chain, contract_address, token_id
711
            FROM tokens
712
            WHERE task_id = $1
713
            ORDER BY chain, contract_address, token_id
714
            LIMIT $2 OFFSET $3
715
            "#,
716
        )
717
        .bind(task_id)
×
718
        .bind(limit)
×
719
        .bind(offset)
×
720
        .fetch_all(&self.pool)
×
721
        .await?;
×
722

723
        use std::collections::BTreeMap;
724
        let mut by_chain: BTreeMap<String, Vec<String>> = BTreeMap::new();
×
725
        for r in token_rows {
×
726
            let chain: String = r.get("chain");
×
727
            let contract_address: String = r.get("contract_address");
×
728
            let token_id: String = r.get("token_id");
×
729
            by_chain
×
730
                .entry(chain)
×
731
                .or_default()
732
                .push(format!("{}:{}", contract_address, token_id));
×
733
        }
734
        let tokens_json = serde_json::json!(by_chain
×
735
            .into_iter()
×
736
            .map(|(chain, toks)| serde_json::json!({ "chain": chain, "tokens": toks }))
×
737
            .collect::<Vec<_>>());
×
738

739
        let meta = BackupTask {
740
            task_id: row.get("task_id"),
×
741
            created_at: row.get("created_at"),
×
742
            updated_at: row.get("updated_at"),
×
743
            requestor: row.get("requestor"),
×
744
            nft_count: row.get("nft_count"),
×
745
            tokens: tokens_json,
746
            archive_status: row
×
747
                .try_get::<Option<String>, _>("archive_status")
748
                .ok()
749
                .flatten(),
750
            ipfs_status: row
×
751
                .try_get::<Option<String>, _>("ipfs_status")
752
                .ok()
753
                .flatten(),
754
            archive_error_log: row.get("archive_error_log"),
×
755
            ipfs_error_log: row.get("ipfs_error_log"),
×
756
            archive_fatal_error: row.get("fatal_error"),
×
757
            ipfs_fatal_error: row
×
758
                .try_get::<Option<String>, _>("ipfs_fatal_error")
759
                .ok()
760
                .flatten(),
761
            storage_mode: row.get("storage_mode"),
×
762
            archive_format: row.get("archive_format"),
×
763
            expires_at: row.get("expires_at"),
×
764
            archive_deleted_at: row.get("archive_deleted_at"),
×
765
            pins_deleted_at: row.get("pins_deleted_at"),
×
766
        };
767

768
        Ok(Some((meta, total)))
×
769
    }
770

771
    pub async fn list_requestor_backup_tasks_paginated(
×
772
        &self,
773
        requestor: &str,
774
        limit: i64,
775
        offset: i64,
776
    ) -> Result<(Vec<BackupTask>, u32), sqlx::Error> {
777
        // Total count
778
        let total_row = sqlx::query!(
×
779
            r#"SELECT COUNT(*) as count FROM backup_tasks b WHERE b.requestor = $1"#,
780
            requestor
781
        )
782
        .fetch_one(&self.pool)
×
783
        .await?;
×
784
        let total: u32 = total_row.count.unwrap_or(0) as u32;
×
785

786
        let rows = sqlx::query(
787
            r#"
788
            SELECT 
789
                b.task_id, b.created_at, b.updated_at, b.requestor, b.nft_count,
790
                ar.status as archive_status, ar.fatal_error, b.storage_mode,
791
                ar.archive_format, ar.expires_at, ar.deleted_at as archive_deleted_at,
792
                ar.error_log as archive_error_log,
793
                pr.status as ipfs_status,
794
                pr.error_log as ipfs_error_log,
795
                pr.fatal_error as ipfs_fatal_error,
796
                pr.deleted_at as pins_deleted_at
797
            FROM backup_tasks b
798
            LEFT JOIN archive_requests ar ON b.task_id = ar.task_id
799
            LEFT JOIN pin_requests pr ON b.task_id = pr.task_id
800
            WHERE b.requestor = $1
801
            ORDER BY b.created_at DESC
802
            LIMIT $2 OFFSET $3
803
            "#,
804
        )
805
        .bind(requestor)
×
806
        .bind(limit)
×
807
        .bind(offset)
×
808
        .fetch_all(&self.pool)
×
809
        .await?;
×
810

811
        let recs = rows
×
812
            .into_iter()
813
            .map(|row| {
×
814
                let task_id: String = row.get("task_id");
×
815

816
                BackupTask {
×
817
                    task_id,
×
818
                    created_at: row.get("created_at"),
×
819
                    updated_at: row.get("updated_at"),
×
820
                    requestor: row.get("requestor"),
×
821
                    nft_count: row.get("nft_count"),
×
822
                    // Client should use get_backup_task to get tokens so the tokens
823
                    // can be properly paginated.
824
                    tokens: serde_json::Value::Null,
×
825
                    archive_status: row
×
826
                        .try_get::<Option<String>, _>("archive_status")
×
827
                        .ok()
×
828
                        .flatten(),
×
829
                    ipfs_status: row
×
830
                        .try_get::<Option<String>, _>("ipfs_status")
×
831
                        .ok()
×
832
                        .flatten(),
×
833
                    archive_error_log: row.get("archive_error_log"),
×
834
                    ipfs_error_log: row.get("ipfs_error_log"),
×
835
                    archive_fatal_error: row.get("fatal_error"),
×
836
                    ipfs_fatal_error: row
×
837
                        .try_get::<Option<String>, _>("ipfs_fatal_error")
×
838
                        .ok()
×
839
                        .flatten(),
×
840
                    storage_mode: row.get("storage_mode"),
×
841
                    archive_format: row.get("archive_format"),
×
842
                    expires_at: row.get("expires_at"),
×
843
                    archive_deleted_at: row.get("archive_deleted_at"),
×
844
                    pins_deleted_at: row.get("pins_deleted_at"),
×
845
                }
846
            })
847
            .collect();
848

849
        Ok((recs, total))
×
850
    }
851

852
    pub async fn list_unprocessed_expired_backups(
×
853
        &self,
854
    ) -> Result<Vec<ExpiredBackup>, sqlx::Error> {
855
        let rows = sqlx::query(
856
            r#"
857
            SELECT b.task_id, ar.archive_format 
858
            FROM backup_tasks b
859
            JOIN archive_requests ar ON b.task_id = ar.task_id
860
            WHERE ar.expires_at IS NOT NULL AND ar.expires_at < NOW() AND ar.status != 'expired'
861
            "#,
862
        )
863
        .fetch_all(&self.pool)
×
864
        .await?;
×
865
        let recs = rows
×
866
            .into_iter()
867
            .map(|row| ExpiredBackup {
×
868
                task_id: row.get("task_id"),
×
869
                archive_format: row.get("archive_format"),
×
870
            })
871
            .collect();
872
        Ok(recs)
×
873
    }
874

875
    /// Retrieve all backup tasks that are in 'in_progress' status
876
    /// This is used to recover incomplete tasks on server restart
877
    pub async fn get_incomplete_backup_tasks(&self) -> Result<Vec<BackupTask>, sqlx::Error> {
×
878
        let rows = sqlx::query(
879
            r#"
880
            SELECT 
881
                b.task_id, b.created_at, b.updated_at, b.requestor, b.nft_count,
882
                ar.status as archive_status, ar.fatal_error, b.storage_mode,
883
                ar.archive_format, ar.expires_at, ar.deleted_at as archive_deleted_at,
884
                ar.error_log as archive_error_log,
885
                pr.status as ipfs_status,
886
                pr.error_log as ipfs_error_log,
887
                pr.deleted_at as pins_deleted_at
888
            FROM backup_tasks b
889
            LEFT JOIN archive_requests ar ON b.task_id = ar.task_id
890
            LEFT JOIN pin_requests pr ON b.task_id = pr.task_id
891
            WHERE (
892
                -- Archive-only mode: check archive status (record must exist and be in_progress)
893
                (b.storage_mode = 'archive' AND ar.status = 'in_progress')
894
                OR
895
                -- IPFS-only mode: check IPFS status (record must exist and be in_progress)
896
                (b.storage_mode = 'ipfs' AND pr.status = 'in_progress')
897
                OR
898
                -- Full mode: check both archive and IPFS status (task is incomplete if either is in_progress)
899
                (b.storage_mode = 'full' AND (ar.status = 'in_progress' OR pr.status = 'in_progress'))
900
            )
901
            ORDER BY b.created_at ASC
902
            "#,
903
        )
904
        .fetch_all(&self.pool)
×
905
        .await?;
×
906

907
        // If no incomplete tasks, return early
908
        if rows.is_empty() {
×
909
            return Ok(Vec::new());
×
910
        }
911

912
        // Collect task_ids to fetch tokens in bulk
913
        let task_ids: Vec<String> = rows.iter().map(|r| r.get::<String, _>("task_id")).collect();
×
914

915
        // Fetch all tokens for these tasks and aggregate by task_id and chain
916
        use std::collections::BTreeMap;
917
        let mut tokens_by_task: BTreeMap<String, BTreeMap<String, Vec<String>>> = BTreeMap::new();
×
918

919
        let token_rows = sqlx::query(
920
            r#"
921
            SELECT task_id, chain, contract_address, token_id
922
            FROM tokens
923
            WHERE task_id = ANY($1)
924
            ORDER BY chain, contract_address, token_id
925
            "#,
926
        )
927
        .bind(&task_ids)
×
928
        .fetch_all(&self.pool)
×
929
        .await?;
×
930

931
        for r in token_rows {
×
932
            let task_id: String = r.get("task_id");
×
933
            let chain: String = r.get("chain");
×
934
            let contract_address: String = r.get("contract_address");
×
935
            let token_id: String = r.get("token_id");
×
936
            tokens_by_task
×
937
                .entry(task_id)
×
938
                .or_default()
939
                .entry(chain)
×
940
                .or_default()
941
                .push(format!("{}:{}", contract_address, token_id));
×
942
        }
943

944
        let recs = rows
×
945
            .into_iter()
946
            .map(|row| {
×
947
                let task_id: String = row.get("task_id");
×
948
                let tokens_json = if let Some(by_chain) = tokens_by_task.get(&task_id) {
×
949
                    serde_json::json!(by_chain
×
950
                        .iter()
×
951
                        .map(|(chain, toks)| serde_json::json!({
×
952
                            "chain": chain,
×
953
                            "tokens": toks,
×
954
                        }))
955
                        .collect::<Vec<_>>())
×
956
                } else {
957
                    // No tokens recorded for this task
958
                    serde_json::json!([])
×
959
                };
960

961
                BackupTask {
×
962
                    task_id,
×
963
                    created_at: row.get("created_at"),
×
964
                    updated_at: row.get("updated_at"),
×
965
                    requestor: row.get("requestor"),
×
966
                    nft_count: row.get("nft_count"),
×
967
                    tokens: tokens_json,
×
968
                    archive_status: row
×
969
                        .try_get::<Option<String>, _>("archive_status")
×
970
                        .ok()
×
971
                        .flatten(),
×
972
                    ipfs_status: row
×
973
                        .try_get::<Option<String>, _>("ipfs_status")
×
974
                        .ok()
×
975
                        .flatten(),
×
976
                    archive_error_log: row.get("archive_error_log"),
×
977
                    ipfs_error_log: row.get("ipfs_error_log"),
×
978
                    archive_fatal_error: row.get("fatal_error"),
×
979
                    ipfs_fatal_error: None,
×
980
                    storage_mode: row.get("storage_mode"),
×
981
                    archive_format: row.get("archive_format"),
×
982
                    expires_at: row.get("expires_at"),
×
983
                    archive_deleted_at: row.get("archive_deleted_at"),
×
984
                    pins_deleted_at: row.get("pins_deleted_at"),
×
985
                }
986
            })
987
            .collect();
988

989
        Ok(recs)
×
990
    }
991

992
    /// Insert pins and their associated tokens in a single atomic transaction
993
    pub async fn insert_pins_with_tokens(
×
994
        &self,
995
        task_id: &str,
996
        token_pin_mappings: &[crate::TokenPinMapping],
997
    ) -> Result<(), sqlx::Error> {
998
        if token_pin_mappings.is_empty() {
×
999
            return Ok(());
×
1000
        }
1001

1002
        // Collect all pin responses and prepare token data
1003
        let mut all_pin_responses = Vec::new();
×
1004
        let mut all_token_data = Vec::new(); // (index_in_pin_responses, chain, contract_address, token_id)
×
1005

1006
        for mapping in token_pin_mappings {
×
1007
            for pin_response in &mapping.pin_responses {
×
1008
                let index = all_pin_responses.len();
×
1009
                all_pin_responses.push(pin_response);
×
1010
                all_token_data.push((
×
1011
                    index,
×
1012
                    mapping.chain.clone(),
×
1013
                    mapping.contract_address.clone(),
×
1014
                    mapping.token_id.clone(),
×
1015
                ));
1016
            }
1017
        }
1018

1019
        if all_pin_responses.is_empty() {
×
1020
            return Ok(());
×
1021
        }
1022

1023
        // Start a transaction for atomicity
1024
        let mut tx = self.pool.begin().await?;
×
1025

1026
        // Insert pins one by one and collect generated IDs
1027
        let mut pin_ids: Vec<i64> = Vec::new();
×
1028
        for pin_response in &all_pin_responses {
×
1029
            // Map status enum to lowercase string to satisfy CHECK constraint
1030
            let status = match pin_response.status {
×
1031
                crate::ipfs::PinResponseStatus::Queued => "queued",
×
1032
                crate::ipfs::PinResponseStatus::Pinning => "pinning",
×
1033
                crate::ipfs::PinResponseStatus::Pinned => "pinned",
×
1034
                crate::ipfs::PinResponseStatus::Failed => "failed",
×
1035
            };
1036

1037
            let row = sqlx::query(
1038
                "INSERT INTO pins (task_id, provider_type, provider_url, cid, request_id, pin_status) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id"
1039
            )
1040
            .bind(task_id)
×
1041
            .bind(&pin_response.provider_type)
×
1042
            .bind(&pin_response.provider_url)
×
1043
            .bind(&pin_response.cid)
×
1044
            .bind(&pin_response.id)
×
1045
            .bind(status)
×
1046
            .fetch_one(&mut *tx)
×
1047
            .await?;
×
1048

1049
            pin_ids.push(row.get("id"));
×
1050
        }
1051

1052
        // Resolve token_ids and update pins rows with token_id
1053
        for (index, chain, contract_address, token_id) in &all_token_data {
×
1054
            // Ensure token row exists and fetch its id
1055
            let inserted = sqlx::query(
1056
                r#"INSERT INTO tokens (task_id, chain, contract_address, token_id)
1057
                   VALUES ($1, $2, $3, $4)
1058
                   ON CONFLICT (task_id, chain, contract_address, token_id) DO NOTHING
1059
                   RETURNING id"#,
1060
            )
1061
            .bind(task_id)
×
1062
            .bind(chain)
×
1063
            .bind(contract_address)
×
1064
            .bind(token_id)
×
1065
            .fetch_optional(&mut *tx)
×
1066
            .await?;
×
1067

1068
            let tok_id: i64 = if let Some(row) = inserted {
×
1069
                row.get("id")
×
1070
            } else {
1071
                sqlx::query("SELECT id FROM tokens WHERE task_id = $1 AND chain = $2 AND contract_address = $3 AND token_id = $4")
×
1072
                    .bind(task_id)
×
1073
                    .bind(chain)
×
1074
                    .bind(contract_address)
×
1075
                    .bind(token_id)
×
1076
                    .fetch_one(&mut *tx)
×
1077
                    .await?
×
1078
                    .get("id")
1079
            };
1080

1081
            sqlx::query("UPDATE pins SET token_id = $2 WHERE id = $1")
×
1082
                .bind(pin_ids[*index])
×
1083
                .bind(tok_id)
×
1084
                .execute(&mut *tx)
×
1085
                .await?;
×
1086
        }
1087

1088
        // Commit the transaction
1089
        tx.commit().await?;
×
1090
        Ok(())
×
1091
    }
1092

1093
    /// Get all pins for a specific backup task
1094
    pub async fn get_pins_by_task_id(&self, task_id: &str) -> Result<Vec<PinRow>, sqlx::Error> {
×
1095
        let rows = sqlx::query(
1096
            r#"
1097
            SELECT id, task_id, provider_type, provider_url, cid, request_id, pin_status, created_at
1098
            FROM pins
1099
            WHERE task_id = $1
1100
            ORDER BY id
1101
            "#,
1102
        )
1103
        .bind(task_id)
×
1104
        .fetch_all(&self.pool)
×
1105
        .await?;
×
1106

1107
        Ok(rows
×
1108
            .into_iter()
×
1109
            .map(|row| PinRow {
×
1110
                id: row.get("id"),
×
1111
                task_id: row.get("task_id"),
×
1112
                provider_type: row.get("provider_type"),
×
1113
                provider_url: row
×
1114
                    .try_get::<Option<String>, _>("provider_url")
×
1115
                    .ok()
×
1116
                    .flatten(),
×
1117
                cid: row.get("cid"),
×
1118
                request_id: row.get("request_id"),
×
1119
                pin_status: row.get("pin_status"),
×
1120
                created_at: row.get("created_at"),
×
1121
            })
1122
            .collect())
×
1123
    }
1124

1125
    /// Paginated pinned tokens grouped by (chain, contract_address, token_id)
1126
    pub async fn get_pinned_tokens_by_requestor(
×
1127
        &self,
1128
        requestor: &str,
1129
        limit: i64,
1130
        offset: i64,
1131
    ) -> Result<(Vec<TokenWithPins>, u32), sqlx::Error> {
1132
        // Total distinct tokens for this requestor
1133
        let total_row = sqlx::query(
1134
            r#"
1135
            SELECT COUNT(*) as count
1136
            FROM (
1137
                SELECT DISTINCT t.chain, t.contract_address, t.token_id
1138
                FROM tokens t
1139
                JOIN pins p ON p.token_id = t.id
1140
                JOIN backup_tasks bt ON bt.task_id = p.task_id
1141
                WHERE bt.requestor = $1
1142
            ) t
1143
            "#,
1144
        )
1145
        .bind(requestor)
×
1146
        .fetch_one(&self.pool)
×
1147
        .await?;
×
1148
        let total: u32 = (total_row.get::<i64, _>("count")).max(0) as u32;
×
1149

1150
        // Page of distinct tokens ordered by most recent pin time
1151
        let rows = sqlx::query(
1152
            r#"
1153
            SELECT t.chain, t.contract_address, t.token_id
1154
            FROM (
1155
                SELECT t.chain, t.contract_address, t.token_id, MAX(p.created_at) AS last_created
1156
                FROM tokens t
1157
                JOIN pins p ON p.token_id = t.id
1158
                JOIN backup_tasks bt ON bt.task_id = p.task_id
1159
                WHERE bt.requestor = $1
1160
                GROUP BY t.chain, t.contract_address, t.token_id
1161
            ) t
1162
            ORDER BY last_created DESC
1163
            LIMIT $2 OFFSET $3
1164
            "#,
1165
        )
1166
        .bind(requestor)
×
1167
        .bind(limit)
×
1168
        .bind(offset)
×
1169
        .fetch_all(&self.pool)
×
1170
        .await?;
×
1171

1172
        // For each token key, fetch pins (ordered by created_at desc)
1173
        let mut result: Vec<TokenWithPins> = Vec::new();
×
1174
        for r in rows {
×
1175
            let token_rows = sqlx::query(
1176
                r#"
1177
                SELECT t.chain, t.contract_address, t.token_id,
1178
                       p.cid, p.provider_type, p.provider_url, p.pin_status, p.created_at
1179
                FROM tokens t
1180
                JOIN pins p ON p.token_id = t.id
1181
                JOIN backup_tasks bt ON bt.task_id = p.task_id
1182
                WHERE bt.requestor = $1
1183
                  AND t.chain = $2
1184
                  AND t.contract_address = $3
1185
                  AND t.token_id = $4
1186
                ORDER BY p.created_at DESC
1187
                "#,
1188
            )
1189
            .bind(requestor)
×
1190
            .bind(r.get::<String, _>("chain"))
×
1191
            .bind(r.get::<String, _>("contract_address"))
×
1192
            .bind(r.get::<String, _>("token_id"))
×
1193
            .fetch_all(&self.pool)
×
1194
            .await?;
×
1195

1196
            let mut pins: Vec<PinInfo> = Vec::new();
×
1197
            let mut chain = String::new();
×
1198
            let mut contract_address = String::new();
×
1199
            let mut token_id = String::new();
×
1200
            for row in token_rows {
×
1201
                chain = row.get("chain");
×
1202
                contract_address = row.get("contract_address");
×
1203
                token_id = row.get("token_id");
×
1204
                let cid: String = row.get("cid");
×
1205
                let provider_type: String = row.get("provider_type");
×
1206
                let provider_url: String = row
×
1207
                    .try_get::<Option<String>, _>("provider_url")
1208
                    .ok()
1209
                    .flatten()
1210
                    .unwrap_or_default();
1211
                let status: String = row.get("pin_status");
×
1212
                let created_at: DateTime<Utc> = row.get("created_at");
×
1213
                pins.push(PinInfo {
×
1214
                    cid,
×
1215
                    provider_type,
×
1216
                    provider_url,
×
1217
                    status,
×
1218
                    created_at,
×
1219
                });
1220
            }
1221
            result.push(TokenWithPins {
×
1222
                chain,
×
1223
                contract_address,
×
1224
                token_id,
×
1225
                pins,
×
1226
            });
1227
        }
1228

1229
        Ok((result, total))
×
1230
    }
1231

1232
    /// Get a specific pinned token for a requestor
1233
    pub async fn get_pinned_token_by_requestor(
×
1234
        &self,
1235
        requestor: &str,
1236
        chain: &str,
1237
        contract_address: &str,
1238
        token_id: &str,
1239
    ) -> Result<Option<TokenWithPins>, sqlx::Error> {
1240
        let query = r#"
×
1241
            SELECT t.chain, t.contract_address, t.token_id,
×
1242
                   p.cid, p.provider_type, p.provider_url, p.pin_status, p.created_at
×
1243
            FROM tokens t
×
1244
            JOIN pins p ON p.token_id = t.id
×
1245
            JOIN backup_tasks bt ON bt.task_id = p.task_id
×
1246
            WHERE bt.requestor = $1
×
1247
              AND t.chain = $2
×
1248
              AND t.contract_address = $3
×
1249
              AND t.token_id = $4
×
1250
            ORDER BY p.created_at DESC
×
1251
        "#;
×
1252

1253
        let rows = sqlx::query(query)
×
1254
            .bind(requestor)
×
1255
            .bind(chain)
×
1256
            .bind(contract_address)
×
1257
            .bind(token_id)
×
1258
            .fetch_all(&self.pool)
×
1259
            .await?;
×
1260

1261
        if rows.is_empty() {
×
1262
            return Ok(None);
×
1263
        }
1264

1265
        let mut pins = Vec::new();
×
1266
        let mut token_chain = String::new();
×
1267
        let mut token_contract_address = String::new();
×
1268
        let mut token_token_id = String::new();
×
1269

1270
        for row in rows {
×
1271
            token_chain = row.get("chain");
×
1272
            token_contract_address = row.get("contract_address");
×
1273
            token_token_id = row.get("token_id");
×
1274
            let cid: String = row.get("cid");
×
1275
            let provider_type: String = row.get("provider_type");
×
1276
            // provider_url may be NULL for legacy rows; default to empty string for API stability
1277
            let provider_url: String = row
×
1278
                .try_get::<Option<String>, _>("provider_url")
1279
                .ok()
1280
                .flatten()
1281
                .unwrap_or_default();
1282
            let status: String = row.get("pin_status");
×
1283
            let created_at: DateTime<Utc> = row.get("created_at");
×
1284

1285
            pins.push(PinInfo {
×
1286
                cid,
×
1287
                provider_type,
×
1288
                provider_url,
×
1289
                status,
×
1290
                created_at,
×
1291
            });
1292
        }
1293

1294
        Ok(Some(TokenWithPins {
×
1295
            chain: token_chain,
×
1296
            contract_address: token_contract_address,
×
1297
            token_id: token_token_id,
×
1298
            pins,
×
1299
        }))
1300
    }
1301

1302
    /// Get all pins that are in 'queued' or 'pinning' status
1303
    /// This is used by the pin monitor to check for status updates
1304
    pub async fn get_active_pins(&self) -> Result<Vec<PinRow>, sqlx::Error> {
×
1305
        let rows = sqlx::query(
1306
            r#"
1307
            SELECT id, task_id, provider_type, provider_url, cid, request_id, pin_status, created_at
1308
            FROM pins
1309
            WHERE pin_status IN ('queued', 'pinning')
1310
            ORDER BY id
1311
            "#,
1312
        )
1313
        .fetch_all(&self.pool)
×
1314
        .await?;
×
1315

1316
        Ok(rows
×
1317
            .into_iter()
×
1318
            .map(|row| PinRow {
×
1319
                id: row.get("id"),
×
1320
                task_id: row.get("task_id"),
×
1321
                provider_type: row.get("provider_type"),
×
1322
                provider_url: row
×
1323
                    .try_get::<Option<String>, _>("provider_url")
×
1324
                    .ok()
×
1325
                    .flatten(),
×
1326
                cid: row.get("cid"),
×
1327
                request_id: row.get("request_id"),
×
1328
                pin_status: row.get("pin_status"),
×
1329
                created_at: row.get("created_at"),
×
1330
            })
1331
            .collect())
×
1332
    }
1333

1334
    /// Set backup fatal error for relevant subresources in a single SQL statement.
1335
    /// The update is based on the `storage_mode` value from the `backup_tasks` table for the given `task_id`:
1336
    /// - If storage_mode is 'archive' or 'full': updates archive_requests.status and archive_requests.fatal_error
1337
    /// - If storage_mode is 'ipfs' or 'full': updates pin_requests.status and pin_requests.fatal_error
1338
    pub async fn set_backup_error(
×
1339
        &self,
1340
        task_id: &str,
1341
        fatal_error: &str,
1342
    ) -> Result<(), sqlx::Error> {
1343
        let sql = r#"
×
1344
            WITH task_mode AS (
×
1345
                SELECT storage_mode FROM backup_tasks WHERE task_id = $1
×
1346
            ),
1347
            upd_archive AS (
×
1348
                UPDATE archive_requests ar
×
1349
                SET status = 'error', fatal_error = $2
×
1350
                WHERE ar.task_id = $1
×
1351
                  AND EXISTS (
×
1352
                      SELECT 1 FROM task_mode tm
×
1353
                      WHERE tm.storage_mode IN ('archive', 'full')
×
1354
                  )
1355
                RETURNING 1
×
1356
            ),
1357
            upd_pins AS (
×
1358
                UPDATE pin_requests pr
×
1359
                SET status = 'error', fatal_error = $2
×
1360
                WHERE pr.task_id = $1
×
1361
                  AND EXISTS (
×
1362
                      SELECT 1 FROM task_mode tm
×
1363
                      WHERE tm.storage_mode IN ('ipfs', 'full')
×
1364
                  )
1365
                RETURNING 1
×
1366
            )
1367
            SELECT COALESCE((SELECT COUNT(*) FROM upd_archive), 0) AS archive_updates,
×
1368
                   COALESCE((SELECT COUNT(*) FROM upd_pins), 0)     AS pin_updates
×
1369
        "#;
×
1370
        sqlx::query(sql)
×
1371
            .bind(task_id)
×
1372
            .bind(fatal_error)
×
1373
            .execute(&self.pool)
×
1374
            .await?;
×
1375
        Ok(())
×
1376
    }
1377

1378
    /// Update backup subresource statuses for the task based on its storage mode
1379
    /// - archive or full: updates archive_requests.status
1380
    /// - ipfs or full: updates pin_requests.status
1381
    pub async fn update_backup_statuses(
×
1382
        &self,
1383
        task_id: &str,
1384
        scope: &str,
1385
        archive_status: &str,
1386
        ipfs_status: &str,
1387
    ) -> Result<(), sqlx::Error> {
1388
        let sql = r#"
×
1389
            WITH upd_archive AS (
×
1390
                UPDATE archive_requests ar
×
1391
                SET status = $2
×
1392
                WHERE ar.task_id = $1
×
1393
                  AND ($4 IN ('archive', 'full'))
×
1394
                RETURNING 1
×
1395
            ),
1396
            upd_pins AS (
×
1397
                UPDATE pin_requests pr
×
1398
                SET status = $3
×
1399
                WHERE pr.task_id = $1
×
1400
                  AND ($4 IN ('ipfs', 'full'))
×
1401
                RETURNING 1
×
1402
            )
1403
            SELECT COALESCE((SELECT COUNT(*) FROM upd_archive), 0) AS archive_updates,
×
1404
                   COALESCE((SELECT COUNT(*) FROM upd_pins), 0)     AS pin_updates
×
1405
        "#;
×
1406
        sqlx::query(sql)
×
1407
            .bind(task_id)
×
1408
            .bind(archive_status)
×
1409
            .bind(ipfs_status)
×
1410
            .bind(scope)
×
1411
            .execute(&self.pool)
×
1412
            .await?;
×
1413
        Ok(())
×
1414
    }
1415

1416
    pub async fn update_pin_statuses(&self, updates: &[(i64, String)]) -> Result<(), sqlx::Error> {
×
1417
        if updates.is_empty() {
×
1418
            return Ok(());
×
1419
        }
1420

1421
        let mut tx = self.pool.begin().await?;
×
1422

1423
        for (id, status) in updates {
×
1424
            sqlx::query(
1425
                r#"
1426
                UPDATE pins
1427
                SET pin_status = $2
1428
                WHERE id = $1
1429
                "#,
1430
            )
1431
            .bind(id)
×
1432
            .bind(status)
×
1433
            .execute(&mut *tx)
×
1434
            .await?;
×
1435
        }
1436

1437
        tx.commit().await?;
×
1438
        Ok(())
×
1439
    }
1440

1441
    /// Ensure the missing subresource exists and upgrade the backup to full storage mode.
1442
    /// If `add_archive` is true, create/ensure archive_requests row with provided format/retention.
1443
    /// Otherwise, ensure pin_requests row exists. Always flips backup_tasks.storage_mode to 'full'.
1444
    pub async fn upgrade_backup_to_full(
×
1445
        &self,
1446
        task_id: &str,
1447
        add_archive: bool,
1448
        archive_format: Option<&str>,
1449
        retention_days: Option<u64>,
1450
    ) -> Result<(), sqlx::Error> {
1451
        let mut tx = self.pool.begin().await?;
×
1452

1453
        // Upgrade storage mode to full
1454
        sqlx::query(
1455
            r#"
1456
            UPDATE backup_tasks
1457
            SET storage_mode = 'full', updated_at = NOW()
1458
            WHERE task_id = $1
1459
            "#,
1460
        )
1461
        .bind(task_id)
1462
        .execute(&mut *tx)
1463
        .await?;
×
1464

1465
        if add_archive {
×
1466
            let fmt = archive_format.unwrap_or("zip");
×
1467
            if let Some(days) = retention_days {
×
1468
                sqlx::query(
1469
                    r#"
1470
                    INSERT INTO archive_requests (task_id, archive_format, expires_at, status)
1471
                    VALUES ($1, $2, NOW() + make_interval(days => $3::int), 'in_progress')
1472
                    ON CONFLICT (task_id) DO NOTHING
1473
                    "#,
1474
                )
1475
                .bind(task_id)
1476
                .bind(fmt)
1477
                .bind(days as i64)
1478
                .execute(&mut *tx)
1479
                .await?;
×
1480
            } else {
1481
                sqlx::query(
1482
                    r#"
1483
                    INSERT INTO archive_requests (task_id, archive_format, expires_at, status)
1484
                    VALUES ($1, $2, NULL, 'in_progress')
1485
                    ON CONFLICT (task_id) DO NOTHING
1486
                    "#,
1487
                )
1488
                .bind(task_id)
×
1489
                .bind(fmt)
×
1490
                .execute(&mut *tx)
×
1491
                .await?;
×
1492
            }
1493
        } else {
1494
            sqlx::query(
1495
                r#"
1496
                INSERT INTO pin_requests (task_id, status)
1497
                VALUES ($1, 'in_progress')
1498
                ON CONFLICT (task_id) DO NOTHING
1499
                "#,
1500
            )
1501
            .bind(task_id)
1502
            .execute(&mut *tx)
1503
            .await?;
×
1504
        }
1505

1506
        tx.commit().await?;
×
1507
        Ok(())
×
1508
    }
1509

1510
    /// Complete archive deletion:
1511
    /// - If current storage_mode is 'archive', delete the whole backup (finalize deletion)
1512
    /// - Else if current storage_mode is 'full', flip to 'ipfs' to reflect archive removed
1513
    pub async fn complete_archive_request_deletion(
×
1514
        &self,
1515
        task_id: &str,
1516
    ) -> Result<(), sqlx::Error> {
1517
        // Atomically: delete when archive-only; else if full, flip to ipfs
1518
        let sql = r#"
×
1519
            WITH del AS (
×
1520
                DELETE FROM backup_tasks
×
1521
                WHERE task_id = $1 AND storage_mode = 'archive'
×
1522
                RETURNING 1
×
1523
            ), upd AS (
×
1524
                UPDATE backup_tasks
×
1525
                SET storage_mode = 'ipfs', updated_at = NOW()
×
1526
                WHERE task_id = $1 AND storage_mode = 'full' AND NOT EXISTS (SELECT 1 FROM del)
×
1527
                RETURNING 1
×
1528
            )
1529
            SELECT COALESCE((SELECT COUNT(*) FROM del), 0) AS deleted,
×
1530
                   COALESCE((SELECT COUNT(*) FROM upd), 0) AS updated
×
1531
        "#;
×
1532
        let _ = sqlx::query(sql).bind(task_id).execute(&self.pool).await?;
×
1533
        Ok(())
×
1534
    }
1535

1536
    /// Complete IPFS pins deletion:
1537
    /// - If current storage_mode is 'ipfs', delete the whole backup (finalize deletion)
1538
    /// - Else if current storage_mode is 'full', flip to 'archive' to reflect pins removed
1539
    pub async fn complete_pin_request_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
1540
        // Atomically: delete when ipfs-only; else if full, flip to archive
1541
        let sql = r#"
×
1542
            WITH del AS (
×
1543
                DELETE FROM backup_tasks
×
1544
                WHERE task_id = $1 AND storage_mode = 'ipfs'
×
1545
                RETURNING 1
×
1546
            ), upd AS (
×
1547
                UPDATE backup_tasks
×
1548
                SET storage_mode = 'archive', updated_at = NOW()
×
1549
                WHERE task_id = $1 AND storage_mode = 'full' AND NOT EXISTS (SELECT 1 FROM del)
×
1550
                RETURNING 1
×
1551
            )
1552
            SELECT COALESCE((SELECT COUNT(*) FROM del), 0) AS deleted,
×
1553
                   COALESCE((SELECT COUNT(*) FROM upd), 0) AS updated
×
1554
        "#;
×
1555
        let _ = sqlx::query(sql).bind(task_id).execute(&self.pool).await?;
×
1556
        Ok(())
×
1557
    }
1558
}
1559

1560
// Implement the unified Database trait for the real Db struct
1561
#[async_trait::async_trait]
1562
impl Database for Db {
1563
    // Backup task operations
1564

1565
    async fn insert_backup_task(
1566
        &self,
1567
        task_id: &str,
1568
        requestor: &str,
1569
        nft_count: i32,
1570
        tokens: &serde_json::Value,
1571
        storage_mode: &str,
1572
        archive_format: Option<&str>,
1573
        retention_days: Option<u64>,
1574
    ) -> Result<(), sqlx::Error> {
1575
        Db::insert_backup_task(
1576
            self,
×
1577
            task_id,
×
1578
            requestor,
×
1579
            nft_count,
×
1580
            tokens,
×
1581
            storage_mode,
×
1582
            archive_format,
×
1583
            retention_days,
×
1584
        )
1585
        .await
×
1586
    }
1587

1588
    async fn get_backup_task(&self, task_id: &str) -> Result<Option<BackupTask>, sqlx::Error> {
×
1589
        Db::get_backup_task(self, task_id).await
×
1590
    }
1591

1592
    async fn get_backup_task_with_tokens(
1593
        &self,
1594
        task_id: &str,
1595
        limit: i64,
1596
        offset: i64,
1597
    ) -> Result<Option<(BackupTask, u32)>, sqlx::Error> {
1598
        Db::get_backup_task_with_tokens(self, task_id, limit, offset).await
×
1599
    }
1600

1601
    async fn delete_backup_task(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
1602
        Db::delete_backup_task(self, task_id).await
×
1603
    }
1604

1605
    async fn get_incomplete_backup_tasks(&self) -> Result<Vec<BackupTask>, sqlx::Error> {
×
1606
        Db::get_incomplete_backup_tasks(self).await
×
1607
    }
1608

1609
    async fn list_requestor_backup_tasks_paginated(
1610
        &self,
1611
        requestor: &str,
1612
        limit: i64,
1613
        offset: i64,
1614
    ) -> Result<(Vec<BackupTask>, u32), sqlx::Error> {
1615
        Db::list_requestor_backup_tasks_paginated(self, requestor, limit, offset).await
×
1616
    }
1617

1618
    async fn list_unprocessed_expired_backups(&self) -> Result<Vec<ExpiredBackup>, sqlx::Error> {
×
1619
        Db::list_unprocessed_expired_backups(self).await
×
1620
    }
1621

1622
    // Backup task status and error operations
1623
    async fn clear_backup_errors(&self, task_id: &str, scope: &str) -> Result<(), sqlx::Error> {
×
1624
        Db::clear_backup_errors(self, task_id, scope).await
×
1625
    }
1626

1627
    async fn set_backup_error(&self, task_id: &str, error: &str) -> Result<(), sqlx::Error> {
×
1628
        Db::set_backup_error(self, task_id, error).await
×
1629
    }
1630

1631
    async fn set_error_logs(
1632
        &self,
1633
        task_id: &str,
1634
        archive_error_log: Option<&str>,
1635
        ipfs_error_log: Option<&str>,
1636
    ) -> Result<(), sqlx::Error> {
1637
        Db::set_error_logs(self, task_id, archive_error_log, ipfs_error_log).await
×
1638
    }
1639

1640
    async fn update_archive_request_error_log(
1641
        &self,
1642
        task_id: &str,
1643
        error_log: &str,
1644
    ) -> Result<(), sqlx::Error> {
1645
        Db::update_archive_request_error_log(self, task_id, error_log).await
×
1646
    }
1647

1648
    async fn update_pin_request_error_log(
1649
        &self,
1650
        task_id: &str,
1651
        error_log: &str,
1652
    ) -> Result<(), sqlx::Error> {
1653
        Db::update_pin_request_error_log(self, task_id, error_log).await
×
1654
    }
1655

1656
    async fn set_archive_request_error(
1657
        &self,
1658
        task_id: &str,
1659
        fatal_error: &str,
1660
    ) -> Result<(), sqlx::Error> {
1661
        Db::set_archive_request_error(self, task_id, fatal_error).await
×
1662
    }
1663

1664
    async fn set_pin_request_error(
1665
        &self,
1666
        task_id: &str,
1667
        fatal_error: &str,
1668
    ) -> Result<(), sqlx::Error> {
1669
        Db::set_pin_request_error(self, task_id, fatal_error).await
×
1670
    }
1671

1672
    // Status update operations
1673
    async fn update_archive_request_status(
1674
        &self,
1675
        task_id: &str,
1676
        status: &str,
1677
    ) -> Result<(), sqlx::Error> {
1678
        Db::update_archive_request_status(self, task_id, status).await
×
1679
    }
1680

1681
    async fn update_pin_request_status(
1682
        &self,
1683
        task_id: &str,
1684
        status: &str,
1685
    ) -> Result<(), sqlx::Error> {
1686
        Db::update_pin_request_status(self, task_id, status).await
×
1687
    }
1688

1689
    async fn update_backup_statuses(
1690
        &self,
1691
        task_id: &str,
1692
        scope: &str,
1693
        archive_status: &str,
1694
        ipfs_status: &str,
1695
    ) -> Result<(), sqlx::Error> {
1696
        Db::update_backup_statuses(self, task_id, scope, archive_status, ipfs_status).await
×
1697
    }
1698

1699
    async fn update_archive_request_statuses(
1700
        &self,
1701
        task_ids: &[String],
1702
        status: &str,
1703
    ) -> Result<(), sqlx::Error> {
1704
        Db::update_archive_request_statuses(self, task_ids, status).await
×
1705
    }
1706

1707
    async fn upgrade_backup_to_full(
1708
        &self,
1709
        task_id: &str,
1710
        add_archive: bool,
1711
        archive_format: Option<&str>,
1712
        retention_days: Option<u64>,
1713
    ) -> Result<(), sqlx::Error> {
1714
        Db::upgrade_backup_to_full(self, task_id, add_archive, archive_format, retention_days).await
×
1715
    }
1716

1717
    // Deletion operations
1718
    async fn start_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
1719
        Db::start_deletion(self, task_id).await
×
1720
    }
1721

1722
    async fn start_archive_request_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
1723
        Db::start_archive_request_deletion(self, task_id).await
×
1724
    }
1725

NEW
1726
    async fn start_pin_request_deletions(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
NEW
1727
        Db::start_pin_request_deletions(self, task_id).await
×
1728
    }
1729

1730
    async fn complete_archive_request_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
1731
        Db::complete_archive_request_deletion(self, task_id).await
×
1732
    }
1733

1734
    async fn complete_pin_request_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
1735
        Db::complete_pin_request_deletion(self, task_id).await
×
1736
    }
1737

1738
    // Retry operations
1739
    async fn retry_backup(
1740
        &self,
1741
        task_id: &str,
1742
        scope: &str,
1743
        retention_days: u64,
1744
    ) -> Result<(), sqlx::Error> {
1745
        Db::retry_backup(self, task_id, scope, retention_days).await
×
1746
    }
1747

1748
    // Pin operations
1749
    async fn insert_pins_with_tokens(
1750
        &self,
1751
        task_id: &str,
1752
        token_pin_mappings: &[crate::TokenPinMapping],
1753
    ) -> Result<(), sqlx::Error> {
1754
        Db::insert_pins_with_tokens(self, task_id, token_pin_mappings).await
×
1755
    }
1756

1757
    async fn get_pins_by_task_id(&self, task_id: &str) -> Result<Vec<PinRow>, sqlx::Error> {
×
1758
        Db::get_pins_by_task_id(self, task_id).await
×
1759
    }
1760

1761
    async fn get_active_pins(&self) -> Result<Vec<PinRow>, sqlx::Error> {
×
1762
        Db::get_active_pins(self).await
×
1763
    }
1764

1765
    async fn update_pin_statuses(&self, updates: &[(i64, String)]) -> Result<(), sqlx::Error> {
×
1766
        Db::update_pin_statuses(self, updates).await
×
1767
    }
1768

1769
    // Pinned tokens operations
1770
    async fn get_pinned_tokens_by_requestor(
1771
        &self,
1772
        requestor: &str,
1773
        limit: i64,
1774
        offset: i64,
1775
    ) -> Result<(Vec<TokenWithPins>, u32), sqlx::Error> {
1776
        Db::get_pinned_tokens_by_requestor(self, requestor, limit, offset).await
×
1777
    }
1778

1779
    async fn get_pinned_token_by_requestor(
1780
        &self,
1781
        requestor: &str,
1782
        chain: &str,
1783
        contract_address: &str,
1784
        token_id: &str,
1785
    ) -> Result<Option<TokenWithPins>, sqlx::Error> {
1786
        Db::get_pinned_token_by_requestor(self, requestor, chain, contract_address, token_id).await
×
1787
    }
1788
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc