• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

0xmichalis / nftbk / 18590579446

17 Oct 2025 10:55AM UTC coverage: 36.423% (-0.4%) from 36.786%
18590579446

push

github

web-flow
chore: migrate tokens json blob to tokens table (#73)

* chore: migrate tokens json blob to tokens table

* chore: address copilot review

* Update src/server/handlers/handle_status.rs

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

38 of 174 new or added lines in 5 files covered. (21.84%)

3 existing lines in 2 files now uncovered.

1501 of 4121 relevant lines covered (36.42%)

6.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/server/database/mod.rs
1
use chrono::{DateTime, Utc};
2
use serde::{Deserialize, Serialize};
3
use sqlx::{postgres::PgPoolOptions, PgPool, Row};
4

5
use crate::server::database::r#trait::Database;
6

7
pub mod r#trait;
8

9
/// Combined view of backup_tasks + archive_requests
10
#[derive(Debug, Serialize, Deserialize, Clone, utoipa::ToSchema)]
11
#[schema(description = "Backup task information including metadata and status")]
12
pub struct BackupTask {
13
    /// Unique identifier for the backup task
14
    #[schema(example = "abc123def456")]
15
    pub task_id: String,
16
    /// When the backup task was created (ISO 8601 timestamp)
17
    #[schema(example = "2024-01-01T12:00:00Z")]
18
    pub created_at: DateTime<Utc>,
19
    /// When the backup task was last updated (ISO 8601 timestamp)
20
    #[schema(example = "2024-01-01T12:05:00Z")]
21
    pub updated_at: DateTime<Utc>,
22
    /// User who requested the backup
23
    #[schema(example = "user123")]
24
    pub requestor: String,
25
    /// Number of NFTs in this backup task
26
    #[schema(example = 42)]
27
    pub nft_count: i32,
28
    /// Token details (only included if include_tokens=true)
29
    pub tokens: serde_json::Value,
30
    /// Archive subresource status (in_progress, done, error, expired)
31
    pub archive_status: Option<String>,
32
    /// IPFS subresource status (in_progress, done, error)
33
    pub ipfs_status: Option<String>,
34
    /// Detailed archive error log if archive completed with some failures
35
    #[schema(example = "Failed to write archive checksum file")]
36
    pub archive_error_log: Option<String>,
37
    /// Detailed IPFS error log aggregated from pin requests
38
    #[schema(
39
        example = "Provider pinata failed: 401 Unauthorized\nProvider web3.storage failed: 429 Too Many Requests"
40
    )]
41
    pub ipfs_error_log: Option<String>,
42
    /// Archive subresource fatal error if backup failed completely at archive stage
43
    pub archive_fatal_error: Option<String>,
44
    /// IPFS subresource fatal error if backup failed completely at IPFS stage
45
    pub ipfs_fatal_error: Option<String>,
46
    /// Storage mode used for the backup (archive, ipfs, full)
47
    #[schema(example = "archive")]
48
    pub storage_mode: String,
49
    /// Archive format used for the backup (zip, tar.gz)
50
    #[schema(example = "zip")]
51
    pub archive_format: Option<String>,
52
    /// When the backup expires (if applicable, typically 7 days from creation)
53
    #[schema(example = "2024-01-08T12:00:00Z")]
54
    pub expires_at: Option<DateTime<Utc>>,
55
    /// When archive deletion was started (if applicable)
56
    #[schema(example = "2024-01-02T10:00:00Z")]
57
    pub archive_deleted_at: Option<DateTime<Utc>>,
58
    /// When IPFS pins deletion was started (if applicable)
59
    #[schema(example = "2024-01-02T10:00:00Z")]
60
    pub pins_deleted_at: Option<DateTime<Utc>>,
61
}
62

63
#[derive(Debug, Serialize, Deserialize, Clone, utoipa::ToSchema)]
64
#[schema(description = "IPFS pin information for a specific CID")]
65
pub struct PinInfo {
66
    /// Content Identifier (CID) of the pinned content
67
    #[schema(example = "QmYwAPJzv5CZsnA625s3Xf2nemtYgPpHdWEz79ojWnPbdG")]
68
    pub cid: String,
69
    /// IPFS provider type where the content is pinned
70
    #[schema(example = "pinata")]
71
    pub provider_type: String,
72
    /// IPFS provider URL where the content is pinned
73
    #[schema(example = "https://api.pinata.cloud")]
74
    pub provider_url: String,
75
    /// Pin status (pinned, pinning, failed, queued)
76
    #[schema(example = "pinned")]
77
    pub status: String,
78
    /// When the pin was created (ISO 8601 timestamp)
79
    #[schema(example = "2024-01-01T12:00:00Z")]
80
    pub created_at: DateTime<Utc>,
81
}
82

83
#[derive(Debug, Serialize, Deserialize, Clone, utoipa::ToSchema)]
84
#[schema(description = "Token information with associated pin requests")]
85
pub struct TokenWithPins {
86
    /// Blockchain identifier (e.g., ethereum, tezos)
87
    #[schema(example = "ethereum")]
88
    pub chain: String,
89
    /// NFT contract address
90
    #[schema(example = "0x1234567890123456789012345678901234567890")]
91
    pub contract_address: String,
92
    /// NFT token ID
93
    #[schema(example = "123")]
94
    pub token_id: String,
95
    /// List of IPFS pins for this token
96
    pub pins: Vec<PinInfo>,
97
}
98

99
#[derive(Debug, Serialize, Deserialize, Clone)]
100
pub struct PinRow {
101
    pub id: i64,
102
    pub task_id: String,
103
    pub provider_type: String,
104
    pub provider_url: Option<String>,
105
    pub cid: String,
106
    pub request_id: String,
107
    pub pin_status: String,
108
    pub created_at: DateTime<Utc>,
109
}
110

111
#[derive(Debug, Clone)]
112
pub struct ExpiredBackup {
113
    pub task_id: String,
114
    pub archive_format: String,
115
}
116

117
#[derive(Clone)]
118
pub struct Db {
119
    pub pool: PgPool,
120
}
121

122
impl Db {
123
    pub async fn new(database_url: &str, max_connections: u32) -> Self {
×
124
        let pool = PgPoolOptions::new()
×
125
            .max_connections(max_connections)
×
126
            .connect(database_url)
×
127
            .await
×
128
            .expect("Failed to connect to Postgres");
129
        // Health check: run a simple query
130
        sqlx::query("SELECT 1")
×
131
            .execute(&pool)
×
132
            .await
×
133
            .expect("Postgres connection is not healthy");
134
        tracing::info!("Postgres connection is healthy");
×
135
        Db { pool }
136
    }
137

138
    #[allow(clippy::too_many_arguments)]
139
    pub async fn insert_backup_task(
×
140
        &self,
141
        task_id: &str,
142
        requestor: &str,
143
        nft_count: i32,
144
        tokens: &serde_json::Value,
145
        storage_mode: &str,
146
        archive_format: Option<&str>,
147
        retention_days: Option<u64>,
148
    ) -> Result<(), sqlx::Error> {
149
        let mut tx = self.pool.begin().await?;
×
150

151
        // Insert into backup_tasks (tokens JSON removed; tokens are stored in tokens table)
152
        sqlx::query(
153
            r#"
154
            INSERT INTO backup_tasks (
155
                task_id, created_at, updated_at, requestor, nft_count, storage_mode
156
            ) VALUES (
157
                $1, NOW(), NOW(), $2, $3, $4
158
            )
159
            ON CONFLICT (task_id) DO UPDATE SET
160
                updated_at = NOW(),
161
                nft_count = EXCLUDED.nft_count,
162
                storage_mode = EXCLUDED.storage_mode
163
            "#,
164
        )
165
        .bind(task_id)
166
        .bind(requestor)
167
        .bind(nft_count)
168
        .bind(storage_mode)
169
        .execute(&mut *tx)
170
        .await?;
×
171

172
        // Replace tokens for this task with the provided list (idempotent)
173
        // Expecting JSON shape: Vec<crate::server::api::Tokens>
NEW
174
        let token_entries: Vec<crate::server::api::Tokens> =
×
175
            serde_json::from_value(tokens.clone()).unwrap_or_default();
176

NEW
177
        for entry in &token_entries {
×
NEW
178
            for token_str in &entry.tokens {
×
NEW
179
                if let Some((contract_address, token_id)) = token_str.split_once(':') {
×
180
                    sqlx::query(
181
                        r#"INSERT INTO tokens (task_id, chain, contract_address, token_id)
182
                           VALUES ($1, $2, $3, $4)
183
                           ON CONFLICT (task_id, chain, contract_address, token_id) DO NOTHING"#,
184
                    )
185
                    .bind(task_id)
186
                    .bind(&entry.chain)
187
                    .bind(contract_address)
188
                    .bind(token_id)
189
                    .execute(&mut *tx)
NEW
190
                    .await?;
×
191
                }
192
            }
193
        }
194

195
        // Insert into archive_requests if storage mode includes archive
196
        if storage_mode == "archive" || storage_mode == "full" {
×
197
            let archive_fmt = archive_format.unwrap_or("zip");
×
198

199
            if let Some(days) = retention_days {
×
200
                sqlx::query(
201
                    r#"
202
                    INSERT INTO archive_requests (task_id, archive_format, expires_at, status)
203
                    VALUES ($1, $2, NOW() + make_interval(days => $3::int), 'in_progress')
204
                    ON CONFLICT (task_id) DO UPDATE SET
205
                        archive_format = EXCLUDED.archive_format,
206
                        expires_at = EXCLUDED.expires_at
207
                    "#,
208
                )
209
                .bind(task_id)
210
                .bind(archive_fmt)
211
                .bind(days as i64)
212
                .execute(&mut *tx)
213
                .await?;
×
214
            } else {
215
                sqlx::query(
216
                    r#"
217
                    INSERT INTO archive_requests (task_id, archive_format, expires_at, status)
218
                    VALUES ($1, $2, NULL, 'in_progress')
219
                    ON CONFLICT (task_id) DO UPDATE SET
220
                        archive_format = EXCLUDED.archive_format,
221
                        expires_at = EXCLUDED.expires_at
222
                    "#,
223
                )
224
                .bind(task_id)
×
225
                .bind(archive_fmt)
×
226
                .execute(&mut *tx)
×
227
                .await?;
×
228
            }
229
        }
230

231
        // Insert into pin_requests if storage mode includes IPFS
232
        if storage_mode == "ipfs" || storage_mode == "full" {
×
233
            sqlx::query(
234
                r#"
235
                INSERT INTO pin_requests (task_id, status)
236
                VALUES ($1, 'in_progress')
237
                ON CONFLICT (task_id) DO UPDATE SET
238
                    status = EXCLUDED.status
239
                "#,
240
            )
241
            .bind(task_id)
242
            .execute(&mut *tx)
243
            .await?;
×
244
        }
245

246
        tx.commit().await?;
×
247
        Ok(())
×
248
    }
249

250
    pub async fn delete_backup_task(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
251
        // CASCADE will delete associated archive_requests row if it exists
252
        sqlx::query!("DELETE FROM backup_tasks WHERE task_id = $1", task_id)
×
253
            .execute(&self.pool)
×
254
            .await?;
×
255
        Ok(())
×
256
    }
257

258
    pub async fn set_error_logs(
×
259
        &self,
260
        task_id: &str,
261
        archive_error_log: Option<&str>,
262
        ipfs_error_log: Option<&str>,
263
    ) -> Result<(), sqlx::Error> {
264
        let mut tx = self.pool.begin().await?;
×
265
        if let Some(a) = archive_error_log {
×
266
            sqlx::query("UPDATE archive_requests SET error_log = $2 WHERE task_id = $1")
×
267
                .bind(task_id)
×
268
                .bind(a)
×
269
                .execute(&mut *tx)
×
270
                .await?;
×
271
        }
272
        if let Some(i) = ipfs_error_log {
×
273
            sqlx::query(
274
                r#"
275
                    UPDATE pin_requests
276
                    SET error_log = $2
277
                    WHERE task_id = $1
278
                    "#,
279
            )
280
            .bind(task_id)
×
281
            .bind(i)
×
282
            .execute(&mut *tx)
×
283
            .await?;
×
284
        }
285
        tx.commit().await?;
×
286
        Ok(())
×
287
    }
288

289
    pub async fn update_archive_request_error_log(
×
290
        &self,
291
        task_id: &str,
292
        error_log: &str,
293
    ) -> Result<(), sqlx::Error> {
294
        sqlx::query(
295
            r#"
296
            UPDATE archive_requests
297
            SET error_log = $2
298
            WHERE task_id = $1
299
            "#,
300
        )
301
        .bind(task_id)
×
302
        .bind(error_log)
×
303
        .execute(&self.pool)
×
304
        .await?;
×
305
        Ok(())
×
306
    }
307

308
    pub async fn update_pin_request_error_log(
×
309
        &self,
310
        task_id: &str,
311
        error_log: &str,
312
    ) -> Result<(), sqlx::Error> {
313
        sqlx::query(
314
            r#"
315
            UPDATE pin_requests
316
            SET error_log = $2
317
            WHERE task_id = $1
318
            "#,
319
        )
320
        .bind(task_id)
×
321
        .bind(error_log)
×
322
        .execute(&self.pool)
×
323
        .await?;
×
324
        Ok(())
×
325
    }
326

327
    pub async fn update_pin_request_status(
×
328
        &self,
329
        task_id: &str,
330
        status: &str,
331
    ) -> Result<(), sqlx::Error> {
332
        sqlx::query(
333
            r#"
334
            UPDATE pin_requests
335
            SET status = $2
336
            WHERE task_id = $1
337
            "#,
338
        )
339
        .bind(task_id)
×
340
        .bind(status)
×
341
        .execute(&self.pool)
×
342
        .await?;
×
343
        Ok(())
×
344
    }
345

346
    pub async fn update_archive_request_status(
×
347
        &self,
348
        task_id: &str,
349
        status: &str,
350
    ) -> Result<(), sqlx::Error> {
351
        sqlx::query(
352
            r#"
353
            UPDATE archive_requests
354
            SET status = $2
355
            WHERE task_id = $1
356
            "#,
357
        )
358
        .bind(task_id)
×
359
        .bind(status)
×
360
        .execute(&self.pool)
×
361
        .await?;
×
362
        Ok(())
×
363
    }
364

365
    pub async fn update_archive_request_statuses(
×
366
        &self,
367
        task_ids: &[String],
368
        status: &str,
369
    ) -> Result<(), sqlx::Error> {
370
        if task_ids.is_empty() {
×
371
            return Ok(());
×
372
        }
373

374
        // Use a transaction for atomicity
375
        let mut tx = self.pool.begin().await?;
×
376

377
        // Update each task_id individually with a prepared statement
378
        for task_id in task_ids {
×
379
            sqlx::query("UPDATE archive_requests SET status = $1 WHERE task_id = $2")
×
380
                .bind(status)
×
381
                .bind(task_id)
×
382
                .execute(&mut *tx)
×
383
                .await?;
×
384
        }
385

386
        tx.commit().await?;
×
387
        Ok(())
×
388
    }
389

390
    pub async fn retry_backup(
×
391
        &self,
392
        task_id: &str,
393
        scope: &str,
394
        retention_days: u64,
395
    ) -> Result<(), sqlx::Error> {
396
        let mut tx = self.pool.begin().await?;
×
397

398
        // Reset statuses per requested scope
399
        if scope == "archive" || scope == "full" {
×
400
            sqlx::query(
401
                r#"
402
                UPDATE archive_requests
403
                SET status = 'in_progress', fatal_error = NULL, error_log = NULL
404
                WHERE task_id = $1
405
                "#,
406
            )
407
            .bind(task_id)
×
408
            .execute(&mut *tx)
×
409
            .await?;
×
410
            sqlx::query(
411
                r#"
412
                UPDATE archive_requests
413
                SET expires_at = NOW() + make_interval(days => $2::int)
414
                WHERE task_id = $1
415
                "#,
416
            )
417
            .bind(task_id)
×
418
            .bind(retention_days as i64)
×
419
            .execute(&mut *tx)
×
420
            .await?;
×
421
        }
422
        if scope == "ipfs" || scope == "full" {
×
423
            sqlx::query(
424
                r#"
425
                UPDATE pin_requests
426
                SET status = 'in_progress', fatal_error = NULL, error_log = NULL
427
                WHERE task_id = $1
428
                "#,
429
            )
430
            .bind(task_id)
×
431
            .execute(&mut *tx)
×
432
            .await?;
×
433
        }
434

435
        tx.commit().await?;
×
436
        Ok(())
×
437
    }
438

439
    pub async fn clear_backup_errors(&self, task_id: &str, scope: &str) -> Result<(), sqlx::Error> {
×
440
        let mut tx = self.pool.begin().await?;
×
441
        // Clear archive errors if scope includes archive
442
        sqlx::query(
443
            r#"
444
            UPDATE archive_requests
445
            SET error_log = NULL, fatal_error = NULL
446
            WHERE task_id = $1 AND ($2 IN ('archive', 'full'))
447
            "#,
448
        )
449
        .bind(task_id)
×
450
        .bind(scope)
×
451
        .execute(&mut *tx)
×
452
        .await?;
×
453
        // Clear IPFS errors if scope includes ipfs
454
        sqlx::query(
455
            r#"
456
            UPDATE pin_requests
457
            SET error_log = NULL, fatal_error = NULL
458
            WHERE task_id = $1 AND ($2 IN ('ipfs', 'full'))
459
            "#,
460
        )
461
        .bind(task_id)
×
462
        .bind(scope)
×
463
        .execute(&mut *tx)
×
464
        .await?;
×
465
        tx.commit().await?;
×
466
        Ok(())
×
467
    }
468

469
    pub async fn set_archive_request_error(
×
470
        &self,
471
        task_id: &str,
472
        fatal_error: &str,
473
    ) -> Result<(), sqlx::Error> {
474
        sqlx::query(
475
            r#"
476
            UPDATE archive_requests
477
            SET status = 'error', fatal_error = $2
478
            WHERE task_id = $1
479
            "#,
480
        )
481
        .bind(task_id)
×
482
        .bind(fatal_error)
×
483
        .execute(&self.pool)
×
484
        .await?;
×
485
        Ok(())
×
486
    }
487

488
    pub async fn set_pin_request_error(
×
489
        &self,
490
        task_id: &str,
491
        fatal_error: &str,
492
    ) -> Result<(), sqlx::Error> {
493
        sqlx::query(
494
            r#"
495
            UPDATE pin_requests
496
            SET status = 'error', fatal_error = $2
497
            WHERE task_id = $1
498
            "#,
499
        )
500
        .bind(task_id)
×
501
        .bind(fatal_error)
×
502
        .execute(&self.pool)
×
503
        .await?;
×
504
        Ok(())
×
505
    }
506

507
    pub async fn start_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
508
        let mut tx = self.pool.begin().await?;
×
509

510
        // Touch parent row
511
        sqlx::query!(
×
512
            r#"UPDATE backup_tasks SET updated_at = NOW() WHERE task_id = $1"#,
513
            task_id
514
        )
515
        .execute(&mut *tx)
×
516
        .await?;
×
517

518
        // Mark archive subresource as being deleted
519
        sqlx::query!(
×
520
            r#"
521
            UPDATE archive_requests
522
            SET deleted_at = NOW()
523
            WHERE task_id = $1 AND deleted_at IS NULL
524
            "#,
525
            task_id
526
        )
527
        .execute(&mut *tx)
×
528
        .await?;
×
529

530
        // Mark IPFS subresource as being deleted
531
        sqlx::query!(
×
532
            r#"
533
            UPDATE pin_requests
534
            SET deleted_at = NOW()
535
            WHERE task_id = $1 AND deleted_at IS NULL
536
            "#,
537
            task_id
538
        )
539
        .execute(&mut *tx)
×
540
        .await?;
×
541

542
        tx.commit().await?;
×
543
        Ok(())
×
544
    }
545

546
    /// Mark archive as being deleted (similar to start_deletion but for archive subresource)
547
    pub async fn start_archive_request_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
548
        sqlx::query!(
×
549
            r#"
550
            UPDATE archive_requests 
551
            SET deleted_at = NOW() 
552
            WHERE task_id = $1 AND deleted_at IS NULL
553
            "#,
554
            task_id
555
        )
556
        .execute(&self.pool)
×
557
        .await?;
×
558
        Ok(())
×
559
    }
560

561
    /// Mark IPFS pins as being deleted (similar to start_deletion but for IPFS pins subresource)
562
    pub async fn start_pin_request_deletions(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
563
        sqlx::query!(
×
564
            r#"
565
            UPDATE pin_requests 
566
            SET deleted_at = NOW() 
567
            WHERE task_id = $1 AND deleted_at IS NULL
568
            "#,
569
            task_id
570
        )
571
        .execute(&self.pool)
×
572
        .await?;
×
573
        Ok(())
×
574
    }
575

576
    pub async fn get_backup_task(&self, task_id: &str) -> Result<Option<BackupTask>, sqlx::Error> {
×
577
        let row = sqlx::query(
578
            r#"
579
            SELECT 
580
                b.task_id, b.created_at, b.updated_at, b.requestor, b.nft_count,
581
                ar.status as archive_status, ar.fatal_error, b.storage_mode,
582
                ar.archive_format, ar.expires_at, ar.deleted_at as archive_deleted_at,
583
                ar.error_log as archive_error_log,
584
                pr.status as ipfs_status,
585
                pr.error_log as ipfs_error_log,
586
                pr.fatal_error as ipfs_fatal_error,
587
                pr.deleted_at as pins_deleted_at
588
            FROM backup_tasks b
589
            LEFT JOIN archive_requests ar ON b.task_id = ar.task_id
590
            LEFT JOIN pin_requests pr ON b.task_id = pr.task_id
591
            WHERE b.task_id = $1
592
            "#,
593
        )
594
        .bind(task_id)
×
595
        .fetch_optional(&self.pool)
×
596
        .await?;
×
597

NEW
598
        if let Some(row) = row {
×
599
            // Fetch tokens for this task from tokens table and aggregate by chain
600
            let token_rows = sqlx::query(
601
                r#"
602
                SELECT chain, contract_address, token_id
603
                FROM tokens
604
                WHERE task_id = $1
605
                ORDER BY chain, contract_address, token_id
606
                "#,
607
            )
608
            .bind(task_id)
609
            .fetch_all(&self.pool)
NEW
610
            .await?;
×
611

612
            use std::collections::BTreeMap;
613
            let mut by_chain: BTreeMap<String, Vec<String>> = BTreeMap::new();
NEW
614
            for r in token_rows {
×
615
                let chain: String = r.get("chain");
616
                let contract_address: String = r.get("contract_address");
617
                let token_id: String = r.get("token_id");
618
                by_chain
619
                    .entry(chain)
620
                    .or_default()
621
                    .push(format!("{}:{}", contract_address, token_id));
622
            }
623
            let tokens_json = serde_json::json!(by_chain
624
                .into_iter()
625
                .map(|(chain, toks)| serde_json::json!({
NEW
626
                    "chain": chain,
×
NEW
627
                    "tokens": toks,
×
628
                }))
629
                .collect::<Vec<_>>());
630

631
            Ok(Some(BackupTask {
632
                task_id: row.get("task_id"),
633
                created_at: row.get("created_at"),
634
                updated_at: row.get("updated_at"),
635
                requestor: row.get("requestor"),
636
                nft_count: row.get("nft_count"),
637
                tokens: tokens_json,
638
                archive_status: row
639
                    .try_get::<Option<String>, _>("archive_status")
640
                    .ok()
641
                    .flatten(),
642
                ipfs_status: row
643
                    .try_get::<Option<String>, _>("ipfs_status")
644
                    .ok()
645
                    .flatten(),
646
                archive_error_log: row.get("archive_error_log"),
647
                ipfs_error_log: row.get("ipfs_error_log"),
648
                archive_fatal_error: row.get("fatal_error"),
649
                ipfs_fatal_error: row
650
                    .try_get::<Option<String>, _>("ipfs_fatal_error")
651
                    .ok()
652
                    .flatten(),
653
                storage_mode: row.get("storage_mode"),
654
                archive_format: row.get("archive_format"),
655
                expires_at: row.get("expires_at"),
656
                archive_deleted_at: row.get("archive_deleted_at"),
657
                pins_deleted_at: row.get("pins_deleted_at"),
658
            }))
659
        } else {
NEW
660
            Ok(None)
×
661
        }
662
    }
663

664
    /// Fetch backup task plus a paginated slice of its tokens; returns (meta, total_token_count)
NEW
665
    pub async fn get_backup_task_with_tokens(
×
666
        &self,
667
        task_id: &str,
668
        limit: i64,
669
        offset: i64,
670
    ) -> Result<Option<(BackupTask, u32)>, sqlx::Error> {
671
        // Base metadata (same as get_backup_task)
672
        let row = sqlx::query(
673
            r#"
674
            SELECT 
675
                b.task_id, b.created_at, b.updated_at, b.requestor, b.nft_count,
676
                ar.status as archive_status, ar.fatal_error, b.storage_mode,
677
                ar.archive_format, ar.expires_at, ar.deleted_at as archive_deleted_at,
678
                ar.error_log as archive_error_log,
679
                pr.status as ipfs_status,
680
                pr.error_log as ipfs_error_log,
681
                pr.fatal_error as ipfs_fatal_error,
682
                pr.deleted_at as pins_deleted_at
683
            FROM backup_tasks b
684
            LEFT JOIN archive_requests ar ON b.task_id = ar.task_id
685
            LEFT JOIN pin_requests pr ON b.task_id = pr.task_id
686
            WHERE b.task_id = $1
687
            "#,
688
        )
NEW
689
        .bind(task_id)
×
NEW
690
        .fetch_optional(&self.pool)
×
NEW
691
        .await?;
×
692

NEW
693
        let Some(row) = row else { return Ok(None) };
×
694

695
        // Total tokens for pagination
NEW
696
        let total_row = sqlx::query!(
×
697
            r#"SELECT COUNT(*) as count FROM tokens WHERE task_id = $1"#,
698
            task_id
699
        )
NEW
700
        .fetch_one(&self.pool)
×
NEW
701
        .await?;
×
NEW
702
        let total: u32 = total_row.count.unwrap_or(0) as u32;
×
703

704
        // Page of tokens
705
        let token_rows = sqlx::query(
706
            r#"
707
            SELECT chain, contract_address, token_id
708
            FROM tokens
709
            WHERE task_id = $1
710
            ORDER BY chain, contract_address, token_id
711
            LIMIT $2 OFFSET $3
712
            "#,
713
        )
NEW
714
        .bind(task_id)
×
NEW
715
        .bind(limit)
×
NEW
716
        .bind(offset)
×
NEW
717
        .fetch_all(&self.pool)
×
NEW
718
        .await?;
×
719

720
        use std::collections::BTreeMap;
NEW
721
        let mut by_chain: BTreeMap<String, Vec<String>> = BTreeMap::new();
×
NEW
722
        for r in token_rows {
×
NEW
723
            let chain: String = r.get("chain");
×
NEW
724
            let contract_address: String = r.get("contract_address");
×
NEW
725
            let token_id: String = r.get("token_id");
×
NEW
726
            by_chain
×
NEW
727
                .entry(chain)
×
728
                .or_default()
NEW
729
                .push(format!("{}:{}", contract_address, token_id));
×
730
        }
NEW
731
        let tokens_json = serde_json::json!(by_chain
×
NEW
732
            .into_iter()
×
NEW
733
            .map(|(chain, toks)| serde_json::json!({ "chain": chain, "tokens": toks }))
×
NEW
734
            .collect::<Vec<_>>());
×
735

736
        let meta = BackupTask {
737
            task_id: row.get("task_id"),
×
738
            created_at: row.get("created_at"),
×
739
            updated_at: row.get("updated_at"),
×
740
            requestor: row.get("requestor"),
×
741
            nft_count: row.get("nft_count"),
×
742
            tokens: tokens_json,
743
            archive_status: row
×
744
                .try_get::<Option<String>, _>("archive_status")
745
                .ok()
746
                .flatten(),
747
            ipfs_status: row
×
748
                .try_get::<Option<String>, _>("ipfs_status")
749
                .ok()
750
                .flatten(),
751
            archive_error_log: row.get("archive_error_log"),
×
752
            ipfs_error_log: row.get("ipfs_error_log"),
×
753
            archive_fatal_error: row.get("fatal_error"),
×
754
            ipfs_fatal_error: row
×
755
                .try_get::<Option<String>, _>("ipfs_fatal_error")
756
                .ok()
757
                .flatten(),
758
            storage_mode: row.get("storage_mode"),
×
759
            archive_format: row.get("archive_format"),
×
760
            expires_at: row.get("expires_at"),
×
761
            archive_deleted_at: row.get("archive_deleted_at"),
×
762
            pins_deleted_at: row.get("pins_deleted_at"),
×
763
        };
764

NEW
765
        Ok(Some((meta, total)))
×
766
    }
767

768
    pub async fn list_requestor_backup_tasks_paginated(
×
769
        &self,
770
        requestor: &str,
771
        limit: i64,
772
        offset: i64,
773
    ) -> Result<(Vec<BackupTask>, u32), sqlx::Error> {
774
        // Total count
UNCOV
775
        let total_row = sqlx::query!(
×
776
            r#"SELECT COUNT(*) as count FROM backup_tasks b WHERE b.requestor = $1"#,
777
            requestor
778
        )
779
        .fetch_one(&self.pool)
×
780
        .await?;
×
NEW
781
        let total: u32 = total_row.count.unwrap_or(0) as u32;
×
782

783
        let rows = sqlx::query(
784
            r#"
785
            SELECT 
786
                b.task_id, b.created_at, b.updated_at, b.requestor, b.nft_count,
787
                ar.status as archive_status, ar.fatal_error, b.storage_mode,
788
                ar.archive_format, ar.expires_at, ar.deleted_at as archive_deleted_at,
789
                ar.error_log as archive_error_log,
790
                pr.status as ipfs_status,
791
                pr.error_log as ipfs_error_log,
792
                pr.fatal_error as ipfs_fatal_error,
793
                pr.deleted_at as pins_deleted_at
794
            FROM backup_tasks b
795
            LEFT JOIN archive_requests ar ON b.task_id = ar.task_id
796
            LEFT JOIN pin_requests pr ON b.task_id = pr.task_id
797
            WHERE b.requestor = $1
798
            ORDER BY b.created_at DESC
799
            LIMIT $2 OFFSET $3
800
            "#,
801
        )
NEW
802
        .bind(requestor)
×
NEW
803
        .bind(limit)
×
NEW
804
        .bind(offset)
×
NEW
805
        .fetch_all(&self.pool)
×
NEW
806
        .await?;
×
807

808
        let recs = rows
×
809
            .into_iter()
810
            .map(|row| {
×
NEW
811
                let task_id: String = row.get("task_id");
×
812

813
                BackupTask {
×
NEW
814
                    task_id,
×
815
                    created_at: row.get("created_at"),
×
816
                    updated_at: row.get("updated_at"),
×
817
                    requestor: row.get("requestor"),
×
818
                    nft_count: row.get("nft_count"),
×
819
                    // Client should use get_backup_task to get tokens so the tokens
820
                    // can be properly paginated.
NEW
821
                    tokens: serde_json::Value::Null,
×
822
                    archive_status: row
×
823
                        .try_get::<Option<String>, _>("archive_status")
×
824
                        .ok()
×
825
                        .flatten(),
×
826
                    ipfs_status: row
×
827
                        .try_get::<Option<String>, _>("ipfs_status")
×
828
                        .ok()
×
829
                        .flatten(),
×
830
                    archive_error_log: row.get("archive_error_log"),
×
831
                    ipfs_error_log: row.get("ipfs_error_log"),
×
832
                    archive_fatal_error: row.get("fatal_error"),
×
833
                    ipfs_fatal_error: row
×
834
                        .try_get::<Option<String>, _>("ipfs_fatal_error")
×
835
                        .ok()
×
836
                        .flatten(),
×
837
                    storage_mode: row.get("storage_mode"),
×
838
                    archive_format: row.get("archive_format"),
×
839
                    expires_at: row.get("expires_at"),
×
840
                    archive_deleted_at: row.get("archive_deleted_at"),
×
841
                    pins_deleted_at: row.get("pins_deleted_at"),
×
842
                }
843
            })
844
            .collect();
845

846
        Ok((recs, total))
×
847
    }
848

849
    pub async fn list_unprocessed_expired_backups(
×
850
        &self,
851
    ) -> Result<Vec<ExpiredBackup>, sqlx::Error> {
852
        let rows = sqlx::query(
853
            r#"
854
            SELECT b.task_id, ar.archive_format 
855
            FROM backup_tasks b
856
            JOIN archive_requests ar ON b.task_id = ar.task_id
857
            WHERE ar.expires_at IS NOT NULL AND ar.expires_at < NOW() AND ar.status != 'expired'
858
            "#,
859
        )
860
        .fetch_all(&self.pool)
×
861
        .await?;
×
862
        let recs = rows
×
863
            .into_iter()
864
            .map(|row| ExpiredBackup {
×
865
                task_id: row.get("task_id"),
×
866
                archive_format: row.get("archive_format"),
×
867
            })
868
            .collect();
869
        Ok(recs)
×
870
    }
871

872
    /// Retrieve all backup tasks that are in 'in_progress' status
873
    /// This is used to recover incomplete tasks on server restart
874
    pub async fn get_incomplete_backup_tasks(&self) -> Result<Vec<BackupTask>, sqlx::Error> {
×
875
        let rows = sqlx::query(
876
            r#"
877
            SELECT 
878
                b.task_id, b.created_at, b.updated_at, b.requestor, b.nft_count,
879
                ar.status as archive_status, ar.fatal_error, b.storage_mode,
880
                ar.archive_format, ar.expires_at, ar.deleted_at as archive_deleted_at,
881
                ar.error_log as archive_error_log,
882
                pr.status as ipfs_status,
883
                pr.error_log as ipfs_error_log,
884
                pr.deleted_at as pins_deleted_at
885
            FROM backup_tasks b
886
            LEFT JOIN archive_requests ar ON b.task_id = ar.task_id
887
            LEFT JOIN pin_requests pr ON b.task_id = pr.task_id
888
            WHERE (
889
                -- Archive-only mode: check archive status (record must exist and be in_progress)
890
                (b.storage_mode = 'archive' AND ar.status = 'in_progress')
891
                OR
892
                -- IPFS-only mode: check IPFS status (record must exist and be in_progress)
893
                (b.storage_mode = 'ipfs' AND pr.status = 'in_progress')
894
                OR
895
                -- Full mode: check both archive and IPFS status (task is incomplete if either is in_progress)
896
                (b.storage_mode = 'full' AND (ar.status = 'in_progress' OR pr.status = 'in_progress'))
897
            )
898
            ORDER BY b.created_at ASC
899
            "#,
900
        )
901
        .fetch_all(&self.pool)
×
902
        .await?;
×
903

904
        let recs = rows
×
905
            .into_iter()
906
            .map(|row| BackupTask {
×
907
                task_id: row.get("task_id"),
×
908
                created_at: row.get("created_at"),
×
909
                updated_at: row.get("updated_at"),
×
910
                requestor: row.get("requestor"),
×
911
                nft_count: row.get("nft_count"),
×
912
                // No need for tokens here, the pruner does not care
NEW
913
                tokens: serde_json::Value::Null,
×
914
                archive_status: row
×
915
                    .try_get::<Option<String>, _>("archive_status")
×
916
                    .ok()
×
917
                    .flatten(),
×
918
                ipfs_status: row
×
919
                    .try_get::<Option<String>, _>("ipfs_status")
×
920
                    .ok()
×
921
                    .flatten(),
×
922
                archive_error_log: row.get("archive_error_log"),
×
923
                ipfs_error_log: row.get("ipfs_error_log"),
×
924
                archive_fatal_error: row.get("fatal_error"),
×
925
                ipfs_fatal_error: None,
×
926
                storage_mode: row.get("storage_mode"),
×
927
                archive_format: row.get("archive_format"),
×
928
                expires_at: row.get("expires_at"),
×
929
                archive_deleted_at: row.get("archive_deleted_at"),
×
930
                pins_deleted_at: row.get("pins_deleted_at"),
×
931
            })
932
            .collect();
933

934
        Ok(recs)
×
935
    }
936

937
    /// Insert pins and their associated tokens in a single atomic transaction
938
    pub async fn insert_pins_with_tokens(
×
939
        &self,
940
        task_id: &str,
941
        token_pin_mappings: &[crate::TokenPinMapping],
942
    ) -> Result<(), sqlx::Error> {
943
        if token_pin_mappings.is_empty() {
×
944
            return Ok(());
×
945
        }
946

947
        // Collect all pin responses and prepare token data
948
        let mut all_pin_responses = Vec::new();
×
949
        let mut all_token_data = Vec::new(); // (index_in_pin_responses, chain, contract_address, token_id)
×
950

951
        for mapping in token_pin_mappings {
×
952
            for pin_response in &mapping.pin_responses {
×
953
                let index = all_pin_responses.len();
×
954
                all_pin_responses.push(pin_response);
×
955
                all_token_data.push((
×
956
                    index,
×
957
                    mapping.chain.clone(),
×
958
                    mapping.contract_address.clone(),
×
959
                    mapping.token_id.clone(),
×
960
                ));
961
            }
962
        }
963

964
        if all_pin_responses.is_empty() {
×
965
            return Ok(());
×
966
        }
967

968
        // Start a transaction for atomicity
969
        let mut tx = self.pool.begin().await?;
×
970

971
        // Insert pins one by one and collect generated IDs
972
        let mut pin_ids: Vec<i64> = Vec::new();
×
973
        for pin_response in &all_pin_responses {
×
974
            // Map status enum to lowercase string to satisfy CHECK constraint
975
            let status = match pin_response.status {
×
976
                crate::ipfs::PinResponseStatus::Queued => "queued",
×
977
                crate::ipfs::PinResponseStatus::Pinning => "pinning",
×
978
                crate::ipfs::PinResponseStatus::Pinned => "pinned",
×
979
                crate::ipfs::PinResponseStatus::Failed => "failed",
×
980
            };
981

982
            let row = sqlx::query(
983
                "INSERT INTO pins (task_id, provider_type, provider_url, cid, request_id, pin_status) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id"
984
            )
985
            .bind(task_id)
×
986
            .bind(&pin_response.provider_type)
×
987
            .bind(&pin_response.provider_url)
×
988
            .bind(&pin_response.cid)
×
989
            .bind(&pin_response.id)
×
990
            .bind(status)
×
991
            .fetch_one(&mut *tx)
×
992
            .await?;
×
993

994
            pin_ids.push(row.get("id"));
×
995
        }
996

997
        // Resolve token_ids and update pins rows with token_id
998
        for (index, chain, contract_address, token_id) in &all_token_data {
×
999
            // Ensure token row exists and fetch its id
1000
            let inserted = sqlx::query(
1001
                r#"INSERT INTO tokens (task_id, chain, contract_address, token_id)
1002
                   VALUES ($1, $2, $3, $4)
1003
                   ON CONFLICT (task_id, chain, contract_address, token_id) DO NOTHING
1004
                   RETURNING id"#,
1005
            )
NEW
1006
            .bind(task_id)
×
1007
            .bind(chain)
×
1008
            .bind(contract_address)
×
1009
            .bind(token_id)
×
NEW
1010
            .fetch_optional(&mut *tx)
×
1011
            .await?;
×
1012

NEW
1013
            let tok_id: i64 = if let Some(row) = inserted {
×
NEW
1014
                row.get("id")
×
1015
            } else {
NEW
1016
                sqlx::query("SELECT id FROM tokens WHERE task_id = $1 AND chain = $2 AND contract_address = $3 AND token_id = $4")
×
NEW
1017
                    .bind(task_id)
×
NEW
1018
                    .bind(chain)
×
NEW
1019
                    .bind(contract_address)
×
NEW
1020
                    .bind(token_id)
×
NEW
1021
                    .fetch_one(&mut *tx)
×
NEW
1022
                    .await?
×
1023
                    .get("id")
1024
            };
1025

NEW
1026
            sqlx::query("UPDATE pins SET token_id = $2 WHERE id = $1")
×
NEW
1027
                .bind(pin_ids[*index])
×
NEW
1028
                .bind(tok_id)
×
NEW
1029
                .execute(&mut *tx)
×
NEW
1030
                .await?;
×
1031
        }
1032

1033
        // Commit the transaction
1034
        tx.commit().await?;
×
1035
        Ok(())
×
1036
    }
1037

1038
    /// Get all pins for a specific backup task
1039
    pub async fn get_pins_by_task_id(&self, task_id: &str) -> Result<Vec<PinRow>, sqlx::Error> {
×
1040
        let rows = sqlx::query(
1041
            r#"
1042
            SELECT id, task_id, provider_type, provider_url, cid, request_id, pin_status, created_at
1043
            FROM pins
1044
            WHERE task_id = $1
1045
            ORDER BY id
1046
            "#,
1047
        )
1048
        .bind(task_id)
×
1049
        .fetch_all(&self.pool)
×
1050
        .await?;
×
1051

1052
        Ok(rows
×
1053
            .into_iter()
×
1054
            .map(|row| PinRow {
×
1055
                id: row.get("id"),
×
1056
                task_id: row.get("task_id"),
×
1057
                provider_type: row.get("provider_type"),
×
1058
                provider_url: row
×
1059
                    .try_get::<Option<String>, _>("provider_url")
×
1060
                    .ok()
×
1061
                    .flatten(),
×
1062
                cid: row.get("cid"),
×
1063
                request_id: row.get("request_id"),
×
1064
                pin_status: row.get("pin_status"),
×
1065
                created_at: row.get("created_at"),
×
1066
            })
1067
            .collect())
×
1068
    }
1069

1070
    /// Paginated pinned tokens grouped by (chain, contract_address, token_id)
1071
    pub async fn get_pinned_tokens_by_requestor(
×
1072
        &self,
1073
        requestor: &str,
1074
        limit: i64,
1075
        offset: i64,
1076
    ) -> Result<(Vec<TokenWithPins>, u32), sqlx::Error> {
1077
        // Total distinct tokens for this requestor
1078
        let total_row = sqlx::query(
1079
            r#"
1080
            SELECT COUNT(*) as count
1081
            FROM (
1082
                SELECT DISTINCT t.chain, t.contract_address, t.token_id
1083
                FROM tokens t
1084
                JOIN pins p ON p.token_id = t.id
1085
                JOIN backup_tasks bt ON bt.task_id = p.task_id
1086
                WHERE bt.requestor = $1
1087
            ) t
1088
            "#,
1089
        )
1090
        .bind(requestor)
×
1091
        .fetch_one(&self.pool)
×
1092
        .await?;
×
1093
        let total: u32 = (total_row.get::<i64, _>("count")).max(0) as u32;
×
1094

1095
        // Page of distinct tokens ordered by most recent pin time
1096
        let rows = sqlx::query(
1097
            r#"
1098
            SELECT t.chain, t.contract_address, t.token_id
1099
            FROM (
1100
                SELECT t.chain, t.contract_address, t.token_id, MAX(p.created_at) AS last_created
1101
                FROM tokens t
1102
                JOIN pins p ON p.token_id = t.id
1103
                JOIN backup_tasks bt ON bt.task_id = p.task_id
1104
                WHERE bt.requestor = $1
1105
                GROUP BY t.chain, t.contract_address, t.token_id
1106
            ) t
1107
            ORDER BY last_created DESC
1108
            LIMIT $2 OFFSET $3
1109
            "#,
1110
        )
1111
        .bind(requestor)
×
1112
        .bind(limit)
×
1113
        .bind(offset)
×
1114
        .fetch_all(&self.pool)
×
1115
        .await?;
×
1116

1117
        // For each token key, fetch pins (ordered by created_at desc)
1118
        let mut result: Vec<TokenWithPins> = Vec::new();
×
1119
        for r in rows {
×
1120
            let token_rows = sqlx::query(
1121
                r#"
1122
                SELECT t.chain, t.contract_address, t.token_id,
1123
                       p.cid, p.provider_type, p.provider_url, p.pin_status, p.created_at
1124
                FROM tokens t
1125
                JOIN pins p ON p.token_id = t.id
1126
                JOIN backup_tasks bt ON bt.task_id = p.task_id
1127
                WHERE bt.requestor = $1
1128
                  AND t.chain = $2
1129
                  AND t.contract_address = $3
1130
                  AND t.token_id = $4
1131
                ORDER BY p.created_at DESC
1132
                "#,
1133
            )
1134
            .bind(requestor)
×
1135
            .bind(r.get::<String, _>("chain"))
×
1136
            .bind(r.get::<String, _>("contract_address"))
×
1137
            .bind(r.get::<String, _>("token_id"))
×
1138
            .fetch_all(&self.pool)
×
1139
            .await?;
×
1140

1141
            let mut pins: Vec<PinInfo> = Vec::new();
×
1142
            let mut chain = String::new();
×
1143
            let mut contract_address = String::new();
×
1144
            let mut token_id = String::new();
×
1145
            for row in token_rows {
×
1146
                chain = row.get("chain");
×
1147
                contract_address = row.get("contract_address");
×
1148
                token_id = row.get("token_id");
×
1149
                let cid: String = row.get("cid");
×
1150
                let provider_type: String = row.get("provider_type");
×
1151
                let provider_url: String = row
×
1152
                    .try_get::<Option<String>, _>("provider_url")
1153
                    .ok()
1154
                    .flatten()
1155
                    .unwrap_or_default();
1156
                let status: String = row.get("pin_status");
×
1157
                let created_at: DateTime<Utc> = row.get("created_at");
×
1158
                pins.push(PinInfo {
×
1159
                    cid,
×
1160
                    provider_type,
×
1161
                    provider_url,
×
1162
                    status,
×
1163
                    created_at,
×
1164
                });
1165
            }
1166
            result.push(TokenWithPins {
×
1167
                chain,
×
1168
                contract_address,
×
1169
                token_id,
×
1170
                pins,
×
1171
            });
1172
        }
1173

1174
        Ok((result, total))
×
1175
    }
1176

1177
    /// Get a specific pinned token for a requestor
1178
    pub async fn get_pinned_token_by_requestor(
×
1179
        &self,
1180
        requestor: &str,
1181
        chain: &str,
1182
        contract_address: &str,
1183
        token_id: &str,
1184
    ) -> Result<Option<TokenWithPins>, sqlx::Error> {
1185
        let query = r#"
×
NEW
1186
            SELECT t.chain, t.contract_address, t.token_id,
×
NEW
1187
                   p.cid, p.provider_type, p.provider_url, p.pin_status, p.created_at
×
NEW
1188
            FROM tokens t
×
NEW
1189
            JOIN pins p ON p.token_id = t.id
×
1190
            JOIN backup_tasks bt ON bt.task_id = p.task_id
×
1191
            WHERE bt.requestor = $1
×
NEW
1192
              AND t.chain = $2
×
NEW
1193
              AND t.contract_address = $3
×
NEW
1194
              AND t.token_id = $4
×
NEW
1195
            ORDER BY p.created_at DESC
×
UNCOV
1196
        "#;
×
1197

1198
        let rows = sqlx::query(query)
×
1199
            .bind(requestor)
×
1200
            .bind(chain)
×
1201
            .bind(contract_address)
×
1202
            .bind(token_id)
×
1203
            .fetch_all(&self.pool)
×
1204
            .await?;
×
1205

1206
        if rows.is_empty() {
×
1207
            return Ok(None);
×
1208
        }
1209

1210
        let mut pins = Vec::new();
×
1211
        let mut token_chain = String::new();
×
1212
        let mut token_contract_address = String::new();
×
1213
        let mut token_token_id = String::new();
×
1214

1215
        for row in rows {
×
1216
            token_chain = row.get("chain");
×
1217
            token_contract_address = row.get("contract_address");
×
1218
            token_token_id = row.get("token_id");
×
1219
            let cid: String = row.get("cid");
×
1220
            let provider_type: String = row.get("provider_type");
×
1221
            // provider_url may be NULL for legacy rows; default to empty string for API stability
1222
            let provider_url: String = row
×
1223
                .try_get::<Option<String>, _>("provider_url")
1224
                .ok()
1225
                .flatten()
1226
                .unwrap_or_default();
1227
            let status: String = row.get("pin_status");
×
1228
            let created_at: DateTime<Utc> = row.get("created_at");
×
1229

1230
            pins.push(PinInfo {
×
1231
                cid,
×
1232
                provider_type,
×
1233
                provider_url,
×
1234
                status,
×
1235
                created_at,
×
1236
            });
1237
        }
1238

1239
        Ok(Some(TokenWithPins {
×
1240
            chain: token_chain,
×
1241
            contract_address: token_contract_address,
×
1242
            token_id: token_token_id,
×
1243
            pins,
×
1244
        }))
1245
    }
1246

1247
    /// Get all pins that are in 'queued' or 'pinning' status
1248
    /// This is used by the pin monitor to check for status updates
1249
    pub async fn get_active_pins(&self) -> Result<Vec<PinRow>, sqlx::Error> {
×
1250
        let rows = sqlx::query(
1251
            r#"
1252
            SELECT id, task_id, provider_type, provider_url, cid, request_id, pin_status, created_at
1253
            FROM pins
1254
            WHERE pin_status IN ('queued', 'pinning')
1255
            ORDER BY id
1256
            "#,
1257
        )
1258
        .fetch_all(&self.pool)
×
1259
        .await?;
×
1260

1261
        Ok(rows
×
1262
            .into_iter()
×
1263
            .map(|row| PinRow {
×
1264
                id: row.get("id"),
×
1265
                task_id: row.get("task_id"),
×
1266
                provider_type: row.get("provider_type"),
×
1267
                provider_url: row
×
1268
                    .try_get::<Option<String>, _>("provider_url")
×
1269
                    .ok()
×
1270
                    .flatten(),
×
1271
                cid: row.get("cid"),
×
1272
                request_id: row.get("request_id"),
×
1273
                pin_status: row.get("pin_status"),
×
1274
                created_at: row.get("created_at"),
×
1275
            })
1276
            .collect())
×
1277
    }
1278

1279
    /// Set backup fatal error for relevant subresources in a single SQL statement.
1280
    /// The update is based on the `storage_mode` value from the `backup_tasks` table for the given `task_id`:
1281
    /// - If storage_mode is 'archive' or 'full': updates archive_requests.status and archive_requests.fatal_error
1282
    /// - If storage_mode is 'ipfs' or 'full': updates pin_requests.status and pin_requests.fatal_error
1283
    pub async fn set_backup_error(
×
1284
        &self,
1285
        task_id: &str,
1286
        fatal_error: &str,
1287
    ) -> Result<(), sqlx::Error> {
1288
        let sql = r#"
×
1289
            WITH task_mode AS (
×
1290
                SELECT storage_mode FROM backup_tasks WHERE task_id = $1
×
1291
            ),
1292
            upd_archive AS (
×
1293
                UPDATE archive_requests ar
×
1294
                SET status = 'error', fatal_error = $2
×
1295
                WHERE ar.task_id = $1
×
1296
                  AND EXISTS (
×
1297
                      SELECT 1 FROM task_mode tm
×
1298
                      WHERE tm.storage_mode IN ('archive', 'full')
×
1299
                  )
1300
                RETURNING 1
×
1301
            ),
1302
            upd_pins AS (
×
1303
                UPDATE pin_requests pr
×
1304
                SET status = 'error', fatal_error = $2
×
1305
                WHERE pr.task_id = $1
×
1306
                  AND EXISTS (
×
1307
                      SELECT 1 FROM task_mode tm
×
1308
                      WHERE tm.storage_mode IN ('ipfs', 'full')
×
1309
                  )
1310
                RETURNING 1
×
1311
            )
1312
            SELECT COALESCE((SELECT COUNT(*) FROM upd_archive), 0) AS archive_updates,
×
1313
                   COALESCE((SELECT COUNT(*) FROM upd_pins), 0)     AS pin_updates
×
1314
        "#;
×
1315
        sqlx::query(sql)
×
1316
            .bind(task_id)
×
1317
            .bind(fatal_error)
×
1318
            .execute(&self.pool)
×
1319
            .await?;
×
1320
        Ok(())
×
1321
    }
1322

1323
    /// Update backup subresource statuses for the task based on its storage mode
1324
    /// - archive or full: updates archive_requests.status
1325
    /// - ipfs or full: updates pin_requests.status
1326
    pub async fn update_backup_statuses(
×
1327
        &self,
1328
        task_id: &str,
1329
        scope: &str,
1330
        archive_status: &str,
1331
        ipfs_status: &str,
1332
    ) -> Result<(), sqlx::Error> {
1333
        let sql = r#"
×
1334
            WITH upd_archive AS (
×
1335
                UPDATE archive_requests ar
×
1336
                SET status = $2
×
1337
                WHERE ar.task_id = $1
×
1338
                  AND ($4 IN ('archive', 'full'))
×
1339
                RETURNING 1
×
1340
            ),
1341
            upd_pins AS (
×
1342
                UPDATE pin_requests pr
×
1343
                SET status = $3
×
1344
                WHERE pr.task_id = $1
×
1345
                  AND ($4 IN ('ipfs', 'full'))
×
1346
                RETURNING 1
×
1347
            )
1348
            SELECT COALESCE((SELECT COUNT(*) FROM upd_archive), 0) AS archive_updates,
×
1349
                   COALESCE((SELECT COUNT(*) FROM upd_pins), 0)     AS pin_updates
×
1350
        "#;
×
1351
        sqlx::query(sql)
×
1352
            .bind(task_id)
×
1353
            .bind(archive_status)
×
1354
            .bind(ipfs_status)
×
1355
            .bind(scope)
×
1356
            .execute(&self.pool)
×
1357
            .await?;
×
1358
        Ok(())
×
1359
    }
1360

1361
    pub async fn update_pin_statuses(&self, updates: &[(i64, String)]) -> Result<(), sqlx::Error> {
×
1362
        if updates.is_empty() {
×
1363
            return Ok(());
×
1364
        }
1365

1366
        let mut tx = self.pool.begin().await?;
×
1367

1368
        for (id, status) in updates {
×
1369
            sqlx::query(
1370
                r#"
1371
                UPDATE pins
1372
                SET pin_status = $2
1373
                WHERE id = $1
1374
                "#,
1375
            )
1376
            .bind(id)
×
1377
            .bind(status)
×
1378
            .execute(&mut *tx)
×
1379
            .await?;
×
1380
        }
1381

1382
        tx.commit().await?;
×
1383
        Ok(())
×
1384
    }
1385

1386
    /// Ensure the missing subresource exists and upgrade the backup to full storage mode.
1387
    /// If `add_archive` is true, create/ensure archive_requests row with provided format/retention.
1388
    /// Otherwise, ensure pin_requests row exists. Always flips backup_tasks.storage_mode to 'full'.
1389
    pub async fn upgrade_backup_to_full(
×
1390
        &self,
1391
        task_id: &str,
1392
        add_archive: bool,
1393
        archive_format: Option<&str>,
1394
        retention_days: Option<u64>,
1395
    ) -> Result<(), sqlx::Error> {
1396
        let mut tx = self.pool.begin().await?;
×
1397

1398
        // Upgrade storage mode to full
1399
        sqlx::query(
1400
            r#"
1401
            UPDATE backup_tasks
1402
            SET storage_mode = 'full', updated_at = NOW()
1403
            WHERE task_id = $1
1404
            "#,
1405
        )
1406
        .bind(task_id)
1407
        .execute(&mut *tx)
1408
        .await?;
×
1409

1410
        if add_archive {
×
1411
            let fmt = archive_format.unwrap_or("zip");
×
1412
            if let Some(days) = retention_days {
×
1413
                sqlx::query(
1414
                    r#"
1415
                    INSERT INTO archive_requests (task_id, archive_format, expires_at, status)
1416
                    VALUES ($1, $2, NOW() + make_interval(days => $3::int), 'in_progress')
1417
                    ON CONFLICT (task_id) DO NOTHING
1418
                    "#,
1419
                )
1420
                .bind(task_id)
1421
                .bind(fmt)
1422
                .bind(days as i64)
1423
                .execute(&mut *tx)
1424
                .await?;
×
1425
            } else {
1426
                sqlx::query(
1427
                    r#"
1428
                    INSERT INTO archive_requests (task_id, archive_format, expires_at, status)
1429
                    VALUES ($1, $2, NULL, 'in_progress')
1430
                    ON CONFLICT (task_id) DO NOTHING
1431
                    "#,
1432
                )
1433
                .bind(task_id)
×
1434
                .bind(fmt)
×
1435
                .execute(&mut *tx)
×
1436
                .await?;
×
1437
            }
1438
        } else {
1439
            sqlx::query(
1440
                r#"
1441
                INSERT INTO pin_requests (task_id, status)
1442
                VALUES ($1, 'in_progress')
1443
                ON CONFLICT (task_id) DO NOTHING
1444
                "#,
1445
            )
1446
            .bind(task_id)
1447
            .execute(&mut *tx)
1448
            .await?;
×
1449
        }
1450

1451
        tx.commit().await?;
×
1452
        Ok(())
×
1453
    }
1454

1455
    /// Complete archive deletion:
1456
    /// - If current storage_mode is 'archive', delete the whole backup (finalize deletion)
1457
    /// - Else if current storage_mode is 'full', flip to 'ipfs' to reflect archive removed
1458
    pub async fn complete_archive_request_deletion(
×
1459
        &self,
1460
        task_id: &str,
1461
    ) -> Result<(), sqlx::Error> {
1462
        // Atomically: delete when archive-only; else if full, flip to ipfs
1463
        let sql = r#"
×
1464
            WITH del AS (
×
1465
                DELETE FROM backup_tasks
×
1466
                WHERE task_id = $1 AND storage_mode = 'archive'
×
1467
                RETURNING 1
×
1468
            ), upd AS (
×
1469
                UPDATE backup_tasks
×
1470
                SET storage_mode = 'ipfs', updated_at = NOW()
×
1471
                WHERE task_id = $1 AND storage_mode = 'full' AND NOT EXISTS (SELECT 1 FROM del)
×
1472
                RETURNING 1
×
1473
            )
1474
            SELECT COALESCE((SELECT COUNT(*) FROM del), 0) AS deleted,
×
1475
                   COALESCE((SELECT COUNT(*) FROM upd), 0) AS updated
×
1476
        "#;
×
1477
        let _ = sqlx::query(sql).bind(task_id).execute(&self.pool).await?;
×
1478
        Ok(())
×
1479
    }
1480

1481
    /// Complete IPFS pins deletion:
1482
    /// - If current storage_mode is 'ipfs', delete the whole backup (finalize deletion)
1483
    /// - Else if current storage_mode is 'full', flip to 'archive' to reflect pins removed
1484
    pub async fn complete_pin_request_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
1485
        // Atomically: delete when ipfs-only; else if full, flip to archive
1486
        let sql = r#"
×
1487
            WITH del AS (
×
1488
                DELETE FROM backup_tasks
×
1489
                WHERE task_id = $1 AND storage_mode = 'ipfs'
×
1490
                RETURNING 1
×
1491
            ), upd AS (
×
1492
                UPDATE backup_tasks
×
1493
                SET storage_mode = 'archive', updated_at = NOW()
×
1494
                WHERE task_id = $1 AND storage_mode = 'full' AND NOT EXISTS (SELECT 1 FROM del)
×
1495
                RETURNING 1
×
1496
            )
1497
            SELECT COALESCE((SELECT COUNT(*) FROM del), 0) AS deleted,
×
1498
                   COALESCE((SELECT COUNT(*) FROM upd), 0) AS updated
×
1499
        "#;
×
1500
        let _ = sqlx::query(sql).bind(task_id).execute(&self.pool).await?;
×
1501
        Ok(())
×
1502
    }
1503
}
1504

1505
// Implement the unified Database trait for the real Db struct
1506
#[async_trait::async_trait]
1507
impl Database for Db {
1508
    // Backup task operations
1509

1510
    async fn insert_backup_task(
1511
        &self,
1512
        task_id: &str,
1513
        requestor: &str,
1514
        nft_count: i32,
1515
        tokens: &serde_json::Value,
1516
        storage_mode: &str,
1517
        archive_format: Option<&str>,
1518
        retention_days: Option<u64>,
1519
    ) -> Result<(), sqlx::Error> {
1520
        Db::insert_backup_task(
1521
            self,
×
1522
            task_id,
×
1523
            requestor,
×
1524
            nft_count,
×
1525
            tokens,
×
1526
            storage_mode,
×
1527
            archive_format,
×
1528
            retention_days,
×
1529
        )
1530
        .await
×
1531
    }
1532

1533
    async fn get_backup_task(&self, task_id: &str) -> Result<Option<BackupTask>, sqlx::Error> {
×
1534
        Db::get_backup_task(self, task_id).await
×
1535
    }
1536

1537
    async fn get_backup_task_with_tokens(
1538
        &self,
1539
        task_id: &str,
1540
        limit: i64,
1541
        offset: i64,
1542
    ) -> Result<Option<(BackupTask, u32)>, sqlx::Error> {
NEW
1543
        Db::get_backup_task_with_tokens(self, task_id, limit, offset).await
×
1544
    }
1545

1546
    async fn delete_backup_task(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
1547
        Db::delete_backup_task(self, task_id).await
×
1548
    }
1549

1550
    async fn get_incomplete_backup_tasks(&self) -> Result<Vec<BackupTask>, sqlx::Error> {
×
1551
        Db::get_incomplete_backup_tasks(self).await
×
1552
    }
1553

1554
    async fn list_requestor_backup_tasks_paginated(
1555
        &self,
1556
        requestor: &str,
1557
        limit: i64,
1558
        offset: i64,
1559
    ) -> Result<(Vec<BackupTask>, u32), sqlx::Error> {
NEW
1560
        Db::list_requestor_backup_tasks_paginated(self, requestor, limit, offset).await
×
1561
    }
1562

1563
    async fn list_unprocessed_expired_backups(&self) -> Result<Vec<ExpiredBackup>, sqlx::Error> {
×
1564
        Db::list_unprocessed_expired_backups(self).await
×
1565
    }
1566

1567
    // Backup task status and error operations
1568
    async fn clear_backup_errors(&self, task_id: &str, scope: &str) -> Result<(), sqlx::Error> {
×
1569
        Db::clear_backup_errors(self, task_id, scope).await
×
1570
    }
1571

1572
    async fn set_backup_error(&self, task_id: &str, error: &str) -> Result<(), sqlx::Error> {
×
1573
        Db::set_backup_error(self, task_id, error).await
×
1574
    }
1575

1576
    async fn set_error_logs(
1577
        &self,
1578
        task_id: &str,
1579
        archive_error_log: Option<&str>,
1580
        ipfs_error_log: Option<&str>,
1581
    ) -> Result<(), sqlx::Error> {
1582
        Db::set_error_logs(self, task_id, archive_error_log, ipfs_error_log).await
×
1583
    }
1584

1585
    async fn update_archive_request_error_log(
1586
        &self,
1587
        task_id: &str,
1588
        error_log: &str,
1589
    ) -> Result<(), sqlx::Error> {
1590
        Db::update_archive_request_error_log(self, task_id, error_log).await
×
1591
    }
1592

1593
    async fn update_pin_request_error_log(
1594
        &self,
1595
        task_id: &str,
1596
        error_log: &str,
1597
    ) -> Result<(), sqlx::Error> {
1598
        Db::update_pin_request_error_log(self, task_id, error_log).await
×
1599
    }
1600

1601
    async fn set_archive_request_error(
1602
        &self,
1603
        task_id: &str,
1604
        fatal_error: &str,
1605
    ) -> Result<(), sqlx::Error> {
1606
        Db::set_archive_request_error(self, task_id, fatal_error).await
×
1607
    }
1608

1609
    async fn set_pin_request_error(
1610
        &self,
1611
        task_id: &str,
1612
        fatal_error: &str,
1613
    ) -> Result<(), sqlx::Error> {
1614
        Db::set_pin_request_error(self, task_id, fatal_error).await
×
1615
    }
1616

1617
    // Status update operations
1618
    async fn update_archive_request_status(
1619
        &self,
1620
        task_id: &str,
1621
        status: &str,
1622
    ) -> Result<(), sqlx::Error> {
1623
        Db::update_archive_request_status(self, task_id, status).await
×
1624
    }
1625

1626
    async fn update_pin_request_status(
1627
        &self,
1628
        task_id: &str,
1629
        status: &str,
1630
    ) -> Result<(), sqlx::Error> {
1631
        Db::update_pin_request_status(self, task_id, status).await
×
1632
    }
1633

1634
    async fn update_backup_statuses(
1635
        &self,
1636
        task_id: &str,
1637
        scope: &str,
1638
        archive_status: &str,
1639
        ipfs_status: &str,
1640
    ) -> Result<(), sqlx::Error> {
1641
        Db::update_backup_statuses(self, task_id, scope, archive_status, ipfs_status).await
×
1642
    }
1643

1644
    async fn update_archive_request_statuses(
1645
        &self,
1646
        task_ids: &[String],
1647
        status: &str,
1648
    ) -> Result<(), sqlx::Error> {
1649
        Db::update_archive_request_statuses(self, task_ids, status).await
×
1650
    }
1651

1652
    async fn upgrade_backup_to_full(
1653
        &self,
1654
        task_id: &str,
1655
        add_archive: bool,
1656
        archive_format: Option<&str>,
1657
        retention_days: Option<u64>,
1658
    ) -> Result<(), sqlx::Error> {
1659
        Db::upgrade_backup_to_full(self, task_id, add_archive, archive_format, retention_days).await
×
1660
    }
1661

1662
    // Deletion operations
1663
    async fn start_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
1664
        Db::start_deletion(self, task_id).await
×
1665
    }
1666

1667
    async fn start_archive_request_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
1668
        Db::start_archive_request_deletion(self, task_id).await
×
1669
    }
1670

1671
    async fn start_pin_request_deletions(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
1672
        Db::start_pin_request_deletions(self, task_id).await
×
1673
    }
1674

1675
    async fn complete_archive_request_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
1676
        Db::complete_archive_request_deletion(self, task_id).await
×
1677
    }
1678

1679
    async fn complete_pin_request_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
1680
        Db::complete_pin_request_deletion(self, task_id).await
×
1681
    }
1682

1683
    // Retry operations
1684
    async fn retry_backup(
1685
        &self,
1686
        task_id: &str,
1687
        scope: &str,
1688
        retention_days: u64,
1689
    ) -> Result<(), sqlx::Error> {
1690
        Db::retry_backup(self, task_id, scope, retention_days).await
×
1691
    }
1692

1693
    // Pin operations
1694
    async fn insert_pins_with_tokens(
1695
        &self,
1696
        task_id: &str,
1697
        token_pin_mappings: &[crate::TokenPinMapping],
1698
    ) -> Result<(), sqlx::Error> {
1699
        Db::insert_pins_with_tokens(self, task_id, token_pin_mappings).await
×
1700
    }
1701

1702
    async fn get_pins_by_task_id(&self, task_id: &str) -> Result<Vec<PinRow>, sqlx::Error> {
×
1703
        Db::get_pins_by_task_id(self, task_id).await
×
1704
    }
1705

1706
    async fn get_active_pins(&self) -> Result<Vec<PinRow>, sqlx::Error> {
×
1707
        Db::get_active_pins(self).await
×
1708
    }
1709

1710
    async fn update_pin_statuses(&self, updates: &[(i64, String)]) -> Result<(), sqlx::Error> {
×
1711
        Db::update_pin_statuses(self, updates).await
×
1712
    }
1713

1714
    // Pinned tokens operations
1715
    async fn get_pinned_tokens_by_requestor(
1716
        &self,
1717
        requestor: &str,
1718
        limit: i64,
1719
        offset: i64,
1720
    ) -> Result<(Vec<TokenWithPins>, u32), sqlx::Error> {
1721
        Db::get_pinned_tokens_by_requestor(self, requestor, limit, offset).await
×
1722
    }
1723

1724
    async fn get_pinned_token_by_requestor(
1725
        &self,
1726
        requestor: &str,
1727
        chain: &str,
1728
        contract_address: &str,
1729
        token_id: &str,
1730
    ) -> Result<Option<TokenWithPins>, sqlx::Error> {
1731
        Db::get_pinned_token_by_requestor(self, requestor, chain, contract_address, token_id).await
×
1732
    }
1733
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc