• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

0xmichalis / nftbk / 18533326444

15 Oct 2025 02:50PM UTC coverage: 36.737% (+0.2%) from 36.538%
18533326444

push

github

0xmichalis
fix: update retry endpoints to work for subresources

35 of 72 new or added lines in 6 files covered. (48.61%)

2 existing lines in 2 files now uncovered.

1457 of 3966 relevant lines covered (36.74%)

6.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/server/database/mod.rs
1
use chrono::{DateTime, Utc};
2
use serde::{Deserialize, Serialize};
3
use sqlx::{postgres::PgPoolOptions, PgPool, Row};
4

5
use crate::server::database::r#trait::Database;
6

7
pub mod r#trait;
8

9
/// Combined view of backup_tasks + archive_requests
10
#[derive(Debug, Serialize, Deserialize, Clone, utoipa::ToSchema)]
11
#[schema(description = "Backup task information including metadata and status")]
12
pub struct BackupTask {
13
    /// Unique identifier for the backup task
14
    #[schema(example = "abc123def456")]
15
    pub task_id: String,
16
    /// When the backup task was created (ISO 8601 timestamp)
17
    #[schema(example = "2024-01-01T12:00:00Z")]
18
    pub created_at: DateTime<Utc>,
19
    /// When the backup task was last updated (ISO 8601 timestamp)
20
    #[schema(example = "2024-01-01T12:05:00Z")]
21
    pub updated_at: DateTime<Utc>,
22
    /// User who requested the backup
23
    #[schema(example = "user123")]
24
    pub requestor: String,
25
    /// Number of NFTs in this backup task
26
    #[schema(example = 42)]
27
    pub nft_count: i32,
28
    /// Token details (only included if include_tokens=true)
29
    pub tokens: serde_json::Value,
30
    /// Archive subresource status (in_progress, done, error, expired)
31
    pub archive_status: Option<String>,
32
    /// IPFS subresource status (in_progress, done, error)
33
    pub ipfs_status: Option<String>,
34
    /// Detailed archive error log if archive completed with some failures
35
    #[schema(example = "Failed to write archive checksum file")]
36
    pub archive_error_log: Option<String>,
37
    /// Detailed IPFS error log aggregated from pin requests
38
    #[schema(
39
        example = "Provider pinata failed: 401 Unauthorized\nProvider web3.storage failed: 429 Too Many Requests"
40
    )]
41
    pub ipfs_error_log: Option<String>,
42
    /// Archive subresource fatal error if backup failed completely at archive stage
43
    pub archive_fatal_error: Option<String>,
44
    /// IPFS subresource fatal error if backup failed completely at IPFS stage
45
    pub ipfs_fatal_error: Option<String>,
46
    /// Storage mode used for the backup (archive, ipfs, full)
47
    #[schema(example = "archive")]
48
    pub storage_mode: String,
49
    /// Archive format used for the backup (zip, tar.gz)
50
    #[schema(example = "zip")]
51
    pub archive_format: Option<String>,
52
    /// When the backup expires (if applicable, typically 7 days from creation)
53
    #[schema(example = "2024-01-08T12:00:00Z")]
54
    pub expires_at: Option<DateTime<Utc>>,
55
    /// When archive deletion was started (if applicable)
56
    #[schema(example = "2024-01-02T10:00:00Z")]
57
    pub archive_deleted_at: Option<DateTime<Utc>>,
58
    /// When IPFS pins deletion was started (if applicable)
59
    #[schema(example = "2024-01-02T10:00:00Z")]
60
    pub pins_deleted_at: Option<DateTime<Utc>>,
61
}
62

63
#[derive(Debug, Serialize, Deserialize, Clone, utoipa::ToSchema)]
64
#[schema(description = "IPFS pin information for a specific CID")]
65
pub struct PinInfo {
66
    /// Content Identifier (CID) of the pinned content
67
    #[schema(example = "QmYwAPJzv5CZsnA625s3Xf2nemtYgPpHdWEz79ojWnPbdG")]
68
    pub cid: String,
69
    /// IPFS provider type where the content is pinned
70
    #[schema(example = "pinata")]
71
    pub provider_type: String,
72
    /// IPFS provider URL where the content is pinned
73
    #[schema(example = "https://api.pinata.cloud")]
74
    pub provider_url: String,
75
    /// Pin status (pinned, pinning, failed, queued)
76
    #[schema(example = "pinned")]
77
    pub status: String,
78
    /// When the pin was created (ISO 8601 timestamp)
79
    #[schema(example = "2024-01-01T12:00:00Z")]
80
    pub created_at: DateTime<Utc>,
81
}
82

83
#[derive(Debug, Serialize, Deserialize, Clone, utoipa::ToSchema)]
84
#[schema(description = "Token information with associated pin requests")]
85
pub struct TokenWithPins {
86
    /// Blockchain identifier (e.g., ethereum, tezos)
87
    #[schema(example = "ethereum")]
88
    pub chain: String,
89
    /// NFT contract address
90
    #[schema(example = "0x1234567890123456789012345678901234567890")]
91
    pub contract_address: String,
92
    /// NFT token ID
93
    #[schema(example = "123")]
94
    pub token_id: String,
95
    /// List of IPFS pins for this token
96
    pub pins: Vec<PinInfo>,
97
}
98

99
#[derive(Debug, Serialize, Deserialize, Clone)]
100
pub struct PinRow {
101
    pub id: i64,
102
    pub task_id: String,
103
    pub provider_type: String,
104
    pub provider_url: Option<String>,
105
    pub cid: String,
106
    pub request_id: String,
107
    pub pin_status: String,
108
    pub created_at: DateTime<Utc>,
109
}
110

111
#[derive(Debug, Clone)]
112
pub struct ExpiredBackup {
113
    pub task_id: String,
114
    pub archive_format: String,
115
}
116

117
#[derive(Clone)]
118
pub struct Db {
119
    pub pool: PgPool,
120
}
121

122
impl Db {
123
    pub async fn new(database_url: &str, max_connections: u32) -> Self {
×
124
        let pool = PgPoolOptions::new()
×
125
            .max_connections(max_connections)
×
126
            .connect(database_url)
×
127
            .await
×
128
            .expect("Failed to connect to Postgres");
129
        // Health check: run a simple query
130
        sqlx::query("SELECT 1")
×
131
            .execute(&pool)
×
132
            .await
×
133
            .expect("Postgres connection is not healthy");
134
        tracing::info!("Postgres connection is healthy");
×
135
        Db { pool }
136
    }
137

138
    #[allow(clippy::too_many_arguments)]
139
    pub async fn insert_backup_task(
×
140
        &self,
141
        task_id: &str,
142
        requestor: &str,
143
        nft_count: i32,
144
        tokens: &serde_json::Value,
145
        storage_mode: &str,
146
        archive_format: Option<&str>,
147
        retention_days: Option<u64>,
148
    ) -> Result<(), sqlx::Error> {
149
        let mut tx = self.pool.begin().await?;
×
150

151
        // Insert into backup_tasks
152
        sqlx::query(
153
            r#"
154
            INSERT INTO backup_tasks (
155
                task_id, created_at, updated_at, requestor, nft_count, tokens, storage_mode
156
            ) VALUES (
157
                $1, NOW(), NOW(), $2, $3, $4, $5
158
            )
159
            ON CONFLICT (task_id) DO UPDATE SET
160
                updated_at = NOW(),
161
                nft_count = EXCLUDED.nft_count,
162
                tokens = EXCLUDED.tokens,
163
                storage_mode = EXCLUDED.storage_mode
164
            "#,
165
        )
166
        .bind(task_id)
167
        .bind(requestor)
168
        .bind(nft_count)
169
        .bind(tokens)
170
        .bind(storage_mode)
171
        .execute(&mut *tx)
172
        .await?;
×
173

174
        // Insert into archive_requests if storage mode includes archive
175
        if storage_mode == "archive" || storage_mode == "full" {
×
176
            let archive_fmt = archive_format.unwrap_or("zip");
×
177

178
            if let Some(days) = retention_days {
×
179
                sqlx::query(
180
                    r#"
181
                    INSERT INTO archive_requests (task_id, archive_format, expires_at, status)
182
                    VALUES ($1, $2, NOW() + ($3 || ' days')::interval, 'in_progress')
183
                    ON CONFLICT (task_id) DO UPDATE SET
184
                        archive_format = EXCLUDED.archive_format,
185
                        expires_at = EXCLUDED.expires_at
186
                    "#,
187
                )
188
                .bind(task_id)
189
                .bind(archive_fmt)
190
                .bind(days as i64)
191
                .execute(&mut *tx)
192
                .await?;
×
193
            } else {
194
                sqlx::query(
195
                    r#"
196
                    INSERT INTO archive_requests (task_id, archive_format, expires_at, status)
197
                    VALUES ($1, $2, NULL, 'in_progress')
198
                    ON CONFLICT (task_id) DO UPDATE SET
199
                        archive_format = EXCLUDED.archive_format,
200
                        expires_at = EXCLUDED.expires_at
201
                    "#,
202
                )
203
                .bind(task_id)
×
204
                .bind(archive_fmt)
×
205
                .execute(&mut *tx)
×
206
                .await?;
×
207
            }
208
        }
209

210
        // Insert into pin_requests if storage mode includes IPFS
211
        if storage_mode == "ipfs" || storage_mode == "full" {
×
212
            sqlx::query(
213
                r#"
214
                INSERT INTO pin_requests (task_id, status)
215
                VALUES ($1, 'in_progress')
216
                ON CONFLICT (task_id) DO UPDATE SET
217
                    status = EXCLUDED.status
218
                "#,
219
            )
220
            .bind(task_id)
221
            .execute(&mut *tx)
222
            .await?;
×
223
        }
224

225
        tx.commit().await?;
×
226
        Ok(())
×
227
    }
228

229
    pub async fn delete_backup_task(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
230
        // CASCADE will delete associated archive_requests row if it exists
231
        sqlx::query!("DELETE FROM backup_tasks WHERE task_id = $1", task_id)
×
232
            .execute(&self.pool)
×
233
            .await?;
×
234
        Ok(())
×
235
    }
236

237
    pub async fn set_error_logs(
×
238
        &self,
239
        task_id: &str,
240
        archive_error_log: Option<&str>,
241
        ipfs_error_log: Option<&str>,
242
    ) -> Result<(), sqlx::Error> {
243
        let mut tx = self.pool.begin().await?;
×
244
        if let Some(a) = archive_error_log {
×
245
            sqlx::query("UPDATE archive_requests SET error_log = $2 WHERE task_id = $1")
×
246
                .bind(task_id)
×
247
                .bind(a)
×
248
                .execute(&mut *tx)
×
249
                .await?;
×
250
        }
251
        if let Some(i) = ipfs_error_log {
×
252
            sqlx::query(
253
                r#"
254
                    UPDATE pin_requests
255
                    SET error_log = $2
256
                    WHERE task_id = $1
257
                    "#,
258
            )
259
            .bind(task_id)
×
260
            .bind(i)
×
261
            .execute(&mut *tx)
×
262
            .await?;
×
263
        }
264
        tx.commit().await?;
×
265
        Ok(())
×
266
    }
267

268
    pub async fn update_archive_request_error_log(
×
269
        &self,
270
        task_id: &str,
271
        error_log: &str,
272
    ) -> Result<(), sqlx::Error> {
273
        sqlx::query(
274
            r#"
275
            UPDATE archive_requests
276
            SET error_log = $2
277
            WHERE task_id = $1
278
            "#,
279
        )
280
        .bind(task_id)
×
281
        .bind(error_log)
×
282
        .execute(&self.pool)
×
283
        .await?;
×
284
        Ok(())
×
285
    }
286

287
    pub async fn update_pin_request_error_log(
×
288
        &self,
289
        task_id: &str,
290
        error_log: &str,
291
    ) -> Result<(), sqlx::Error> {
292
        sqlx::query(
293
            r#"
294
            UPDATE pin_requests
295
            SET error_log = $2
296
            WHERE task_id = $1
297
            "#,
298
        )
299
        .bind(task_id)
×
300
        .bind(error_log)
×
301
        .execute(&self.pool)
×
302
        .await?;
×
303
        Ok(())
×
304
    }
305

306
    pub async fn update_pin_request_status(
×
307
        &self,
308
        task_id: &str,
309
        status: &str,
310
    ) -> Result<(), sqlx::Error> {
311
        sqlx::query(
312
            r#"
313
            UPDATE pin_requests
314
            SET status = $2
315
            WHERE task_id = $1
316
            "#,
317
        )
318
        .bind(task_id)
×
319
        .bind(status)
×
320
        .execute(&self.pool)
×
321
        .await?;
×
322
        Ok(())
×
323
    }
324

325
    pub async fn update_archive_request_status(
×
326
        &self,
327
        task_id: &str,
328
        status: &str,
329
    ) -> Result<(), sqlx::Error> {
330
        sqlx::query(
331
            r#"
332
            UPDATE archive_requests
333
            SET status = $2
334
            WHERE task_id = $1
335
            "#,
336
        )
337
        .bind(task_id)
×
338
        .bind(status)
×
339
        .execute(&self.pool)
×
340
        .await?;
×
341
        Ok(())
×
342
    }
343

344
    pub async fn update_archive_request_statuses(
×
345
        &self,
346
        task_ids: &[String],
347
        status: &str,
348
    ) -> Result<(), sqlx::Error> {
349
        if task_ids.is_empty() {
×
350
            return Ok(());
×
351
        }
352

353
        // Use a transaction for atomicity
354
        let mut tx = self.pool.begin().await?;
×
355

356
        // Update each task_id individually with a prepared statement
357
        for task_id in task_ids {
×
358
            sqlx::query("UPDATE archive_requests SET status = $1 WHERE task_id = $2")
×
359
                .bind(status)
×
360
                .bind(task_id)
×
361
                .execute(&mut *tx)
×
362
                .await?;
×
363
        }
364

365
        tx.commit().await?;
×
366
        Ok(())
×
367
    }
368

UNCOV
369
    pub async fn retry_backup(
×
370
        &self,
371
        task_id: &str,
372
        scope: &str,
373
        retention_days: u64,
374
    ) -> Result<(), sqlx::Error> {
375
        let mut tx = self.pool.begin().await?;
×
376

377
        // Reset statuses per requested scope
NEW
378
        if scope == "archive" || scope == "full" {
×
379
            sqlx::query(
380
                r#"
381
                UPDATE archive_requests
382
                SET status = 'in_progress', fatal_error = NULL, error_log = NULL
383
                WHERE task_id = $1
384
                "#,
385
            )
NEW
386
            .bind(task_id)
×
NEW
387
            .execute(&mut *tx)
×
NEW
388
            .await?;
×
389
            sqlx::query(
390
                r#"
391
                UPDATE archive_requests
392
                SET expires_at = NOW() + ($2 || ' days')::interval
393
                WHERE task_id = $1
394
                "#,
395
            )
NEW
396
            .bind(task_id)
×
NEW
397
            .bind(retention_days as i64)
×
NEW
398
            .execute(&mut *tx)
×
NEW
399
            .await?;
×
400
        }
NEW
401
        if scope == "ipfs" || scope == "full" {
×
402
            sqlx::query(
403
                r#"
404
                UPDATE pin_requests
405
                SET status = 'in_progress', fatal_error = NULL, error_log = NULL
406
                WHERE task_id = $1
407
                "#,
408
            )
NEW
409
            .bind(task_id)
×
NEW
410
            .execute(&mut *tx)
×
NEW
411
            .await?;
×
412
        }
413

414
        tx.commit().await?;
×
415
        Ok(())
×
416
    }
417

418
    pub async fn clear_backup_errors(&self, task_id: &str, scope: &str) -> Result<(), sqlx::Error> {
×
419
        let mut tx = self.pool.begin().await?;
×
420
        // Clear archive errors if scope includes archive
421
        sqlx::query(
422
            r#"
423
            UPDATE archive_requests
424
            SET error_log = NULL, fatal_error = NULL
425
            WHERE task_id = $1 AND ($2 IN ('archive', 'full'))
426
            "#,
427
        )
428
        .bind(task_id)
×
429
        .bind(scope)
×
430
        .execute(&mut *tx)
×
431
        .await?;
×
432
        // Clear IPFS errors if scope includes ipfs
433
        sqlx::query(
434
            r#"
435
            UPDATE pin_requests
436
            SET error_log = NULL, fatal_error = NULL
437
            WHERE task_id = $1 AND ($2 IN ('ipfs', 'full'))
438
            "#,
439
        )
440
        .bind(task_id)
×
441
        .bind(scope)
×
442
        .execute(&mut *tx)
×
443
        .await?;
×
444
        tx.commit().await?;
×
445
        Ok(())
×
446
    }
447

448
    pub async fn set_archive_request_error(
×
449
        &self,
450
        task_id: &str,
451
        fatal_error: &str,
452
    ) -> Result<(), sqlx::Error> {
453
        sqlx::query(
454
            r#"
455
            UPDATE archive_requests
456
            SET status = 'error', fatal_error = $2
457
            WHERE task_id = $1
458
            "#,
459
        )
460
        .bind(task_id)
×
461
        .bind(fatal_error)
×
462
        .execute(&self.pool)
×
463
        .await?;
×
464
        Ok(())
×
465
    }
466

467
    pub async fn set_pin_request_error(
×
468
        &self,
469
        task_id: &str,
470
        fatal_error: &str,
471
    ) -> Result<(), sqlx::Error> {
472
        sqlx::query(
473
            r#"
474
            UPDATE pin_requests
475
            SET status = 'error', fatal_error = $2
476
            WHERE task_id = $1
477
            "#,
478
        )
479
        .bind(task_id)
×
480
        .bind(fatal_error)
×
481
        .execute(&self.pool)
×
482
        .await?;
×
483
        Ok(())
×
484
    }
485

486
    pub async fn start_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
487
        let mut tx = self.pool.begin().await?;
×
488

489
        // Touch parent row
490
        sqlx::query!(
×
491
            r#"UPDATE backup_tasks SET updated_at = NOW() WHERE task_id = $1"#,
492
            task_id
493
        )
494
        .execute(&mut *tx)
×
495
        .await?;
×
496

497
        // Mark archive subresource as being deleted
498
        sqlx::query!(
×
499
            r#"
500
            UPDATE archive_requests
501
            SET deleted_at = NOW()
502
            WHERE task_id = $1 AND deleted_at IS NULL
503
            "#,
504
            task_id
505
        )
506
        .execute(&mut *tx)
×
507
        .await?;
×
508

509
        // Mark IPFS subresource as being deleted
510
        sqlx::query!(
×
511
            r#"
512
            UPDATE pin_requests
513
            SET deleted_at = NOW()
514
            WHERE task_id = $1 AND deleted_at IS NULL
515
            "#,
516
            task_id
517
        )
518
        .execute(&mut *tx)
×
519
        .await?;
×
520

521
        tx.commit().await?;
×
522
        Ok(())
×
523
    }
524

525
    /// Mark archive as being deleted (similar to start_deletion but for archive subresource)
526
    pub async fn start_archive_request_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
527
        sqlx::query!(
×
528
            r#"
529
            UPDATE archive_requests 
530
            SET deleted_at = NOW() 
531
            WHERE task_id = $1 AND deleted_at IS NULL
532
            "#,
533
            task_id
534
        )
535
        .execute(&self.pool)
×
536
        .await?;
×
537
        Ok(())
×
538
    }
539

540
    /// Mark IPFS pins as being deleted (similar to start_deletion but for IPFS pins subresource)
541
    pub async fn start_pin_request_deletions(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
542
        sqlx::query!(
×
543
            r#"
544
            UPDATE pin_requests 
545
            SET deleted_at = NOW() 
546
            WHERE task_id = $1 AND deleted_at IS NULL
547
            "#,
548
            task_id
549
        )
550
        .execute(&self.pool)
×
551
        .await?;
×
552
        Ok(())
×
553
    }
554

555
    pub async fn get_backup_task(&self, task_id: &str) -> Result<Option<BackupTask>, sqlx::Error> {
×
556
        let row = sqlx::query(
557
            r#"
558
            SELECT 
559
                b.task_id, b.created_at, b.updated_at, b.requestor, b.nft_count, 
560
                b.tokens, ar.status as archive_status, ar.fatal_error, b.storage_mode,
561
                ar.archive_format, ar.expires_at, ar.deleted_at as archive_deleted_at,
562
                ar.error_log as archive_error_log,
563
                pr.status as ipfs_status,
564
                pr.error_log as ipfs_error_log,
565
                pr.fatal_error as ipfs_fatal_error,
566
                pr.deleted_at as pins_deleted_at
567
            FROM backup_tasks b
568
            LEFT JOIN archive_requests ar ON b.task_id = ar.task_id
569
            LEFT JOIN pin_requests pr ON b.task_id = pr.task_id
570
            WHERE b.task_id = $1
571
            "#,
572
        )
573
        .bind(task_id)
×
574
        .fetch_optional(&self.pool)
×
575
        .await?;
×
576

577
        Ok(row.map(|row| BackupTask {
578
            task_id: row.get("task_id"),
×
579
            created_at: row.get("created_at"),
×
580
            updated_at: row.get("updated_at"),
×
581
            requestor: row.get("requestor"),
×
582
            nft_count: row.get("nft_count"),
×
583
            tokens: row.get("tokens"),
×
584
            archive_status: row
×
585
                .try_get::<Option<String>, _>("archive_status")
×
586
                .ok()
×
587
                .flatten(),
×
588
            ipfs_status: row
×
589
                .try_get::<Option<String>, _>("ipfs_status")
×
590
                .ok()
×
591
                .flatten(),
×
592
            archive_error_log: row.get("archive_error_log"),
×
593
            ipfs_error_log: row.get("ipfs_error_log"),
×
594
            archive_fatal_error: row.get("fatal_error"),
×
595
            ipfs_fatal_error: row
×
596
                .try_get::<Option<String>, _>("ipfs_fatal_error")
×
597
                .ok()
×
598
                .flatten(),
×
599
            storage_mode: row.get("storage_mode"),
×
600
            archive_format: row.get("archive_format"),
×
601
            expires_at: row.get("expires_at"),
×
602
            archive_deleted_at: row.get("archive_deleted_at"),
×
603
            pins_deleted_at: row.get("pins_deleted_at"),
×
604
        }))
605
    }
606

607
    pub async fn list_requestor_backup_tasks_paginated(
×
608
        &self,
609
        requestor: &str,
610
        include_tokens: bool,
611
        limit: i64,
612
        offset: i64,
613
    ) -> Result<(Vec<BackupTask>, u32), sqlx::Error> {
614
        let tokens_field = if include_tokens { "b.tokens," } else { "" };
×
615

616
        // Total count
617
        let total_row = sqlx::query!(
×
618
            r#"SELECT COUNT(*) as count FROM backup_tasks b WHERE b.requestor = $1"#,
619
            requestor
620
        )
621
        .fetch_one(&self.pool)
×
622
        .await?;
×
623
        let total: u32 = (total_row.count.unwrap_or(0) as i64).max(0) as u32;
×
624

625
        let query = format!(
×
626
            r#"
627
            SELECT 
628
                b.task_id, b.created_at, b.updated_at, b.requestor, b.nft_count, 
629
                {tokens_field} ar.status as archive_status, ar.fatal_error, b.storage_mode,
630
                ar.archive_format, ar.expires_at, ar.deleted_at as archive_deleted_at,
631
                ar.error_log as archive_error_log,
632
                pr.status as ipfs_status,
633
                pr.error_log as ipfs_error_log,
634
                pr.fatal_error as ipfs_fatal_error,
635
                pr.deleted_at as pins_deleted_at
636
            FROM backup_tasks b
637
            LEFT JOIN archive_requests ar ON b.task_id = ar.task_id
638
            LEFT JOIN pin_requests pr ON b.task_id = pr.task_id
639
            WHERE b.requestor = $1
640
            ORDER BY b.created_at DESC
641
            LIMIT $2 OFFSET $3
642
            "#,
643
        );
644

645
        let rows = sqlx::query(&query)
×
646
            .bind(requestor)
×
647
            .bind(limit)
×
648
            .bind(offset)
×
649
            .fetch_all(&self.pool)
×
650
            .await?;
×
651

652
        let recs = rows
×
653
            .into_iter()
654
            .map(|row| {
×
655
                let tokens = if include_tokens {
×
656
                    row.try_get::<serde_json::Value, _>("tokens")
×
657
                        .unwrap_or(serde_json::Value::Null)
×
658
                } else {
659
                    serde_json::Value::Null
×
660
                };
661

662
                BackupTask {
×
663
                    task_id: row.get("task_id"),
×
664
                    created_at: row.get("created_at"),
×
665
                    updated_at: row.get("updated_at"),
×
666
                    requestor: row.get("requestor"),
×
667
                    nft_count: row.get("nft_count"),
×
668
                    tokens,
×
669
                    archive_status: row
×
670
                        .try_get::<Option<String>, _>("archive_status")
×
671
                        .ok()
×
672
                        .flatten(),
×
673
                    ipfs_status: row
×
674
                        .try_get::<Option<String>, _>("ipfs_status")
×
675
                        .ok()
×
676
                        .flatten(),
×
677
                    archive_error_log: row.get("archive_error_log"),
×
678
                    ipfs_error_log: row.get("ipfs_error_log"),
×
679
                    archive_fatal_error: row.get("fatal_error"),
×
680
                    ipfs_fatal_error: row
×
681
                        .try_get::<Option<String>, _>("ipfs_fatal_error")
×
682
                        .ok()
×
683
                        .flatten(),
×
684
                    storage_mode: row.get("storage_mode"),
×
685
                    archive_format: row.get("archive_format"),
×
686
                    expires_at: row.get("expires_at"),
×
687
                    archive_deleted_at: row.get("archive_deleted_at"),
×
688
                    pins_deleted_at: row.get("pins_deleted_at"),
×
689
                }
690
            })
691
            .collect();
692

693
        Ok((recs, total))
×
694
    }
695

696
    pub async fn list_unprocessed_expired_backups(
×
697
        &self,
698
    ) -> Result<Vec<ExpiredBackup>, sqlx::Error> {
699
        let rows = sqlx::query(
700
            r#"
701
            SELECT b.task_id, ar.archive_format 
702
            FROM backup_tasks b
703
            JOIN archive_requests ar ON b.task_id = ar.task_id
704
            WHERE ar.expires_at IS NOT NULL AND ar.expires_at < NOW() AND ar.status != 'expired'
705
            "#,
706
        )
707
        .fetch_all(&self.pool)
×
708
        .await?;
×
709
        let recs = rows
×
710
            .into_iter()
711
            .map(|row| ExpiredBackup {
×
712
                task_id: row.get("task_id"),
×
713
                archive_format: row.get("archive_format"),
×
714
            })
715
            .collect();
716
        Ok(recs)
×
717
    }
718

719
    /// Retrieve all backup tasks that are in 'in_progress' status
720
    /// This is used to recover incomplete tasks on server restart
721
    pub async fn get_incomplete_backup_tasks(&self) -> Result<Vec<BackupTask>, sqlx::Error> {
×
722
        let rows = sqlx::query(
723
            r#"
724
            SELECT 
725
                b.task_id, b.created_at, b.updated_at, b.requestor, b.nft_count, 
726
                b.tokens, ar.status as archive_status, ar.fatal_error, b.storage_mode,
727
                ar.archive_format, ar.expires_at, ar.deleted_at as archive_deleted_at,
728
                ar.error_log as archive_error_log,
729
                pr.status as ipfs_status,
730
                pr.error_log as ipfs_error_log,
731
                pr.deleted_at as pins_deleted_at
732
            FROM backup_tasks b
733
            LEFT JOIN archive_requests ar ON b.task_id = ar.task_id
734
            LEFT JOIN pin_requests pr ON b.task_id = pr.task_id
735
            WHERE (
736
                -- Archive-only mode: check archive status (record must exist and be in_progress)
737
                (b.storage_mode = 'archive' AND ar.status = 'in_progress')
738
                OR
739
                -- IPFS-only mode: check IPFS status (record must exist and be in_progress)
740
                (b.storage_mode = 'ipfs' AND pr.status = 'in_progress')
741
                OR
742
                -- Full mode: check both archive and IPFS status (task is incomplete if either is in_progress)
743
                (b.storage_mode = 'full' AND (ar.status = 'in_progress' OR pr.status = 'in_progress'))
744
            )
745
            ORDER BY b.created_at ASC
746
            "#,
747
        )
748
        .fetch_all(&self.pool)
×
749
        .await?;
×
750

751
        let recs = rows
×
752
            .into_iter()
753
            .map(|row| BackupTask {
×
754
                task_id: row.get("task_id"),
×
755
                created_at: row.get("created_at"),
×
756
                updated_at: row.get("updated_at"),
×
757
                requestor: row.get("requestor"),
×
758
                nft_count: row.get("nft_count"),
×
759
                tokens: row.get("tokens"),
×
760
                archive_status: row
×
761
                    .try_get::<Option<String>, _>("archive_status")
×
762
                    .ok()
×
763
                    .flatten(),
×
764
                ipfs_status: row
×
765
                    .try_get::<Option<String>, _>("ipfs_status")
×
766
                    .ok()
×
767
                    .flatten(),
×
768
                archive_error_log: row.get("archive_error_log"),
×
769
                ipfs_error_log: row.get("ipfs_error_log"),
×
770
                archive_fatal_error: row.get("fatal_error"),
×
771
                ipfs_fatal_error: None,
×
772
                storage_mode: row.get("storage_mode"),
×
773
                archive_format: row.get("archive_format"),
×
774
                expires_at: row.get("expires_at"),
×
775
                archive_deleted_at: row.get("archive_deleted_at"),
×
776
                pins_deleted_at: row.get("pins_deleted_at"),
×
777
            })
778
            .collect();
779

780
        Ok(recs)
×
781
    }
782

783
    /// Insert pins and their associated tokens in a single atomic transaction
784
    pub async fn insert_pins_with_tokens(
×
785
        &self,
786
        task_id: &str,
787
        token_pin_mappings: &[crate::TokenPinMapping],
788
    ) -> Result<(), sqlx::Error> {
789
        if token_pin_mappings.is_empty() {
×
790
            return Ok(());
×
791
        }
792

793
        // Collect all pin responses and prepare token data
794
        let mut all_pin_responses = Vec::new();
×
795
        let mut all_token_data = Vec::new(); // (index_in_pin_responses, chain, contract_address, token_id)
×
796

797
        for mapping in token_pin_mappings {
×
798
            for pin_response in &mapping.pin_responses {
×
799
                let index = all_pin_responses.len();
×
800
                all_pin_responses.push(pin_response);
×
801
                all_token_data.push((
×
802
                    index,
×
803
                    mapping.chain.clone(),
×
804
                    mapping.contract_address.clone(),
×
805
                    mapping.token_id.clone(),
×
806
                ));
807
            }
808
        }
809

810
        if all_pin_responses.is_empty() {
×
811
            return Ok(());
×
812
        }
813

814
        // Start a transaction for atomicity
815
        let mut tx = self.pool.begin().await?;
×
816

817
        // Insert pins one by one and collect generated IDs
818
        let mut pin_ids: Vec<i64> = Vec::new();
×
819
        for pin_response in &all_pin_responses {
×
820
            // Map status enum to lowercase string to satisfy CHECK constraint
821
            let status = match pin_response.status {
×
822
                crate::ipfs::PinResponseStatus::Queued => "queued",
×
823
                crate::ipfs::PinResponseStatus::Pinning => "pinning",
×
824
                crate::ipfs::PinResponseStatus::Pinned => "pinned",
×
825
                crate::ipfs::PinResponseStatus::Failed => "failed",
×
826
            };
827

828
            let row = sqlx::query(
829
                "INSERT INTO pins (task_id, provider_type, provider_url, cid, request_id, pin_status) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id"
830
            )
831
            .bind(task_id)
×
832
            .bind(&pin_response.provider_type)
×
833
            .bind(&pin_response.provider_url)
×
834
            .bind(&pin_response.cid)
×
835
            .bind(&pin_response.id)
×
836
            .bind(status)
×
837
            .fetch_one(&mut *tx)
×
838
            .await?;
×
839

840
            pin_ids.push(row.get("id"));
×
841
        }
842

843
        // Insert pinned tokens using the generated pin_ids
844
        for (index, chain, contract_address, token_id) in &all_token_data {
×
845
            sqlx::query(
846
                "INSERT INTO pinned_tokens (pin_id, chain, contract_address, token_id) VALUES ($1, $2, $3, $4)"
847
            )
848
            .bind(pin_ids[*index])
×
849
            .bind(chain)
×
850
            .bind(contract_address)
×
851
            .bind(token_id)
×
852
            .execute(&mut *tx)
×
853
            .await?;
×
854
        }
855

856
        // Commit the transaction
857
        tx.commit().await?;
×
858
        Ok(())
×
859
    }
860

861
    /// Get all pins for a specific backup task
862
    pub async fn get_pins_by_task_id(&self, task_id: &str) -> Result<Vec<PinRow>, sqlx::Error> {
×
863
        let rows = sqlx::query(
864
            r#"
865
            SELECT id, task_id, provider_type, provider_url, cid, request_id, pin_status, created_at
866
            FROM pins
867
            WHERE task_id = $1
868
            ORDER BY id
869
            "#,
870
        )
871
        .bind(task_id)
×
872
        .fetch_all(&self.pool)
×
873
        .await?;
×
874

875
        Ok(rows
×
876
            .into_iter()
×
877
            .map(|row| PinRow {
×
878
                id: row.get("id"),
×
879
                task_id: row.get("task_id"),
×
880
                provider_type: row.get("provider_type"),
×
881
                provider_url: row
×
882
                    .try_get::<Option<String>, _>("provider_url")
×
883
                    .ok()
×
884
                    .flatten(),
×
885
                cid: row.get("cid"),
×
886
                request_id: row.get("request_id"),
×
887
                pin_status: row.get("pin_status"),
×
888
                created_at: row.get("created_at"),
×
889
            })
890
            .collect())
×
891
    }
892

893
    /// Paginated pinned tokens grouped by (chain, contract_address, token_id)
894
    pub async fn get_pinned_tokens_by_requestor(
×
895
        &self,
896
        requestor: &str,
897
        limit: i64,
898
        offset: i64,
899
    ) -> Result<(Vec<TokenWithPins>, u32), sqlx::Error> {
900
        // Total distinct tokens for this requestor
901
        let total_row = sqlx::query(
902
            r#"
903
            SELECT COUNT(*) as count
904
            FROM (
905
                SELECT DISTINCT pt.chain, pt.contract_address, pt.token_id
906
                FROM pinned_tokens pt
907
                JOIN pins p ON p.id = pt.pin_id
908
                JOIN backup_tasks bt ON bt.task_id = p.task_id
909
                WHERE bt.requestor = $1
910
            ) t
911
            "#,
912
        )
913
        .bind(requestor)
×
914
        .fetch_one(&self.pool)
×
915
        .await?;
×
916
        let total: u32 = (total_row.get::<i64, _>("count")).max(0) as u32;
×
917

918
        // Page of distinct tokens ordered by most recent pin time
919
        let rows = sqlx::query(
920
            r#"
921
            SELECT t.chain, t.contract_address, t.token_id
922
            FROM (
923
                SELECT pt.chain, pt.contract_address, pt.token_id, MAX(pt.created_at) AS last_created
924
                FROM pinned_tokens pt
925
                JOIN pins p ON p.id = pt.pin_id
926
                JOIN backup_tasks bt ON bt.task_id = p.task_id
927
                WHERE bt.requestor = $1
928
                GROUP BY pt.chain, pt.contract_address, pt.token_id
929
            ) t
930
            ORDER BY last_created DESC
931
            LIMIT $2 OFFSET $3
932
            "#,
933
        )
934
        .bind(requestor)
×
935
        .bind(limit)
×
936
        .bind(offset)
×
937
        .fetch_all(&self.pool)
×
938
        .await?;
×
939

940
        // For each token key, fetch pins (ordered by created_at desc)
941
        let mut result: Vec<TokenWithPins> = Vec::new();
×
942
        for r in rows {
×
943
            let token_rows = sqlx::query(
944
                r#"
945
                SELECT pt.chain, pt.contract_address, pt.token_id,
946
                       p.cid, p.provider_type, p.provider_url, p.pin_status, pt.created_at
947
                FROM pinned_tokens pt
948
                JOIN pins p ON p.id = pt.pin_id
949
                JOIN backup_tasks bt ON bt.task_id = p.task_id
950
                WHERE bt.requestor = $1
951
                  AND pt.chain = $2
952
                  AND pt.contract_address = $3
953
                  AND pt.token_id = $4
954
                ORDER BY pt.created_at DESC
955
                "#,
956
            )
957
            .bind(requestor)
×
958
            .bind(r.get::<String, _>("chain"))
×
959
            .bind(r.get::<String, _>("contract_address"))
×
960
            .bind(r.get::<String, _>("token_id"))
×
961
            .fetch_all(&self.pool)
×
962
            .await?;
×
963

964
            let mut pins: Vec<PinInfo> = Vec::new();
×
965
            let mut chain = String::new();
×
966
            let mut contract_address = String::new();
×
967
            let mut token_id = String::new();
×
968
            for row in token_rows {
×
969
                chain = row.get("chain");
×
970
                contract_address = row.get("contract_address");
×
971
                token_id = row.get("token_id");
×
972
                let cid: String = row.get("cid");
×
973
                let provider_type: String = row.get("provider_type");
×
974
                let provider_url: String = row
×
975
                    .try_get::<Option<String>, _>("provider_url")
976
                    .ok()
977
                    .flatten()
978
                    .unwrap_or_default();
979
                let status: String = row.get("pin_status");
×
980
                let created_at: DateTime<Utc> = row.get("created_at");
×
981
                pins.push(PinInfo {
×
982
                    cid,
×
983
                    provider_type,
×
984
                    provider_url,
×
985
                    status,
×
986
                    created_at,
×
987
                });
988
            }
989
            result.push(TokenWithPins {
×
990
                chain,
×
991
                contract_address,
×
992
                token_id,
×
993
                pins,
×
994
            });
995
        }
996

997
        Ok((result, total))
×
998
    }
999

1000
    /// Get a specific pinned token for a requestor
1001
    pub async fn get_pinned_token_by_requestor(
×
1002
        &self,
1003
        requestor: &str,
1004
        chain: &str,
1005
        contract_address: &str,
1006
        token_id: &str,
1007
    ) -> Result<Option<TokenWithPins>, sqlx::Error> {
1008
        let query = r#"
×
1009
            SELECT pt.chain, pt.contract_address, pt.token_id,
×
1010
                   p.cid, p.provider_type, p.provider_url, p.pin_status, pt.created_at
×
1011
            FROM pinned_tokens pt
×
1012
            JOIN pins p ON p.id = pt.pin_id
×
1013
            JOIN backup_tasks bt ON bt.task_id = p.task_id
×
1014
            WHERE bt.requestor = $1
×
1015
              AND pt.chain = $2
×
1016
              AND pt.contract_address = $3
×
1017
              AND pt.token_id = $4
×
1018
            ORDER BY pt.created_at DESC
×
1019
        "#;
×
1020

1021
        let rows = sqlx::query(query)
×
1022
            .bind(requestor)
×
1023
            .bind(chain)
×
1024
            .bind(contract_address)
×
1025
            .bind(token_id)
×
1026
            .fetch_all(&self.pool)
×
1027
            .await?;
×
1028

1029
        if rows.is_empty() {
×
1030
            return Ok(None);
×
1031
        }
1032

1033
        let mut pins = Vec::new();
×
1034
        let mut token_chain = String::new();
×
1035
        let mut token_contract_address = String::new();
×
1036
        let mut token_token_id = String::new();
×
1037

1038
        for row in rows {
×
1039
            token_chain = row.get("chain");
×
1040
            token_contract_address = row.get("contract_address");
×
1041
            token_token_id = row.get("token_id");
×
1042
            let cid: String = row.get("cid");
×
1043
            let provider_type: String = row.get("provider_type");
×
1044
            // provider_url may be NULL for legacy rows; default to empty string for API stability
1045
            let provider_url: String = row
×
1046
                .try_get::<Option<String>, _>("provider_url")
1047
                .ok()
1048
                .flatten()
1049
                .unwrap_or_default();
1050
            let status: String = row.get("pin_status");
×
1051
            let created_at: DateTime<Utc> = row.get("created_at");
×
1052

1053
            pins.push(PinInfo {
×
1054
                cid,
×
1055
                provider_type,
×
1056
                provider_url,
×
1057
                status,
×
1058
                created_at,
×
1059
            });
1060
        }
1061

1062
        Ok(Some(TokenWithPins {
×
1063
            chain: token_chain,
×
1064
            contract_address: token_contract_address,
×
1065
            token_id: token_token_id,
×
1066
            pins,
×
1067
        }))
1068
    }
1069

1070
    /// Get all pins that are in 'queued' or 'pinning' status
1071
    /// This is used by the pin monitor to check for status updates
1072
    pub async fn get_active_pins(&self) -> Result<Vec<PinRow>, sqlx::Error> {
×
1073
        let rows = sqlx::query(
1074
            r#"
1075
            SELECT id, task_id, provider_type, provider_url, cid, request_id, pin_status, created_at
1076
            FROM pins
1077
            WHERE pin_status IN ('queued', 'pinning')
1078
            ORDER BY id
1079
            "#,
1080
        )
1081
        .fetch_all(&self.pool)
×
1082
        .await?;
×
1083

1084
        Ok(rows
×
1085
            .into_iter()
×
1086
            .map(|row| PinRow {
×
1087
                id: row.get("id"),
×
1088
                task_id: row.get("task_id"),
×
1089
                provider_type: row.get("provider_type"),
×
1090
                provider_url: row
×
1091
                    .try_get::<Option<String>, _>("provider_url")
×
1092
                    .ok()
×
1093
                    .flatten(),
×
1094
                cid: row.get("cid"),
×
1095
                request_id: row.get("request_id"),
×
1096
                pin_status: row.get("pin_status"),
×
1097
                created_at: row.get("created_at"),
×
1098
            })
1099
            .collect())
×
1100
    }
1101

1102
    /// Set backup fatal error for relevant subresources in a single SQL statement.
1103
    /// The update is based on the `storage_mode` value from the `backup_tasks` table for the given `task_id`:
1104
    /// - If storage_mode is 'archive' or 'full': updates archive_requests.status and archive_requests.fatal_error
1105
    /// - If storage_mode is 'ipfs' or 'full': updates pin_requests.status and pin_requests.fatal_error
1106
    pub async fn set_backup_error(
×
1107
        &self,
1108
        task_id: &str,
1109
        fatal_error: &str,
1110
    ) -> Result<(), sqlx::Error> {
1111
        let sql = r#"
×
1112
            WITH task_mode AS (
×
1113
                SELECT storage_mode FROM backup_tasks WHERE task_id = $1
×
1114
            ),
1115
            upd_archive AS (
×
1116
                UPDATE archive_requests ar
×
1117
                SET status = 'error', fatal_error = $2
×
1118
                WHERE ar.task_id = $1
×
1119
                  AND EXISTS (
×
1120
                      SELECT 1 FROM task_mode tm
×
1121
                      WHERE tm.storage_mode IN ('archive', 'full')
×
1122
                  )
1123
                RETURNING 1
×
1124
            ),
1125
            upd_pins AS (
×
1126
                UPDATE pin_requests pr
×
1127
                SET status = 'error', fatal_error = $2
×
1128
                WHERE pr.task_id = $1
×
1129
                  AND EXISTS (
×
1130
                      SELECT 1 FROM task_mode tm
×
1131
                      WHERE tm.storage_mode IN ('ipfs', 'full')
×
1132
                  )
1133
                RETURNING 1
×
1134
            )
1135
            SELECT COALESCE((SELECT COUNT(*) FROM upd_archive), 0) AS archive_updates,
×
1136
                   COALESCE((SELECT COUNT(*) FROM upd_pins), 0)     AS pin_updates
×
1137
        "#;
×
1138
        sqlx::query(sql)
×
1139
            .bind(task_id)
×
1140
            .bind(fatal_error)
×
1141
            .execute(&self.pool)
×
1142
            .await?;
×
1143
        Ok(())
×
1144
    }
1145

1146
    /// Update backup subresource statuses for the task based on its storage mode
1147
    /// - archive or full: updates archive_requests.status
1148
    /// - ipfs or full: updates pin_requests.status
1149
    pub async fn update_backup_statuses(
×
1150
        &self,
1151
        task_id: &str,
1152
        scope: &str,
1153
        archive_status: &str,
1154
        ipfs_status: &str,
1155
    ) -> Result<(), sqlx::Error> {
1156
        let sql = r#"
×
1157
            WITH upd_archive AS (
×
1158
                UPDATE archive_requests ar
×
1159
                SET status = $2
×
1160
                WHERE ar.task_id = $1
×
1161
                  AND ($4 IN ('archive', 'full'))
×
1162
                RETURNING 1
×
1163
            ),
1164
            upd_pins AS (
×
1165
                UPDATE pin_requests pr
×
1166
                SET status = $3
×
1167
                WHERE pr.task_id = $1
×
1168
                  AND ($4 IN ('ipfs', 'full'))
×
1169
                RETURNING 1
×
1170
            )
1171
            SELECT COALESCE((SELECT COUNT(*) FROM upd_archive), 0) AS archive_updates,
×
1172
                   COALESCE((SELECT COUNT(*) FROM upd_pins), 0)     AS pin_updates
×
1173
        "#;
×
1174
        sqlx::query(sql)
×
1175
            .bind(task_id)
×
1176
            .bind(archive_status)
×
1177
            .bind(ipfs_status)
×
1178
            .bind(scope)
×
1179
            .execute(&self.pool)
×
1180
            .await?;
×
1181
        Ok(())
×
1182
    }
1183

1184
    pub async fn update_pin_statuses(&self, updates: &[(i64, String)]) -> Result<(), sqlx::Error> {
×
1185
        if updates.is_empty() {
×
1186
            return Ok(());
×
1187
        }
1188

1189
        let mut tx = self.pool.begin().await?;
×
1190

1191
        for (id, status) in updates {
×
1192
            sqlx::query(
1193
                r#"
1194
                UPDATE pins
1195
                SET pin_status = $2
1196
                WHERE id = $1
1197
                "#,
1198
            )
1199
            .bind(id)
×
1200
            .bind(status)
×
1201
            .execute(&mut *tx)
×
1202
            .await?;
×
1203
        }
1204

1205
        tx.commit().await?;
×
1206
        Ok(())
×
1207
    }
1208

1209
    /// Complete archive deletion:
1210
    /// - If current storage_mode is 'archive', delete the whole backup (finalize deletion)
1211
    /// - Else if current storage_mode is 'full', flip to 'ipfs' to reflect archive removed
1212
    pub async fn complete_archive_request_deletion(
×
1213
        &self,
1214
        task_id: &str,
1215
    ) -> Result<(), sqlx::Error> {
1216
        // Atomically: delete when archive-only; else if full, flip to ipfs
1217
        let sql = r#"
×
1218
            WITH del AS (
×
1219
                DELETE FROM backup_tasks
×
1220
                WHERE task_id = $1 AND storage_mode = 'archive'
×
1221
                RETURNING 1
×
1222
            ), upd AS (
×
1223
                UPDATE backup_tasks
×
1224
                SET storage_mode = 'ipfs', updated_at = NOW()
×
1225
                WHERE task_id = $1 AND storage_mode = 'full' AND NOT EXISTS (SELECT 1 FROM del)
×
1226
                RETURNING 1
×
1227
            )
1228
            SELECT COALESCE((SELECT COUNT(*) FROM del), 0) AS deleted,
×
1229
                   COALESCE((SELECT COUNT(*) FROM upd), 0) AS updated
×
1230
        "#;
×
1231
        let _ = sqlx::query(sql).bind(task_id).execute(&self.pool).await?;
×
1232
        Ok(())
×
1233
    }
1234

1235
    /// Complete IPFS pins deletion:
1236
    /// - If current storage_mode is 'ipfs', delete the whole backup (finalize deletion)
1237
    /// - Else if current storage_mode is 'full', flip to 'archive' to reflect pins removed
1238
    pub async fn complete_pin_request_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
1239
        // Atomically: delete when ipfs-only; else if full, flip to archive
1240
        let sql = r#"
×
1241
            WITH del AS (
×
1242
                DELETE FROM backup_tasks
×
1243
                WHERE task_id = $1 AND storage_mode = 'ipfs'
×
1244
                RETURNING 1
×
1245
            ), upd AS (
×
1246
                UPDATE backup_tasks
×
1247
                SET storage_mode = 'archive', updated_at = NOW()
×
1248
                WHERE task_id = $1 AND storage_mode = 'full' AND NOT EXISTS (SELECT 1 FROM del)
×
1249
                RETURNING 1
×
1250
            )
1251
            SELECT COALESCE((SELECT COUNT(*) FROM del), 0) AS deleted,
×
1252
                   COALESCE((SELECT COUNT(*) FROM upd), 0) AS updated
×
1253
        "#;
×
1254
        let _ = sqlx::query(sql).bind(task_id).execute(&self.pool).await?;
×
1255
        Ok(())
×
1256
    }
1257
}
1258

1259
// Implement the unified Database trait for the real Db struct
1260
#[async_trait::async_trait]
1261
impl Database for Db {
1262
    // Backup task operations
1263
    async fn insert_backup_task(
1264
        &self,
1265
        task_id: &str,
1266
        requestor: &str,
1267
        nft_count: i32,
1268
        tokens: &serde_json::Value,
1269
        storage_mode: &str,
1270
        archive_format: Option<&str>,
1271
        retention_days: Option<u64>,
1272
    ) -> Result<(), sqlx::Error> {
1273
        Db::insert_backup_task(
1274
            self,
×
1275
            task_id,
×
1276
            requestor,
×
1277
            nft_count,
×
1278
            tokens,
×
1279
            storage_mode,
×
1280
            archive_format,
×
1281
            retention_days,
×
1282
        )
1283
        .await
×
1284
    }
1285

1286
    async fn get_backup_task(&self, task_id: &str) -> Result<Option<BackupTask>, sqlx::Error> {
×
1287
        Db::get_backup_task(self, task_id).await
×
1288
    }
1289

1290
    async fn delete_backup_task(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
1291
        Db::delete_backup_task(self, task_id).await
×
1292
    }
1293

1294
    async fn get_incomplete_backup_tasks(&self) -> Result<Vec<BackupTask>, sqlx::Error> {
×
1295
        Db::get_incomplete_backup_tasks(self).await
×
1296
    }
1297

1298
    async fn list_requestor_backup_tasks_paginated(
1299
        &self,
1300
        requestor: &str,
1301
        include_tokens: bool,
1302
        limit: i64,
1303
        offset: i64,
1304
    ) -> Result<(Vec<BackupTask>, u32), sqlx::Error> {
1305
        Db::list_requestor_backup_tasks_paginated(self, requestor, include_tokens, limit, offset)
×
1306
            .await
×
1307
    }
1308

1309
    async fn list_unprocessed_expired_backups(&self) -> Result<Vec<ExpiredBackup>, sqlx::Error> {
×
1310
        Db::list_unprocessed_expired_backups(self).await
×
1311
    }
1312

1313
    // Backup task status and error operations
1314
    async fn clear_backup_errors(&self, task_id: &str, scope: &str) -> Result<(), sqlx::Error> {
×
1315
        Db::clear_backup_errors(self, task_id, scope).await
×
1316
    }
1317

1318
    async fn set_backup_error(&self, task_id: &str, error: &str) -> Result<(), sqlx::Error> {
×
1319
        Db::set_backup_error(self, task_id, error).await
×
1320
    }
1321

1322
    async fn set_error_logs(
1323
        &self,
1324
        task_id: &str,
1325
        archive_error_log: Option<&str>,
1326
        ipfs_error_log: Option<&str>,
1327
    ) -> Result<(), sqlx::Error> {
1328
        Db::set_error_logs(self, task_id, archive_error_log, ipfs_error_log).await
×
1329
    }
1330

1331
    async fn update_archive_request_error_log(
1332
        &self,
1333
        task_id: &str,
1334
        error_log: &str,
1335
    ) -> Result<(), sqlx::Error> {
1336
        Db::update_archive_request_error_log(self, task_id, error_log).await
×
1337
    }
1338

1339
    async fn update_pin_request_error_log(
1340
        &self,
1341
        task_id: &str,
1342
        error_log: &str,
1343
    ) -> Result<(), sqlx::Error> {
1344
        Db::update_pin_request_error_log(self, task_id, error_log).await
×
1345
    }
1346

1347
    async fn set_archive_request_error(
1348
        &self,
1349
        task_id: &str,
1350
        fatal_error: &str,
1351
    ) -> Result<(), sqlx::Error> {
1352
        Db::set_archive_request_error(self, task_id, fatal_error).await
×
1353
    }
1354

1355
    async fn set_pin_request_error(
1356
        &self,
1357
        task_id: &str,
1358
        fatal_error: &str,
1359
    ) -> Result<(), sqlx::Error> {
1360
        Db::set_pin_request_error(self, task_id, fatal_error).await
×
1361
    }
1362

1363
    // Status update operations
1364
    async fn update_archive_request_status(
1365
        &self,
1366
        task_id: &str,
1367
        status: &str,
1368
    ) -> Result<(), sqlx::Error> {
1369
        Db::update_archive_request_status(self, task_id, status).await
×
1370
    }
1371

1372
    async fn update_pin_request_status(
1373
        &self,
1374
        task_id: &str,
1375
        status: &str,
1376
    ) -> Result<(), sqlx::Error> {
1377
        Db::update_pin_request_status(self, task_id, status).await
×
1378
    }
1379

1380
    async fn update_backup_statuses(
1381
        &self,
1382
        task_id: &str,
1383
        scope: &str,
1384
        archive_status: &str,
1385
        ipfs_status: &str,
1386
    ) -> Result<(), sqlx::Error> {
1387
        Db::update_backup_statuses(self, task_id, scope, archive_status, ipfs_status).await
×
1388
    }
1389

1390
    async fn update_archive_request_statuses(
1391
        &self,
1392
        task_ids: &[String],
1393
        status: &str,
1394
    ) -> Result<(), sqlx::Error> {
1395
        Db::update_archive_request_statuses(self, task_ids, status).await
×
1396
    }
1397

1398
    // Deletion operations
1399
    async fn start_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
1400
        Db::start_deletion(self, task_id).await
×
1401
    }
1402

1403
    async fn start_archive_request_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
1404
        Db::start_archive_request_deletion(self, task_id).await
×
1405
    }
1406

1407
    async fn start_pin_request_deletions(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
1408
        Db::start_pin_request_deletions(self, task_id).await
×
1409
    }
1410

1411
    async fn complete_archive_request_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
1412
        Db::complete_archive_request_deletion(self, task_id).await
×
1413
    }
1414

1415
    async fn complete_pin_request_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
1416
        Db::complete_pin_request_deletion(self, task_id).await
×
1417
    }
1418

1419
    // Retry operations
1420
    async fn retry_backup(
1421
        &self,
1422
        task_id: &str,
1423
        scope: &str,
1424
        retention_days: u64,
1425
    ) -> Result<(), sqlx::Error> {
NEW
1426
        Db::retry_backup(self, task_id, scope, retention_days).await
×
1427
    }
1428

1429
    // Pin operations
1430
    async fn insert_pins_with_tokens(
1431
        &self,
1432
        task_id: &str,
1433
        token_pin_mappings: &[crate::TokenPinMapping],
1434
    ) -> Result<(), sqlx::Error> {
1435
        Db::insert_pins_with_tokens(self, task_id, token_pin_mappings).await
×
1436
    }
1437

1438
    async fn get_pins_by_task_id(&self, task_id: &str) -> Result<Vec<PinRow>, sqlx::Error> {
×
1439
        Db::get_pins_by_task_id(self, task_id).await
×
1440
    }
1441

1442
    async fn get_active_pins(&self) -> Result<Vec<PinRow>, sqlx::Error> {
×
1443
        Db::get_active_pins(self).await
×
1444
    }
1445

1446
    async fn update_pin_statuses(&self, updates: &[(i64, String)]) -> Result<(), sqlx::Error> {
×
1447
        Db::update_pin_statuses(self, updates).await
×
1448
    }
1449

1450
    // Pinned tokens operations
1451
    async fn get_pinned_tokens_by_requestor(
1452
        &self,
1453
        requestor: &str,
1454
        limit: i64,
1455
        offset: i64,
1456
    ) -> Result<(Vec<TokenWithPins>, u32), sqlx::Error> {
1457
        Db::get_pinned_tokens_by_requestor(self, requestor, limit, offset).await
×
1458
    }
1459

1460
    async fn get_pinned_token_by_requestor(
1461
        &self,
1462
        requestor: &str,
1463
        chain: &str,
1464
        contract_address: &str,
1465
        token_id: &str,
1466
    ) -> Result<Option<TokenWithPins>, sqlx::Error> {
1467
        Db::get_pinned_token_by_requestor(self, requestor, chain, contract_address, token_id).await
×
1468
    }
1469
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc