• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

0xmichalis / nftbk / 18521726970

15 Oct 2025 07:49AM UTC coverage: 35.597% (+0.3%) from 35.281%
18521726970

push

github

0xmichalis
chore: drop deleted_at from backup_tasks

9 of 26 new or added lines in 3 files covered. (34.62%)

3 existing lines in 2 files now uncovered.

1405 of 3947 relevant lines covered (35.6%)

6.05 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/server/database/mod.rs
1
use chrono::{DateTime, Utc};
2
use serde::{Deserialize, Serialize};
3
use sqlx::{postgres::PgPoolOptions, PgPool, Row};
4

5
use crate::server::database::r#trait::Database;
6

7
pub mod r#trait;
8

9
/// Combined view of backup_tasks + archive_requests
10
#[derive(Debug, Serialize, Deserialize, Clone, utoipa::ToSchema)]
11
#[schema(description = "Backup task information including metadata and status")]
12
pub struct BackupTask {
13
    /// Unique identifier for the backup task
14
    #[schema(example = "abc123def456")]
15
    pub task_id: String,
16
    /// When the backup task was created (ISO 8601 timestamp)
17
    #[schema(example = "2024-01-01T12:00:00Z")]
18
    pub created_at: DateTime<Utc>,
19
    /// When the backup task was last updated (ISO 8601 timestamp)
20
    #[schema(example = "2024-01-01T12:05:00Z")]
21
    pub updated_at: DateTime<Utc>,
22
    /// User who requested the backup
23
    #[schema(example = "user123")]
24
    pub requestor: String,
25
    /// Number of NFTs in this backup task
26
    #[schema(example = 42)]
27
    pub nft_count: i32,
28
    /// Token details (only included if include_tokens=true)
29
    pub tokens: serde_json::Value,
30
    /// Archive subresource status (in_progress, done, error, expired)
31
    pub archive_status: Option<String>,
32
    /// IPFS subresource status (in_progress, done, error)
33
    pub ipfs_status: Option<String>,
34
    /// Detailed archive error log if archive completed with some failures
35
    #[schema(example = "Failed to write archive checksum file")]
36
    pub archive_error_log: Option<String>,
37
    /// Detailed IPFS error log aggregated from pin requests
38
    #[schema(
39
        example = "Provider pinata failed: 401 Unauthorized\nProvider web3.storage failed: 429 Too Many Requests"
40
    )]
41
    pub ipfs_error_log: Option<String>,
42
    /// Archive subresource fatal error if backup failed completely at archive stage
43
    pub archive_fatal_error: Option<String>,
44
    /// IPFS subresource fatal error if backup failed completely at IPFS stage
45
    pub ipfs_fatal_error: Option<String>,
46
    /// Storage mode used for the backup (archive, ipfs, full)
47
    #[schema(example = "archive")]
48
    pub storage_mode: String,
49
    /// Archive format used for the backup (zip, tar.gz)
50
    #[schema(example = "zip")]
51
    pub archive_format: Option<String>,
52
    /// When the backup expires (if applicable, typically 7 days from creation)
53
    #[schema(example = "2024-01-08T12:00:00Z")]
54
    pub expires_at: Option<DateTime<Utc>>,
55
    /// When archive deletion was started (if applicable)
56
    #[schema(example = "2024-01-02T10:00:00Z")]
57
    pub archive_deleted_at: Option<DateTime<Utc>>,
58
    /// When IPFS pins deletion was started (if applicable)
59
    #[schema(example = "2024-01-02T10:00:00Z")]
60
    pub pins_deleted_at: Option<DateTime<Utc>>,
61
}
62

63
#[derive(Debug, Serialize, Deserialize, Clone, utoipa::ToSchema)]
64
#[schema(description = "IPFS pin information for a specific CID")]
65
pub struct PinInfo {
66
    /// Content Identifier (CID) of the pinned content
67
    #[schema(example = "QmYwAPJzv5CZsnA625s3Xf2nemtYgPpHdWEz79ojWnPbdG")]
68
    pub cid: String,
69
    /// IPFS provider type where the content is pinned
70
    #[schema(example = "pinata")]
71
    pub provider_type: String,
72
    /// IPFS provider URL where the content is pinned
73
    #[schema(example = "https://api.pinata.cloud")]
74
    pub provider_url: String,
75
    /// Pin status (pinned, pinning, failed, queued)
76
    #[schema(example = "pinned")]
77
    pub status: String,
78
    /// When the pin was created (ISO 8601 timestamp)
79
    #[schema(example = "2024-01-01T12:00:00Z")]
80
    pub created_at: DateTime<Utc>,
81
}
82

83
#[derive(Debug, Serialize, Deserialize, Clone, utoipa::ToSchema)]
84
#[schema(description = "Token information with associated pin requests")]
85
pub struct TokenWithPins {
86
    /// Blockchain identifier (e.g., ethereum, tezos)
87
    #[schema(example = "ethereum")]
88
    pub chain: String,
89
    /// NFT contract address
90
    #[schema(example = "0x1234567890123456789012345678901234567890")]
91
    pub contract_address: String,
92
    /// NFT token ID
93
    #[schema(example = "123")]
94
    pub token_id: String,
95
    /// List of IPFS pins for this token
96
    pub pins: Vec<PinInfo>,
97
}
98

99
#[derive(Debug, Serialize, Deserialize, Clone)]
100
pub struct PinRow {
101
    pub id: i64,
102
    pub task_id: String,
103
    pub provider_type: String,
104
    pub provider_url: Option<String>,
105
    pub cid: String,
106
    pub request_id: String,
107
    pub pin_status: String,
108
    pub created_at: DateTime<Utc>,
109
}
110

111
#[derive(Debug, Clone)]
112
pub struct ExpiredBackup {
113
    pub task_id: String,
114
    pub archive_format: String,
115
}
116

117
#[derive(Clone)]
118
pub struct Db {
119
    pub pool: PgPool,
120
}
121

122
impl Db {
123
    pub async fn new(database_url: &str, max_connections: u32) -> Self {
×
124
        let pool = PgPoolOptions::new()
×
125
            .max_connections(max_connections)
×
126
            .connect(database_url)
×
127
            .await
×
128
            .expect("Failed to connect to Postgres");
129
        // Health check: run a simple query
130
        sqlx::query("SELECT 1")
×
131
            .execute(&pool)
×
132
            .await
×
133
            .expect("Postgres connection is not healthy");
134
        tracing::info!("Postgres connection is healthy");
×
135
        Db { pool }
136
    }
137

138
    #[allow(clippy::too_many_arguments)]
139
    pub async fn insert_backup_task(
×
140
        &self,
141
        task_id: &str,
142
        requestor: &str,
143
        nft_count: i32,
144
        tokens: &serde_json::Value,
145
        storage_mode: &str,
146
        archive_format: Option<&str>,
147
        retention_days: Option<u64>,
148
    ) -> Result<(), sqlx::Error> {
149
        let mut tx = self.pool.begin().await?;
×
150

151
        // Insert into backup_tasks
152
        sqlx::query(
153
            r#"
154
            INSERT INTO backup_tasks (
155
                task_id, created_at, updated_at, requestor, nft_count, tokens, storage_mode
156
            ) VALUES (
157
                $1, NOW(), NOW(), $2, $3, $4, $5
158
            )
159
            ON CONFLICT (task_id) DO UPDATE SET
160
                updated_at = NOW(),
161
                nft_count = EXCLUDED.nft_count,
162
                tokens = EXCLUDED.tokens,
163
                storage_mode = EXCLUDED.storage_mode
164
            "#,
165
        )
166
        .bind(task_id)
167
        .bind(requestor)
168
        .bind(nft_count)
169
        .bind(tokens)
170
        .bind(storage_mode)
171
        .execute(&mut *tx)
172
        .await?;
×
173

174
        // Insert into archive_requests if storage mode includes archive
175
        if storage_mode == "archive" || storage_mode == "full" {
×
176
            let archive_fmt = archive_format.unwrap_or("zip");
×
177

178
            if let Some(days) = retention_days {
×
179
                sqlx::query(
180
                    r#"
181
                    INSERT INTO archive_requests (task_id, archive_format, expires_at, status)
182
                    VALUES ($1, $2, NOW() + ($3 || ' days')::interval, 'in_progress')
183
                    ON CONFLICT (task_id) DO UPDATE SET
184
                        archive_format = EXCLUDED.archive_format,
185
                        expires_at = EXCLUDED.expires_at
186
                    "#,
187
                )
188
                .bind(task_id)
189
                .bind(archive_fmt)
190
                .bind(days as i64)
191
                .execute(&mut *tx)
192
                .await?;
×
193
            } else {
194
                sqlx::query(
195
                    r#"
196
                    INSERT INTO archive_requests (task_id, archive_format, expires_at, status)
197
                    VALUES ($1, $2, NULL, 'in_progress')
198
                    ON CONFLICT (task_id) DO UPDATE SET
199
                        archive_format = EXCLUDED.archive_format,
200
                        expires_at = EXCLUDED.expires_at
201
                    "#,
202
                )
203
                .bind(task_id)
×
204
                .bind(archive_fmt)
×
205
                .execute(&mut *tx)
×
206
                .await?;
×
207
            }
208
        }
209

210
        // Insert into pin_requests if storage mode includes IPFS
211
        if storage_mode == "ipfs" || storage_mode == "full" {
×
212
            sqlx::query(
213
                r#"
214
                INSERT INTO pin_requests (task_id, status)
215
                VALUES ($1, 'in_progress')
216
                ON CONFLICT (task_id) DO UPDATE SET
217
                    status = EXCLUDED.status
218
                "#,
219
            )
220
            .bind(task_id)
221
            .execute(&mut *tx)
222
            .await?;
×
223
        }
224

225
        tx.commit().await?;
×
226
        Ok(())
×
227
    }
228

229
    pub async fn delete_backup_task(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
230
        // CASCADE will delete associated archive_requests row if it exists
231
        sqlx::query!("DELETE FROM backup_tasks WHERE task_id = $1", task_id)
×
232
            .execute(&self.pool)
×
233
            .await?;
×
234
        Ok(())
×
235
    }
236

237
    pub async fn set_error_logs(
×
238
        &self,
239
        task_id: &str,
240
        archive_error_log: Option<&str>,
241
        ipfs_error_log: Option<&str>,
242
    ) -> Result<(), sqlx::Error> {
243
        let mut tx = self.pool.begin().await?;
×
244
        if let Some(a) = archive_error_log {
×
245
            sqlx::query("UPDATE archive_requests SET error_log = $2 WHERE task_id = $1")
×
246
                .bind(task_id)
×
247
                .bind(a)
×
248
                .execute(&mut *tx)
×
249
                .await?;
×
250
        }
251
        if let Some(i) = ipfs_error_log {
×
252
            sqlx::query(
253
                r#"
254
                    UPDATE pin_requests
255
                    SET error_log = $2
256
                    WHERE task_id = $1
257
                    "#,
258
            )
259
            .bind(task_id)
×
260
            .bind(i)
×
261
            .execute(&mut *tx)
×
262
            .await?;
×
263
        }
264
        tx.commit().await?;
×
265
        Ok(())
×
266
    }
267

268
    pub async fn update_archive_request_error_log(
×
269
        &self,
270
        task_id: &str,
271
        error_log: &str,
272
    ) -> Result<(), sqlx::Error> {
273
        sqlx::query(
274
            r#"
275
            UPDATE archive_requests
276
            SET error_log = $2
277
            WHERE task_id = $1
278
            "#,
279
        )
280
        .bind(task_id)
×
281
        .bind(error_log)
×
282
        .execute(&self.pool)
×
283
        .await?;
×
284
        Ok(())
×
285
    }
286

287
    pub async fn update_pin_request_error_log(
×
288
        &self,
289
        task_id: &str,
290
        error_log: &str,
291
    ) -> Result<(), sqlx::Error> {
292
        sqlx::query(
293
            r#"
294
            UPDATE pin_requests
295
            SET error_log = $2
296
            WHERE task_id = $1
297
            "#,
298
        )
299
        .bind(task_id)
×
300
        .bind(error_log)
×
301
        .execute(&self.pool)
×
302
        .await?;
×
303
        Ok(())
×
304
    }
305

306
    pub async fn update_pin_request_status(
×
307
        &self,
308
        task_id: &str,
309
        status: &str,
310
    ) -> Result<(), sqlx::Error> {
311
        sqlx::query(
312
            r#"
313
            UPDATE pin_requests
314
            SET status = $2
315
            WHERE task_id = $1
316
            "#,
317
        )
318
        .bind(task_id)
×
319
        .bind(status)
×
320
        .execute(&self.pool)
×
321
        .await?;
×
322
        Ok(())
×
323
    }
324

325
    pub async fn update_archive_request_status(
×
326
        &self,
327
        task_id: &str,
328
        status: &str,
329
    ) -> Result<(), sqlx::Error> {
330
        sqlx::query(
331
            r#"
332
            UPDATE archive_requests
333
            SET status = $2
334
            WHERE task_id = $1
335
            "#,
336
        )
337
        .bind(task_id)
×
338
        .bind(status)
×
339
        .execute(&self.pool)
×
340
        .await?;
×
341
        Ok(())
×
342
    }
343

344
    pub async fn update_archive_request_statuses(
×
345
        &self,
346
        task_ids: &[String],
347
        status: &str,
348
    ) -> Result<(), sqlx::Error> {
349
        if task_ids.is_empty() {
×
350
            return Ok(());
×
351
        }
352

353
        // Use a transaction for atomicity
354
        let mut tx = self.pool.begin().await?;
×
355

356
        // Update each task_id individually with a prepared statement
357
        for task_id in task_ids {
×
358
            sqlx::query("UPDATE archive_requests SET status = $1 WHERE task_id = $2")
×
359
                .bind(status)
×
360
                .bind(task_id)
×
361
                .execute(&mut *tx)
×
362
                .await?;
×
363
        }
364

365
        tx.commit().await?;
×
366
        Ok(())
×
367
    }
368

369
    // TODO: Should support pin request retries
370
    pub async fn retry_backup(
×
371
        &self,
372
        task_id: &str,
373
        retention_days: u64,
374
    ) -> Result<(), sqlx::Error> {
375
        let mut tx = self.pool.begin().await?;
×
376

377
        // Reset archive subresource status and fatal_error
378
        sqlx::query(
379
            r#"
380
            UPDATE archive_requests
381
            SET status = 'in_progress', fatal_error = NULL
382
            WHERE task_id = $1
383
            "#,
384
        )
385
        .bind(task_id)
×
386
        .execute(&mut *tx)
×
387
        .await?;
×
388

389
        // Update archive_requests expires_at if it exists
390
        sqlx::query!(
×
391
            r#"
392
            UPDATE archive_requests
393
            SET expires_at = NOW() + ($2 || ' days')::interval
394
            WHERE task_id = $1
395
            "#,
396
            task_id,
397
            retention_days as i64
×
398
        )
399
        .execute(&mut *tx)
×
400
        .await?;
×
401

402
        tx.commit().await?;
×
403
        Ok(())
×
404
    }
405

406
    pub async fn clear_backup_errors(&self, task_id: &str, scope: &str) -> Result<(), sqlx::Error> {
×
407
        let mut tx = self.pool.begin().await?;
×
408
        // Clear archive errors if scope includes archive
409
        sqlx::query(
410
            r#"
411
            UPDATE archive_requests
412
            SET error_log = NULL, fatal_error = NULL
413
            WHERE task_id = $1 AND ($2 IN ('archive', 'full'))
414
            "#,
415
        )
416
        .bind(task_id)
×
417
        .bind(scope)
×
418
        .execute(&mut *tx)
×
419
        .await?;
×
420
        // Clear IPFS errors if scope includes ipfs
421
        sqlx::query(
422
            r#"
423
            UPDATE pin_requests
424
            SET error_log = NULL, fatal_error = NULL
425
            WHERE task_id = $1 AND ($2 IN ('ipfs', 'full'))
426
            "#,
427
        )
428
        .bind(task_id)
×
429
        .bind(scope)
×
430
        .execute(&mut *tx)
×
431
        .await?;
×
432
        tx.commit().await?;
×
433
        Ok(())
×
434
    }
435

436
    pub async fn set_archive_request_error(
×
437
        &self,
438
        task_id: &str,
439
        fatal_error: &str,
440
    ) -> Result<(), sqlx::Error> {
441
        sqlx::query(
442
            r#"
443
            UPDATE archive_requests
444
            SET status = 'error', fatal_error = $2
445
            WHERE task_id = $1
446
            "#,
447
        )
448
        .bind(task_id)
×
449
        .bind(fatal_error)
×
450
        .execute(&self.pool)
×
451
        .await?;
×
452
        Ok(())
×
453
    }
454

455
    pub async fn set_pin_request_error(
×
456
        &self,
457
        task_id: &str,
458
        fatal_error: &str,
459
    ) -> Result<(), sqlx::Error> {
460
        sqlx::query(
461
            r#"
462
            UPDATE pin_requests
463
            SET status = 'error', fatal_error = $2
464
            WHERE task_id = $1
465
            "#,
466
        )
467
        .bind(task_id)
×
468
        .bind(fatal_error)
×
469
        .execute(&self.pool)
×
470
        .await?;
×
471
        Ok(())
×
472
    }
473

474
    pub async fn start_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
NEW
475
        let mut tx = self.pool.begin().await?;
×
476

477
        // Touch parent row
UNCOV
478
        sqlx::query!(
×
479
            r#"UPDATE backup_tasks SET updated_at = NOW() WHERE task_id = $1"#,
480
            task_id
481
        )
NEW
482
        .execute(&mut *tx)
×
483
        .await?;
×
484

485
        // Mark archive subresource as being deleted
NEW
486
        sqlx::query!(
×
487
            r#"
488
            UPDATE archive_requests
489
            SET deleted_at = NOW()
490
            WHERE task_id = $1 AND deleted_at IS NULL
491
            "#,
492
            task_id
493
        )
NEW
494
        .execute(&mut *tx)
×
NEW
495
        .await?;
×
496

497
        // Mark IPFS subresource as being deleted
NEW
498
        sqlx::query!(
×
499
            r#"
500
            UPDATE pin_requests
501
            SET deleted_at = NOW()
502
            WHERE task_id = $1 AND deleted_at IS NULL
503
            "#,
504
            task_id
505
        )
NEW
506
        .execute(&mut *tx)
×
NEW
507
        .await?;
×
508

NEW
509
        tx.commit().await?;
×
UNCOV
510
        Ok(())
×
511
    }
512

513
    /// Mark archive as being deleted (similar to start_deletion but for archive subresource)
514
    pub async fn start_archive_request_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
515
        sqlx::query!(
×
516
            r#"
517
            UPDATE archive_requests 
518
            SET deleted_at = NOW() 
519
            WHERE task_id = $1 AND deleted_at IS NULL
520
            "#,
521
            task_id
522
        )
523
        .execute(&self.pool)
×
524
        .await?;
×
525
        Ok(())
×
526
    }
527

528
    /// Mark IPFS pins as being deleted (similar to start_deletion but for IPFS pins subresource)
529
    pub async fn start_pin_request_deletions(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
530
        sqlx::query!(
×
531
            r#"
532
            UPDATE pin_requests 
533
            SET deleted_at = NOW() 
534
            WHERE task_id = $1 AND deleted_at IS NULL
535
            "#,
536
            task_id
537
        )
538
        .execute(&self.pool)
×
539
        .await?;
×
540
        Ok(())
×
541
    }
542

543
    pub async fn get_backup_task(&self, task_id: &str) -> Result<Option<BackupTask>, sqlx::Error> {
×
544
        let row = sqlx::query(
545
            r#"
546
            SELECT 
547
                b.task_id, b.created_at, b.updated_at, b.requestor, b.nft_count, 
548
                b.tokens, ar.status as archive_status, ar.fatal_error, b.storage_mode,
549
                ar.archive_format, ar.expires_at, ar.deleted_at as archive_deleted_at,
550
                ar.error_log as archive_error_log,
551
                pr.status as ipfs_status,
552
                pr.error_log as ipfs_error_log,
553
                pr.fatal_error as ipfs_fatal_error,
554
                pr.deleted_at as pins_deleted_at
555
            FROM backup_tasks b
556
            LEFT JOIN archive_requests ar ON b.task_id = ar.task_id
557
            LEFT JOIN pin_requests pr ON b.task_id = pr.task_id
558
            WHERE b.task_id = $1
559
            "#,
560
        )
561
        .bind(task_id)
×
562
        .fetch_optional(&self.pool)
×
563
        .await?;
×
564

565
        Ok(row.map(|row| BackupTask {
566
            task_id: row.get("task_id"),
×
567
            created_at: row.get("created_at"),
×
568
            updated_at: row.get("updated_at"),
×
569
            requestor: row.get("requestor"),
×
570
            nft_count: row.get("nft_count"),
×
571
            tokens: row.get("tokens"),
×
572
            archive_status: row
×
573
                .try_get::<Option<String>, _>("archive_status")
×
574
                .ok()
×
575
                .flatten(),
×
576
            ipfs_status: row
×
577
                .try_get::<Option<String>, _>("ipfs_status")
×
578
                .ok()
×
579
                .flatten(),
×
580
            archive_error_log: row.get("archive_error_log"),
×
581
            ipfs_error_log: row.get("ipfs_error_log"),
×
582
            archive_fatal_error: row.get("fatal_error"),
×
583
            ipfs_fatal_error: row
×
584
                .try_get::<Option<String>, _>("ipfs_fatal_error")
×
585
                .ok()
×
586
                .flatten(),
×
587
            storage_mode: row.get("storage_mode"),
×
588
            archive_format: row.get("archive_format"),
×
589
            expires_at: row.get("expires_at"),
×
590
            archive_deleted_at: row.get("archive_deleted_at"),
×
591
            pins_deleted_at: row.get("pins_deleted_at"),
×
592
        }))
593
    }
594

595
    pub async fn list_requestor_backup_tasks_paginated(
×
596
        &self,
597
        requestor: &str,
598
        include_tokens: bool,
599
        limit: i64,
600
        offset: i64,
601
    ) -> Result<(Vec<BackupTask>, u32), sqlx::Error> {
602
        let tokens_field = if include_tokens { "b.tokens," } else { "" };
×
603

604
        // Total count
605
        let total_row = sqlx::query!(
×
606
            r#"SELECT COUNT(*) as count FROM backup_tasks b WHERE b.requestor = $1"#,
607
            requestor
608
        )
609
        .fetch_one(&self.pool)
×
610
        .await?;
×
611
        let total: u32 = (total_row.count.unwrap_or(0) as i64).max(0) as u32;
×
612

613
        let query = format!(
×
614
            r#"
615
            SELECT 
616
                b.task_id, b.created_at, b.updated_at, b.requestor, b.nft_count, 
617
                {tokens_field} ar.status as archive_status, ar.fatal_error, b.storage_mode,
618
                ar.archive_format, ar.expires_at, ar.deleted_at as archive_deleted_at,
619
                ar.error_log as archive_error_log,
620
                pr.status as ipfs_status,
621
                pr.error_log as ipfs_error_log,
622
                pr.fatal_error as ipfs_fatal_error,
623
                pr.deleted_at as pins_deleted_at
624
            FROM backup_tasks b
625
            LEFT JOIN archive_requests ar ON b.task_id = ar.task_id
626
            LEFT JOIN pin_requests pr ON b.task_id = pr.task_id
627
            WHERE b.requestor = $1
628
            ORDER BY b.created_at DESC
629
            LIMIT $2 OFFSET $3
630
            "#,
631
        );
632

633
        let rows = sqlx::query(&query)
×
634
            .bind(requestor)
×
635
            .bind(limit)
×
636
            .bind(offset)
×
637
            .fetch_all(&self.pool)
×
638
            .await?;
×
639

640
        let recs = rows
×
641
            .into_iter()
642
            .map(|row| {
×
643
                let tokens = if include_tokens {
×
644
                    row.try_get::<serde_json::Value, _>("tokens")
×
645
                        .unwrap_or(serde_json::Value::Null)
×
646
                } else {
647
                    serde_json::Value::Null
×
648
                };
649

650
                BackupTask {
×
651
                    task_id: row.get("task_id"),
×
652
                    created_at: row.get("created_at"),
×
653
                    updated_at: row.get("updated_at"),
×
654
                    requestor: row.get("requestor"),
×
655
                    nft_count: row.get("nft_count"),
×
656
                    tokens,
×
657
                    archive_status: row
×
658
                        .try_get::<Option<String>, _>("archive_status")
×
659
                        .ok()
×
660
                        .flatten(),
×
661
                    ipfs_status: row
×
662
                        .try_get::<Option<String>, _>("ipfs_status")
×
663
                        .ok()
×
664
                        .flatten(),
×
665
                    archive_error_log: row.get("archive_error_log"),
×
666
                    ipfs_error_log: row.get("ipfs_error_log"),
×
667
                    archive_fatal_error: row.get("fatal_error"),
×
668
                    ipfs_fatal_error: row
×
669
                        .try_get::<Option<String>, _>("ipfs_fatal_error")
×
670
                        .ok()
×
671
                        .flatten(),
×
672
                    storage_mode: row.get("storage_mode"),
×
673
                    archive_format: row.get("archive_format"),
×
674
                    expires_at: row.get("expires_at"),
×
675
                    archive_deleted_at: row.get("archive_deleted_at"),
×
676
                    pins_deleted_at: row.get("pins_deleted_at"),
×
677
                }
678
            })
679
            .collect();
680

681
        Ok((recs, total))
×
682
    }
683

684
    pub async fn list_unprocessed_expired_backups(
×
685
        &self,
686
    ) -> Result<Vec<ExpiredBackup>, sqlx::Error> {
687
        let rows = sqlx::query(
688
            r#"
689
            SELECT b.task_id, ar.archive_format 
690
            FROM backup_tasks b
691
            JOIN archive_requests ar ON b.task_id = ar.task_id
692
            WHERE ar.expires_at IS NOT NULL AND ar.expires_at < NOW() AND ar.status != 'expired'
693
            "#,
694
        )
695
        .fetch_all(&self.pool)
×
696
        .await?;
×
697
        let recs = rows
×
698
            .into_iter()
699
            .map(|row| ExpiredBackup {
×
700
                task_id: row.get("task_id"),
×
701
                archive_format: row.get("archive_format"),
×
702
            })
703
            .collect();
704
        Ok(recs)
×
705
    }
706

707
    /// Retrieve all backup tasks that are in 'in_progress' status
708
    /// This is used to recover incomplete tasks on server restart
709
    pub async fn get_incomplete_backup_tasks(&self) -> Result<Vec<BackupTask>, sqlx::Error> {
×
710
        let rows = sqlx::query(
711
            r#"
712
            SELECT 
713
                b.task_id, b.created_at, b.updated_at, b.requestor, b.nft_count, 
714
                b.tokens, ar.status as archive_status, ar.fatal_error, b.storage_mode,
715
                ar.archive_format, ar.expires_at, ar.deleted_at as archive_deleted_at,
716
                ar.error_log as archive_error_log,
717
                pr.status as ipfs_status,
718
                pr.error_log as ipfs_error_log,
719
                pr.deleted_at as pins_deleted_at
720
            FROM backup_tasks b
721
            LEFT JOIN archive_requests ar ON b.task_id = ar.task_id
722
            LEFT JOIN pin_requests pr ON b.task_id = pr.task_id
723
            WHERE (
724
                -- Archive-only mode: check archive status (record must exist and be in_progress)
725
                (b.storage_mode = 'archive' AND ar.status = 'in_progress')
726
                OR
727
                -- IPFS-only mode: check IPFS status (record must exist and be in_progress)
728
                (b.storage_mode = 'ipfs' AND pr.status = 'in_progress')
729
                OR
730
                -- Full mode: check both archive and IPFS status (task is incomplete if either is in_progress)
731
                (b.storage_mode = 'full' AND (ar.status = 'in_progress' OR pr.status = 'in_progress'))
732
            )
733
            ORDER BY b.created_at ASC
734
            "#,
735
        )
736
        .fetch_all(&self.pool)
×
737
        .await?;
×
738

739
        let recs = rows
×
740
            .into_iter()
741
            .map(|row| BackupTask {
×
742
                task_id: row.get("task_id"),
×
743
                created_at: row.get("created_at"),
×
744
                updated_at: row.get("updated_at"),
×
745
                requestor: row.get("requestor"),
×
746
                nft_count: row.get("nft_count"),
×
747
                tokens: row.get("tokens"),
×
748
                archive_status: row
×
749
                    .try_get::<Option<String>, _>("archive_status")
×
750
                    .ok()
×
751
                    .flatten(),
×
752
                ipfs_status: row
×
753
                    .try_get::<Option<String>, _>("ipfs_status")
×
754
                    .ok()
×
755
                    .flatten(),
×
756
                archive_error_log: row.get("archive_error_log"),
×
757
                ipfs_error_log: row.get("ipfs_error_log"),
×
758
                archive_fatal_error: row.get("fatal_error"),
×
759
                ipfs_fatal_error: None,
×
760
                storage_mode: row.get("storage_mode"),
×
761
                archive_format: row.get("archive_format"),
×
762
                expires_at: row.get("expires_at"),
×
763
                archive_deleted_at: row.get("archive_deleted_at"),
×
764
                pins_deleted_at: row.get("pins_deleted_at"),
×
765
            })
766
            .collect();
767

768
        Ok(recs)
×
769
    }
770

771
    /// Insert pins and their associated tokens in a single atomic transaction
772
    pub async fn insert_pins_with_tokens(
×
773
        &self,
774
        task_id: &str,
775
        token_pin_mappings: &[crate::TokenPinMapping],
776
    ) -> Result<(), sqlx::Error> {
777
        if token_pin_mappings.is_empty() {
×
778
            return Ok(());
×
779
        }
780

781
        // Collect all pin responses and prepare token data
782
        let mut all_pin_responses = Vec::new();
×
783
        let mut all_token_data = Vec::new(); // (index_in_pin_responses, chain, contract_address, token_id)
×
784

785
        for mapping in token_pin_mappings {
×
786
            for pin_response in &mapping.pin_responses {
×
787
                let index = all_pin_responses.len();
×
788
                all_pin_responses.push(pin_response);
×
789
                all_token_data.push((
×
790
                    index,
×
791
                    mapping.chain.clone(),
×
792
                    mapping.contract_address.clone(),
×
793
                    mapping.token_id.clone(),
×
794
                ));
795
            }
796
        }
797

798
        if all_pin_responses.is_empty() {
×
799
            return Ok(());
×
800
        }
801

802
        // Start a transaction for atomicity
803
        let mut tx = self.pool.begin().await?;
×
804

805
        // Insert pins one by one and collect generated IDs
806
        let mut pin_ids: Vec<i64> = Vec::new();
×
807
        for pin_response in &all_pin_responses {
×
808
            // Map status enum to lowercase string to satisfy CHECK constraint
809
            let status = match pin_response.status {
×
810
                crate::ipfs::PinResponseStatus::Queued => "queued",
×
811
                crate::ipfs::PinResponseStatus::Pinning => "pinning",
×
812
                crate::ipfs::PinResponseStatus::Pinned => "pinned",
×
813
                crate::ipfs::PinResponseStatus::Failed => "failed",
×
814
            };
815

816
            let row = sqlx::query(
817
                "INSERT INTO pins (task_id, provider_type, provider_url, cid, request_id, pin_status) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id"
818
            )
819
            .bind(task_id)
×
820
            .bind(&pin_response.provider_type)
×
821
            .bind(&pin_response.provider_url)
×
822
            .bind(&pin_response.cid)
×
823
            .bind(&pin_response.id)
×
824
            .bind(status)
×
825
            .fetch_one(&mut *tx)
×
826
            .await?;
×
827

828
            pin_ids.push(row.get("id"));
×
829
        }
830

831
        // Insert pinned tokens using the generated pin_ids
832
        for (index, chain, contract_address, token_id) in &all_token_data {
×
833
            sqlx::query(
834
                "INSERT INTO pinned_tokens (pin_id, chain, contract_address, token_id) VALUES ($1, $2, $3, $4)"
835
            )
836
            .bind(pin_ids[*index])
×
837
            .bind(chain)
×
838
            .bind(contract_address)
×
839
            .bind(token_id)
×
840
            .execute(&mut *tx)
×
841
            .await?;
×
842
        }
843

844
        // Commit the transaction
845
        tx.commit().await?;
×
846
        Ok(())
×
847
    }
848

849
    /// Get all pins for a specific backup task
850
    pub async fn get_pins_by_task_id(&self, task_id: &str) -> Result<Vec<PinRow>, sqlx::Error> {
×
851
        let rows = sqlx::query(
852
            r#"
853
            SELECT id, task_id, provider_type, provider_url, cid, request_id, pin_status, created_at
854
            FROM pins
855
            WHERE task_id = $1
856
            ORDER BY id
857
            "#,
858
        )
859
        .bind(task_id)
×
860
        .fetch_all(&self.pool)
×
861
        .await?;
×
862

863
        Ok(rows
×
864
            .into_iter()
×
865
            .map(|row| PinRow {
×
866
                id: row.get("id"),
×
867
                task_id: row.get("task_id"),
×
868
                provider_type: row.get("provider_type"),
×
869
                provider_url: row
×
870
                    .try_get::<Option<String>, _>("provider_url")
×
871
                    .ok()
×
872
                    .flatten(),
×
873
                cid: row.get("cid"),
×
874
                request_id: row.get("request_id"),
×
875
                pin_status: row.get("pin_status"),
×
876
                created_at: row.get("created_at"),
×
877
            })
878
            .collect())
×
879
    }
880

881
    /// Paginated pinned tokens grouped by (chain, contract_address, token_id)
882
    pub async fn get_pinned_tokens_by_requestor(
×
883
        &self,
884
        requestor: &str,
885
        limit: i64,
886
        offset: i64,
887
    ) -> Result<(Vec<TokenWithPins>, u32), sqlx::Error> {
888
        // Total distinct tokens for this requestor
889
        let total_row = sqlx::query(
890
            r#"
891
            SELECT COUNT(*) as count
892
            FROM (
893
                SELECT DISTINCT pt.chain, pt.contract_address, pt.token_id
894
                FROM pinned_tokens pt
895
                JOIN pins p ON p.id = pt.pin_id
896
                JOIN backup_tasks bt ON bt.task_id = p.task_id
897
                WHERE bt.requestor = $1
898
            ) t
899
            "#,
900
        )
901
        .bind(requestor)
×
902
        .fetch_one(&self.pool)
×
903
        .await?;
×
904
        let total: u32 = (total_row.get::<i64, _>("count")).max(0) as u32;
×
905

906
        // Page of distinct tokens ordered by most recent pin time
907
        let rows = sqlx::query(
908
            r#"
909
            SELECT t.chain, t.contract_address, t.token_id
910
            FROM (
911
                SELECT pt.chain, pt.contract_address, pt.token_id, MAX(pt.created_at) AS last_created
912
                FROM pinned_tokens pt
913
                JOIN pins p ON p.id = pt.pin_id
914
                JOIN backup_tasks bt ON bt.task_id = p.task_id
915
                WHERE bt.requestor = $1
916
                GROUP BY pt.chain, pt.contract_address, pt.token_id
917
            ) t
918
            ORDER BY last_created DESC
919
            LIMIT $2 OFFSET $3
920
            "#,
921
        )
922
        .bind(requestor)
×
923
        .bind(limit)
×
924
        .bind(offset)
×
925
        .fetch_all(&self.pool)
×
926
        .await?;
×
927

928
        // For each token key, fetch pins (ordered by created_at desc)
929
        let mut result: Vec<TokenWithPins> = Vec::new();
×
930
        for r in rows {
×
931
            let token_rows = sqlx::query(
932
                r#"
933
                SELECT pt.chain, pt.contract_address, pt.token_id,
934
                       p.cid, p.provider_type, p.provider_url, p.pin_status, pt.created_at
935
                FROM pinned_tokens pt
936
                JOIN pins p ON p.id = pt.pin_id
937
                JOIN backup_tasks bt ON bt.task_id = p.task_id
938
                WHERE bt.requestor = $1
939
                  AND pt.chain = $2
940
                  AND pt.contract_address = $3
941
                  AND pt.token_id = $4
942
                ORDER BY pt.created_at DESC
943
                "#,
944
            )
945
            .bind(requestor)
×
946
            .bind(r.get::<String, _>("chain"))
×
947
            .bind(r.get::<String, _>("contract_address"))
×
948
            .bind(r.get::<String, _>("token_id"))
×
949
            .fetch_all(&self.pool)
×
950
            .await?;
×
951

952
            let mut pins: Vec<PinInfo> = Vec::new();
×
953
            let mut chain = String::new();
×
954
            let mut contract_address = String::new();
×
955
            let mut token_id = String::new();
×
956
            for row in token_rows {
×
957
                chain = row.get("chain");
×
958
                contract_address = row.get("contract_address");
×
959
                token_id = row.get("token_id");
×
960
                let cid: String = row.get("cid");
×
961
                let provider_type: String = row.get("provider_type");
×
962
                let provider_url: String = row
×
963
                    .try_get::<Option<String>, _>("provider_url")
964
                    .ok()
965
                    .flatten()
966
                    .unwrap_or_default();
967
                let status: String = row.get("pin_status");
×
968
                let created_at: DateTime<Utc> = row.get("created_at");
×
969
                pins.push(PinInfo {
×
970
                    cid,
×
971
                    provider_type,
×
972
                    provider_url,
×
973
                    status,
×
974
                    created_at,
×
975
                });
976
            }
977
            result.push(TokenWithPins {
×
978
                chain,
×
979
                contract_address,
×
980
                token_id,
×
981
                pins,
×
982
            });
983
        }
984

985
        Ok((result, total))
×
986
    }
987

988
    /// Get a specific pinned token for a requestor
989
    pub async fn get_pinned_token_by_requestor(
×
990
        &self,
991
        requestor: &str,
992
        chain: &str,
993
        contract_address: &str,
994
        token_id: &str,
995
    ) -> Result<Option<TokenWithPins>, sqlx::Error> {
996
        let query = r#"
×
997
            SELECT pt.chain, pt.contract_address, pt.token_id,
×
998
                   p.cid, p.provider_type, p.provider_url, p.pin_status, pt.created_at
×
999
            FROM pinned_tokens pt
×
1000
            JOIN pins p ON p.id = pt.pin_id
×
1001
            JOIN backup_tasks bt ON bt.task_id = p.task_id
×
1002
            WHERE bt.requestor = $1
×
1003
              AND pt.chain = $2
×
1004
              AND pt.contract_address = $3
×
1005
              AND pt.token_id = $4
×
1006
            ORDER BY pt.created_at DESC
×
1007
        "#;
×
1008

1009
        let rows = sqlx::query(query)
×
1010
            .bind(requestor)
×
1011
            .bind(chain)
×
1012
            .bind(contract_address)
×
1013
            .bind(token_id)
×
1014
            .fetch_all(&self.pool)
×
1015
            .await?;
×
1016

1017
        if rows.is_empty() {
×
1018
            return Ok(None);
×
1019
        }
1020

1021
        let mut pins = Vec::new();
×
1022
        let mut token_chain = String::new();
×
1023
        let mut token_contract_address = String::new();
×
1024
        let mut token_token_id = String::new();
×
1025

1026
        for row in rows {
×
1027
            token_chain = row.get("chain");
×
1028
            token_contract_address = row.get("contract_address");
×
1029
            token_token_id = row.get("token_id");
×
1030
            let cid: String = row.get("cid");
×
1031
            let provider_type: String = row.get("provider_type");
×
1032
            // provider_url may be NULL for legacy rows; default to empty string for API stability
1033
            let provider_url: String = row
×
1034
                .try_get::<Option<String>, _>("provider_url")
1035
                .ok()
1036
                .flatten()
1037
                .unwrap_or_default();
1038
            let status: String = row.get("pin_status");
×
1039
            let created_at: DateTime<Utc> = row.get("created_at");
×
1040

1041
            pins.push(PinInfo {
×
1042
                cid,
×
1043
                provider_type,
×
1044
                provider_url,
×
1045
                status,
×
1046
                created_at,
×
1047
            });
1048
        }
1049

1050
        Ok(Some(TokenWithPins {
×
1051
            chain: token_chain,
×
1052
            contract_address: token_contract_address,
×
1053
            token_id: token_token_id,
×
1054
            pins,
×
1055
        }))
1056
    }
1057

1058
    /// Get all pins that are in 'queued' or 'pinning' status
1059
    /// This is used by the pin monitor to check for status updates
1060
    pub async fn get_active_pins(&self) -> Result<Vec<PinRow>, sqlx::Error> {
×
1061
        let rows = sqlx::query(
1062
            r#"
1063
            SELECT id, task_id, provider_type, provider_url, cid, request_id, pin_status, created_at
1064
            FROM pins
1065
            WHERE pin_status IN ('queued', 'pinning')
1066
            ORDER BY id
1067
            "#,
1068
        )
1069
        .fetch_all(&self.pool)
×
1070
        .await?;
×
1071

1072
        Ok(rows
×
1073
            .into_iter()
×
1074
            .map(|row| PinRow {
×
1075
                id: row.get("id"),
×
1076
                task_id: row.get("task_id"),
×
1077
                provider_type: row.get("provider_type"),
×
1078
                provider_url: row
×
1079
                    .try_get::<Option<String>, _>("provider_url")
×
1080
                    .ok()
×
1081
                    .flatten(),
×
1082
                cid: row.get("cid"),
×
1083
                request_id: row.get("request_id"),
×
1084
                pin_status: row.get("pin_status"),
×
1085
                created_at: row.get("created_at"),
×
1086
            })
1087
            .collect())
×
1088
    }
1089

1090
    /// Set backup fatal error for relevant subresources in a single SQL statement.
1091
    /// The update is based on the `storage_mode` value from the `backup_tasks` table for the given `task_id`:
1092
    /// - If storage_mode is 'archive' or 'full': updates archive_requests.status and archive_requests.fatal_error
1093
    /// - If storage_mode is 'ipfs' or 'full': updates pin_requests.status and pin_requests.fatal_error
1094
    pub async fn set_backup_error(
×
1095
        &self,
1096
        task_id: &str,
1097
        fatal_error: &str,
1098
    ) -> Result<(), sqlx::Error> {
1099
        let sql = r#"
×
1100
            WITH task_mode AS (
×
1101
                SELECT storage_mode FROM backup_tasks WHERE task_id = $1
×
1102
            ),
1103
            upd_archive AS (
×
1104
                UPDATE archive_requests ar
×
1105
                SET status = 'error', fatal_error = $2
×
1106
                WHERE ar.task_id = $1
×
1107
                  AND EXISTS (
×
1108
                      SELECT 1 FROM task_mode tm
×
1109
                      WHERE tm.storage_mode IN ('archive', 'full')
×
1110
                  )
1111
                RETURNING 1
×
1112
            ),
1113
            upd_pins AS (
×
1114
                UPDATE pin_requests pr
×
1115
                SET status = 'error', fatal_error = $2
×
1116
                WHERE pr.task_id = $1
×
1117
                  AND EXISTS (
×
1118
                      SELECT 1 FROM task_mode tm
×
1119
                      WHERE tm.storage_mode IN ('ipfs', 'full')
×
1120
                  )
1121
                RETURNING 1
×
1122
            )
1123
            SELECT COALESCE((SELECT COUNT(*) FROM upd_archive), 0) AS archive_updates,
×
1124
                   COALESCE((SELECT COUNT(*) FROM upd_pins), 0)     AS pin_updates
×
1125
        "#;
×
1126
        sqlx::query(sql)
×
1127
            .bind(task_id)
×
1128
            .bind(fatal_error)
×
1129
            .execute(&self.pool)
×
1130
            .await?;
×
1131
        Ok(())
×
1132
    }
1133

1134
    /// Update backup subresource statuses for the task based on its storage mode
1135
    /// - archive or full: updates archive_requests.status
1136
    /// - ipfs or full: updates pin_requests.status
1137
    pub async fn update_backup_statuses(
×
1138
        &self,
1139
        task_id: &str,
1140
        scope: &str,
1141
        archive_status: &str,
1142
        ipfs_status: &str,
1143
    ) -> Result<(), sqlx::Error> {
1144
        let sql = r#"
×
1145
            WITH upd_archive AS (
×
1146
                UPDATE archive_requests ar
×
1147
                SET status = $2
×
1148
                WHERE ar.task_id = $1
×
1149
                  AND ($4 IN ('archive', 'full'))
×
1150
                RETURNING 1
×
1151
            ),
1152
            upd_pins AS (
×
1153
                UPDATE pin_requests pr
×
1154
                SET status = $3
×
1155
                WHERE pr.task_id = $1
×
1156
                  AND ($4 IN ('ipfs', 'full'))
×
1157
                RETURNING 1
×
1158
            )
1159
            SELECT COALESCE((SELECT COUNT(*) FROM upd_archive), 0) AS archive_updates,
×
1160
                   COALESCE((SELECT COUNT(*) FROM upd_pins), 0)     AS pin_updates
×
1161
        "#;
×
1162
        sqlx::query(sql)
×
1163
            .bind(task_id)
×
1164
            .bind(archive_status)
×
1165
            .bind(ipfs_status)
×
1166
            .bind(scope)
×
1167
            .execute(&self.pool)
×
1168
            .await?;
×
1169
        Ok(())
×
1170
    }
1171

1172
    pub async fn update_pin_statuses(&self, updates: &[(i64, String)]) -> Result<(), sqlx::Error> {
×
1173
        if updates.is_empty() {
×
1174
            return Ok(());
×
1175
        }
1176

1177
        let mut tx = self.pool.begin().await?;
×
1178

1179
        for (id, status) in updates {
×
1180
            sqlx::query(
1181
                r#"
1182
                UPDATE pins
1183
                SET pin_status = $2
1184
                WHERE id = $1
1185
                "#,
1186
            )
1187
            .bind(id)
×
1188
            .bind(status)
×
1189
            .execute(&mut *tx)
×
1190
            .await?;
×
1191
        }
1192

1193
        tx.commit().await?;
×
1194
        Ok(())
×
1195
    }
1196

1197
    /// Complete archive deletion:
1198
    /// - If current storage_mode is 'archive', delete the whole backup (finalize deletion)
1199
    /// - Else if current storage_mode is 'full', flip to 'ipfs' to reflect archive removed
1200
    pub async fn complete_archive_request_deletion(
×
1201
        &self,
1202
        task_id: &str,
1203
    ) -> Result<(), sqlx::Error> {
1204
        // Atomically: delete when archive-only; else if full, flip to ipfs
1205
        let sql = r#"
×
1206
            WITH del AS (
×
1207
                DELETE FROM backup_tasks
×
1208
                WHERE task_id = $1 AND storage_mode = 'archive'
×
1209
                RETURNING 1
×
1210
            ), upd AS (
×
1211
                UPDATE backup_tasks
×
1212
                SET storage_mode = 'ipfs', updated_at = NOW()
×
1213
                WHERE task_id = $1 AND storage_mode = 'full' AND NOT EXISTS (SELECT 1 FROM del)
×
1214
                RETURNING 1
×
1215
            )
1216
            SELECT COALESCE((SELECT COUNT(*) FROM del), 0) AS deleted,
×
1217
                   COALESCE((SELECT COUNT(*) FROM upd), 0) AS updated
×
1218
        "#;
×
1219
        let _ = sqlx::query(sql).bind(task_id).execute(&self.pool).await?;
×
1220
        Ok(())
×
1221
    }
1222

1223
    /// Complete IPFS pins deletion:
1224
    /// - If current storage_mode is 'ipfs', delete the whole backup (finalize deletion)
1225
    /// - Else if current storage_mode is 'full', flip to 'archive' to reflect pins removed
1226
    pub async fn complete_pin_request_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
1227
        // Atomically: delete when ipfs-only; else if full, flip to archive
1228
        let sql = r#"
×
1229
            WITH del AS (
×
1230
                DELETE FROM backup_tasks
×
1231
                WHERE task_id = $1 AND storage_mode = 'ipfs'
×
1232
                RETURNING 1
×
1233
            ), upd AS (
×
1234
                UPDATE backup_tasks
×
1235
                SET storage_mode = 'archive', updated_at = NOW()
×
1236
                WHERE task_id = $1 AND storage_mode = 'full' AND NOT EXISTS (SELECT 1 FROM del)
×
1237
                RETURNING 1
×
1238
            )
1239
            SELECT COALESCE((SELECT COUNT(*) FROM del), 0) AS deleted,
×
1240
                   COALESCE((SELECT COUNT(*) FROM upd), 0) AS updated
×
1241
        "#;
×
1242
        let _ = sqlx::query(sql).bind(task_id).execute(&self.pool).await?;
×
1243
        Ok(())
×
1244
    }
1245
}
1246

1247
// Implement the unified Database trait for the real Db struct
1248
#[async_trait::async_trait]
1249
impl Database for Db {
1250
    // Backup task operations
1251
    async fn insert_backup_task(
1252
        &self,
1253
        task_id: &str,
1254
        requestor: &str,
1255
        nft_count: i32,
1256
        tokens: &serde_json::Value,
1257
        storage_mode: &str,
1258
        archive_format: Option<&str>,
1259
        retention_days: Option<u64>,
1260
    ) -> Result<(), sqlx::Error> {
1261
        Db::insert_backup_task(
1262
            self,
×
1263
            task_id,
×
1264
            requestor,
×
1265
            nft_count,
×
1266
            tokens,
×
1267
            storage_mode,
×
1268
            archive_format,
×
1269
            retention_days,
×
1270
        )
1271
        .await
×
1272
    }
1273

1274
    async fn get_backup_task(&self, task_id: &str) -> Result<Option<BackupTask>, sqlx::Error> {
×
1275
        Db::get_backup_task(self, task_id).await
×
1276
    }
1277

1278
    async fn delete_backup_task(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
1279
        Db::delete_backup_task(self, task_id).await
×
1280
    }
1281

1282
    async fn get_incomplete_backup_tasks(&self) -> Result<Vec<BackupTask>, sqlx::Error> {
×
1283
        Db::get_incomplete_backup_tasks(self).await
×
1284
    }
1285

1286
    async fn list_requestor_backup_tasks_paginated(
1287
        &self,
1288
        requestor: &str,
1289
        include_tokens: bool,
1290
        limit: i64,
1291
        offset: i64,
1292
    ) -> Result<(Vec<BackupTask>, u32), sqlx::Error> {
1293
        Db::list_requestor_backup_tasks_paginated(self, requestor, include_tokens, limit, offset)
×
1294
            .await
×
1295
    }
1296

1297
    async fn list_unprocessed_expired_backups(&self) -> Result<Vec<ExpiredBackup>, sqlx::Error> {
×
1298
        Db::list_unprocessed_expired_backups(self).await
×
1299
    }
1300

1301
    // Backup task status and error operations
1302
    async fn clear_backup_errors(&self, task_id: &str, scope: &str) -> Result<(), sqlx::Error> {
×
1303
        Db::clear_backup_errors(self, task_id, scope).await
×
1304
    }
1305

1306
    async fn set_backup_error(&self, task_id: &str, error: &str) -> Result<(), sqlx::Error> {
×
1307
        Db::set_backup_error(self, task_id, error).await
×
1308
    }
1309

1310
    async fn set_error_logs(
1311
        &self,
1312
        task_id: &str,
1313
        archive_error_log: Option<&str>,
1314
        ipfs_error_log: Option<&str>,
1315
    ) -> Result<(), sqlx::Error> {
1316
        Db::set_error_logs(self, task_id, archive_error_log, ipfs_error_log).await
×
1317
    }
1318

1319
    async fn update_archive_request_error_log(
1320
        &self,
1321
        task_id: &str,
1322
        error_log: &str,
1323
    ) -> Result<(), sqlx::Error> {
1324
        Db::update_archive_request_error_log(self, task_id, error_log).await
×
1325
    }
1326

1327
    async fn update_pin_request_error_log(
1328
        &self,
1329
        task_id: &str,
1330
        error_log: &str,
1331
    ) -> Result<(), sqlx::Error> {
1332
        Db::update_pin_request_error_log(self, task_id, error_log).await
×
1333
    }
1334

1335
    async fn set_archive_request_error(
1336
        &self,
1337
        task_id: &str,
1338
        fatal_error: &str,
1339
    ) -> Result<(), sqlx::Error> {
1340
        Db::set_archive_request_error(self, task_id, fatal_error).await
×
1341
    }
1342

1343
    async fn set_pin_request_error(
1344
        &self,
1345
        task_id: &str,
1346
        fatal_error: &str,
1347
    ) -> Result<(), sqlx::Error> {
1348
        Db::set_pin_request_error(self, task_id, fatal_error).await
×
1349
    }
1350

1351
    // Status update operations
1352
    async fn update_archive_request_status(
1353
        &self,
1354
        task_id: &str,
1355
        status: &str,
1356
    ) -> Result<(), sqlx::Error> {
1357
        Db::update_archive_request_status(self, task_id, status).await
×
1358
    }
1359

1360
    async fn update_pin_request_status(
1361
        &self,
1362
        task_id: &str,
1363
        status: &str,
1364
    ) -> Result<(), sqlx::Error> {
1365
        Db::update_pin_request_status(self, task_id, status).await
×
1366
    }
1367

1368
    async fn update_backup_statuses(
1369
        &self,
1370
        task_id: &str,
1371
        scope: &str,
1372
        archive_status: &str,
1373
        ipfs_status: &str,
1374
    ) -> Result<(), sqlx::Error> {
1375
        Db::update_backup_statuses(self, task_id, scope, archive_status, ipfs_status).await
×
1376
    }
1377

1378
    async fn update_archive_request_statuses(
1379
        &self,
1380
        task_ids: &[String],
1381
        status: &str,
1382
    ) -> Result<(), sqlx::Error> {
1383
        Db::update_archive_request_statuses(self, task_ids, status).await
×
1384
    }
1385

1386
    // Deletion operations
1387
    async fn start_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
1388
        Db::start_deletion(self, task_id).await
×
1389
    }
1390

1391
    async fn start_archive_request_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
1392
        Db::start_archive_request_deletion(self, task_id).await
×
1393
    }
1394

1395
    async fn start_pin_request_deletions(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
1396
        Db::start_pin_request_deletions(self, task_id).await
×
1397
    }
1398

1399
    async fn complete_archive_request_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
1400
        Db::complete_archive_request_deletion(self, task_id).await
×
1401
    }
1402

1403
    async fn complete_pin_request_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
1404
        Db::complete_pin_request_deletion(self, task_id).await
×
1405
    }
1406

1407
    // Retry operations
1408
    async fn retry_backup(&self, task_id: &str, retention_days: u64) -> Result<(), sqlx::Error> {
×
1409
        Db::retry_backup(self, task_id, retention_days).await
×
1410
    }
1411

1412
    // Pin operations
1413
    async fn insert_pins_with_tokens(
1414
        &self,
1415
        task_id: &str,
1416
        token_pin_mappings: &[crate::TokenPinMapping],
1417
    ) -> Result<(), sqlx::Error> {
1418
        Db::insert_pins_with_tokens(self, task_id, token_pin_mappings).await
×
1419
    }
1420

1421
    async fn get_pins_by_task_id(&self, task_id: &str) -> Result<Vec<PinRow>, sqlx::Error> {
×
1422
        Db::get_pins_by_task_id(self, task_id).await
×
1423
    }
1424

1425
    async fn get_active_pins(&self) -> Result<Vec<PinRow>, sqlx::Error> {
×
1426
        Db::get_active_pins(self).await
×
1427
    }
1428

1429
    async fn update_pin_statuses(&self, updates: &[(i64, String)]) -> Result<(), sqlx::Error> {
×
1430
        Db::update_pin_statuses(self, updates).await
×
1431
    }
1432

1433
    // Pinned tokens operations
1434
    async fn get_pinned_tokens_by_requestor(
1435
        &self,
1436
        requestor: &str,
1437
        limit: i64,
1438
        offset: i64,
1439
    ) -> Result<(Vec<TokenWithPins>, u32), sqlx::Error> {
1440
        Db::get_pinned_tokens_by_requestor(self, requestor, limit, offset).await
×
1441
    }
1442

1443
    async fn get_pinned_token_by_requestor(
1444
        &self,
1445
        requestor: &str,
1446
        chain: &str,
1447
        contract_address: &str,
1448
        token_id: &str,
1449
    ) -> Result<Option<TokenWithPins>, sqlx::Error> {
1450
        Db::get_pinned_token_by_requestor(self, requestor, chain, contract_address, token_id).await
×
1451
    }
1452
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc