• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

0xmichalis / nftbk / 18593781965

17 Oct 2025 01:12PM UTC coverage: 35.841% (-0.2%) from 36.062%
18593781965

push

github

0xmichalis
fix: put tokens back in get_incomplete_backup_tasks

0 of 50 new or added lines in 1 file covered. (0.0%)

1 existing line in 1 file now uncovered.

1520 of 4241 relevant lines covered (35.84%)

5.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/server/database/mod.rs
1
use chrono::{DateTime, Utc};
2
use serde::{Deserialize, Serialize};
3
use sqlx::{postgres::PgPoolOptions, PgPool, Row};
4

5
use crate::server::database::r#trait::Database;
6

7
pub mod r#trait;
8

9
#[derive(Debug, Serialize, Deserialize, Clone)]
10
pub struct BackupTask {
11
    pub task_id: String,
12
    pub created_at: DateTime<Utc>,
13
    pub updated_at: DateTime<Utc>,
14
    pub requestor: String,
15
    pub nft_count: i32,
16
    pub tokens: serde_json::Value,
17
    pub archive_status: Option<String>,
18
    pub ipfs_status: Option<String>,
19
    pub archive_error_log: Option<String>,
20
    pub ipfs_error_log: Option<String>,
21
    pub archive_fatal_error: Option<String>,
22
    pub ipfs_fatal_error: Option<String>,
23
    pub storage_mode: String,
24
    pub archive_format: Option<String>,
25
    pub expires_at: Option<DateTime<Utc>>,
26
    pub archive_deleted_at: Option<DateTime<Utc>>,
27
    pub pins_deleted_at: Option<DateTime<Utc>>,
28
}
29

30
#[derive(Debug, Serialize, Deserialize, Clone, utoipa::ToSchema)]
31
#[schema(description = "IPFS pin information for a specific CID")]
32
pub struct PinInfo {
33
    /// Content Identifier (CID) of the pinned content
34
    #[schema(example = "QmYwAPJzv5CZsnA625s3Xf2nemtYgPpHdWEz79ojWnPbdG")]
35
    pub cid: String,
36
    /// IPFS provider type where the content is pinned
37
    #[schema(example = "pinata")]
38
    pub provider_type: String,
39
    /// IPFS provider URL where the content is pinned
40
    #[schema(example = "https://api.pinata.cloud")]
41
    pub provider_url: String,
42
    /// Pin status (pinned, pinning, failed, queued)
43
    #[schema(example = "pinned")]
44
    pub status: String,
45
    /// When the pin was created (ISO 8601 timestamp)
46
    #[schema(example = "2024-01-01T12:00:00Z")]
47
    pub created_at: DateTime<Utc>,
48
}
49

50
#[derive(Debug, Serialize, Deserialize, Clone, utoipa::ToSchema)]
51
#[schema(description = "Token information with associated pin requests")]
52
pub struct TokenWithPins {
53
    /// Blockchain identifier (e.g., ethereum, tezos)
54
    #[schema(example = "ethereum")]
55
    pub chain: String,
56
    /// NFT contract address
57
    #[schema(example = "0x1234567890123456789012345678901234567890")]
58
    pub contract_address: String,
59
    /// NFT token ID
60
    #[schema(example = "123")]
61
    pub token_id: String,
62
    /// List of IPFS pins for this token
63
    pub pins: Vec<PinInfo>,
64
}
65

66
#[derive(Debug, Serialize, Deserialize, Clone)]
67
pub struct PinRow {
68
    pub id: i64,
69
    pub task_id: String,
70
    pub provider_type: String,
71
    pub provider_url: Option<String>,
72
    pub cid: String,
73
    pub request_id: String,
74
    pub pin_status: String,
75
    pub created_at: DateTime<Utc>,
76
}
77

78
#[derive(Debug, Clone)]
79
pub struct ExpiredBackup {
80
    pub task_id: String,
81
    pub archive_format: String,
82
}
83

84
#[derive(Clone)]
85
pub struct Db {
86
    pub pool: PgPool,
87
}
88

89
impl Db {
90
    pub async fn new(database_url: &str, max_connections: u32) -> Self {
×
91
        let pool = PgPoolOptions::new()
×
92
            .max_connections(max_connections)
×
93
            .connect(database_url)
×
94
            .await
×
95
            .expect("Failed to connect to Postgres");
96
        // Health check: run a simple query
97
        sqlx::query("SELECT 1")
×
98
            .execute(&pool)
×
99
            .await
×
100
            .expect("Postgres connection is not healthy");
101
        tracing::info!("Postgres connection is healthy");
×
102
        Db { pool }
103
    }
104

105
    #[allow(clippy::too_many_arguments)]
106
    pub async fn insert_backup_task(
×
107
        &self,
108
        task_id: &str,
109
        requestor: &str,
110
        nft_count: i32,
111
        tokens: &serde_json::Value,
112
        storage_mode: &str,
113
        archive_format: Option<&str>,
114
        retention_days: Option<u64>,
115
    ) -> Result<(), sqlx::Error> {
116
        let mut tx = self.pool.begin().await?;
×
117

118
        // Insert into backup_tasks (tokens JSON removed; tokens are stored in tokens table)
119
        sqlx::query(
120
            r#"
121
            INSERT INTO backup_tasks (
122
                task_id, created_at, updated_at, requestor, nft_count, storage_mode
123
            ) VALUES (
124
                $1, NOW(), NOW(), $2, $3, $4
125
            )
126
            ON CONFLICT (task_id) DO UPDATE SET
127
                updated_at = NOW(),
128
                nft_count = EXCLUDED.nft_count,
129
                storage_mode = EXCLUDED.storage_mode
130
            "#,
131
        )
132
        .bind(task_id)
133
        .bind(requestor)
134
        .bind(nft_count)
135
        .bind(storage_mode)
136
        .execute(&mut *tx)
137
        .await?;
×
138

139
        // Replace tokens for this task with the provided list (idempotent)
140
        // Expecting JSON shape: Vec<crate::server::api::Tokens>
141
        let token_entries: Vec<crate::server::api::Tokens> =
×
142
            serde_json::from_value(tokens.clone()).unwrap_or_default();
143

144
        for entry in &token_entries {
×
145
            for token_str in &entry.tokens {
×
146
                if let Some((contract_address, token_id)) = token_str.split_once(':') {
×
147
                    sqlx::query(
148
                        r#"INSERT INTO tokens (task_id, chain, contract_address, token_id)
149
                           VALUES ($1, $2, $3, $4)
150
                           ON CONFLICT (task_id, chain, contract_address, token_id) DO NOTHING"#,
151
                    )
152
                    .bind(task_id)
153
                    .bind(&entry.chain)
154
                    .bind(contract_address)
155
                    .bind(token_id)
156
                    .execute(&mut *tx)
157
                    .await?;
×
158
                }
159
            }
160
        }
161

162
        // Insert into archive_requests if storage mode includes archive
163
        if storage_mode == "archive" || storage_mode == "full" {
×
164
            let archive_fmt = archive_format.unwrap_or("zip");
×
165

166
            if let Some(days) = retention_days {
×
167
                sqlx::query(
168
                    r#"
169
                    INSERT INTO archive_requests (task_id, archive_format, expires_at, status)
170
                    VALUES ($1, $2, NOW() + make_interval(days => $3::int), 'in_progress')
171
                    ON CONFLICT (task_id) DO UPDATE SET
172
                        archive_format = EXCLUDED.archive_format,
173
                        expires_at = EXCLUDED.expires_at
174
                    "#,
175
                )
176
                .bind(task_id)
177
                .bind(archive_fmt)
178
                .bind(days as i64)
179
                .execute(&mut *tx)
180
                .await?;
×
181
            } else {
182
                sqlx::query(
183
                    r#"
184
                    INSERT INTO archive_requests (task_id, archive_format, expires_at, status)
185
                    VALUES ($1, $2, NULL, 'in_progress')
186
                    ON CONFLICT (task_id) DO UPDATE SET
187
                        archive_format = EXCLUDED.archive_format,
188
                        expires_at = EXCLUDED.expires_at
189
                    "#,
190
                )
191
                .bind(task_id)
×
192
                .bind(archive_fmt)
×
193
                .execute(&mut *tx)
×
194
                .await?;
×
195
            }
196
        }
197

198
        // Insert into pin_requests if storage mode includes IPFS
199
        if storage_mode == "ipfs" || storage_mode == "full" {
×
200
            sqlx::query(
201
                r#"
202
                INSERT INTO pin_requests (task_id, status)
203
                VALUES ($1, 'in_progress')
204
                ON CONFLICT (task_id) DO UPDATE SET
205
                    status = EXCLUDED.status
206
                "#,
207
            )
208
            .bind(task_id)
209
            .execute(&mut *tx)
210
            .await?;
×
211
        }
212

213
        tx.commit().await?;
×
214
        Ok(())
×
215
    }
216

217
    pub async fn delete_backup_task(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
218
        // CASCADE will delete associated archive_requests row if it exists
219
        sqlx::query!("DELETE FROM backup_tasks WHERE task_id = $1", task_id)
×
220
            .execute(&self.pool)
×
221
            .await?;
×
222
        Ok(())
×
223
    }
224

225
    pub async fn set_error_logs(
×
226
        &self,
227
        task_id: &str,
228
        archive_error_log: Option<&str>,
229
        ipfs_error_log: Option<&str>,
230
    ) -> Result<(), sqlx::Error> {
231
        let mut tx = self.pool.begin().await?;
×
232
        if let Some(a) = archive_error_log {
×
233
            sqlx::query("UPDATE archive_requests SET error_log = $2 WHERE task_id = $1")
×
234
                .bind(task_id)
×
235
                .bind(a)
×
236
                .execute(&mut *tx)
×
237
                .await?;
×
238
        }
239
        if let Some(i) = ipfs_error_log {
×
240
            sqlx::query(
241
                r#"
242
                    UPDATE pin_requests
243
                    SET error_log = $2
244
                    WHERE task_id = $1
245
                    "#,
246
            )
247
            .bind(task_id)
×
248
            .bind(i)
×
249
            .execute(&mut *tx)
×
250
            .await?;
×
251
        }
252
        tx.commit().await?;
×
253
        Ok(())
×
254
    }
255

256
    pub async fn update_archive_request_error_log(
×
257
        &self,
258
        task_id: &str,
259
        error_log: &str,
260
    ) -> Result<(), sqlx::Error> {
261
        sqlx::query(
262
            r#"
263
            UPDATE archive_requests
264
            SET error_log = $2
265
            WHERE task_id = $1
266
            "#,
267
        )
268
        .bind(task_id)
×
269
        .bind(error_log)
×
270
        .execute(&self.pool)
×
271
        .await?;
×
272
        Ok(())
×
273
    }
274

275
    pub async fn update_pin_request_error_log(
×
276
        &self,
277
        task_id: &str,
278
        error_log: &str,
279
    ) -> Result<(), sqlx::Error> {
280
        sqlx::query(
281
            r#"
282
            UPDATE pin_requests
283
            SET error_log = $2
284
            WHERE task_id = $1
285
            "#,
286
        )
287
        .bind(task_id)
×
288
        .bind(error_log)
×
289
        .execute(&self.pool)
×
290
        .await?;
×
291
        Ok(())
×
292
    }
293

294
    pub async fn update_pin_request_status(
×
295
        &self,
296
        task_id: &str,
297
        status: &str,
298
    ) -> Result<(), sqlx::Error> {
299
        sqlx::query(
300
            r#"
301
            UPDATE pin_requests
302
            SET status = $2
303
            WHERE task_id = $1
304
            "#,
305
        )
306
        .bind(task_id)
×
307
        .bind(status)
×
308
        .execute(&self.pool)
×
309
        .await?;
×
310
        Ok(())
×
311
    }
312

313
    pub async fn update_archive_request_status(
×
314
        &self,
315
        task_id: &str,
316
        status: &str,
317
    ) -> Result<(), sqlx::Error> {
318
        sqlx::query(
319
            r#"
320
            UPDATE archive_requests
321
            SET status = $2
322
            WHERE task_id = $1
323
            "#,
324
        )
325
        .bind(task_id)
×
326
        .bind(status)
×
327
        .execute(&self.pool)
×
328
        .await?;
×
329
        Ok(())
×
330
    }
331

332
    pub async fn update_archive_request_statuses(
×
333
        &self,
334
        task_ids: &[String],
335
        status: &str,
336
    ) -> Result<(), sqlx::Error> {
337
        if task_ids.is_empty() {
×
338
            return Ok(());
×
339
        }
340

341
        // Use a transaction for atomicity
342
        let mut tx = self.pool.begin().await?;
×
343

344
        // Update each task_id individually with a prepared statement
345
        for task_id in task_ids {
×
346
            sqlx::query("UPDATE archive_requests SET status = $1 WHERE task_id = $2")
×
347
                .bind(status)
×
348
                .bind(task_id)
×
349
                .execute(&mut *tx)
×
350
                .await?;
×
351
        }
352

353
        tx.commit().await?;
×
354
        Ok(())
×
355
    }
356

357
    pub async fn retry_backup(
×
358
        &self,
359
        task_id: &str,
360
        scope: &str,
361
        retention_days: u64,
362
    ) -> Result<(), sqlx::Error> {
363
        let mut tx = self.pool.begin().await?;
×
364

365
        // Reset statuses per requested scope
366
        if scope == "archive" || scope == "full" {
×
367
            sqlx::query(
368
                r#"
369
                UPDATE archive_requests
370
                SET status = 'in_progress', fatal_error = NULL, error_log = NULL
371
                WHERE task_id = $1
372
                "#,
373
            )
374
            .bind(task_id)
×
375
            .execute(&mut *tx)
×
376
            .await?;
×
377
            sqlx::query(
378
                r#"
379
                UPDATE archive_requests
380
                SET expires_at = NOW() + make_interval(days => $2::int)
381
                WHERE task_id = $1
382
                "#,
383
            )
384
            .bind(task_id)
×
385
            .bind(retention_days as i64)
×
386
            .execute(&mut *tx)
×
387
            .await?;
×
388
        }
389
        if scope == "ipfs" || scope == "full" {
×
390
            sqlx::query(
391
                r#"
392
                UPDATE pin_requests
393
                SET status = 'in_progress', fatal_error = NULL, error_log = NULL
394
                WHERE task_id = $1
395
                "#,
396
            )
397
            .bind(task_id)
×
398
            .execute(&mut *tx)
×
399
            .await?;
×
400
        }
401

402
        tx.commit().await?;
×
403
        Ok(())
×
404
    }
405

406
    pub async fn clear_backup_errors(&self, task_id: &str, scope: &str) -> Result<(), sqlx::Error> {
×
407
        let mut tx = self.pool.begin().await?;
×
408
        // Clear archive errors if scope includes archive
409
        sqlx::query(
410
            r#"
411
            UPDATE archive_requests
412
            SET error_log = NULL, fatal_error = NULL
413
            WHERE task_id = $1 AND ($2 IN ('archive', 'full'))
414
            "#,
415
        )
416
        .bind(task_id)
×
417
        .bind(scope)
×
418
        .execute(&mut *tx)
×
419
        .await?;
×
420
        // Clear IPFS errors if scope includes ipfs
421
        sqlx::query(
422
            r#"
423
            UPDATE pin_requests
424
            SET error_log = NULL, fatal_error = NULL
425
            WHERE task_id = $1 AND ($2 IN ('ipfs', 'full'))
426
            "#,
427
        )
428
        .bind(task_id)
×
429
        .bind(scope)
×
430
        .execute(&mut *tx)
×
431
        .await?;
×
432
        tx.commit().await?;
×
433
        Ok(())
×
434
    }
435

436
    pub async fn set_archive_request_error(
×
437
        &self,
438
        task_id: &str,
439
        fatal_error: &str,
440
    ) -> Result<(), sqlx::Error> {
441
        sqlx::query(
442
            r#"
443
            UPDATE archive_requests
444
            SET status = 'error', fatal_error = $2
445
            WHERE task_id = $1
446
            "#,
447
        )
448
        .bind(task_id)
×
449
        .bind(fatal_error)
×
450
        .execute(&self.pool)
×
451
        .await?;
×
452
        Ok(())
×
453
    }
454

455
    pub async fn set_pin_request_error(
×
456
        &self,
457
        task_id: &str,
458
        fatal_error: &str,
459
    ) -> Result<(), sqlx::Error> {
460
        sqlx::query(
461
            r#"
462
            UPDATE pin_requests
463
            SET status = 'error', fatal_error = $2
464
            WHERE task_id = $1
465
            "#,
466
        )
467
        .bind(task_id)
×
468
        .bind(fatal_error)
×
469
        .execute(&self.pool)
×
470
        .await?;
×
471
        Ok(())
×
472
    }
473

474
    pub async fn start_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
475
        let mut tx = self.pool.begin().await?;
×
476

477
        // Touch parent row
478
        sqlx::query!(
×
479
            r#"UPDATE backup_tasks SET updated_at = NOW() WHERE task_id = $1"#,
480
            task_id
481
        )
482
        .execute(&mut *tx)
×
483
        .await?;
×
484

485
        // Mark archive subresource as being deleted
486
        sqlx::query!(
×
487
            r#"
488
            UPDATE archive_requests
489
            SET deleted_at = NOW()
490
            WHERE task_id = $1 AND deleted_at IS NULL
491
            "#,
492
            task_id
493
        )
494
        .execute(&mut *tx)
×
495
        .await?;
×
496

497
        // Mark IPFS subresource as being deleted
498
        sqlx::query!(
×
499
            r#"
500
            UPDATE pin_requests
501
            SET deleted_at = NOW()
502
            WHERE task_id = $1 AND deleted_at IS NULL
503
            "#,
504
            task_id
505
        )
506
        .execute(&mut *tx)
×
507
        .await?;
×
508

509
        tx.commit().await?;
×
510
        Ok(())
×
511
    }
512

513
    /// Mark archive as being deleted (similar to start_deletion but for archive subresource)
514
    pub async fn start_archive_request_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
515
        sqlx::query!(
×
516
            r#"
517
            UPDATE archive_requests 
518
            SET deleted_at = NOW() 
519
            WHERE task_id = $1 AND deleted_at IS NULL
520
            "#,
521
            task_id
522
        )
523
        .execute(&self.pool)
×
524
        .await?;
×
525
        Ok(())
×
526
    }
527

528
    /// Mark IPFS pins as being deleted (similar to start_deletion but for IPFS pins subresource)
529
    pub async fn start_pin_request_deletions(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
530
        sqlx::query!(
×
531
            r#"
532
            UPDATE pin_requests 
533
            SET deleted_at = NOW() 
534
            WHERE task_id = $1 AND deleted_at IS NULL
535
            "#,
536
            task_id
537
        )
538
        .execute(&self.pool)
×
539
        .await?;
×
540
        Ok(())
×
541
    }
542

543
    pub async fn get_backup_task(&self, task_id: &str) -> Result<Option<BackupTask>, sqlx::Error> {
×
544
        let row = sqlx::query(
545
            r#"
546
            SELECT 
547
                b.task_id, b.created_at, b.updated_at, b.requestor, b.nft_count,
548
                ar.status as archive_status, ar.fatal_error, b.storage_mode,
549
                ar.archive_format, ar.expires_at, ar.deleted_at as archive_deleted_at,
550
                ar.error_log as archive_error_log,
551
                pr.status as ipfs_status,
552
                pr.error_log as ipfs_error_log,
553
                pr.fatal_error as ipfs_fatal_error,
554
                pr.deleted_at as pins_deleted_at
555
            FROM backup_tasks b
556
            LEFT JOIN archive_requests ar ON b.task_id = ar.task_id
557
            LEFT JOIN pin_requests pr ON b.task_id = pr.task_id
558
            WHERE b.task_id = $1
559
            "#,
560
        )
561
        .bind(task_id)
×
562
        .fetch_optional(&self.pool)
×
563
        .await?;
×
564

565
        if let Some(row) = row {
×
566
            // Fetch tokens for this task from tokens table and aggregate by chain
567
            let token_rows = sqlx::query(
568
                r#"
569
                SELECT chain, contract_address, token_id
570
                FROM tokens
571
                WHERE task_id = $1
572
                ORDER BY chain, contract_address, token_id
573
                "#,
574
            )
575
            .bind(task_id)
576
            .fetch_all(&self.pool)
577
            .await?;
×
578

579
            use std::collections::BTreeMap;
580
            let mut by_chain: BTreeMap<String, Vec<String>> = BTreeMap::new();
581
            for r in token_rows {
×
582
                let chain: String = r.get("chain");
583
                let contract_address: String = r.get("contract_address");
584
                let token_id: String = r.get("token_id");
585
                by_chain
586
                    .entry(chain)
587
                    .or_default()
588
                    .push(format!("{}:{}", contract_address, token_id));
589
            }
590
            let tokens_json = serde_json::json!(by_chain
591
                .into_iter()
592
                .map(|(chain, toks)| serde_json::json!({
593
                    "chain": chain,
×
594
                    "tokens": toks,
×
595
                }))
596
                .collect::<Vec<_>>());
597

598
            Ok(Some(BackupTask {
599
                task_id: row.get("task_id"),
600
                created_at: row.get("created_at"),
601
                updated_at: row.get("updated_at"),
602
                requestor: row.get("requestor"),
603
                nft_count: row.get("nft_count"),
604
                tokens: tokens_json,
605
                archive_status: row
606
                    .try_get::<Option<String>, _>("archive_status")
607
                    .ok()
608
                    .flatten(),
609
                ipfs_status: row
610
                    .try_get::<Option<String>, _>("ipfs_status")
611
                    .ok()
612
                    .flatten(),
613
                archive_error_log: row.get("archive_error_log"),
614
                ipfs_error_log: row.get("ipfs_error_log"),
615
                archive_fatal_error: row.get("fatal_error"),
616
                ipfs_fatal_error: row
617
                    .try_get::<Option<String>, _>("ipfs_fatal_error")
618
                    .ok()
619
                    .flatten(),
620
                storage_mode: row.get("storage_mode"),
621
                archive_format: row.get("archive_format"),
622
                expires_at: row.get("expires_at"),
623
                archive_deleted_at: row.get("archive_deleted_at"),
624
                pins_deleted_at: row.get("pins_deleted_at"),
625
            }))
626
        } else {
627
            Ok(None)
×
628
        }
629
    }
630

631
    /// Fetch backup task plus a paginated slice of its tokens; returns (meta, total_token_count)
632
    pub async fn get_backup_task_with_tokens(
×
633
        &self,
634
        task_id: &str,
635
        limit: i64,
636
        offset: i64,
637
    ) -> Result<Option<(BackupTask, u32)>, sqlx::Error> {
638
        // Base metadata (same as get_backup_task)
639
        let row = sqlx::query(
640
            r#"
641
            SELECT 
642
                b.task_id, b.created_at, b.updated_at, b.requestor, b.nft_count,
643
                ar.status as archive_status, ar.fatal_error, b.storage_mode,
644
                ar.archive_format, ar.expires_at, ar.deleted_at as archive_deleted_at,
645
                ar.error_log as archive_error_log,
646
                pr.status as ipfs_status,
647
                pr.error_log as ipfs_error_log,
648
                pr.fatal_error as ipfs_fatal_error,
649
                pr.deleted_at as pins_deleted_at
650
            FROM backup_tasks b
651
            LEFT JOIN archive_requests ar ON b.task_id = ar.task_id
652
            LEFT JOIN pin_requests pr ON b.task_id = pr.task_id
653
            WHERE b.task_id = $1
654
            "#,
655
        )
656
        .bind(task_id)
×
657
        .fetch_optional(&self.pool)
×
658
        .await?;
×
659

660
        let Some(row) = row else { return Ok(None) };
×
661

662
        // Total tokens for pagination
663
        let total_row = sqlx::query!(
×
664
            r#"SELECT COUNT(*) as count FROM tokens WHERE task_id = $1"#,
665
            task_id
666
        )
667
        .fetch_one(&self.pool)
×
668
        .await?;
×
669
        let total: u32 = total_row.count.unwrap_or(0) as u32;
×
670

671
        // Page of tokens
672
        let token_rows = sqlx::query(
673
            r#"
674
            SELECT chain, contract_address, token_id
675
            FROM tokens
676
            WHERE task_id = $1
677
            ORDER BY chain, contract_address, token_id
678
            LIMIT $2 OFFSET $3
679
            "#,
680
        )
681
        .bind(task_id)
×
682
        .bind(limit)
×
683
        .bind(offset)
×
684
        .fetch_all(&self.pool)
×
685
        .await?;
×
686

687
        use std::collections::BTreeMap;
688
        let mut by_chain: BTreeMap<String, Vec<String>> = BTreeMap::new();
×
689
        for r in token_rows {
×
690
            let chain: String = r.get("chain");
×
691
            let contract_address: String = r.get("contract_address");
×
692
            let token_id: String = r.get("token_id");
×
693
            by_chain
×
694
                .entry(chain)
×
695
                .or_default()
696
                .push(format!("{}:{}", contract_address, token_id));
×
697
        }
698
        let tokens_json = serde_json::json!(by_chain
×
699
            .into_iter()
×
700
            .map(|(chain, toks)| serde_json::json!({ "chain": chain, "tokens": toks }))
×
701
            .collect::<Vec<_>>());
×
702

703
        let meta = BackupTask {
704
            task_id: row.get("task_id"),
×
705
            created_at: row.get("created_at"),
×
706
            updated_at: row.get("updated_at"),
×
707
            requestor: row.get("requestor"),
×
708
            nft_count: row.get("nft_count"),
×
709
            tokens: tokens_json,
710
            archive_status: row
×
711
                .try_get::<Option<String>, _>("archive_status")
712
                .ok()
713
                .flatten(),
714
            ipfs_status: row
×
715
                .try_get::<Option<String>, _>("ipfs_status")
716
                .ok()
717
                .flatten(),
718
            archive_error_log: row.get("archive_error_log"),
×
719
            ipfs_error_log: row.get("ipfs_error_log"),
×
720
            archive_fatal_error: row.get("fatal_error"),
×
721
            ipfs_fatal_error: row
×
722
                .try_get::<Option<String>, _>("ipfs_fatal_error")
723
                .ok()
724
                .flatten(),
725
            storage_mode: row.get("storage_mode"),
×
726
            archive_format: row.get("archive_format"),
×
727
            expires_at: row.get("expires_at"),
×
728
            archive_deleted_at: row.get("archive_deleted_at"),
×
729
            pins_deleted_at: row.get("pins_deleted_at"),
×
730
        };
731

732
        Ok(Some((meta, total)))
×
733
    }
734

735
    pub async fn list_requestor_backup_tasks_paginated(
×
736
        &self,
737
        requestor: &str,
738
        limit: i64,
739
        offset: i64,
740
    ) -> Result<(Vec<BackupTask>, u32), sqlx::Error> {
741
        // Total count
742
        let total_row = sqlx::query!(
×
743
            r#"SELECT COUNT(*) as count FROM backup_tasks b WHERE b.requestor = $1"#,
744
            requestor
745
        )
746
        .fetch_one(&self.pool)
×
747
        .await?;
×
748
        let total: u32 = total_row.count.unwrap_or(0) as u32;
×
749

750
        let rows = sqlx::query(
751
            r#"
752
            SELECT 
753
                b.task_id, b.created_at, b.updated_at, b.requestor, b.nft_count,
754
                ar.status as archive_status, ar.fatal_error, b.storage_mode,
755
                ar.archive_format, ar.expires_at, ar.deleted_at as archive_deleted_at,
756
                ar.error_log as archive_error_log,
757
                pr.status as ipfs_status,
758
                pr.error_log as ipfs_error_log,
759
                pr.fatal_error as ipfs_fatal_error,
760
                pr.deleted_at as pins_deleted_at
761
            FROM backup_tasks b
762
            LEFT JOIN archive_requests ar ON b.task_id = ar.task_id
763
            LEFT JOIN pin_requests pr ON b.task_id = pr.task_id
764
            WHERE b.requestor = $1
765
            ORDER BY b.created_at DESC
766
            LIMIT $2 OFFSET $3
767
            "#,
768
        )
769
        .bind(requestor)
×
770
        .bind(limit)
×
771
        .bind(offset)
×
772
        .fetch_all(&self.pool)
×
773
        .await?;
×
774

775
        let recs = rows
×
776
            .into_iter()
777
            .map(|row| {
×
778
                let task_id: String = row.get("task_id");
×
779

780
                BackupTask {
×
781
                    task_id,
×
782
                    created_at: row.get("created_at"),
×
783
                    updated_at: row.get("updated_at"),
×
784
                    requestor: row.get("requestor"),
×
785
                    nft_count: row.get("nft_count"),
×
786
                    // Client should use get_backup_task to get tokens so the tokens
787
                    // can be properly paginated.
788
                    tokens: serde_json::Value::Null,
×
789
                    archive_status: row
×
790
                        .try_get::<Option<String>, _>("archive_status")
×
791
                        .ok()
×
792
                        .flatten(),
×
793
                    ipfs_status: row
×
794
                        .try_get::<Option<String>, _>("ipfs_status")
×
795
                        .ok()
×
796
                        .flatten(),
×
797
                    archive_error_log: row.get("archive_error_log"),
×
798
                    ipfs_error_log: row.get("ipfs_error_log"),
×
799
                    archive_fatal_error: row.get("fatal_error"),
×
800
                    ipfs_fatal_error: row
×
801
                        .try_get::<Option<String>, _>("ipfs_fatal_error")
×
802
                        .ok()
×
803
                        .flatten(),
×
804
                    storage_mode: row.get("storage_mode"),
×
805
                    archive_format: row.get("archive_format"),
×
806
                    expires_at: row.get("expires_at"),
×
807
                    archive_deleted_at: row.get("archive_deleted_at"),
×
808
                    pins_deleted_at: row.get("pins_deleted_at"),
×
809
                }
810
            })
811
            .collect();
812

813
        Ok((recs, total))
×
814
    }
815

816
    pub async fn list_unprocessed_expired_backups(
×
817
        &self,
818
    ) -> Result<Vec<ExpiredBackup>, sqlx::Error> {
819
        let rows = sqlx::query(
820
            r#"
821
            SELECT b.task_id, ar.archive_format 
822
            FROM backup_tasks b
823
            JOIN archive_requests ar ON b.task_id = ar.task_id
824
            WHERE ar.expires_at IS NOT NULL AND ar.expires_at < NOW() AND ar.status != 'expired'
825
            "#,
826
        )
827
        .fetch_all(&self.pool)
×
828
        .await?;
×
829
        let recs = rows
×
830
            .into_iter()
831
            .map(|row| ExpiredBackup {
×
832
                task_id: row.get("task_id"),
×
833
                archive_format: row.get("archive_format"),
×
834
            })
835
            .collect();
836
        Ok(recs)
×
837
    }
838

839
    /// Retrieve all backup tasks that are in 'in_progress' status
840
    /// This is used to recover incomplete tasks on server restart
841
    pub async fn get_incomplete_backup_tasks(&self) -> Result<Vec<BackupTask>, sqlx::Error> {
×
842
        let rows = sqlx::query(
843
            r#"
844
            SELECT 
845
                b.task_id, b.created_at, b.updated_at, b.requestor, b.nft_count,
846
                ar.status as archive_status, ar.fatal_error, b.storage_mode,
847
                ar.archive_format, ar.expires_at, ar.deleted_at as archive_deleted_at,
848
                ar.error_log as archive_error_log,
849
                pr.status as ipfs_status,
850
                pr.error_log as ipfs_error_log,
851
                pr.deleted_at as pins_deleted_at
852
            FROM backup_tasks b
853
            LEFT JOIN archive_requests ar ON b.task_id = ar.task_id
854
            LEFT JOIN pin_requests pr ON b.task_id = pr.task_id
855
            WHERE (
856
                -- Archive-only mode: check archive status (record must exist and be in_progress)
857
                (b.storage_mode = 'archive' AND ar.status = 'in_progress')
858
                OR
859
                -- IPFS-only mode: check IPFS status (record must exist and be in_progress)
860
                (b.storage_mode = 'ipfs' AND pr.status = 'in_progress')
861
                OR
862
                -- Full mode: check both archive and IPFS status (task is incomplete if either is in_progress)
863
                (b.storage_mode = 'full' AND (ar.status = 'in_progress' OR pr.status = 'in_progress'))
864
            )
865
            ORDER BY b.created_at ASC
866
            "#,
867
        )
868
        .fetch_all(&self.pool)
×
869
        .await?;
×
870

871
        // If no incomplete tasks, return early
NEW
872
        if rows.is_empty() {
×
NEW
873
            return Ok(Vec::new());
×
874
        }
875

876
        // Collect task_ids to fetch tokens in bulk
NEW
877
        let task_ids: Vec<String> = rows.iter().map(|r| r.get::<String, _>("task_id")).collect();
×
878

879
        // Fetch all tokens for these tasks and aggregate by task_id and chain
880
        use std::collections::BTreeMap;
NEW
881
        let mut tokens_by_task: BTreeMap<String, BTreeMap<String, Vec<String>>> = BTreeMap::new();
×
882

883
        let token_rows = sqlx::query(
884
            r#"
885
            SELECT task_id, chain, contract_address, token_id
886
            FROM tokens
887
            WHERE task_id = ANY($1)
888
            ORDER BY chain, contract_address, token_id
889
            "#,
890
        )
NEW
891
        .bind(&task_ids)
×
NEW
892
        .fetch_all(&self.pool)
×
NEW
893
        .await?;
×
894

NEW
895
        for r in token_rows {
×
NEW
896
            let task_id: String = r.get("task_id");
×
NEW
897
            let chain: String = r.get("chain");
×
NEW
898
            let contract_address: String = r.get("contract_address");
×
NEW
899
            let token_id: String = r.get("token_id");
×
NEW
900
            tokens_by_task
×
NEW
901
                .entry(task_id)
×
902
                .or_default()
NEW
903
                .entry(chain)
×
904
                .or_default()
NEW
905
                .push(format!("{}:{}", contract_address, token_id));
×
906
        }
907

UNCOV
908
        let recs = rows
×
909
            .into_iter()
NEW
910
            .map(|row| {
×
NEW
911
                let task_id: String = row.get("task_id");
×
NEW
912
                let tokens_json = if let Some(by_chain) = tokens_by_task.get(&task_id) {
×
NEW
913
                    serde_json::json!(by_chain
×
NEW
914
                        .iter()
×
NEW
915
                        .map(|(chain, toks)| serde_json::json!({
×
NEW
916
                            "chain": chain,
×
NEW
917
                            "tokens": toks,
×
918
                        }))
NEW
919
                        .collect::<Vec<_>>())
×
920
                } else {
921
                    // No tokens recorded for this task
NEW
922
                    serde_json::json!([])
×
923
                };
924

NEW
925
                BackupTask {
×
NEW
926
                    task_id,
×
NEW
927
                    created_at: row.get("created_at"),
×
NEW
928
                    updated_at: row.get("updated_at"),
×
NEW
929
                    requestor: row.get("requestor"),
×
NEW
930
                    nft_count: row.get("nft_count"),
×
NEW
931
                    tokens: tokens_json,
×
NEW
932
                    archive_status: row
×
NEW
933
                        .try_get::<Option<String>, _>("archive_status")
×
NEW
934
                        .ok()
×
NEW
935
                        .flatten(),
×
NEW
936
                    ipfs_status: row
×
NEW
937
                        .try_get::<Option<String>, _>("ipfs_status")
×
NEW
938
                        .ok()
×
NEW
939
                        .flatten(),
×
NEW
940
                    archive_error_log: row.get("archive_error_log"),
×
NEW
941
                    ipfs_error_log: row.get("ipfs_error_log"),
×
NEW
942
                    archive_fatal_error: row.get("fatal_error"),
×
NEW
943
                    ipfs_fatal_error: None,
×
NEW
944
                    storage_mode: row.get("storage_mode"),
×
NEW
945
                    archive_format: row.get("archive_format"),
×
NEW
946
                    expires_at: row.get("expires_at"),
×
NEW
947
                    archive_deleted_at: row.get("archive_deleted_at"),
×
NEW
948
                    pins_deleted_at: row.get("pins_deleted_at"),
×
949
                }
950
            })
951
            .collect();
952

953
        Ok(recs)
×
954
    }
955

956
    /// Insert pins and their associated tokens in a single atomic transaction
957
    pub async fn insert_pins_with_tokens(
×
958
        &self,
959
        task_id: &str,
960
        token_pin_mappings: &[crate::TokenPinMapping],
961
    ) -> Result<(), sqlx::Error> {
962
        if token_pin_mappings.is_empty() {
×
963
            return Ok(());
×
964
        }
965

966
        // Collect all pin responses and prepare token data
967
        let mut all_pin_responses = Vec::new();
×
968
        let mut all_token_data = Vec::new(); // (index_in_pin_responses, chain, contract_address, token_id)
×
969

970
        for mapping in token_pin_mappings {
×
971
            for pin_response in &mapping.pin_responses {
×
972
                let index = all_pin_responses.len();
×
973
                all_pin_responses.push(pin_response);
×
974
                all_token_data.push((
×
975
                    index,
×
976
                    mapping.chain.clone(),
×
977
                    mapping.contract_address.clone(),
×
978
                    mapping.token_id.clone(),
×
979
                ));
980
            }
981
        }
982

983
        if all_pin_responses.is_empty() {
×
984
            return Ok(());
×
985
        }
986

987
        // Start a transaction for atomicity
988
        let mut tx = self.pool.begin().await?;
×
989

990
        // Insert pins one by one and collect generated IDs
991
        let mut pin_ids: Vec<i64> = Vec::new();
×
992
        for pin_response in &all_pin_responses {
×
993
            // Map status enum to lowercase string to satisfy CHECK constraint
994
            let status = match pin_response.status {
×
995
                crate::ipfs::PinResponseStatus::Queued => "queued",
×
996
                crate::ipfs::PinResponseStatus::Pinning => "pinning",
×
997
                crate::ipfs::PinResponseStatus::Pinned => "pinned",
×
998
                crate::ipfs::PinResponseStatus::Failed => "failed",
×
999
            };
1000

1001
            let row = sqlx::query(
1002
                "INSERT INTO pins (task_id, provider_type, provider_url, cid, request_id, pin_status) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id"
1003
            )
1004
            .bind(task_id)
×
1005
            .bind(&pin_response.provider_type)
×
1006
            .bind(&pin_response.provider_url)
×
1007
            .bind(&pin_response.cid)
×
1008
            .bind(&pin_response.id)
×
1009
            .bind(status)
×
1010
            .fetch_one(&mut *tx)
×
1011
            .await?;
×
1012

1013
            pin_ids.push(row.get("id"));
×
1014
        }
1015

1016
        // Resolve token_ids and update pins rows with token_id
1017
        for (index, chain, contract_address, token_id) in &all_token_data {
×
1018
            // Ensure token row exists and fetch its id
1019
            let inserted = sqlx::query(
1020
                r#"INSERT INTO tokens (task_id, chain, contract_address, token_id)
1021
                   VALUES ($1, $2, $3, $4)
1022
                   ON CONFLICT (task_id, chain, contract_address, token_id) DO NOTHING
1023
                   RETURNING id"#,
1024
            )
1025
            .bind(task_id)
×
1026
            .bind(chain)
×
1027
            .bind(contract_address)
×
1028
            .bind(token_id)
×
1029
            .fetch_optional(&mut *tx)
×
1030
            .await?;
×
1031

1032
            let tok_id: i64 = if let Some(row) = inserted {
×
1033
                row.get("id")
×
1034
            } else {
1035
                sqlx::query("SELECT id FROM tokens WHERE task_id = $1 AND chain = $2 AND contract_address = $3 AND token_id = $4")
×
1036
                    .bind(task_id)
×
1037
                    .bind(chain)
×
1038
                    .bind(contract_address)
×
1039
                    .bind(token_id)
×
1040
                    .fetch_one(&mut *tx)
×
1041
                    .await?
×
1042
                    .get("id")
1043
            };
1044

1045
            sqlx::query("UPDATE pins SET token_id = $2 WHERE id = $1")
×
1046
                .bind(pin_ids[*index])
×
1047
                .bind(tok_id)
×
1048
                .execute(&mut *tx)
×
1049
                .await?;
×
1050
        }
1051

1052
        // Commit the transaction
1053
        tx.commit().await?;
×
1054
        Ok(())
×
1055
    }
1056

1057
    /// Get all pins for a specific backup task
1058
    pub async fn get_pins_by_task_id(&self, task_id: &str) -> Result<Vec<PinRow>, sqlx::Error> {
×
1059
        let rows = sqlx::query(
1060
            r#"
1061
            SELECT id, task_id, provider_type, provider_url, cid, request_id, pin_status, created_at
1062
            FROM pins
1063
            WHERE task_id = $1
1064
            ORDER BY id
1065
            "#,
1066
        )
1067
        .bind(task_id)
×
1068
        .fetch_all(&self.pool)
×
1069
        .await?;
×
1070

1071
        Ok(rows
×
1072
            .into_iter()
×
1073
            .map(|row| PinRow {
×
1074
                id: row.get("id"),
×
1075
                task_id: row.get("task_id"),
×
1076
                provider_type: row.get("provider_type"),
×
1077
                provider_url: row
×
1078
                    .try_get::<Option<String>, _>("provider_url")
×
1079
                    .ok()
×
1080
                    .flatten(),
×
1081
                cid: row.get("cid"),
×
1082
                request_id: row.get("request_id"),
×
1083
                pin_status: row.get("pin_status"),
×
1084
                created_at: row.get("created_at"),
×
1085
            })
1086
            .collect())
×
1087
    }
1088

1089
    /// Paginated pinned tokens grouped by (chain, contract_address, token_id)
1090
    pub async fn get_pinned_tokens_by_requestor(
×
1091
        &self,
1092
        requestor: &str,
1093
        limit: i64,
1094
        offset: i64,
1095
    ) -> Result<(Vec<TokenWithPins>, u32), sqlx::Error> {
1096
        // Total distinct tokens for this requestor
1097
        let total_row = sqlx::query(
1098
            r#"
1099
            SELECT COUNT(*) as count
1100
            FROM (
1101
                SELECT DISTINCT t.chain, t.contract_address, t.token_id
1102
                FROM tokens t
1103
                JOIN pins p ON p.token_id = t.id
1104
                JOIN backup_tasks bt ON bt.task_id = p.task_id
1105
                WHERE bt.requestor = $1
1106
            ) t
1107
            "#,
1108
        )
1109
        .bind(requestor)
×
1110
        .fetch_one(&self.pool)
×
1111
        .await?;
×
1112
        let total: u32 = (total_row.get::<i64, _>("count")).max(0) as u32;
×
1113

1114
        // Page of distinct tokens ordered by most recent pin time
1115
        let rows = sqlx::query(
1116
            r#"
1117
            SELECT t.chain, t.contract_address, t.token_id
1118
            FROM (
1119
                SELECT t.chain, t.contract_address, t.token_id, MAX(p.created_at) AS last_created
1120
                FROM tokens t
1121
                JOIN pins p ON p.token_id = t.id
1122
                JOIN backup_tasks bt ON bt.task_id = p.task_id
1123
                WHERE bt.requestor = $1
1124
                GROUP BY t.chain, t.contract_address, t.token_id
1125
            ) t
1126
            ORDER BY last_created DESC
1127
            LIMIT $2 OFFSET $3
1128
            "#,
1129
        )
1130
        .bind(requestor)
×
1131
        .bind(limit)
×
1132
        .bind(offset)
×
1133
        .fetch_all(&self.pool)
×
1134
        .await?;
×
1135

1136
        // For each token key, fetch pins (ordered by created_at desc)
1137
        let mut result: Vec<TokenWithPins> = Vec::new();
×
1138
        for r in rows {
×
1139
            let token_rows = sqlx::query(
1140
                r#"
1141
                SELECT t.chain, t.contract_address, t.token_id,
1142
                       p.cid, p.provider_type, p.provider_url, p.pin_status, p.created_at
1143
                FROM tokens t
1144
                JOIN pins p ON p.token_id = t.id
1145
                JOIN backup_tasks bt ON bt.task_id = p.task_id
1146
                WHERE bt.requestor = $1
1147
                  AND t.chain = $2
1148
                  AND t.contract_address = $3
1149
                  AND t.token_id = $4
1150
                ORDER BY p.created_at DESC
1151
                "#,
1152
            )
1153
            .bind(requestor)
×
1154
            .bind(r.get::<String, _>("chain"))
×
1155
            .bind(r.get::<String, _>("contract_address"))
×
1156
            .bind(r.get::<String, _>("token_id"))
×
1157
            .fetch_all(&self.pool)
×
1158
            .await?;
×
1159

1160
            let mut pins: Vec<PinInfo> = Vec::new();
×
1161
            let mut chain = String::new();
×
1162
            let mut contract_address = String::new();
×
1163
            let mut token_id = String::new();
×
1164
            for row in token_rows {
×
1165
                chain = row.get("chain");
×
1166
                contract_address = row.get("contract_address");
×
1167
                token_id = row.get("token_id");
×
1168
                let cid: String = row.get("cid");
×
1169
                let provider_type: String = row.get("provider_type");
×
1170
                let provider_url: String = row
×
1171
                    .try_get::<Option<String>, _>("provider_url")
1172
                    .ok()
1173
                    .flatten()
1174
                    .unwrap_or_default();
1175
                let status: String = row.get("pin_status");
×
1176
                let created_at: DateTime<Utc> = row.get("created_at");
×
1177
                pins.push(PinInfo {
×
1178
                    cid,
×
1179
                    provider_type,
×
1180
                    provider_url,
×
1181
                    status,
×
1182
                    created_at,
×
1183
                });
1184
            }
1185
            result.push(TokenWithPins {
×
1186
                chain,
×
1187
                contract_address,
×
1188
                token_id,
×
1189
                pins,
×
1190
            });
1191
        }
1192

1193
        Ok((result, total))
×
1194
    }
1195

1196
    /// Get a specific pinned token for a requestor
1197
    pub async fn get_pinned_token_by_requestor(
×
1198
        &self,
1199
        requestor: &str,
1200
        chain: &str,
1201
        contract_address: &str,
1202
        token_id: &str,
1203
    ) -> Result<Option<TokenWithPins>, sqlx::Error> {
1204
        let query = r#"
×
1205
            SELECT t.chain, t.contract_address, t.token_id,
×
1206
                   p.cid, p.provider_type, p.provider_url, p.pin_status, p.created_at
×
1207
            FROM tokens t
×
1208
            JOIN pins p ON p.token_id = t.id
×
1209
            JOIN backup_tasks bt ON bt.task_id = p.task_id
×
1210
            WHERE bt.requestor = $1
×
1211
              AND t.chain = $2
×
1212
              AND t.contract_address = $3
×
1213
              AND t.token_id = $4
×
1214
            ORDER BY p.created_at DESC
×
1215
        "#;
×
1216

1217
        let rows = sqlx::query(query)
×
1218
            .bind(requestor)
×
1219
            .bind(chain)
×
1220
            .bind(contract_address)
×
1221
            .bind(token_id)
×
1222
            .fetch_all(&self.pool)
×
1223
            .await?;
×
1224

1225
        if rows.is_empty() {
×
1226
            return Ok(None);
×
1227
        }
1228

1229
        let mut pins = Vec::new();
×
1230
        let mut token_chain = String::new();
×
1231
        let mut token_contract_address = String::new();
×
1232
        let mut token_token_id = String::new();
×
1233

1234
        for row in rows {
×
1235
            token_chain = row.get("chain");
×
1236
            token_contract_address = row.get("contract_address");
×
1237
            token_token_id = row.get("token_id");
×
1238
            let cid: String = row.get("cid");
×
1239
            let provider_type: String = row.get("provider_type");
×
1240
            // provider_url may be NULL for legacy rows; default to empty string for API stability
1241
            let provider_url: String = row
×
1242
                .try_get::<Option<String>, _>("provider_url")
1243
                .ok()
1244
                .flatten()
1245
                .unwrap_or_default();
1246
            let status: String = row.get("pin_status");
×
1247
            let created_at: DateTime<Utc> = row.get("created_at");
×
1248

1249
            pins.push(PinInfo {
×
1250
                cid,
×
1251
                provider_type,
×
1252
                provider_url,
×
1253
                status,
×
1254
                created_at,
×
1255
            });
1256
        }
1257

1258
        Ok(Some(TokenWithPins {
×
1259
            chain: token_chain,
×
1260
            contract_address: token_contract_address,
×
1261
            token_id: token_token_id,
×
1262
            pins,
×
1263
        }))
1264
    }
1265

1266
    /// Get all pins that are in 'queued' or 'pinning' status
1267
    /// This is used by the pin monitor to check for status updates
1268
    pub async fn get_active_pins(&self) -> Result<Vec<PinRow>, sqlx::Error> {
×
1269
        let rows = sqlx::query(
1270
            r#"
1271
            SELECT id, task_id, provider_type, provider_url, cid, request_id, pin_status, created_at
1272
            FROM pins
1273
            WHERE pin_status IN ('queued', 'pinning')
1274
            ORDER BY id
1275
            "#,
1276
        )
1277
        .fetch_all(&self.pool)
×
1278
        .await?;
×
1279

1280
        Ok(rows
×
1281
            .into_iter()
×
1282
            .map(|row| PinRow {
×
1283
                id: row.get("id"),
×
1284
                task_id: row.get("task_id"),
×
1285
                provider_type: row.get("provider_type"),
×
1286
                provider_url: row
×
1287
                    .try_get::<Option<String>, _>("provider_url")
×
1288
                    .ok()
×
1289
                    .flatten(),
×
1290
                cid: row.get("cid"),
×
1291
                request_id: row.get("request_id"),
×
1292
                pin_status: row.get("pin_status"),
×
1293
                created_at: row.get("created_at"),
×
1294
            })
1295
            .collect())
×
1296
    }
1297

1298
    /// Set backup fatal error for relevant subresources in a single SQL statement.
1299
    /// The update is based on the `storage_mode` value from the `backup_tasks` table for the given `task_id`:
1300
    /// - If storage_mode is 'archive' or 'full': updates archive_requests.status and archive_requests.fatal_error
1301
    /// - If storage_mode is 'ipfs' or 'full': updates pin_requests.status and pin_requests.fatal_error
1302
    pub async fn set_backup_error(
×
1303
        &self,
1304
        task_id: &str,
1305
        fatal_error: &str,
1306
    ) -> Result<(), sqlx::Error> {
1307
        let sql = r#"
×
1308
            WITH task_mode AS (
×
1309
                SELECT storage_mode FROM backup_tasks WHERE task_id = $1
×
1310
            ),
1311
            upd_archive AS (
×
1312
                UPDATE archive_requests ar
×
1313
                SET status = 'error', fatal_error = $2
×
1314
                WHERE ar.task_id = $1
×
1315
                  AND EXISTS (
×
1316
                      SELECT 1 FROM task_mode tm
×
1317
                      WHERE tm.storage_mode IN ('archive', 'full')
×
1318
                  )
1319
                RETURNING 1
×
1320
            ),
1321
            upd_pins AS (
×
1322
                UPDATE pin_requests pr
×
1323
                SET status = 'error', fatal_error = $2
×
1324
                WHERE pr.task_id = $1
×
1325
                  AND EXISTS (
×
1326
                      SELECT 1 FROM task_mode tm
×
1327
                      WHERE tm.storage_mode IN ('ipfs', 'full')
×
1328
                  )
1329
                RETURNING 1
×
1330
            )
1331
            SELECT COALESCE((SELECT COUNT(*) FROM upd_archive), 0) AS archive_updates,
×
1332
                   COALESCE((SELECT COUNT(*) FROM upd_pins), 0)     AS pin_updates
×
1333
        "#;
×
1334
        sqlx::query(sql)
×
1335
            .bind(task_id)
×
1336
            .bind(fatal_error)
×
1337
            .execute(&self.pool)
×
1338
            .await?;
×
1339
        Ok(())
×
1340
    }
1341

1342
    /// Update backup subresource statuses for the task based on its storage mode
1343
    /// - archive or full: updates archive_requests.status
1344
    /// - ipfs or full: updates pin_requests.status
1345
    pub async fn update_backup_statuses(
×
1346
        &self,
1347
        task_id: &str,
1348
        scope: &str,
1349
        archive_status: &str,
1350
        ipfs_status: &str,
1351
    ) -> Result<(), sqlx::Error> {
1352
        let sql = r#"
×
1353
            WITH upd_archive AS (
×
1354
                UPDATE archive_requests ar
×
1355
                SET status = $2
×
1356
                WHERE ar.task_id = $1
×
1357
                  AND ($4 IN ('archive', 'full'))
×
1358
                RETURNING 1
×
1359
            ),
1360
            upd_pins AS (
×
1361
                UPDATE pin_requests pr
×
1362
                SET status = $3
×
1363
                WHERE pr.task_id = $1
×
1364
                  AND ($4 IN ('ipfs', 'full'))
×
1365
                RETURNING 1
×
1366
            )
1367
            SELECT COALESCE((SELECT COUNT(*) FROM upd_archive), 0) AS archive_updates,
×
1368
                   COALESCE((SELECT COUNT(*) FROM upd_pins), 0)     AS pin_updates
×
1369
        "#;
×
1370
        sqlx::query(sql)
×
1371
            .bind(task_id)
×
1372
            .bind(archive_status)
×
1373
            .bind(ipfs_status)
×
1374
            .bind(scope)
×
1375
            .execute(&self.pool)
×
1376
            .await?;
×
1377
        Ok(())
×
1378
    }
1379

1380
    pub async fn update_pin_statuses(&self, updates: &[(i64, String)]) -> Result<(), sqlx::Error> {
×
1381
        if updates.is_empty() {
×
1382
            return Ok(());
×
1383
        }
1384

1385
        let mut tx = self.pool.begin().await?;
×
1386

1387
        for (id, status) in updates {
×
1388
            sqlx::query(
1389
                r#"
1390
                UPDATE pins
1391
                SET pin_status = $2
1392
                WHERE id = $1
1393
                "#,
1394
            )
1395
            .bind(id)
×
1396
            .bind(status)
×
1397
            .execute(&mut *tx)
×
1398
            .await?;
×
1399
        }
1400

1401
        tx.commit().await?;
×
1402
        Ok(())
×
1403
    }
1404

1405
    /// Ensure the missing subresource exists and upgrade the backup to full storage mode.
1406
    /// If `add_archive` is true, create/ensure archive_requests row with provided format/retention.
1407
    /// Otherwise, ensure pin_requests row exists. Always flips backup_tasks.storage_mode to 'full'.
1408
    pub async fn upgrade_backup_to_full(
×
1409
        &self,
1410
        task_id: &str,
1411
        add_archive: bool,
1412
        archive_format: Option<&str>,
1413
        retention_days: Option<u64>,
1414
    ) -> Result<(), sqlx::Error> {
1415
        let mut tx = self.pool.begin().await?;
×
1416

1417
        // Upgrade storage mode to full
1418
        sqlx::query(
1419
            r#"
1420
            UPDATE backup_tasks
1421
            SET storage_mode = 'full', updated_at = NOW()
1422
            WHERE task_id = $1
1423
            "#,
1424
        )
1425
        .bind(task_id)
1426
        .execute(&mut *tx)
1427
        .await?;
×
1428

1429
        if add_archive {
×
1430
            let fmt = archive_format.unwrap_or("zip");
×
1431
            if let Some(days) = retention_days {
×
1432
                sqlx::query(
1433
                    r#"
1434
                    INSERT INTO archive_requests (task_id, archive_format, expires_at, status)
1435
                    VALUES ($1, $2, NOW() + make_interval(days => $3::int), 'in_progress')
1436
                    ON CONFLICT (task_id) DO NOTHING
1437
                    "#,
1438
                )
1439
                .bind(task_id)
1440
                .bind(fmt)
1441
                .bind(days as i64)
1442
                .execute(&mut *tx)
1443
                .await?;
×
1444
            } else {
1445
                sqlx::query(
1446
                    r#"
1447
                    INSERT INTO archive_requests (task_id, archive_format, expires_at, status)
1448
                    VALUES ($1, $2, NULL, 'in_progress')
1449
                    ON CONFLICT (task_id) DO NOTHING
1450
                    "#,
1451
                )
1452
                .bind(task_id)
×
1453
                .bind(fmt)
×
1454
                .execute(&mut *tx)
×
1455
                .await?;
×
1456
            }
1457
        } else {
1458
            sqlx::query(
1459
                r#"
1460
                INSERT INTO pin_requests (task_id, status)
1461
                VALUES ($1, 'in_progress')
1462
                ON CONFLICT (task_id) DO NOTHING
1463
                "#,
1464
            )
1465
            .bind(task_id)
1466
            .execute(&mut *tx)
1467
            .await?;
×
1468
        }
1469

1470
        tx.commit().await?;
×
1471
        Ok(())
×
1472
    }
1473

1474
    /// Complete archive deletion:
1475
    /// - If current storage_mode is 'archive', delete the whole backup (finalize deletion)
1476
    /// - Else if current storage_mode is 'full', flip to 'ipfs' to reflect archive removed
1477
    pub async fn complete_archive_request_deletion(
×
1478
        &self,
1479
        task_id: &str,
1480
    ) -> Result<(), sqlx::Error> {
1481
        // Atomically: delete when archive-only; else if full, flip to ipfs
1482
        let sql = r#"
×
1483
            WITH del AS (
×
1484
                DELETE FROM backup_tasks
×
1485
                WHERE task_id = $1 AND storage_mode = 'archive'
×
1486
                RETURNING 1
×
1487
            ), upd AS (
×
1488
                UPDATE backup_tasks
×
1489
                SET storage_mode = 'ipfs', updated_at = NOW()
×
1490
                WHERE task_id = $1 AND storage_mode = 'full' AND NOT EXISTS (SELECT 1 FROM del)
×
1491
                RETURNING 1
×
1492
            )
1493
            SELECT COALESCE((SELECT COUNT(*) FROM del), 0) AS deleted,
×
1494
                   COALESCE((SELECT COUNT(*) FROM upd), 0) AS updated
×
1495
        "#;
×
1496
        let _ = sqlx::query(sql).bind(task_id).execute(&self.pool).await?;
×
1497
        Ok(())
×
1498
    }
1499

1500
    /// Complete IPFS pins deletion:
1501
    /// - If current storage_mode is 'ipfs', delete the whole backup (finalize deletion)
1502
    /// - Else if current storage_mode is 'full', flip to 'archive' to reflect pins removed
1503
    pub async fn complete_pin_request_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
1504
        // Atomically: delete when ipfs-only; else if full, flip to archive
1505
        let sql = r#"
×
1506
            WITH del AS (
×
1507
                DELETE FROM backup_tasks
×
1508
                WHERE task_id = $1 AND storage_mode = 'ipfs'
×
1509
                RETURNING 1
×
1510
            ), upd AS (
×
1511
                UPDATE backup_tasks
×
1512
                SET storage_mode = 'archive', updated_at = NOW()
×
1513
                WHERE task_id = $1 AND storage_mode = 'full' AND NOT EXISTS (SELECT 1 FROM del)
×
1514
                RETURNING 1
×
1515
            )
1516
            SELECT COALESCE((SELECT COUNT(*) FROM del), 0) AS deleted,
×
1517
                   COALESCE((SELECT COUNT(*) FROM upd), 0) AS updated
×
1518
        "#;
×
1519
        let _ = sqlx::query(sql).bind(task_id).execute(&self.pool).await?;
×
1520
        Ok(())
×
1521
    }
1522
}
1523

1524
// Implement the unified Database trait for the real Db struct
1525
#[async_trait::async_trait]
1526
impl Database for Db {
1527
    // Backup task operations
1528

1529
    async fn insert_backup_task(
1530
        &self,
1531
        task_id: &str,
1532
        requestor: &str,
1533
        nft_count: i32,
1534
        tokens: &serde_json::Value,
1535
        storage_mode: &str,
1536
        archive_format: Option<&str>,
1537
        retention_days: Option<u64>,
1538
    ) -> Result<(), sqlx::Error> {
1539
        Db::insert_backup_task(
1540
            self,
×
1541
            task_id,
×
1542
            requestor,
×
1543
            nft_count,
×
1544
            tokens,
×
1545
            storage_mode,
×
1546
            archive_format,
×
1547
            retention_days,
×
1548
        )
1549
        .await
×
1550
    }
1551

1552
    async fn get_backup_task(&self, task_id: &str) -> Result<Option<BackupTask>, sqlx::Error> {
×
1553
        Db::get_backup_task(self, task_id).await
×
1554
    }
1555

1556
    async fn get_backup_task_with_tokens(
1557
        &self,
1558
        task_id: &str,
1559
        limit: i64,
1560
        offset: i64,
1561
    ) -> Result<Option<(BackupTask, u32)>, sqlx::Error> {
1562
        Db::get_backup_task_with_tokens(self, task_id, limit, offset).await
×
1563
    }
1564

1565
    async fn delete_backup_task(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
1566
        Db::delete_backup_task(self, task_id).await
×
1567
    }
1568

1569
    async fn get_incomplete_backup_tasks(&self) -> Result<Vec<BackupTask>, sqlx::Error> {
×
1570
        Db::get_incomplete_backup_tasks(self).await
×
1571
    }
1572

1573
    async fn list_requestor_backup_tasks_paginated(
1574
        &self,
1575
        requestor: &str,
1576
        limit: i64,
1577
        offset: i64,
1578
    ) -> Result<(Vec<BackupTask>, u32), sqlx::Error> {
1579
        Db::list_requestor_backup_tasks_paginated(self, requestor, limit, offset).await
×
1580
    }
1581

1582
    async fn list_unprocessed_expired_backups(&self) -> Result<Vec<ExpiredBackup>, sqlx::Error> {
×
1583
        Db::list_unprocessed_expired_backups(self).await
×
1584
    }
1585

1586
    // Backup task status and error operations
1587
    async fn clear_backup_errors(&self, task_id: &str, scope: &str) -> Result<(), sqlx::Error> {
×
1588
        Db::clear_backup_errors(self, task_id, scope).await
×
1589
    }
1590

1591
    async fn set_backup_error(&self, task_id: &str, error: &str) -> Result<(), sqlx::Error> {
×
1592
        Db::set_backup_error(self, task_id, error).await
×
1593
    }
1594

1595
    async fn set_error_logs(
1596
        &self,
1597
        task_id: &str,
1598
        archive_error_log: Option<&str>,
1599
        ipfs_error_log: Option<&str>,
1600
    ) -> Result<(), sqlx::Error> {
1601
        Db::set_error_logs(self, task_id, archive_error_log, ipfs_error_log).await
×
1602
    }
1603

1604
    async fn update_archive_request_error_log(
1605
        &self,
1606
        task_id: &str,
1607
        error_log: &str,
1608
    ) -> Result<(), sqlx::Error> {
1609
        Db::update_archive_request_error_log(self, task_id, error_log).await
×
1610
    }
1611

1612
    async fn update_pin_request_error_log(
1613
        &self,
1614
        task_id: &str,
1615
        error_log: &str,
1616
    ) -> Result<(), sqlx::Error> {
1617
        Db::update_pin_request_error_log(self, task_id, error_log).await
×
1618
    }
1619

1620
    async fn set_archive_request_error(
1621
        &self,
1622
        task_id: &str,
1623
        fatal_error: &str,
1624
    ) -> Result<(), sqlx::Error> {
1625
        Db::set_archive_request_error(self, task_id, fatal_error).await
×
1626
    }
1627

1628
    async fn set_pin_request_error(
1629
        &self,
1630
        task_id: &str,
1631
        fatal_error: &str,
1632
    ) -> Result<(), sqlx::Error> {
1633
        Db::set_pin_request_error(self, task_id, fatal_error).await
×
1634
    }
1635

1636
    // Status update operations
1637
    async fn update_archive_request_status(
1638
        &self,
1639
        task_id: &str,
1640
        status: &str,
1641
    ) -> Result<(), sqlx::Error> {
1642
        Db::update_archive_request_status(self, task_id, status).await
×
1643
    }
1644

1645
    async fn update_pin_request_status(
1646
        &self,
1647
        task_id: &str,
1648
        status: &str,
1649
    ) -> Result<(), sqlx::Error> {
1650
        Db::update_pin_request_status(self, task_id, status).await
×
1651
    }
1652

1653
    async fn update_backup_statuses(
1654
        &self,
1655
        task_id: &str,
1656
        scope: &str,
1657
        archive_status: &str,
1658
        ipfs_status: &str,
1659
    ) -> Result<(), sqlx::Error> {
1660
        Db::update_backup_statuses(self, task_id, scope, archive_status, ipfs_status).await
×
1661
    }
1662

1663
    async fn update_archive_request_statuses(
1664
        &self,
1665
        task_ids: &[String],
1666
        status: &str,
1667
    ) -> Result<(), sqlx::Error> {
1668
        Db::update_archive_request_statuses(self, task_ids, status).await
×
1669
    }
1670

1671
    async fn upgrade_backup_to_full(
1672
        &self,
1673
        task_id: &str,
1674
        add_archive: bool,
1675
        archive_format: Option<&str>,
1676
        retention_days: Option<u64>,
1677
    ) -> Result<(), sqlx::Error> {
1678
        Db::upgrade_backup_to_full(self, task_id, add_archive, archive_format, retention_days).await
×
1679
    }
1680

1681
    // Deletion operations
1682
    async fn start_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
1683
        Db::start_deletion(self, task_id).await
×
1684
    }
1685

1686
    async fn start_archive_request_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
1687
        Db::start_archive_request_deletion(self, task_id).await
×
1688
    }
1689

1690
    async fn start_pin_request_deletions(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
1691
        Db::start_pin_request_deletions(self, task_id).await
×
1692
    }
1693

1694
    async fn complete_archive_request_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
1695
        Db::complete_archive_request_deletion(self, task_id).await
×
1696
    }
1697

1698
    async fn complete_pin_request_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
1699
        Db::complete_pin_request_deletion(self, task_id).await
×
1700
    }
1701

1702
    // Retry operations
1703
    async fn retry_backup(
1704
        &self,
1705
        task_id: &str,
1706
        scope: &str,
1707
        retention_days: u64,
1708
    ) -> Result<(), sqlx::Error> {
1709
        Db::retry_backup(self, task_id, scope, retention_days).await
×
1710
    }
1711

1712
    // Pin operations
1713
    async fn insert_pins_with_tokens(
1714
        &self,
1715
        task_id: &str,
1716
        token_pin_mappings: &[crate::TokenPinMapping],
1717
    ) -> Result<(), sqlx::Error> {
1718
        Db::insert_pins_with_tokens(self, task_id, token_pin_mappings).await
×
1719
    }
1720

1721
    async fn get_pins_by_task_id(&self, task_id: &str) -> Result<Vec<PinRow>, sqlx::Error> {
×
1722
        Db::get_pins_by_task_id(self, task_id).await
×
1723
    }
1724

1725
    async fn get_active_pins(&self) -> Result<Vec<PinRow>, sqlx::Error> {
×
1726
        Db::get_active_pins(self).await
×
1727
    }
1728

1729
    async fn update_pin_statuses(&self, updates: &[(i64, String)]) -> Result<(), sqlx::Error> {
×
1730
        Db::update_pin_statuses(self, updates).await
×
1731
    }
1732

1733
    // Pinned tokens operations
1734
    async fn get_pinned_tokens_by_requestor(
1735
        &self,
1736
        requestor: &str,
1737
        limit: i64,
1738
        offset: i64,
1739
    ) -> Result<(Vec<TokenWithPins>, u32), sqlx::Error> {
1740
        Db::get_pinned_tokens_by_requestor(self, requestor, limit, offset).await
×
1741
    }
1742

1743
    async fn get_pinned_token_by_requestor(
1744
        &self,
1745
        requestor: &str,
1746
        chain: &str,
1747
        contract_address: &str,
1748
        token_id: &str,
1749
    ) -> Result<Option<TokenWithPins>, sqlx::Error> {
1750
        Db::get_pinned_token_by_requestor(self, requestor, chain, contract_address, token_id).await
×
1751
    }
1752
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc