• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

0xmichalis / nftbk / 18521170339

15 Oct 2025 07:25AM UTC coverage: 35.281% (-0.2%) from 35.479%
18521170339

push

github

0xmichalis
fix: use atomic delete-or-update pattern to delete backups

0 of 28 new or added lines in 1 file covered. (0.0%)

3 existing lines in 1 file now uncovered.

1389 of 3937 relevant lines covered (35.28%)

6.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/server/database/mod.rs
1
use chrono::{DateTime, Utc};
2
use serde::{Deserialize, Serialize};
3
use sqlx::{postgres::PgPoolOptions, PgPool, Row};
4

5
use crate::server::database::r#trait::Database;
6

7
pub mod r#trait;
8

9
/// Combined view of backup_tasks + archive_requests
10
#[derive(Debug, Serialize, Deserialize, Clone, utoipa::ToSchema)]
11
#[schema(description = "Backup task information including metadata and status")]
12
pub struct BackupTask {
13
    /// Unique identifier for the backup task
14
    #[schema(example = "abc123def456")]
15
    pub task_id: String,
16
    /// When the backup task was created (ISO 8601 timestamp)
17
    #[schema(example = "2024-01-01T12:00:00Z")]
18
    pub created_at: DateTime<Utc>,
19
    /// When the backup task was last updated (ISO 8601 timestamp)
20
    #[schema(example = "2024-01-01T12:05:00Z")]
21
    pub updated_at: DateTime<Utc>,
22
    /// User who requested the backup
23
    #[schema(example = "user123")]
24
    pub requestor: String,
25
    /// Number of NFTs in this backup task
26
    #[schema(example = 42)]
27
    pub nft_count: i32,
28
    /// Token details (only included if include_tokens=true)
29
    pub tokens: serde_json::Value,
30
    /// Archive subresource status (in_progress, done, error, expired)
31
    pub archive_status: Option<String>,
32
    /// IPFS subresource status (in_progress, done, error)
33
    pub ipfs_status: Option<String>,
34
    /// Detailed archive error log if archive completed with some failures
35
    #[schema(example = "Failed to write archive checksum file")]
36
    pub archive_error_log: Option<String>,
37
    /// Detailed IPFS error log aggregated from pin requests
38
    #[schema(
39
        example = "Provider pinata failed: 401 Unauthorized\nProvider web3.storage failed: 429 Too Many Requests"
40
    )]
41
    pub ipfs_error_log: Option<String>,
42
    /// Archive subresource fatal error if backup failed completely at archive stage
43
    pub archive_fatal_error: Option<String>,
44
    /// IPFS subresource fatal error if backup failed completely at IPFS stage
45
    pub ipfs_fatal_error: Option<String>,
46
    /// Storage mode used for the backup (archive, ipfs, full)
47
    #[schema(example = "archive")]
48
    pub storage_mode: String,
49
    /// Archive format used for the backup (zip, tar.gz)
50
    #[schema(example = "zip")]
51
    pub archive_format: Option<String>,
52
    /// When the backup expires (if applicable, typically 7 days from creation)
53
    #[schema(example = "2024-01-08T12:00:00Z")]
54
    pub expires_at: Option<DateTime<Utc>>,
55
    /// When deletion was started (if applicable)
56
    #[schema(example = "2024-01-02T10:00:00Z")]
57
    pub deleted_at: Option<DateTime<Utc>>,
58
    /// When archive deletion was started (if applicable)
59
    #[schema(example = "2024-01-02T10:00:00Z")]
60
    pub archive_deleted_at: Option<DateTime<Utc>>,
61
    /// When IPFS pins deletion was started (if applicable)
62
    #[schema(example = "2024-01-02T10:00:00Z")]
63
    pub pins_deleted_at: Option<DateTime<Utc>>,
64
}
65

66
#[derive(Debug, Serialize, Deserialize, Clone, utoipa::ToSchema)]
67
#[schema(description = "IPFS pin information for a specific CID")]
68
pub struct PinInfo {
69
    /// Content Identifier (CID) of the pinned content
70
    #[schema(example = "QmYwAPJzv5CZsnA625s3Xf2nemtYgPpHdWEz79ojWnPbdG")]
71
    pub cid: String,
72
    /// IPFS provider type where the content is pinned
73
    #[schema(example = "pinata")]
74
    pub provider_type: String,
75
    /// IPFS provider URL where the content is pinned
76
    #[schema(example = "https://api.pinata.cloud")]
77
    pub provider_url: String,
78
    /// Pin status (pinned, pinning, failed, queued)
79
    #[schema(example = "pinned")]
80
    pub status: String,
81
    /// When the pin was created (ISO 8601 timestamp)
82
    #[schema(example = "2024-01-01T12:00:00Z")]
83
    pub created_at: DateTime<Utc>,
84
}
85

86
#[derive(Debug, Serialize, Deserialize, Clone, utoipa::ToSchema)]
87
#[schema(description = "Token information with associated pin requests")]
88
pub struct TokenWithPins {
89
    /// Blockchain identifier (e.g., ethereum, tezos)
90
    #[schema(example = "ethereum")]
91
    pub chain: String,
92
    /// NFT contract address
93
    #[schema(example = "0x1234567890123456789012345678901234567890")]
94
    pub contract_address: String,
95
    /// NFT token ID
96
    #[schema(example = "123")]
97
    pub token_id: String,
98
    /// List of IPFS pins for this token
99
    pub pins: Vec<PinInfo>,
100
}
101

102
#[derive(Debug, Serialize, Deserialize, Clone)]
103
pub struct PinRow {
104
    pub id: i64,
105
    pub task_id: String,
106
    pub provider_type: String,
107
    pub provider_url: Option<String>,
108
    pub cid: String,
109
    pub request_id: String,
110
    pub pin_status: String,
111
    pub created_at: DateTime<Utc>,
112
}
113

114
#[derive(Debug, Clone)]
115
pub struct ExpiredBackup {
116
    pub task_id: String,
117
    pub archive_format: String,
118
}
119

120
#[derive(Clone)]
121
pub struct Db {
122
    pub pool: PgPool,
123
}
124

125
impl Db {
126
    pub async fn new(database_url: &str, max_connections: u32) -> Self {
×
127
        let pool = PgPoolOptions::new()
×
128
            .max_connections(max_connections)
×
129
            .connect(database_url)
×
130
            .await
×
131
            .expect("Failed to connect to Postgres");
132
        // Health check: run a simple query
133
        sqlx::query("SELECT 1")
×
134
            .execute(&pool)
×
135
            .await
×
136
            .expect("Postgres connection is not healthy");
137
        tracing::info!("Postgres connection is healthy");
×
138
        Db { pool }
139
    }
140

141
    #[allow(clippy::too_many_arguments)]
142
    pub async fn insert_backup_task(
×
143
        &self,
144
        task_id: &str,
145
        requestor: &str,
146
        nft_count: i32,
147
        tokens: &serde_json::Value,
148
        storage_mode: &str,
149
        archive_format: Option<&str>,
150
        retention_days: Option<u64>,
151
    ) -> Result<(), sqlx::Error> {
152
        let mut tx = self.pool.begin().await?;
×
153

154
        // Insert into backup_tasks
155
        sqlx::query(
156
            r#"
157
            INSERT INTO backup_tasks (
158
                task_id, created_at, updated_at, requestor, nft_count, tokens, storage_mode
159
            ) VALUES (
160
                $1, NOW(), NOW(), $2, $3, $4, $5
161
            )
162
            ON CONFLICT (task_id) DO UPDATE SET
163
                updated_at = NOW(),
164
                nft_count = EXCLUDED.nft_count,
165
                tokens = EXCLUDED.tokens,
166
                storage_mode = EXCLUDED.storage_mode
167
            "#,
168
        )
169
        .bind(task_id)
170
        .bind(requestor)
171
        .bind(nft_count)
172
        .bind(tokens)
173
        .bind(storage_mode)
174
        .execute(&mut *tx)
175
        .await?;
×
176

177
        // Insert into archive_requests if storage mode includes archive
178
        if storage_mode == "archive" || storage_mode == "full" {
×
179
            let archive_fmt = archive_format.unwrap_or("zip");
×
180

181
            if let Some(days) = retention_days {
×
182
                sqlx::query(
183
                    r#"
184
                    INSERT INTO archive_requests (task_id, archive_format, expires_at, status)
185
                    VALUES ($1, $2, NOW() + ($3 || ' days')::interval, 'in_progress')
186
                    ON CONFLICT (task_id) DO UPDATE SET
187
                        archive_format = EXCLUDED.archive_format,
188
                        expires_at = EXCLUDED.expires_at
189
                    "#,
190
                )
191
                .bind(task_id)
192
                .bind(archive_fmt)
193
                .bind(days as i64)
194
                .execute(&mut *tx)
195
                .await?;
×
196
            } else {
197
                sqlx::query(
198
                    r#"
199
                    INSERT INTO archive_requests (task_id, archive_format, expires_at, status)
200
                    VALUES ($1, $2, NULL, 'in_progress')
201
                    ON CONFLICT (task_id) DO UPDATE SET
202
                        archive_format = EXCLUDED.archive_format,
203
                        expires_at = EXCLUDED.expires_at
204
                    "#,
205
                )
206
                .bind(task_id)
×
207
                .bind(archive_fmt)
×
208
                .execute(&mut *tx)
×
209
                .await?;
×
210
            }
211
        }
212

213
        // Insert into pin_requests if storage mode includes IPFS
214
        if storage_mode == "ipfs" || storage_mode == "full" {
×
215
            sqlx::query(
216
                r#"
217
                INSERT INTO pin_requests (task_id, status)
218
                VALUES ($1, 'in_progress')
219
                ON CONFLICT (task_id) DO UPDATE SET
220
                    status = EXCLUDED.status
221
                "#,
222
            )
223
            .bind(task_id)
224
            .execute(&mut *tx)
225
            .await?;
×
226
        }
227

228
        tx.commit().await?;
×
229
        Ok(())
×
230
    }
231

232
    pub async fn delete_backup_task(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
233
        // CASCADE will delete associated archive_requests row if it exists
234
        sqlx::query!("DELETE FROM backup_tasks WHERE task_id = $1", task_id)
×
235
            .execute(&self.pool)
×
236
            .await?;
×
237
        Ok(())
×
238
    }
239

240
    pub async fn set_error_logs(
×
241
        &self,
242
        task_id: &str,
243
        archive_error_log: Option<&str>,
244
        ipfs_error_log: Option<&str>,
245
    ) -> Result<(), sqlx::Error> {
246
        let mut tx = self.pool.begin().await?;
×
247
        if let Some(a) = archive_error_log {
×
248
            sqlx::query("UPDATE archive_requests SET error_log = $2 WHERE task_id = $1")
×
249
                .bind(task_id)
×
250
                .bind(a)
×
251
                .execute(&mut *tx)
×
252
                .await?;
×
253
        }
254
        if let Some(i) = ipfs_error_log {
×
255
            sqlx::query(
256
                r#"
257
                    UPDATE pin_requests
258
                    SET error_log = $2
259
                    WHERE task_id = $1
260
                    "#,
261
            )
262
            .bind(task_id)
×
263
            .bind(i)
×
264
            .execute(&mut *tx)
×
265
            .await?;
×
266
        }
267
        tx.commit().await?;
×
268
        Ok(())
×
269
    }
270

271
    pub async fn update_archive_request_error_log(
×
272
        &self,
273
        task_id: &str,
274
        error_log: &str,
275
    ) -> Result<(), sqlx::Error> {
276
        sqlx::query(
277
            r#"
278
            UPDATE archive_requests
279
            SET error_log = $2
280
            WHERE task_id = $1
281
            "#,
282
        )
283
        .bind(task_id)
×
284
        .bind(error_log)
×
285
        .execute(&self.pool)
×
286
        .await?;
×
287
        Ok(())
×
288
    }
289

290
    pub async fn update_pin_request_error_log(
×
291
        &self,
292
        task_id: &str,
293
        error_log: &str,
294
    ) -> Result<(), sqlx::Error> {
295
        sqlx::query(
296
            r#"
297
            UPDATE pin_requests
298
            SET error_log = $2
299
            WHERE task_id = $1
300
            "#,
301
        )
302
        .bind(task_id)
×
303
        .bind(error_log)
×
304
        .execute(&self.pool)
×
305
        .await?;
×
306
        Ok(())
×
307
    }
308

309
    pub async fn update_pin_request_status(
×
310
        &self,
311
        task_id: &str,
312
        status: &str,
313
    ) -> Result<(), sqlx::Error> {
314
        sqlx::query(
315
            r#"
316
            UPDATE pin_requests
317
            SET status = $2
318
            WHERE task_id = $1
319
            "#,
320
        )
321
        .bind(task_id)
×
322
        .bind(status)
×
323
        .execute(&self.pool)
×
324
        .await?;
×
325
        Ok(())
×
326
    }
327

328
    pub async fn update_archive_request_status(
×
329
        &self,
330
        task_id: &str,
331
        status: &str,
332
    ) -> Result<(), sqlx::Error> {
333
        sqlx::query(
334
            r#"
335
            UPDATE archive_requests
336
            SET status = $2
337
            WHERE task_id = $1
338
            "#,
339
        )
340
        .bind(task_id)
×
341
        .bind(status)
×
342
        .execute(&self.pool)
×
343
        .await?;
×
344
        Ok(())
×
345
    }
346

347
    pub async fn update_archive_request_statuses(
×
348
        &self,
349
        task_ids: &[String],
350
        status: &str,
351
    ) -> Result<(), sqlx::Error> {
352
        if task_ids.is_empty() {
×
353
            return Ok(());
×
354
        }
355

356
        // Use a transaction for atomicity
357
        let mut tx = self.pool.begin().await?;
×
358

359
        // Update each task_id individually with a prepared statement
360
        for task_id in task_ids {
×
361
            sqlx::query("UPDATE archive_requests SET status = $1 WHERE task_id = $2")
×
362
                .bind(status)
×
363
                .bind(task_id)
×
364
                .execute(&mut *tx)
×
365
                .await?;
×
366
        }
367

368
        tx.commit().await?;
×
369
        Ok(())
×
370
    }
371

372
    // TODO: Should support pin request retries
373
    pub async fn retry_backup(
×
374
        &self,
375
        task_id: &str,
376
        retention_days: u64,
377
    ) -> Result<(), sqlx::Error> {
378
        let mut tx = self.pool.begin().await?;
×
379

380
        // Reset archive subresource status and fatal_error
381
        sqlx::query(
382
            r#"
383
            UPDATE archive_requests
384
            SET status = 'in_progress', fatal_error = NULL
385
            WHERE task_id = $1
386
            "#,
387
        )
388
        .bind(task_id)
×
389
        .execute(&mut *tx)
×
390
        .await?;
×
391

392
        // Update archive_requests expires_at if it exists
393
        sqlx::query!(
×
394
            r#"
395
            UPDATE archive_requests
396
            SET expires_at = NOW() + ($2 || ' days')::interval
397
            WHERE task_id = $1
398
            "#,
399
            task_id,
400
            retention_days as i64
×
401
        )
402
        .execute(&mut *tx)
×
403
        .await?;
×
404

405
        tx.commit().await?;
×
406
        Ok(())
×
407
    }
408

409
    pub async fn clear_backup_errors(&self, task_id: &str, scope: &str) -> Result<(), sqlx::Error> {
×
410
        let mut tx = self.pool.begin().await?;
×
411
        // Clear archive errors if scope includes archive
412
        sqlx::query(
413
            r#"
414
            UPDATE archive_requests
415
            SET error_log = NULL, fatal_error = NULL
416
            WHERE task_id = $1 AND ($2 IN ('archive', 'full'))
417
            "#,
418
        )
419
        .bind(task_id)
×
420
        .bind(scope)
×
421
        .execute(&mut *tx)
×
422
        .await?;
×
423
        // Clear IPFS errors if scope includes ipfs
424
        sqlx::query(
425
            r#"
426
            UPDATE pin_requests
427
            SET error_log = NULL, fatal_error = NULL
428
            WHERE task_id = $1 AND ($2 IN ('ipfs', 'full'))
429
            "#,
430
        )
431
        .bind(task_id)
×
432
        .bind(scope)
×
433
        .execute(&mut *tx)
×
434
        .await?;
×
435
        tx.commit().await?;
×
436
        Ok(())
×
437
    }
438

439
    pub async fn set_archive_request_error(
×
440
        &self,
441
        task_id: &str,
442
        fatal_error: &str,
443
    ) -> Result<(), sqlx::Error> {
444
        sqlx::query(
445
            r#"
446
            UPDATE archive_requests
447
            SET status = 'error', fatal_error = $2
448
            WHERE task_id = $1
449
            "#,
450
        )
451
        .bind(task_id)
×
452
        .bind(fatal_error)
×
453
        .execute(&self.pool)
×
454
        .await?;
×
455
        Ok(())
×
456
    }
457

458
    pub async fn set_pin_request_error(
×
459
        &self,
460
        task_id: &str,
461
        fatal_error: &str,
462
    ) -> Result<(), sqlx::Error> {
463
        sqlx::query(
464
            r#"
465
            UPDATE pin_requests
466
            SET status = 'error', fatal_error = $2
467
            WHERE task_id = $1
468
            "#,
469
        )
470
        .bind(task_id)
×
471
        .bind(fatal_error)
×
472
        .execute(&self.pool)
×
473
        .await?;
×
474
        Ok(())
×
475
    }
476

477
    pub async fn start_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
478
        sqlx::query!(
×
479
            r#"UPDATE backup_tasks SET deleted_at = NOW(), updated_at = NOW() WHERE task_id = $1"#,
480
            task_id
481
        )
482
        .execute(&self.pool)
×
483
        .await?;
×
484
        Ok(())
×
485
    }
486

487
    /// Mark archive as being deleted (similar to start_deletion but for archive subresource)
488
    pub async fn start_archive_request_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
489
        sqlx::query!(
×
490
            r#"
491
            UPDATE archive_requests 
492
            SET deleted_at = NOW() 
493
            WHERE task_id = $1 AND deleted_at IS NULL
494
            "#,
495
            task_id
496
        )
497
        .execute(&self.pool)
×
498
        .await?;
×
499
        Ok(())
×
500
    }
501

502
    /// Mark IPFS pins as being deleted (similar to start_deletion but for IPFS pins subresource)
503
    pub async fn start_pin_request_deletions(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
504
        sqlx::query!(
×
505
            r#"
506
            UPDATE pin_requests 
507
            SET deleted_at = NOW() 
508
            WHERE task_id = $1 AND deleted_at IS NULL
509
            "#,
510
            task_id
511
        )
512
        .execute(&self.pool)
×
513
        .await?;
×
514
        Ok(())
×
515
    }
516

517
    pub async fn get_backup_task(&self, task_id: &str) -> Result<Option<BackupTask>, sqlx::Error> {
×
518
        let row = sqlx::query(
519
            r#"
520
            SELECT 
521
                b.task_id, b.created_at, b.updated_at, b.requestor, b.nft_count, 
522
                b.tokens, ar.status as archive_status, ar.fatal_error, b.storage_mode,
523
                b.deleted_at, ar.archive_format, ar.expires_at, ar.deleted_at as archive_deleted_at,
524
                ar.error_log as archive_error_log,
525
                pr.status as ipfs_status,
526
                pr.error_log as ipfs_error_log,
527
                pr.fatal_error as ipfs_fatal_error,
528
                pr.deleted_at as pins_deleted_at
529
            FROM backup_tasks b
530
            LEFT JOIN archive_requests ar ON b.task_id = ar.task_id
531
            LEFT JOIN pin_requests pr ON b.task_id = pr.task_id
532
            WHERE b.task_id = $1
533
            "#,
534
        )
535
        .bind(task_id)
×
536
        .fetch_optional(&self.pool)
×
537
        .await?;
×
538

539
        Ok(row.map(|row| BackupTask {
540
            task_id: row.get("task_id"),
×
541
            created_at: row.get("created_at"),
×
542
            updated_at: row.get("updated_at"),
×
543
            requestor: row.get("requestor"),
×
544
            nft_count: row.get("nft_count"),
×
545
            tokens: row.get("tokens"),
×
546
            archive_status: row
×
547
                .try_get::<Option<String>, _>("archive_status")
×
548
                .ok()
×
549
                .flatten(),
×
550
            ipfs_status: row
×
551
                .try_get::<Option<String>, _>("ipfs_status")
×
552
                .ok()
×
553
                .flatten(),
×
554
            archive_error_log: row.get("archive_error_log"),
×
555
            ipfs_error_log: row.get("ipfs_error_log"),
×
556
            archive_fatal_error: row.get("fatal_error"),
×
557
            ipfs_fatal_error: row
×
558
                .try_get::<Option<String>, _>("ipfs_fatal_error")
×
559
                .ok()
×
560
                .flatten(),
×
561
            storage_mode: row.get("storage_mode"),
×
562
            archive_format: row.get("archive_format"),
×
563
            expires_at: row.get("expires_at"),
×
564
            deleted_at: row.get("deleted_at"),
×
565
            archive_deleted_at: row.get("archive_deleted_at"),
×
566
            pins_deleted_at: row.get("pins_deleted_at"),
×
567
        }))
568
    }
569

570
    pub async fn list_requestor_backup_tasks_paginated(
×
571
        &self,
572
        requestor: &str,
573
        include_tokens: bool,
574
        limit: i64,
575
        offset: i64,
576
    ) -> Result<(Vec<BackupTask>, u32), sqlx::Error> {
577
        let tokens_field = if include_tokens { "b.tokens," } else { "" };
×
578

579
        // Total count
580
        let total_row = sqlx::query!(
×
581
            r#"SELECT COUNT(*) as count FROM backup_tasks b WHERE b.requestor = $1"#,
582
            requestor
583
        )
584
        .fetch_one(&self.pool)
×
585
        .await?;
×
586
        let total: u32 = (total_row.count.unwrap_or(0) as i64).max(0) as u32;
×
587

588
        let query = format!(
×
589
            r#"
590
            SELECT 
591
                b.task_id, b.created_at, b.updated_at, b.requestor, b.nft_count, 
592
                {tokens_field} ar.status as archive_status, ar.fatal_error, b.storage_mode,
593
                b.deleted_at, ar.archive_format, ar.expires_at, ar.deleted_at as archive_deleted_at,
594
                ar.error_log as archive_error_log,
595
                pr.status as ipfs_status,
596
                pr.error_log as ipfs_error_log,
597
                pr.fatal_error as ipfs_fatal_error,
598
                pr.deleted_at as pins_deleted_at
599
            FROM backup_tasks b
600
            LEFT JOIN archive_requests ar ON b.task_id = ar.task_id
601
            LEFT JOIN pin_requests pr ON b.task_id = pr.task_id
602
            WHERE b.requestor = $1
603
            ORDER BY b.created_at DESC
604
            LIMIT $2 OFFSET $3
605
            "#,
606
        );
607

608
        let rows = sqlx::query(&query)
×
609
            .bind(requestor)
×
610
            .bind(limit)
×
611
            .bind(offset)
×
612
            .fetch_all(&self.pool)
×
613
            .await?;
×
614

615
        let recs = rows
×
616
            .into_iter()
617
            .map(|row| {
×
618
                let tokens = if include_tokens {
×
619
                    row.try_get::<serde_json::Value, _>("tokens")
×
620
                        .unwrap_or(serde_json::Value::Null)
×
621
                } else {
622
                    serde_json::Value::Null
×
623
                };
624

625
                BackupTask {
×
626
                    task_id: row.get("task_id"),
×
627
                    created_at: row.get("created_at"),
×
628
                    updated_at: row.get("updated_at"),
×
629
                    requestor: row.get("requestor"),
×
630
                    nft_count: row.get("nft_count"),
×
631
                    tokens,
×
632
                    archive_status: row
×
633
                        .try_get::<Option<String>, _>("archive_status")
×
634
                        .ok()
×
635
                        .flatten(),
×
636
                    ipfs_status: row
×
637
                        .try_get::<Option<String>, _>("ipfs_status")
×
638
                        .ok()
×
639
                        .flatten(),
×
640
                    archive_error_log: row.get("archive_error_log"),
×
641
                    ipfs_error_log: row.get("ipfs_error_log"),
×
642
                    archive_fatal_error: row.get("fatal_error"),
×
643
                    ipfs_fatal_error: row
×
644
                        .try_get::<Option<String>, _>("ipfs_fatal_error")
×
645
                        .ok()
×
646
                        .flatten(),
×
647
                    storage_mode: row.get("storage_mode"),
×
648
                    archive_format: row.get("archive_format"),
×
649
                    expires_at: row.get("expires_at"),
×
650
                    deleted_at: row.get("deleted_at"),
×
651
                    archive_deleted_at: row.get("archive_deleted_at"),
×
652
                    pins_deleted_at: row.get("pins_deleted_at"),
×
653
                }
654
            })
655
            .collect();
656

657
        Ok((recs, total))
×
658
    }
659

660
    pub async fn list_unprocessed_expired_backups(
×
661
        &self,
662
    ) -> Result<Vec<ExpiredBackup>, sqlx::Error> {
663
        let rows = sqlx::query(
664
            r#"
665
            SELECT b.task_id, ar.archive_format 
666
            FROM backup_tasks b
667
            JOIN archive_requests ar ON b.task_id = ar.task_id
668
            WHERE ar.expires_at IS NOT NULL AND ar.expires_at < NOW() AND ar.status != 'expired'
669
            "#,
670
        )
671
        .fetch_all(&self.pool)
×
672
        .await?;
×
673
        let recs = rows
×
674
            .into_iter()
675
            .map(|row| ExpiredBackup {
×
676
                task_id: row.get("task_id"),
×
677
                archive_format: row.get("archive_format"),
×
678
            })
679
            .collect();
680
        Ok(recs)
×
681
    }
682

683
    /// Retrieve all backup tasks that are in 'in_progress' status
684
    /// This is used to recover incomplete tasks on server restart
685
    pub async fn get_incomplete_backup_tasks(&self) -> Result<Vec<BackupTask>, sqlx::Error> {
×
686
        let rows = sqlx::query(
687
            r#"
688
            SELECT 
689
                b.task_id, b.created_at, b.updated_at, b.requestor, b.nft_count, 
690
                b.tokens, ar.status as archive_status, ar.fatal_error, b.storage_mode,
691
                b.deleted_at, ar.archive_format, ar.expires_at, ar.deleted_at as archive_deleted_at,
692
                ar.error_log as archive_error_log,
693
                pr.status as ipfs_status,
694
                pr.error_log as ipfs_error_log,
695
                pr.deleted_at as pins_deleted_at
696
            FROM backup_tasks b
697
            LEFT JOIN archive_requests ar ON b.task_id = ar.task_id
698
            LEFT JOIN pin_requests pr ON b.task_id = pr.task_id
699
            WHERE (
700
                -- Archive-only mode: check archive status (record must exist and be in_progress)
701
                (b.storage_mode = 'archive' AND ar.status = 'in_progress')
702
                OR
703
                -- IPFS-only mode: check IPFS status (record must exist and be in_progress)
704
                (b.storage_mode = 'ipfs' AND pr.status = 'in_progress')
705
                OR
706
                -- Full mode: check both archive and IPFS status (task is incomplete if either is in_progress)
707
                (b.storage_mode = 'full' AND (ar.status = 'in_progress' OR pr.status = 'in_progress'))
708
            )
709
            ORDER BY b.created_at ASC
710
            "#,
711
        )
712
        .fetch_all(&self.pool)
×
713
        .await?;
×
714

715
        let recs = rows
×
716
            .into_iter()
717
            .map(|row| BackupTask {
×
718
                task_id: row.get("task_id"),
×
719
                created_at: row.get("created_at"),
×
720
                updated_at: row.get("updated_at"),
×
721
                requestor: row.get("requestor"),
×
722
                nft_count: row.get("nft_count"),
×
723
                tokens: row.get("tokens"),
×
724
                archive_status: row
×
725
                    .try_get::<Option<String>, _>("archive_status")
×
726
                    .ok()
×
727
                    .flatten(),
×
728
                ipfs_status: row
×
729
                    .try_get::<Option<String>, _>("ipfs_status")
×
730
                    .ok()
×
731
                    .flatten(),
×
732
                archive_error_log: row.get("archive_error_log"),
×
733
                ipfs_error_log: row.get("ipfs_error_log"),
×
734
                archive_fatal_error: row.get("fatal_error"),
×
735
                ipfs_fatal_error: None,
×
736
                storage_mode: row.get("storage_mode"),
×
737
                archive_format: row.get("archive_format"),
×
738
                expires_at: row.get("expires_at"),
×
739
                deleted_at: row.get("deleted_at"),
×
740
                archive_deleted_at: row.get("archive_deleted_at"),
×
741
                pins_deleted_at: row.get("pins_deleted_at"),
×
742
            })
743
            .collect();
744

745
        Ok(recs)
×
746
    }
747

748
    /// Insert pins and their associated tokens in a single atomic transaction
749
    pub async fn insert_pins_with_tokens(
×
750
        &self,
751
        task_id: &str,
752
        token_pin_mappings: &[crate::TokenPinMapping],
753
    ) -> Result<(), sqlx::Error> {
754
        if token_pin_mappings.is_empty() {
×
755
            return Ok(());
×
756
        }
757

758
        // Collect all pin responses and prepare token data
759
        let mut all_pin_responses = Vec::new();
×
760
        let mut all_token_data = Vec::new(); // (index_in_pin_responses, chain, contract_address, token_id)
×
761

762
        for mapping in token_pin_mappings {
×
763
            for pin_response in &mapping.pin_responses {
×
764
                let index = all_pin_responses.len();
×
765
                all_pin_responses.push(pin_response);
×
766
                all_token_data.push((
×
767
                    index,
×
768
                    mapping.chain.clone(),
×
769
                    mapping.contract_address.clone(),
×
770
                    mapping.token_id.clone(),
×
771
                ));
772
            }
773
        }
774

775
        if all_pin_responses.is_empty() {
×
776
            return Ok(());
×
777
        }
778

779
        // Start a transaction for atomicity
780
        let mut tx = self.pool.begin().await?;
×
781

782
        // Insert pins one by one and collect generated IDs
783
        let mut pin_ids: Vec<i64> = Vec::new();
×
784
        for pin_response in &all_pin_responses {
×
785
            // Map status enum to lowercase string to satisfy CHECK constraint
786
            let status = match pin_response.status {
×
787
                crate::ipfs::PinResponseStatus::Queued => "queued",
×
788
                crate::ipfs::PinResponseStatus::Pinning => "pinning",
×
789
                crate::ipfs::PinResponseStatus::Pinned => "pinned",
×
790
                crate::ipfs::PinResponseStatus::Failed => "failed",
×
791
            };
792

793
            let row = sqlx::query(
794
                "INSERT INTO pins (task_id, provider_type, provider_url, cid, request_id, pin_status) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id"
795
            )
796
            .bind(task_id)
×
797
            .bind(&pin_response.provider_type)
×
798
            .bind(&pin_response.provider_url)
×
799
            .bind(&pin_response.cid)
×
800
            .bind(&pin_response.id)
×
801
            .bind(status)
×
802
            .fetch_one(&mut *tx)
×
803
            .await?;
×
804

805
            pin_ids.push(row.get("id"));
×
806
        }
807

808
        // Insert pinned tokens using the generated pin_ids
809
        for (index, chain, contract_address, token_id) in &all_token_data {
×
810
            sqlx::query(
811
                "INSERT INTO pinned_tokens (pin_id, chain, contract_address, token_id) VALUES ($1, $2, $3, $4)"
812
            )
813
            .bind(pin_ids[*index])
×
814
            .bind(chain)
×
815
            .bind(contract_address)
×
816
            .bind(token_id)
×
817
            .execute(&mut *tx)
×
818
            .await?;
×
819
        }
820

821
        // Commit the transaction
822
        tx.commit().await?;
×
823
        Ok(())
×
824
    }
825

826
    /// Get all pins for a specific backup task
827
    pub async fn get_pins_by_task_id(&self, task_id: &str) -> Result<Vec<PinRow>, sqlx::Error> {
×
828
        let rows = sqlx::query(
829
            r#"
830
            SELECT id, task_id, provider_type, provider_url, cid, request_id, pin_status, created_at
831
            FROM pins
832
            WHERE task_id = $1
833
            ORDER BY id
834
            "#,
835
        )
836
        .bind(task_id)
×
837
        .fetch_all(&self.pool)
×
838
        .await?;
×
839

840
        Ok(rows
×
841
            .into_iter()
×
842
            .map(|row| PinRow {
×
843
                id: row.get("id"),
×
844
                task_id: row.get("task_id"),
×
845
                provider_type: row.get("provider_type"),
×
846
                provider_url: row
×
847
                    .try_get::<Option<String>, _>("provider_url")
×
848
                    .ok()
×
849
                    .flatten(),
×
850
                cid: row.get("cid"),
×
851
                request_id: row.get("request_id"),
×
852
                pin_status: row.get("pin_status"),
×
853
                created_at: row.get("created_at"),
×
854
            })
855
            .collect())
×
856
    }
857

858
    /// Paginated pinned tokens grouped by (chain, contract_address, token_id)
859
    pub async fn get_pinned_tokens_by_requestor(
×
860
        &self,
861
        requestor: &str,
862
        limit: i64,
863
        offset: i64,
864
    ) -> Result<(Vec<TokenWithPins>, u32), sqlx::Error> {
865
        // Total distinct tokens for this requestor
866
        let total_row = sqlx::query(
867
            r#"
868
            SELECT COUNT(*) as count
869
            FROM (
870
                SELECT DISTINCT pt.chain, pt.contract_address, pt.token_id
871
                FROM pinned_tokens pt
872
                JOIN pins p ON p.id = pt.pin_id
873
                JOIN backup_tasks bt ON bt.task_id = p.task_id
874
                WHERE bt.requestor = $1
875
            ) t
876
            "#,
877
        )
878
        .bind(requestor)
×
879
        .fetch_one(&self.pool)
×
880
        .await?;
×
881
        let total: u32 = (total_row.get::<i64, _>("count")).max(0) as u32;
×
882

883
        // Page of distinct tokens ordered by most recent pin time
884
        let rows = sqlx::query(
885
            r#"
886
            SELECT t.chain, t.contract_address, t.token_id
887
            FROM (
888
                SELECT pt.chain, pt.contract_address, pt.token_id, MAX(pt.created_at) AS last_created
889
                FROM pinned_tokens pt
890
                JOIN pins p ON p.id = pt.pin_id
891
                JOIN backup_tasks bt ON bt.task_id = p.task_id
892
                WHERE bt.requestor = $1
893
                GROUP BY pt.chain, pt.contract_address, pt.token_id
894
            ) t
895
            ORDER BY last_created DESC
896
            LIMIT $2 OFFSET $3
897
            "#,
898
        )
899
        .bind(requestor)
×
900
        .bind(limit)
×
901
        .bind(offset)
×
902
        .fetch_all(&self.pool)
×
903
        .await?;
×
904

905
        // For each token key, fetch pins (ordered by created_at desc)
906
        let mut result: Vec<TokenWithPins> = Vec::new();
×
907
        for r in rows {
×
908
            let token_rows = sqlx::query(
909
                r#"
910
                SELECT pt.chain, pt.contract_address, pt.token_id,
911
                       p.cid, p.provider_type, p.provider_url, p.pin_status, pt.created_at
912
                FROM pinned_tokens pt
913
                JOIN pins p ON p.id = pt.pin_id
914
                JOIN backup_tasks bt ON bt.task_id = p.task_id
915
                WHERE bt.requestor = $1
916
                  AND pt.chain = $2
917
                  AND pt.contract_address = $3
918
                  AND pt.token_id = $4
919
                ORDER BY pt.created_at DESC
920
                "#,
921
            )
922
            .bind(requestor)
×
923
            .bind(r.get::<String, _>("chain"))
×
924
            .bind(r.get::<String, _>("contract_address"))
×
925
            .bind(r.get::<String, _>("token_id"))
×
926
            .fetch_all(&self.pool)
×
927
            .await?;
×
928

929
            let mut pins: Vec<PinInfo> = Vec::new();
×
930
            let mut chain = String::new();
×
931
            let mut contract_address = String::new();
×
932
            let mut token_id = String::new();
×
933
            for row in token_rows {
×
934
                chain = row.get("chain");
×
935
                contract_address = row.get("contract_address");
×
936
                token_id = row.get("token_id");
×
937
                let cid: String = row.get("cid");
×
938
                let provider_type: String = row.get("provider_type");
×
939
                let provider_url: String = row
×
940
                    .try_get::<Option<String>, _>("provider_url")
941
                    .ok()
942
                    .flatten()
943
                    .unwrap_or_default();
944
                let status: String = row.get("pin_status");
×
945
                let created_at: DateTime<Utc> = row.get("created_at");
×
946
                pins.push(PinInfo {
×
947
                    cid,
×
948
                    provider_type,
×
949
                    provider_url,
×
950
                    status,
×
951
                    created_at,
×
952
                });
953
            }
954
            result.push(TokenWithPins {
×
955
                chain,
×
956
                contract_address,
×
957
                token_id,
×
958
                pins,
×
959
            });
960
        }
961

962
        Ok((result, total))
×
963
    }
964

965
    /// Get a specific pinned token for a requestor
966
    pub async fn get_pinned_token_by_requestor(
×
967
        &self,
968
        requestor: &str,
969
        chain: &str,
970
        contract_address: &str,
971
        token_id: &str,
972
    ) -> Result<Option<TokenWithPins>, sqlx::Error> {
973
        let query = r#"
×
974
            SELECT pt.chain, pt.contract_address, pt.token_id,
×
975
                   p.cid, p.provider_type, p.provider_url, p.pin_status, pt.created_at
×
976
            FROM pinned_tokens pt
×
977
            JOIN pins p ON p.id = pt.pin_id
×
978
            JOIN backup_tasks bt ON bt.task_id = p.task_id
×
979
            WHERE bt.requestor = $1
×
980
              AND pt.chain = $2
×
981
              AND pt.contract_address = $3
×
982
              AND pt.token_id = $4
×
983
            ORDER BY pt.created_at DESC
×
984
        "#;
×
985

986
        let rows = sqlx::query(query)
×
987
            .bind(requestor)
×
988
            .bind(chain)
×
989
            .bind(contract_address)
×
990
            .bind(token_id)
×
991
            .fetch_all(&self.pool)
×
992
            .await?;
×
993

994
        if rows.is_empty() {
×
995
            return Ok(None);
×
996
        }
997

998
        let mut pins = Vec::new();
×
999
        let mut token_chain = String::new();
×
1000
        let mut token_contract_address = String::new();
×
1001
        let mut token_token_id = String::new();
×
1002

1003
        for row in rows {
×
1004
            token_chain = row.get("chain");
×
1005
            token_contract_address = row.get("contract_address");
×
1006
            token_token_id = row.get("token_id");
×
1007
            let cid: String = row.get("cid");
×
1008
            let provider_type: String = row.get("provider_type");
×
1009
            // provider_url may be NULL for legacy rows; default to empty string for API stability
1010
            let provider_url: String = row
×
1011
                .try_get::<Option<String>, _>("provider_url")
1012
                .ok()
1013
                .flatten()
1014
                .unwrap_or_default();
1015
            let status: String = row.get("pin_status");
×
1016
            let created_at: DateTime<Utc> = row.get("created_at");
×
1017

1018
            pins.push(PinInfo {
×
1019
                cid,
×
1020
                provider_type,
×
1021
                provider_url,
×
1022
                status,
×
1023
                created_at,
×
1024
            });
1025
        }
1026

1027
        Ok(Some(TokenWithPins {
×
1028
            chain: token_chain,
×
1029
            contract_address: token_contract_address,
×
1030
            token_id: token_token_id,
×
1031
            pins,
×
1032
        }))
1033
    }
1034

1035
    /// Get all pins that are in 'queued' or 'pinning' status
1036
    /// This is used by the pin monitor to check for status updates
1037
    pub async fn get_active_pins(&self) -> Result<Vec<PinRow>, sqlx::Error> {
×
1038
        let rows = sqlx::query(
1039
            r#"
1040
            SELECT id, task_id, provider_type, provider_url, cid, request_id, pin_status, created_at
1041
            FROM pins
1042
            WHERE pin_status IN ('queued', 'pinning')
1043
            ORDER BY id
1044
            "#,
1045
        )
1046
        .fetch_all(&self.pool)
×
1047
        .await?;
×
1048

1049
        Ok(rows
×
1050
            .into_iter()
×
1051
            .map(|row| PinRow {
×
1052
                id: row.get("id"),
×
1053
                task_id: row.get("task_id"),
×
1054
                provider_type: row.get("provider_type"),
×
1055
                provider_url: row
×
1056
                    .try_get::<Option<String>, _>("provider_url")
×
1057
                    .ok()
×
1058
                    .flatten(),
×
1059
                cid: row.get("cid"),
×
1060
                request_id: row.get("request_id"),
×
1061
                pin_status: row.get("pin_status"),
×
1062
                created_at: row.get("created_at"),
×
1063
            })
1064
            .collect())
×
1065
    }
1066

1067
    /// Set backup fatal error for relevant subresources in a single SQL statement.
1068
    /// The update is based on the `storage_mode` value from the `backup_tasks` table for the given `task_id`:
1069
    /// - If storage_mode is 'archive' or 'full': updates archive_requests.status and archive_requests.fatal_error
1070
    /// - If storage_mode is 'ipfs' or 'full': updates pin_requests.status and pin_requests.fatal_error
1071
    pub async fn set_backup_error(
×
1072
        &self,
1073
        task_id: &str,
1074
        fatal_error: &str,
1075
    ) -> Result<(), sqlx::Error> {
1076
        let sql = r#"
×
1077
            WITH task_mode AS (
×
1078
                SELECT storage_mode FROM backup_tasks WHERE task_id = $1
×
1079
            ),
1080
            upd_archive AS (
×
1081
                UPDATE archive_requests ar
×
1082
                SET status = 'error', fatal_error = $2
×
1083
                WHERE ar.task_id = $1
×
1084
                  AND EXISTS (
×
1085
                      SELECT 1 FROM task_mode tm
×
1086
                      WHERE tm.storage_mode IN ('archive', 'full')
×
1087
                  )
1088
                RETURNING 1
×
1089
            ),
1090
            upd_pins AS (
×
1091
                UPDATE pin_requests pr
×
1092
                SET status = 'error', fatal_error = $2
×
1093
                WHERE pr.task_id = $1
×
1094
                  AND EXISTS (
×
1095
                      SELECT 1 FROM task_mode tm
×
1096
                      WHERE tm.storage_mode IN ('ipfs', 'full')
×
1097
                  )
1098
                RETURNING 1
×
1099
            )
1100
            SELECT COALESCE((SELECT COUNT(*) FROM upd_archive), 0) AS archive_updates,
×
1101
                   COALESCE((SELECT COUNT(*) FROM upd_pins), 0)     AS pin_updates
×
1102
        "#;
×
1103
        sqlx::query(sql)
×
1104
            .bind(task_id)
×
1105
            .bind(fatal_error)
×
1106
            .execute(&self.pool)
×
1107
            .await?;
×
1108
        Ok(())
×
1109
    }
1110

1111
    /// Update backup subresource statuses for the task based on its storage mode
1112
    /// - archive or full: updates archive_requests.status
1113
    /// - ipfs or full: updates pin_requests.status
1114
    pub async fn update_backup_statuses(
×
1115
        &self,
1116
        task_id: &str,
1117
        scope: &str,
1118
        archive_status: &str,
1119
        ipfs_status: &str,
1120
    ) -> Result<(), sqlx::Error> {
1121
        let sql = r#"
×
1122
            WITH upd_archive AS (
×
1123
                UPDATE archive_requests ar
×
1124
                SET status = $2
×
1125
                WHERE ar.task_id = $1
×
1126
                  AND ($4 IN ('archive', 'full'))
×
1127
                RETURNING 1
×
1128
            ),
1129
            upd_pins AS (
×
1130
                UPDATE pin_requests pr
×
1131
                SET status = $3
×
1132
                WHERE pr.task_id = $1
×
1133
                  AND ($4 IN ('ipfs', 'full'))
×
1134
                RETURNING 1
×
1135
            )
1136
            SELECT COALESCE((SELECT COUNT(*) FROM upd_archive), 0) AS archive_updates,
×
1137
                   COALESCE((SELECT COUNT(*) FROM upd_pins), 0)     AS pin_updates
×
1138
        "#;
×
1139
        sqlx::query(sql)
×
1140
            .bind(task_id)
×
1141
            .bind(archive_status)
×
1142
            .bind(ipfs_status)
×
1143
            .bind(scope)
×
1144
            .execute(&self.pool)
×
1145
            .await?;
×
1146
        Ok(())
×
1147
    }
1148

1149
    pub async fn update_pin_statuses(&self, updates: &[(i64, String)]) -> Result<(), sqlx::Error> {
×
1150
        if updates.is_empty() {
×
1151
            return Ok(());
×
1152
        }
1153

1154
        let mut tx = self.pool.begin().await?;
×
1155

1156
        for (id, status) in updates {
×
1157
            sqlx::query(
1158
                r#"
1159
                UPDATE pins
1160
                SET pin_status = $2
1161
                WHERE id = $1
1162
                "#,
1163
            )
1164
            .bind(id)
×
1165
            .bind(status)
×
1166
            .execute(&mut *tx)
×
1167
            .await?;
×
1168
        }
1169

1170
        tx.commit().await?;
×
1171
        Ok(())
×
1172
    }
1173

1174
    /// Complete archive deletion:
1175
    /// - If current storage_mode is 'archive', delete the whole backup (finalize deletion)
1176
    /// - Else if current storage_mode is 'full', flip to 'ipfs' to reflect archive removed
UNCOV
1177
    pub async fn complete_archive_request_deletion(
×
1178
        &self,
1179
        task_id: &str,
1180
    ) -> Result<(), sqlx::Error> {
1181
        // Atomically: delete when archive-only; else if full, flip to ipfs
NEW
1182
        let sql = r#"
×
NEW
1183
            WITH del AS (
×
NEW
1184
                DELETE FROM backup_tasks
×
NEW
1185
                WHERE task_id = $1 AND storage_mode = 'archive'
×
NEW
1186
                RETURNING 1
×
NEW
1187
            ), upd AS (
×
NEW
1188
                UPDATE backup_tasks
×
NEW
1189
                SET storage_mode = 'ipfs', updated_at = NOW()
×
NEW
1190
                WHERE task_id = $1 AND storage_mode = 'full' AND NOT EXISTS (SELECT 1 FROM del)
×
NEW
1191
                RETURNING 1
×
1192
            )
NEW
1193
            SELECT COALESCE((SELECT COUNT(*) FROM del), 0) AS deleted,
×
NEW
1194
                   COALESCE((SELECT COUNT(*) FROM upd), 0) AS updated
×
NEW
1195
        "#;
×
NEW
1196
        let _ = sqlx::query(sql).bind(task_id).execute(&self.pool).await?;
×
UNCOV
1197
        Ok(())
×
1198
    }
1199

1200
    /// Complete IPFS pins deletion:
1201
    /// - If current storage_mode is 'ipfs', delete the whole backup (finalize deletion)
1202
    /// - Else if current storage_mode is 'full', flip to 'archive' to reflect pins removed
1203
    pub async fn complete_pin_request_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
1204
        // Atomically: delete when ipfs-only; else if full, flip to archive
NEW
1205
        let sql = r#"
×
NEW
1206
            WITH del AS (
×
NEW
1207
                DELETE FROM backup_tasks
×
NEW
1208
                WHERE task_id = $1 AND storage_mode = 'ipfs'
×
NEW
1209
                RETURNING 1
×
NEW
1210
            ), upd AS (
×
NEW
1211
                UPDATE backup_tasks
×
NEW
1212
                SET storage_mode = 'archive', updated_at = NOW()
×
NEW
1213
                WHERE task_id = $1 AND storage_mode = 'full' AND NOT EXISTS (SELECT 1 FROM del)
×
NEW
1214
                RETURNING 1
×
1215
            )
NEW
1216
            SELECT COALESCE((SELECT COUNT(*) FROM del), 0) AS deleted,
×
NEW
1217
                   COALESCE((SELECT COUNT(*) FROM upd), 0) AS updated
×
NEW
1218
        "#;
×
NEW
1219
        let _ = sqlx::query(sql).bind(task_id).execute(&self.pool).await?;
×
UNCOV
1220
        Ok(())
×
1221
    }
1222
}
1223

1224
// Implement the unified Database trait for the real Db struct
1225
#[async_trait::async_trait]
1226
impl Database for Db {
1227
    // Backup task operations
1228
    async fn insert_backup_task(
1229
        &self,
1230
        task_id: &str,
1231
        requestor: &str,
1232
        nft_count: i32,
1233
        tokens: &serde_json::Value,
1234
        storage_mode: &str,
1235
        archive_format: Option<&str>,
1236
        retention_days: Option<u64>,
1237
    ) -> Result<(), sqlx::Error> {
1238
        Db::insert_backup_task(
1239
            self,
×
1240
            task_id,
×
1241
            requestor,
×
1242
            nft_count,
×
1243
            tokens,
×
1244
            storage_mode,
×
1245
            archive_format,
×
1246
            retention_days,
×
1247
        )
1248
        .await
×
1249
    }
1250

1251
    async fn get_backup_task(&self, task_id: &str) -> Result<Option<BackupTask>, sqlx::Error> {
×
1252
        Db::get_backup_task(self, task_id).await
×
1253
    }
1254

1255
    async fn delete_backup_task(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
1256
        Db::delete_backup_task(self, task_id).await
×
1257
    }
1258

1259
    async fn get_incomplete_backup_tasks(&self) -> Result<Vec<BackupTask>, sqlx::Error> {
×
1260
        Db::get_incomplete_backup_tasks(self).await
×
1261
    }
1262

1263
    async fn list_requestor_backup_tasks_paginated(
1264
        &self,
1265
        requestor: &str,
1266
        include_tokens: bool,
1267
        limit: i64,
1268
        offset: i64,
1269
    ) -> Result<(Vec<BackupTask>, u32), sqlx::Error> {
1270
        Db::list_requestor_backup_tasks_paginated(self, requestor, include_tokens, limit, offset)
×
1271
            .await
×
1272
    }
1273

1274
    async fn list_unprocessed_expired_backups(&self) -> Result<Vec<ExpiredBackup>, sqlx::Error> {
×
1275
        Db::list_unprocessed_expired_backups(self).await
×
1276
    }
1277

1278
    // Backup task status and error operations
1279
    async fn clear_backup_errors(&self, task_id: &str, scope: &str) -> Result<(), sqlx::Error> {
×
1280
        Db::clear_backup_errors(self, task_id, scope).await
×
1281
    }
1282

1283
    async fn set_backup_error(&self, task_id: &str, error: &str) -> Result<(), sqlx::Error> {
×
1284
        Db::set_backup_error(self, task_id, error).await
×
1285
    }
1286

1287
    async fn set_error_logs(
1288
        &self,
1289
        task_id: &str,
1290
        archive_error_log: Option<&str>,
1291
        ipfs_error_log: Option<&str>,
1292
    ) -> Result<(), sqlx::Error> {
1293
        Db::set_error_logs(self, task_id, archive_error_log, ipfs_error_log).await
×
1294
    }
1295

1296
    async fn update_archive_request_error_log(
1297
        &self,
1298
        task_id: &str,
1299
        error_log: &str,
1300
    ) -> Result<(), sqlx::Error> {
1301
        Db::update_archive_request_error_log(self, task_id, error_log).await
×
1302
    }
1303

1304
    async fn update_pin_request_error_log(
1305
        &self,
1306
        task_id: &str,
1307
        error_log: &str,
1308
    ) -> Result<(), sqlx::Error> {
1309
        Db::update_pin_request_error_log(self, task_id, error_log).await
×
1310
    }
1311

1312
    async fn set_archive_request_error(
1313
        &self,
1314
        task_id: &str,
1315
        fatal_error: &str,
1316
    ) -> Result<(), sqlx::Error> {
1317
        Db::set_archive_request_error(self, task_id, fatal_error).await
×
1318
    }
1319

1320
    async fn set_pin_request_error(
1321
        &self,
1322
        task_id: &str,
1323
        fatal_error: &str,
1324
    ) -> Result<(), sqlx::Error> {
1325
        Db::set_pin_request_error(self, task_id, fatal_error).await
×
1326
    }
1327

1328
    // Status update operations
1329
    async fn update_archive_request_status(
1330
        &self,
1331
        task_id: &str,
1332
        status: &str,
1333
    ) -> Result<(), sqlx::Error> {
1334
        Db::update_archive_request_status(self, task_id, status).await
×
1335
    }
1336

1337
    async fn update_pin_request_status(
1338
        &self,
1339
        task_id: &str,
1340
        status: &str,
1341
    ) -> Result<(), sqlx::Error> {
1342
        Db::update_pin_request_status(self, task_id, status).await
×
1343
    }
1344

1345
    async fn update_backup_statuses(
1346
        &self,
1347
        task_id: &str,
1348
        scope: &str,
1349
        archive_status: &str,
1350
        ipfs_status: &str,
1351
    ) -> Result<(), sqlx::Error> {
1352
        Db::update_backup_statuses(self, task_id, scope, archive_status, ipfs_status).await
×
1353
    }
1354

1355
    async fn update_archive_request_statuses(
1356
        &self,
1357
        task_ids: &[String],
1358
        status: &str,
1359
    ) -> Result<(), sqlx::Error> {
1360
        Db::update_archive_request_statuses(self, task_ids, status).await
×
1361
    }
1362

1363
    // Deletion operations
1364
    async fn start_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
1365
        Db::start_deletion(self, task_id).await
×
1366
    }
1367

1368
    async fn start_archive_request_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
1369
        Db::start_archive_request_deletion(self, task_id).await
×
1370
    }
1371

1372
    async fn start_pin_request_deletions(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
1373
        Db::start_pin_request_deletions(self, task_id).await
×
1374
    }
1375

1376
    async fn complete_archive_request_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
1377
        Db::complete_archive_request_deletion(self, task_id).await
×
1378
    }
1379

1380
    async fn complete_pin_request_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
1381
        Db::complete_pin_request_deletion(self, task_id).await
×
1382
    }
1383

1384
    // Retry operations
1385
    async fn retry_backup(&self, task_id: &str, retention_days: u64) -> Result<(), sqlx::Error> {
×
1386
        Db::retry_backup(self, task_id, retention_days).await
×
1387
    }
1388

1389
    // Pin operations
1390
    async fn insert_pins_with_tokens(
1391
        &self,
1392
        task_id: &str,
1393
        token_pin_mappings: &[crate::TokenPinMapping],
1394
    ) -> Result<(), sqlx::Error> {
1395
        Db::insert_pins_with_tokens(self, task_id, token_pin_mappings).await
×
1396
    }
1397

1398
    async fn get_pins_by_task_id(&self, task_id: &str) -> Result<Vec<PinRow>, sqlx::Error> {
×
1399
        Db::get_pins_by_task_id(self, task_id).await
×
1400
    }
1401

1402
    async fn get_active_pins(&self) -> Result<Vec<PinRow>, sqlx::Error> {
×
1403
        Db::get_active_pins(self).await
×
1404
    }
1405

1406
    async fn update_pin_statuses(&self, updates: &[(i64, String)]) -> Result<(), sqlx::Error> {
×
1407
        Db::update_pin_statuses(self, updates).await
×
1408
    }
1409

1410
    // Pinned tokens operations
1411
    async fn get_pinned_tokens_by_requestor(
1412
        &self,
1413
        requestor: &str,
1414
        limit: i64,
1415
        offset: i64,
1416
    ) -> Result<(Vec<TokenWithPins>, u32), sqlx::Error> {
1417
        Db::get_pinned_tokens_by_requestor(self, requestor, limit, offset).await
×
1418
    }
1419

1420
    async fn get_pinned_token_by_requestor(
1421
        &self,
1422
        requestor: &str,
1423
        chain: &str,
1424
        contract_address: &str,
1425
        token_id: &str,
1426
    ) -> Result<Option<TokenWithPins>, sqlx::Error> {
1427
        Db::get_pinned_token_by_requestor(self, requestor, chain, contract_address, token_id).await
×
1428
    }
1429
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc