• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

0xmichalis / nftbk / 18519216215

15 Oct 2025 05:51AM UTC coverage: 35.315% (+0.09%) from 35.225%
18519216215

push

github

0xmichalis
refactor: a few more db handler renames

7 of 25 new or added lines in 4 files covered. (28.0%)

328 existing lines in 2 files now uncovered.

1378 of 3902 relevant lines covered (35.32%)

5.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/server/database/mod.rs
1
use chrono::{DateTime, Utc};
2
use serde::{Deserialize, Serialize};
3
use sqlx::{postgres::PgPoolOptions, PgPool, Row};
4

5
use crate::server::database::r#trait::Database;
6

7
pub mod r#trait;
8

9
/// Combined view of backup_tasks + archive_requests
10
#[derive(Debug, Serialize, Deserialize, Clone, utoipa::ToSchema)]
11
#[schema(description = "Backup task information including metadata and status")]
12
pub struct BackupTask {
13
    /// Unique identifier for the backup task
14
    #[schema(example = "abc123def456")]
15
    pub task_id: String,
16
    /// When the backup task was created (ISO 8601 timestamp)
17
    #[schema(example = "2024-01-01T12:00:00Z")]
18
    pub created_at: DateTime<Utc>,
19
    /// When the backup task was last updated (ISO 8601 timestamp)
20
    #[schema(example = "2024-01-01T12:05:00Z")]
21
    pub updated_at: DateTime<Utc>,
22
    /// User who requested the backup
23
    #[schema(example = "user123")]
24
    pub requestor: String,
25
    /// Number of NFTs in this backup task
26
    #[schema(example = 42)]
27
    pub nft_count: i32,
28
    /// Token details (only included if include_tokens=true)
29
    pub tokens: serde_json::Value,
30
    /// Archive subresource status (in_progress, done, error, expired)
31
    pub archive_status: Option<String>,
32
    /// IPFS subresource status (in_progress, done, error)
33
    pub ipfs_status: Option<String>,
34
    /// Detailed archive error log if archive completed with some failures
35
    #[schema(example = "Failed to write archive checksum file")]
36
    pub archive_error_log: Option<String>,
37
    /// Detailed IPFS error log aggregated from pin requests
38
    #[schema(
39
        example = "Provider pinata failed: 401 Unauthorized\nProvider web3.storage failed: 429 Too Many Requests"
40
    )]
41
    pub ipfs_error_log: Option<String>,
42
    /// Archive subresource fatal error if backup failed completely at archive stage
43
    pub archive_fatal_error: Option<String>,
44
    /// IPFS subresource fatal error if backup failed completely at IPFS stage
45
    pub ipfs_fatal_error: Option<String>,
46
    /// Storage mode used for the backup (archive, ipfs, full)
47
    #[schema(example = "archive")]
48
    pub storage_mode: String,
49
    /// Archive format used for the backup (zip, tar.gz)
50
    #[schema(example = "zip")]
51
    pub archive_format: Option<String>,
52
    /// When the backup expires (if applicable, typically 7 days from creation)
53
    #[schema(example = "2024-01-08T12:00:00Z")]
54
    pub expires_at: Option<DateTime<Utc>>,
55
    /// When deletion was started (if applicable)
56
    #[schema(example = "2024-01-02T10:00:00Z")]
57
    pub deleted_at: Option<DateTime<Utc>>,
58
    /// When archive deletion was started (if applicable)
59
    #[schema(example = "2024-01-02T10:00:00Z")]
60
    pub archive_deleted_at: Option<DateTime<Utc>>,
61
    /// When IPFS pins deletion was started (if applicable)
62
    #[schema(example = "2024-01-02T10:00:00Z")]
63
    pub pins_deleted_at: Option<DateTime<Utc>>,
64
}
65

66
#[derive(Debug, Serialize, Deserialize, Clone, utoipa::ToSchema)]
67
#[schema(description = "IPFS pin information for a specific CID")]
68
pub struct PinInfo {
69
    /// Content Identifier (CID) of the pinned content
70
    #[schema(example = "QmYwAPJzv5CZsnA625s3Xf2nemtYgPpHdWEz79ojWnPbdG")]
71
    pub cid: String,
72
    /// IPFS provider type where the content is pinned
73
    #[schema(example = "pinata")]
74
    pub provider_type: String,
75
    /// IPFS provider URL where the content is pinned
76
    #[schema(example = "https://api.pinata.cloud")]
77
    pub provider_url: String,
78
    /// Pin status (pinned, pinning, failed, queued)
79
    #[schema(example = "pinned")]
80
    pub status: String,
81
    /// When the pin was created (ISO 8601 timestamp)
82
    #[schema(example = "2024-01-01T12:00:00Z")]
83
    pub created_at: DateTime<Utc>,
84
}
85

86
#[derive(Debug, Serialize, Deserialize, Clone, utoipa::ToSchema)]
87
#[schema(description = "Token information with associated pin requests")]
88
pub struct TokenWithPins {
89
    /// Blockchain identifier (e.g., ethereum, tezos)
90
    #[schema(example = "ethereum")]
91
    pub chain: String,
92
    /// NFT contract address
93
    #[schema(example = "0x1234567890123456789012345678901234567890")]
94
    pub contract_address: String,
95
    /// NFT token ID
96
    #[schema(example = "123")]
97
    pub token_id: String,
98
    /// List of IPFS pins for this token
99
    pub pins: Vec<PinInfo>,
100
}
101

102
#[derive(Debug, Serialize, Deserialize, Clone)]
103
pub struct PinRow {
104
    pub id: i64,
105
    pub task_id: String,
106
    pub provider_type: String,
107
    pub provider_url: Option<String>,
108
    pub cid: String,
109
    pub request_id: String,
110
    pub pin_status: String,
111
    pub created_at: DateTime<Utc>,
112
}
113

114
#[derive(Debug, Clone)]
115
pub struct ExpiredBackup {
116
    pub task_id: String,
117
    pub archive_format: String,
118
}
119

120
#[derive(Clone)]
121
pub struct Db {
122
    pub pool: PgPool,
123
}
124

125
impl Db {
126
    pub async fn new(database_url: &str, max_connections: u32) -> Self {
×
127
        let pool = PgPoolOptions::new()
×
128
            .max_connections(max_connections)
×
129
            .connect(database_url)
×
130
            .await
×
131
            .expect("Failed to connect to Postgres");
132
        // Health check: run a simple query
133
        sqlx::query("SELECT 1")
×
134
            .execute(&pool)
×
135
            .await
×
136
            .expect("Postgres connection is not healthy");
137
        tracing::info!("Postgres connection is healthy");
×
138
        Db { pool }
139
    }
140

141
    #[allow(clippy::too_many_arguments)]
142
    pub async fn insert_backup_task(
×
143
        &self,
144
        task_id: &str,
145
        requestor: &str,
146
        nft_count: i32,
147
        tokens: &serde_json::Value,
148
        storage_mode: &str,
149
        archive_format: Option<&str>,
150
        retention_days: Option<u64>,
151
    ) -> Result<(), sqlx::Error> {
152
        let mut tx = self.pool.begin().await?;
×
153

154
        // Insert into backup_tasks
155
        sqlx::query(
156
            r#"
157
            INSERT INTO backup_tasks (
158
                task_id, created_at, updated_at, requestor, nft_count, tokens, storage_mode
159
            ) VALUES (
160
                $1, NOW(), NOW(), $2, $3, $4, $5
161
            )
162
            ON CONFLICT (task_id) DO UPDATE SET
163
                updated_at = NOW(),
164
                nft_count = EXCLUDED.nft_count,
165
                tokens = EXCLUDED.tokens,
166
                storage_mode = EXCLUDED.storage_mode
167
            "#,
168
        )
169
        .bind(task_id)
170
        .bind(requestor)
171
        .bind(nft_count)
172
        .bind(tokens)
173
        .bind(storage_mode)
174
        .execute(&mut *tx)
175
        .await?;
×
176

177
        // Insert into archive_requests if storage mode includes archive
178
        if storage_mode == "archive" || storage_mode == "full" {
×
179
            let archive_fmt = archive_format.unwrap_or("zip");
×
180

181
            if let Some(days) = retention_days {
×
182
                sqlx::query(
183
                    r#"
184
                    INSERT INTO archive_requests (task_id, archive_format, expires_at, status)
185
                    VALUES ($1, $2, NOW() + ($3 || ' days')::interval, 'in_progress')
186
                    ON CONFLICT (task_id) DO UPDATE SET
187
                        archive_format = EXCLUDED.archive_format,
188
                        expires_at = EXCLUDED.expires_at,
189
                        status = COALESCE(archive_requests.status, 'in_progress')
190
                    "#,
191
                )
192
                .bind(task_id)
193
                .bind(archive_fmt)
194
                .bind(days as i64)
195
                .execute(&mut *tx)
196
                .await?;
×
197
            } else {
198
                sqlx::query(
199
                    r#"
200
                    INSERT INTO archive_requests (task_id, archive_format, expires_at, status)
201
                    VALUES ($1, $2, NULL, 'in_progress')
202
                    ON CONFLICT (task_id) DO UPDATE SET
203
                        archive_format = EXCLUDED.archive_format,
204
                        expires_at = EXCLUDED.expires_at,
205
                        status = COALESCE(archive_requests.status, 'in_progress')
206
                    "#,
207
                )
208
                .bind(task_id)
×
209
                .bind(archive_fmt)
×
210
                .execute(&mut *tx)
×
211
                .await?;
×
212
            }
213
        }
214

215
        // Insert into pin_requests if storage mode includes IPFS
216
        if storage_mode == "ipfs" || storage_mode == "full" {
×
217
            sqlx::query(
218
                r#"
219
                INSERT INTO pin_requests (task_id, status)
220
                VALUES ($1, 'in_progress')
221
                ON CONFLICT (task_id) DO UPDATE SET
222
                    status = COALESCE(pin_requests.status, 'in_progress')
223
                "#,
224
            )
225
            .bind(task_id)
226
            .execute(&mut *tx)
227
            .await?;
×
228
        }
229

230
        tx.commit().await?;
×
231
        Ok(())
×
232
    }
233

234
    pub async fn delete_backup_task(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
235
        // CASCADE will delete associated archive_requests row if it exists
236
        sqlx::query!("DELETE FROM backup_tasks WHERE task_id = $1", task_id)
×
237
            .execute(&self.pool)
×
238
            .await?;
×
239
        Ok(())
×
240
    }
241

UNCOV
242
    pub async fn set_error_logs(
×
243
        &self,
244
        task_id: &str,
245
        archive_error_log: Option<&str>,
246
        ipfs_error_log: Option<&str>,
247
    ) -> Result<(), sqlx::Error> {
UNCOV
248
        let mut tx = self.pool.begin().await?;
×
249
        if let Some(a) = archive_error_log {
×
250
            sqlx::query("UPDATE archive_requests SET error_log = $2 WHERE task_id = $1")
×
251
                .bind(task_id)
×
252
                .bind(a)
×
253
                .execute(&mut *tx)
×
254
                .await?;
×
255
        }
UNCOV
256
        if let Some(i) = ipfs_error_log {
×
257
            sqlx::query(
258
                r#"
259
                    UPDATE pin_requests
260
                    SET error_log = $2
261
                    WHERE task_id = $1
262
                    "#,
263
            )
UNCOV
264
            .bind(task_id)
×
UNCOV
265
            .bind(i)
×
266
            .execute(&mut *tx)
×
267
            .await?;
×
268
        }
269
        tx.commit().await?;
×
UNCOV
270
        Ok(())
×
271
    }
272

UNCOV
273
    pub async fn update_archive_request_error_log(
×
274
        &self,
275
        task_id: &str,
276
        error_log: &str,
277
    ) -> Result<(), sqlx::Error> {
278
        sqlx::query(
279
            r#"
280
            UPDATE archive_requests
281
            SET error_log = $2
282
            WHERE task_id = $1
283
            "#,
284
        )
UNCOV
285
        .bind(task_id)
×
UNCOV
286
        .bind(error_log)
×
287
        .execute(&self.pool)
×
288
        .await?;
×
289
        Ok(())
×
290
    }
291

UNCOV
292
    pub async fn update_pin_request_error_log(
×
293
        &self,
294
        task_id: &str,
295
        error_log: &str,
296
    ) -> Result<(), sqlx::Error> {
297
        sqlx::query(
298
            r#"
299
            UPDATE pin_requests
300
            SET error_log = $2
301
            WHERE task_id = $1
302
            "#,
303
        )
UNCOV
304
        .bind(task_id)
×
UNCOV
305
        .bind(error_log)
×
UNCOV
306
        .execute(&self.pool)
×
307
        .await?;
×
308
        Ok(())
×
309
    }
310

311
    pub async fn update_pin_request_status(
×
312
        &self,
313
        task_id: &str,
314
        status: &str,
315
    ) -> Result<(), sqlx::Error> {
316
        sqlx::query(
317
            r#"
318
            UPDATE pin_requests
319
            SET status = $2
320
            WHERE task_id = $1
321
            "#,
322
        )
UNCOV
323
        .bind(task_id)
×
UNCOV
324
        .bind(status)
×
UNCOV
325
        .execute(&self.pool)
×
UNCOV
326
        .await?;
×
327
        Ok(())
×
328
    }
329

330
    pub async fn update_archive_request_status(
×
331
        &self,
332
        task_id: &str,
333
        status: &str,
334
    ) -> Result<(), sqlx::Error> {
335
        sqlx::query(
336
            r#"
337
            UPDATE archive_requests
338
            SET status = $2
339
            WHERE task_id = $1
340
            "#,
341
        )
UNCOV
342
        .bind(task_id)
×
UNCOV
343
        .bind(status)
×
UNCOV
344
        .execute(&self.pool)
×
UNCOV
345
        .await?;
×
346
        Ok(())
×
347
    }
348

349
    pub async fn update_archive_request_statuses(
×
350
        &self,
351
        task_ids: &[String],
352
        status: &str,
353
    ) -> Result<(), sqlx::Error> {
UNCOV
354
        if task_ids.is_empty() {
×
UNCOV
355
            return Ok(());
×
356
        }
357

358
        // Use a transaction for atomicity
UNCOV
359
        let mut tx = self.pool.begin().await?;
×
360

361
        // Update each task_id individually with a prepared statement
UNCOV
362
        for task_id in task_ids {
×
UNCOV
363
            sqlx::query("UPDATE archive_requests SET status = $1 WHERE task_id = $2")
×
UNCOV
364
                .bind(status)
×
UNCOV
365
                .bind(task_id)
×
UNCOV
366
                .execute(&mut *tx)
×
UNCOV
367
                .await?;
×
368
        }
369

370
        tx.commit().await?;
×
UNCOV
371
        Ok(())
×
372
    }
373

374
    // TODO: Should support pin request retries
UNCOV
375
    pub async fn retry_backup(
×
376
        &self,
377
        task_id: &str,
378
        retention_days: u64,
379
    ) -> Result<(), sqlx::Error> {
380
        let mut tx = self.pool.begin().await?;
×
381

382
        // Reset archive subresource status and fatal_error
383
        sqlx::query(
384
            r#"
385
            UPDATE archive_requests
386
            SET status = 'in_progress', fatal_error = NULL
387
            WHERE task_id = $1
388
            "#,
389
        )
390
        .bind(task_id)
×
UNCOV
391
        .execute(&mut *tx)
×
UNCOV
392
        .await?;
×
393

394
        // Update archive_requests expires_at if it exists
395
        sqlx::query!(
×
396
            r#"
397
            UPDATE archive_requests
398
            SET expires_at = NOW() + ($2 || ' days')::interval
399
            WHERE task_id = $1
400
            "#,
401
            task_id,
UNCOV
402
            retention_days as i64
×
403
        )
404
        .execute(&mut *tx)
×
405
        .await?;
×
406

407
        tx.commit().await?;
×
408
        Ok(())
×
409
    }
410

411
    pub async fn clear_backup_errors(&self, task_id: &str, scope: &str) -> Result<(), sqlx::Error> {
×
412
        let mut tx = self.pool.begin().await?;
×
413
        // Clear archive errors if scope includes archive
414
        sqlx::query(
415
            r#"
416
            UPDATE archive_requests
417
            SET error_log = NULL, fatal_error = NULL
418
            WHERE task_id = $1 AND ($2 IN ('archive', 'full'))
419
            "#,
420
        )
UNCOV
421
        .bind(task_id)
×
UNCOV
422
        .bind(scope)
×
UNCOV
423
        .execute(&mut *tx)
×
UNCOV
424
        .await?;
×
425
        // Clear IPFS errors if scope includes ipfs
426
        sqlx::query(
427
            r#"
428
            UPDATE pin_requests
429
            SET error_log = NULL, fatal_error = NULL
430
            WHERE task_id = $1 AND ($2 IN ('ipfs', 'full'))
431
            "#,
432
        )
UNCOV
433
        .bind(task_id)
×
UNCOV
434
        .bind(scope)
×
UNCOV
435
        .execute(&mut *tx)
×
UNCOV
436
        .await?;
×
437
        tx.commit().await?;
×
438
        Ok(())
×
439
    }
440

441
    pub async fn set_archive_request_error(
×
442
        &self,
443
        task_id: &str,
444
        fatal_error: &str,
445
    ) -> Result<(), sqlx::Error> {
446
        sqlx::query(
447
            r#"
448
            UPDATE archive_requests
449
            SET status = 'error', fatal_error = $2
450
            WHERE task_id = $1
451
            "#,
452
        )
UNCOV
453
        .bind(task_id)
×
UNCOV
454
        .bind(fatal_error)
×
UNCOV
455
        .execute(&self.pool)
×
UNCOV
456
        .await?;
×
457
        Ok(())
×
458
    }
459

460
    pub async fn set_pin_request_error(
×
461
        &self,
462
        task_id: &str,
463
        fatal_error: &str,
464
    ) -> Result<(), sqlx::Error> {
465
        sqlx::query(
466
            r#"
467
            UPDATE pin_requests
468
            SET status = 'error', fatal_error = $2
469
            WHERE task_id = $1
470
            "#,
471
        )
UNCOV
472
        .bind(task_id)
×
UNCOV
473
        .bind(fatal_error)
×
UNCOV
474
        .execute(&self.pool)
×
475
        .await?;
×
476
        Ok(())
×
477
    }
478

UNCOV
479
    pub async fn start_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
UNCOV
480
        sqlx::query!(
×
481
            r#"UPDATE backup_tasks SET deleted_at = NOW(), updated_at = NOW() WHERE task_id = $1"#,
482
            task_id
483
        )
484
        .execute(&self.pool)
×
485
        .await?;
×
486
        Ok(())
×
487
    }
488

489
    /// Mark archive as being deleted (similar to start_deletion but for archive subresource)
490
    pub async fn start_archive_request_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
491
        sqlx::query!(
×
492
            r#"
493
            UPDATE archive_requests 
494
            SET deleted_at = NOW() 
495
            WHERE task_id = $1 AND deleted_at IS NULL
496
            "#,
497
            task_id
498
        )
499
        .execute(&self.pool)
×
500
        .await?;
×
501
        Ok(())
×
502
    }
503

504
    /// Mark IPFS pins as being deleted (similar to start_deletion but for IPFS pins subresource)
UNCOV
505
    pub async fn start_pin_request_deletions(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
UNCOV
506
        sqlx::query!(
×
507
            r#"
508
            UPDATE pin_requests 
509
            SET deleted_at = NOW() 
510
            WHERE task_id = $1 AND deleted_at IS NULL
511
            "#,
512
            task_id
513
        )
UNCOV
514
        .execute(&self.pool)
×
UNCOV
515
        .await?;
×
UNCOV
516
        Ok(())
×
517
    }
518

UNCOV
519
    pub async fn get_backup_task(&self, task_id: &str) -> Result<Option<BackupTask>, sqlx::Error> {
×
520
        let row = sqlx::query(
521
            r#"
522
            SELECT 
523
                b.task_id, b.created_at, b.updated_at, b.requestor, b.nft_count, 
524
                b.tokens, ar.status as archive_status, ar.fatal_error, b.storage_mode,
525
                b.deleted_at, ar.archive_format, ar.expires_at, ar.deleted_at as archive_deleted_at,
526
                ar.error_log as archive_error_log,
527
                pr.status as ipfs_status,
528
                pr.error_log as ipfs_error_log,
529
                pr.fatal_error as ipfs_fatal_error,
530
                pr.deleted_at as pins_deleted_at
531
            FROM backup_tasks b
532
            LEFT JOIN archive_requests ar ON b.task_id = ar.task_id
533
            LEFT JOIN pin_requests pr ON b.task_id = pr.task_id
534
            WHERE b.task_id = $1
535
            "#,
536
        )
537
        .bind(task_id)
×
538
        .fetch_optional(&self.pool)
×
539
        .await?;
×
540

541
        Ok(row.map(|row| BackupTask {
542
            task_id: row.get("task_id"),
×
543
            created_at: row.get("created_at"),
×
544
            updated_at: row.get("updated_at"),
×
545
            requestor: row.get("requestor"),
×
546
            nft_count: row.get("nft_count"),
×
547
            tokens: row.get("tokens"),
×
548
            archive_status: row
×
549
                .try_get::<Option<String>, _>("archive_status")
×
550
                .ok()
×
551
                .flatten(),
×
552
            ipfs_status: row
×
553
                .try_get::<Option<String>, _>("ipfs_status")
×
UNCOV
554
                .ok()
×
UNCOV
555
                .flatten(),
×
UNCOV
556
            archive_error_log: row.get("archive_error_log"),
×
557
            ipfs_error_log: row.get("ipfs_error_log"),
×
UNCOV
558
            archive_fatal_error: row.get("fatal_error"),
×
UNCOV
559
            ipfs_fatal_error: row
×
UNCOV
560
                .try_get::<Option<String>, _>("ipfs_fatal_error")
×
UNCOV
561
                .ok()
×
UNCOV
562
                .flatten(),
×
UNCOV
563
            storage_mode: row.get("storage_mode"),
×
564
            archive_format: row.get("archive_format"),
×
UNCOV
565
            expires_at: row.get("expires_at"),
×
UNCOV
566
            deleted_at: row.get("deleted_at"),
×
567
            archive_deleted_at: row.get("archive_deleted_at"),
×
UNCOV
568
            pins_deleted_at: row.get("pins_deleted_at"),
×
569
        }))
570
    }
571

572
    pub async fn list_requestor_backup_tasks_paginated(
×
573
        &self,
574
        requestor: &str,
575
        include_tokens: bool,
576
        limit: i64,
577
        offset: i64,
578
    ) -> Result<(Vec<BackupTask>, u32), sqlx::Error> {
UNCOV
579
        let tokens_field = if include_tokens { "b.tokens," } else { "" };
×
580

581
        // Total count
UNCOV
582
        let total_row = sqlx::query!(
×
583
            r#"SELECT COUNT(*) as count FROM backup_tasks b WHERE b.requestor = $1"#,
584
            requestor
585
        )
UNCOV
586
        .fetch_one(&self.pool)
×
UNCOV
587
        .await?;
×
UNCOV
588
        let total: u32 = (total_row.count.unwrap_or(0) as i64).max(0) as u32;
×
589

UNCOV
590
        let query = format!(
×
591
            r#"
592
            SELECT 
593
                b.task_id, b.created_at, b.updated_at, b.requestor, b.nft_count, 
594
                {tokens_field} ar.status as archive_status, ar.fatal_error, b.storage_mode,
595
                b.deleted_at, ar.archive_format, ar.expires_at, ar.deleted_at as archive_deleted_at,
596
                ar.error_log as archive_error_log,
597
                pr.status as ipfs_status,
598
                pr.error_log as ipfs_error_log,
599
                pr.fatal_error as ipfs_fatal_error,
600
                pr.deleted_at as pins_deleted_at
601
            FROM backup_tasks b
602
            LEFT JOIN archive_requests ar ON b.task_id = ar.task_id
603
            LEFT JOIN pin_requests pr ON b.task_id = pr.task_id
604
            WHERE b.requestor = $1
605
            ORDER BY b.created_at DESC
606
            LIMIT $2 OFFSET $3
607
            "#,
608
        );
609

UNCOV
610
        let rows = sqlx::query(&query)
×
UNCOV
611
            .bind(requestor)
×
612
            .bind(limit)
×
613
            .bind(offset)
×
614
            .fetch_all(&self.pool)
×
615
            .await?;
×
616

617
        let recs = rows
×
618
            .into_iter()
619
            .map(|row| {
×
620
                let tokens = if include_tokens {
×
621
                    row.try_get::<serde_json::Value, _>("tokens")
×
622
                        .unwrap_or(serde_json::Value::Null)
×
623
                } else {
624
                    serde_json::Value::Null
×
625
                };
626

627
                BackupTask {
×
628
                    task_id: row.get("task_id"),
×
629
                    created_at: row.get("created_at"),
×
630
                    updated_at: row.get("updated_at"),
×
631
                    requestor: row.get("requestor"),
×
632
                    nft_count: row.get("nft_count"),
×
633
                    tokens,
×
634
                    archive_status: row
×
635
                        .try_get::<Option<String>, _>("archive_status")
×
636
                        .ok()
×
637
                        .flatten(),
×
638
                    ipfs_status: row
×
639
                        .try_get::<Option<String>, _>("ipfs_status")
×
UNCOV
640
                        .ok()
×
UNCOV
641
                        .flatten(),
×
UNCOV
642
                    archive_error_log: row.get("archive_error_log"),
×
UNCOV
643
                    ipfs_error_log: row.get("ipfs_error_log"),
×
644
                    archive_fatal_error: row.get("fatal_error"),
×
UNCOV
645
                    ipfs_fatal_error: row
×
UNCOV
646
                        .try_get::<Option<String>, _>("ipfs_fatal_error")
×
647
                        .ok()
×
UNCOV
648
                        .flatten(),
×
UNCOV
649
                    storage_mode: row.get("storage_mode"),
×
UNCOV
650
                    archive_format: row.get("archive_format"),
×
UNCOV
651
                    expires_at: row.get("expires_at"),
×
UNCOV
652
                    deleted_at: row.get("deleted_at"),
×
UNCOV
653
                    archive_deleted_at: row.get("archive_deleted_at"),
×
UNCOV
654
                    pins_deleted_at: row.get("pins_deleted_at"),
×
655
                }
656
            })
657
            .collect();
658

659
        Ok((recs, total))
×
660
    }
661

662
    pub async fn list_unprocessed_expired_backups(
×
663
        &self,
664
    ) -> Result<Vec<ExpiredBackup>, sqlx::Error> {
665
        let rows = sqlx::query(
666
            r#"
667
            SELECT b.task_id, ar.archive_format 
668
            FROM backup_tasks b
669
            JOIN archive_requests ar ON b.task_id = ar.task_id
670
            WHERE ar.expires_at IS NOT NULL AND ar.expires_at < NOW() AND COALESCE(ar.status, 'in_progress') != 'expired'
671
            "#,
672
        )
UNCOV
673
        .fetch_all(&self.pool)
×
UNCOV
674
        .await?;
×
UNCOV
675
        let recs = rows
×
676
            .into_iter()
UNCOV
677
            .map(|row| ExpiredBackup {
×
UNCOV
678
                task_id: row.get("task_id"),
×
UNCOV
679
                archive_format: row.get("archive_format"),
×
680
            })
681
            .collect();
UNCOV
682
        Ok(recs)
×
683
    }
684

685
    /// Retrieve all backup tasks that are in 'in_progress' status
686
    /// This is used to recover incomplete tasks on server restart
UNCOV
687
    pub async fn get_incomplete_backup_tasks(&self) -> Result<Vec<BackupTask>, sqlx::Error> {
×
688
        let rows = sqlx::query(
689
            r#"
690
            SELECT 
691
                b.task_id, b.created_at, b.updated_at, b.requestor, b.nft_count, 
692
                b.tokens, ar.status as archive_status, ar.fatal_error, b.storage_mode,
693
                b.deleted_at, ar.archive_format, ar.expires_at, ar.deleted_at as archive_deleted_at,
694
                ar.error_log as archive_error_log,
695
                pr.status as ipfs_status,
696
                pr.error_log as ipfs_error_log,
697
                pr.deleted_at as pins_deleted_at
698
            FROM backup_tasks b
699
            LEFT JOIN archive_requests ar ON b.task_id = ar.task_id
700
            LEFT JOIN pin_requests pr ON b.task_id = pr.task_id
701
            WHERE (
702
                -- Archive-only mode: check archive status (record must exist and be in_progress)
703
                (b.storage_mode = 'archive' AND ar.status = 'in_progress')
704
                OR
705
                -- IPFS-only mode: check IPFS status (record must exist and be in_progress)
706
                (b.storage_mode = 'ipfs' AND pr.status = 'in_progress')
707
                OR
708
                -- Full mode: check both archive and IPFS status (task is incomplete if either is in_progress)
709
                (b.storage_mode = 'full' AND (ar.status = 'in_progress' OR pr.status = 'in_progress'))
710
            )
711
            ORDER BY b.created_at ASC
712
            "#,
713
        )
714
        .fetch_all(&self.pool)
×
715
        .await?;
×
716

717
        let recs = rows
×
718
            .into_iter()
719
            .map(|row| BackupTask {
×
720
                task_id: row.get("task_id"),
×
721
                created_at: row.get("created_at"),
×
722
                updated_at: row.get("updated_at"),
×
723
                requestor: row.get("requestor"),
×
724
                nft_count: row.get("nft_count"),
×
725
                tokens: row.get("tokens"),
×
UNCOV
726
                archive_status: row
×
UNCOV
727
                    .try_get::<Option<String>, _>("archive_status")
×
UNCOV
728
                    .ok()
×
729
                    .flatten(),
×
UNCOV
730
                ipfs_status: row
×
UNCOV
731
                    .try_get::<Option<String>, _>("ipfs_status")
×
UNCOV
732
                    .ok()
×
733
                    .flatten(),
×
UNCOV
734
                archive_error_log: row.get("archive_error_log"),
×
UNCOV
735
                ipfs_error_log: row.get("ipfs_error_log"),
×
UNCOV
736
                archive_fatal_error: row.get("fatal_error"),
×
UNCOV
737
                ipfs_fatal_error: None,
×
738
                storage_mode: row.get("storage_mode"),
×
739
                archive_format: row.get("archive_format"),
×
UNCOV
740
                expires_at: row.get("expires_at"),
×
UNCOV
741
                deleted_at: row.get("deleted_at"),
×
UNCOV
742
                archive_deleted_at: row.get("archive_deleted_at"),
×
743
                pins_deleted_at: row.get("pins_deleted_at"),
×
744
            })
745
            .collect();
746

747
        Ok(recs)
×
748
    }
749

750
    /// Insert pins and their associated tokens in a single atomic transaction
751
    pub async fn insert_pins_with_tokens(
×
752
        &self,
753
        task_id: &str,
754
        token_pin_mappings: &[crate::TokenPinMapping],
755
    ) -> Result<(), sqlx::Error> {
UNCOV
756
        if token_pin_mappings.is_empty() {
×
UNCOV
757
            return Ok(());
×
758
        }
759

760
        // Collect all pin responses and prepare token data
UNCOV
761
        let mut all_pin_responses = Vec::new();
×
UNCOV
762
        let mut all_token_data = Vec::new(); // (index_in_pin_responses, chain, contract_address, token_id)
×
763

764
        for mapping in token_pin_mappings {
×
UNCOV
765
            for pin_response in &mapping.pin_responses {
×
UNCOV
766
                let index = all_pin_responses.len();
×
767
                all_pin_responses.push(pin_response);
×
768
                all_token_data.push((
×
UNCOV
769
                    index,
×
770
                    mapping.chain.clone(),
×
771
                    mapping.contract_address.clone(),
×
772
                    mapping.token_id.clone(),
×
773
                ));
774
            }
775
        }
776

UNCOV
777
        if all_pin_responses.is_empty() {
×
UNCOV
778
            return Ok(());
×
779
        }
780

781
        // Start a transaction for atomicity
782
        let mut tx = self.pool.begin().await?;
×
783

784
        // Insert pins one by one and collect generated IDs
785
        let mut pin_ids: Vec<i64> = Vec::new();
×
786
        for pin_response in &all_pin_responses {
×
787
            // Map status enum to lowercase string to satisfy CHECK constraint
UNCOV
788
            let status = match pin_response.status {
×
789
                crate::ipfs::PinResponseStatus::Queued => "queued",
×
UNCOV
790
                crate::ipfs::PinResponseStatus::Pinning => "pinning",
×
UNCOV
791
                crate::ipfs::PinResponseStatus::Pinned => "pinned",
×
UNCOV
792
                crate::ipfs::PinResponseStatus::Failed => "failed",
×
793
            };
794

795
            let row = sqlx::query(
796
                "INSERT INTO pins (task_id, provider_type, provider_url, cid, request_id, pin_status) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id"
797
            )
798
            .bind(task_id)
×
799
            .bind(&pin_response.provider_type)
×
800
            .bind(&pin_response.provider_url)
×
801
            .bind(&pin_response.cid)
×
802
            .bind(&pin_response.id)
×
UNCOV
803
            .bind(status)
×
UNCOV
804
            .fetch_one(&mut *tx)
×
UNCOV
805
            .await?;
×
806

807
            pin_ids.push(row.get("id"));
×
808
        }
809

810
        // Insert pinned tokens using the generated pin_ids
811
        for (index, chain, contract_address, token_id) in &all_token_data {
×
812
            sqlx::query(
813
                "INSERT INTO pinned_tokens (pin_id, chain, contract_address, token_id) VALUES ($1, $2, $3, $4)"
814
            )
UNCOV
815
            .bind(pin_ids[*index])
×
UNCOV
816
            .bind(chain)
×
UNCOV
817
            .bind(contract_address)
×
UNCOV
818
            .bind(token_id)
×
UNCOV
819
            .execute(&mut *tx)
×
820
            .await?;
×
821
        }
822

823
        // Commit the transaction
824
        tx.commit().await?;
×
825
        Ok(())
×
826
    }
827

828
    /// Get all pins for a specific backup task
829
    pub async fn get_pins_by_task_id(&self, task_id: &str) -> Result<Vec<PinRow>, sqlx::Error> {
×
830
        let rows = sqlx::query(
831
            r#"
832
            SELECT id, task_id, provider_type, provider_url, cid, request_id, pin_status, created_at
833
            FROM pins
834
            WHERE task_id = $1
835
            ORDER BY id
836
            "#,
837
        )
UNCOV
838
        .bind(task_id)
×
839
        .fetch_all(&self.pool)
×
UNCOV
840
        .await?;
×
841

UNCOV
842
        Ok(rows
×
843
            .into_iter()
×
UNCOV
844
            .map(|row| PinRow {
×
UNCOV
845
                id: row.get("id"),
×
UNCOV
846
                task_id: row.get("task_id"),
×
UNCOV
847
                provider_type: row.get("provider_type"),
×
UNCOV
848
                provider_url: row
×
UNCOV
849
                    .try_get::<Option<String>, _>("provider_url")
×
UNCOV
850
                    .ok()
×
UNCOV
851
                    .flatten(),
×
UNCOV
852
                cid: row.get("cid"),
×
UNCOV
853
                request_id: row.get("request_id"),
×
UNCOV
854
                pin_status: row.get("pin_status"),
×
UNCOV
855
                created_at: row.get("created_at"),
×
856
            })
UNCOV
857
            .collect())
×
858
    }
859

860
    /// Paginated pinned tokens grouped by (chain, contract_address, token_id)
UNCOV
861
    pub async fn get_pinned_tokens_by_requestor(
×
862
        &self,
863
        requestor: &str,
864
        limit: i64,
865
        offset: i64,
866
    ) -> Result<(Vec<TokenWithPins>, u32), sqlx::Error> {
867
        // Total distinct tokens for this requestor
868
        let total_row = sqlx::query(
869
            r#"
870
            SELECT COUNT(*) as count
871
            FROM (
872
                SELECT DISTINCT pt.chain, pt.contract_address, pt.token_id
873
                FROM pinned_tokens pt
874
                JOIN pins p ON p.id = pt.pin_id
875
                JOIN backup_tasks bt ON bt.task_id = p.task_id
876
                WHERE bt.requestor = $1
877
            ) t
878
            "#,
879
        )
UNCOV
880
        .bind(requestor)
×
UNCOV
881
        .fetch_one(&self.pool)
×
UNCOV
882
        .await?;
×
883
        let total: u32 = (total_row.get::<i64, _>("count")).max(0) as u32;
×
884

885
        // Page of distinct tokens ordered by most recent pin time
886
        let rows = sqlx::query(
887
            r#"
888
            SELECT t.chain, t.contract_address, t.token_id
889
            FROM (
890
                SELECT pt.chain, pt.contract_address, pt.token_id, MAX(pt.created_at) AS last_created
891
                FROM pinned_tokens pt
892
                JOIN pins p ON p.id = pt.pin_id
893
                JOIN backup_tasks bt ON bt.task_id = p.task_id
894
                WHERE bt.requestor = $1
895
                GROUP BY pt.chain, pt.contract_address, pt.token_id
896
            ) t
897
            ORDER BY last_created DESC
898
            LIMIT $2 OFFSET $3
899
            "#,
900
        )
UNCOV
901
        .bind(requestor)
×
UNCOV
902
        .bind(limit)
×
UNCOV
903
        .bind(offset)
×
UNCOV
904
        .fetch_all(&self.pool)
×
UNCOV
905
        .await?;
×
906

907
        // For each token key, fetch pins (ordered by created_at desc)
908
        let mut result: Vec<TokenWithPins> = Vec::new();
×
909
        for r in rows {
×
910
            let token_rows = sqlx::query(
911
                r#"
912
                SELECT pt.chain, pt.contract_address, pt.token_id,
913
                       p.cid, p.provider_type, p.provider_url, p.pin_status, pt.created_at
914
                FROM pinned_tokens pt
915
                JOIN pins p ON p.id = pt.pin_id
916
                JOIN backup_tasks bt ON bt.task_id = p.task_id
917
                WHERE bt.requestor = $1
918
                  AND pt.chain = $2
919
                  AND pt.contract_address = $3
920
                  AND pt.token_id = $4
921
                ORDER BY pt.created_at DESC
922
                "#,
923
            )
UNCOV
924
            .bind(requestor)
×
UNCOV
925
            .bind(r.get::<String, _>("chain"))
×
UNCOV
926
            .bind(r.get::<String, _>("contract_address"))
×
UNCOV
927
            .bind(r.get::<String, _>("token_id"))
×
928
            .fetch_all(&self.pool)
×
929
            .await?;
×
930

931
            let mut pins: Vec<PinInfo> = Vec::new();
×
932
            let mut chain = String::new();
×
933
            let mut contract_address = String::new();
×
934
            let mut token_id = String::new();
×
935
            for row in token_rows {
×
UNCOV
936
                chain = row.get("chain");
×
UNCOV
937
                contract_address = row.get("contract_address");
×
938
                token_id = row.get("token_id");
×
939
                let cid: String = row.get("cid");
×
940
                let provider_type: String = row.get("provider_type");
×
941
                let provider_url: String = row
×
942
                    .try_get::<Option<String>, _>("provider_url")
943
                    .ok()
944
                    .flatten()
945
                    .unwrap_or_default();
946
                let status: String = row.get("pin_status");
×
UNCOV
947
                let created_at: DateTime<Utc> = row.get("created_at");
×
UNCOV
948
                pins.push(PinInfo {
×
UNCOV
949
                    cid,
×
950
                    provider_type,
×
UNCOV
951
                    provider_url,
×
UNCOV
952
                    status,
×
UNCOV
953
                    created_at,
×
954
                });
955
            }
UNCOV
956
            result.push(TokenWithPins {
×
957
                chain,
×
958
                contract_address,
×
959
                token_id,
×
960
                pins,
×
961
            });
962
        }
963

964
        Ok((result, total))
×
965
    }
966

967
    /// Get a specific pinned token for a requestor
968
    pub async fn get_pinned_token_by_requestor(
×
969
        &self,
970
        requestor: &str,
971
        chain: &str,
972
        contract_address: &str,
973
        token_id: &str,
974
    ) -> Result<Option<TokenWithPins>, sqlx::Error> {
975
        let query = r#"
×
976
            SELECT pt.chain, pt.contract_address, pt.token_id,
×
UNCOV
977
                   p.cid, p.provider_type, p.provider_url, p.pin_status, pt.created_at
×
978
            FROM pinned_tokens pt
×
979
            JOIN pins p ON p.id = pt.pin_id
×
UNCOV
980
            JOIN backup_tasks bt ON bt.task_id = p.task_id
×
UNCOV
981
            WHERE bt.requestor = $1
×
982
              AND pt.chain = $2
×
983
              AND pt.contract_address = $3
×
984
              AND pt.token_id = $4
×
985
            ORDER BY pt.created_at DESC
×
UNCOV
986
        "#;
×
987

988
        let rows = sqlx::query(query)
×
989
            .bind(requestor)
×
990
            .bind(chain)
×
991
            .bind(contract_address)
×
992
            .bind(token_id)
×
UNCOV
993
            .fetch_all(&self.pool)
×
994
            .await?;
×
995

UNCOV
996
        if rows.is_empty() {
×
UNCOV
997
            return Ok(None);
×
998
        }
999

1000
        let mut pins = Vec::new();
×
UNCOV
1001
        let mut token_chain = String::new();
×
1002
        let mut token_contract_address = String::new();
×
1003
        let mut token_token_id = String::new();
×
1004

1005
        for row in rows {
×
1006
            token_chain = row.get("chain");
×
1007
            token_contract_address = row.get("contract_address");
×
UNCOV
1008
            token_token_id = row.get("token_id");
×
UNCOV
1009
            let cid: String = row.get("cid");
×
UNCOV
1010
            let provider_type: String = row.get("provider_type");
×
1011
            // provider_url may be NULL for legacy rows; default to empty string for API stability
1012
            let provider_url: String = row
×
1013
                .try_get::<Option<String>, _>("provider_url")
1014
                .ok()
1015
                .flatten()
1016
                .unwrap_or_default();
UNCOV
1017
            let status: String = row.get("pin_status");
×
UNCOV
1018
            let created_at: DateTime<Utc> = row.get("created_at");
×
1019

UNCOV
1020
            pins.push(PinInfo {
×
1021
                cid,
×
UNCOV
1022
                provider_type,
×
UNCOV
1023
                provider_url,
×
UNCOV
1024
                status,
×
UNCOV
1025
                created_at,
×
1026
            });
1027
        }
1028

UNCOV
1029
        Ok(Some(TokenWithPins {
×
1030
            chain: token_chain,
×
1031
            contract_address: token_contract_address,
×
UNCOV
1032
            token_id: token_token_id,
×
1033
            pins,
×
1034
        }))
1035
    }
1036

1037
    /// Get all pins that are in 'queued' or 'pinning' status
1038
    /// This is used by the pin monitor to check for status updates
1039
    pub async fn get_active_pins(&self) -> Result<Vec<PinRow>, sqlx::Error> {
×
1040
        let rows = sqlx::query(
1041
            r#"
1042
            SELECT id, task_id, provider_type, provider_url, cid, request_id, pin_status, created_at
1043
            FROM pins
1044
            WHERE pin_status IN ('queued', 'pinning')
1045
            ORDER BY id
1046
            "#,
1047
        )
1048
        .fetch_all(&self.pool)
×
UNCOV
1049
        .await?;
×
1050

UNCOV
1051
        Ok(rows
×
1052
            .into_iter()
×
UNCOV
1053
            .map(|row| PinRow {
×
UNCOV
1054
                id: row.get("id"),
×
UNCOV
1055
                task_id: row.get("task_id"),
×
UNCOV
1056
                provider_type: row.get("provider_type"),
×
UNCOV
1057
                provider_url: row
×
UNCOV
1058
                    .try_get::<Option<String>, _>("provider_url")
×
UNCOV
1059
                    .ok()
×
UNCOV
1060
                    .flatten(),
×
UNCOV
1061
                cid: row.get("cid"),
×
UNCOV
1062
                request_id: row.get("request_id"),
×
UNCOV
1063
                pin_status: row.get("pin_status"),
×
1064
                created_at: row.get("created_at"),
×
1065
            })
1066
            .collect())
×
1067
    }
1068

1069
    /// Set backup fatal error for relevant subresources in a single SQL statement.
1070
    /// The update is based on the `storage_mode` value from the `backup_tasks` table for the given `task_id`:
1071
    /// - If storage_mode is 'archive' or 'full': updates archive_requests.status and archive_requests.fatal_error
1072
    /// - If storage_mode is 'ipfs' or 'full': updates pin_requests.status and pin_requests.fatal_error
UNCOV
1073
    pub async fn set_backup_error(
×
1074
        &self,
1075
        task_id: &str,
1076
        fatal_error: &str,
1077
    ) -> Result<(), sqlx::Error> {
UNCOV
1078
        let sql = r#"
×
UNCOV
1079
            WITH task_mode AS (
×
1080
                SELECT storage_mode FROM backup_tasks WHERE task_id = $1
×
1081
            ),
1082
            upd_archive AS (
×
UNCOV
1083
                UPDATE archive_requests ar
×
1084
                SET status = 'error', fatal_error = $2
×
1085
                WHERE ar.task_id = $1
×
1086
                  AND EXISTS (
×
1087
                      SELECT 1 FROM task_mode tm
×
1088
                      WHERE tm.storage_mode IN ('archive', 'full')
×
1089
                  )
1090
                RETURNING 1
×
1091
            ),
1092
            upd_pins AS (
×
UNCOV
1093
                UPDATE pin_requests pr
×
1094
                SET status = 'error', fatal_error = $2
×
1095
                WHERE pr.task_id = $1
×
1096
                  AND EXISTS (
×
1097
                      SELECT 1 FROM task_mode tm
×
1098
                      WHERE tm.storage_mode IN ('ipfs', 'full')
×
1099
                  )
1100
                RETURNING 1
×
1101
            )
1102
            SELECT COALESCE((SELECT COUNT(*) FROM upd_archive), 0) AS archive_updates,
×
UNCOV
1103
                   COALESCE((SELECT COUNT(*) FROM upd_pins), 0)     AS pin_updates
×
1104
        "#;
×
1105
        sqlx::query(sql)
×
1106
            .bind(task_id)
×
1107
            .bind(fatal_error)
×
1108
            .execute(&self.pool)
×
1109
            .await?;
×
1110
        Ok(())
×
1111
    }
1112

1113
    /// Update backup subresource statuses for the task based on its storage mode
1114
    /// - archive or full: updates archive_requests.status
1115
    /// - ipfs or full: updates pin_requests.status
UNCOV
1116
    pub async fn update_backup_statuses(
×
1117
        &self,
1118
        task_id: &str,
1119
        scope: &str,
1120
        archive_status: &str,
1121
        ipfs_status: &str,
1122
    ) -> Result<(), sqlx::Error> {
UNCOV
1123
        let sql = r#"
×
UNCOV
1124
            WITH upd_archive AS (
×
1125
                UPDATE archive_requests ar
×
1126
                SET status = $2
×
1127
                WHERE ar.task_id = $1
×
1128
                  AND ($4 IN ('archive', 'full'))
×
1129
                RETURNING 1
×
1130
            ),
1131
            upd_pins AS (
×
UNCOV
1132
                UPDATE pin_requests pr
×
1133
                SET status = $3
×
1134
                WHERE pr.task_id = $1
×
1135
                  AND ($4 IN ('ipfs', 'full'))
×
1136
                RETURNING 1
×
1137
            )
1138
            SELECT COALESCE((SELECT COUNT(*) FROM upd_archive), 0) AS archive_updates,
×
UNCOV
1139
                   COALESCE((SELECT COUNT(*) FROM upd_pins), 0)     AS pin_updates
×
1140
        "#;
×
1141
        sqlx::query(sql)
×
1142
            .bind(task_id)
×
1143
            .bind(archive_status)
×
1144
            .bind(ipfs_status)
×
1145
            .bind(scope)
×
1146
            .execute(&self.pool)
×
1147
            .await?;
×
1148
        Ok(())
×
1149
    }
1150

NEW
1151
    pub async fn update_pin_statuses(&self, updates: &[(i64, String)]) -> Result<(), sqlx::Error> {
×
UNCOV
1152
        if updates.is_empty() {
×
UNCOV
1153
            return Ok(());
×
1154
        }
1155

UNCOV
1156
        let mut tx = self.pool.begin().await?;
×
1157

UNCOV
1158
        for (id, status) in updates {
×
1159
            sqlx::query(
1160
                r#"
1161
                UPDATE pins
1162
                SET pin_status = $2
1163
                WHERE id = $1
1164
                "#,
1165
            )
UNCOV
1166
            .bind(id)
×
UNCOV
1167
            .bind(status)
×
UNCOV
1168
            .execute(&mut *tx)
×
UNCOV
1169
            .await?;
×
1170
        }
1171

UNCOV
1172
        tx.commit().await?;
×
UNCOV
1173
        Ok(())
×
1174
    }
1175

1176
    /// Complete archive deletion by updating storage mode to ipfs
NEW
1177
    pub async fn complete_archive_request_deletion(
×
1178
        &self,
1179
        task_id: &str,
1180
    ) -> Result<(), sqlx::Error> {
UNCOV
1181
        sqlx::query!(
×
1182
            r#"
1183
            UPDATE backup_tasks
1184
            SET storage_mode = 'ipfs', updated_at = NOW()
1185
            WHERE task_id = $1
1186
            "#,
1187
            task_id
1188
        )
UNCOV
1189
        .execute(&self.pool)
×
UNCOV
1190
        .await?;
×
1191
        Ok(())
×
1192
    }
1193

1194
    /// Complete IPFS pins deletion by updating storage mode to archive
NEW
1195
    pub async fn complete_pin_request_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
UNCOV
1196
        sqlx::query!(
×
1197
            r#"
1198
            UPDATE backup_tasks
1199
            SET storage_mode = 'archive', updated_at = NOW()
1200
            WHERE task_id = $1
1201
            "#,
1202
            task_id
1203
        )
UNCOV
1204
        .execute(&self.pool)
×
UNCOV
1205
        .await?;
×
UNCOV
1206
        Ok(())
×
1207
    }
1208
}
1209

1210
// Implement the unified Database trait for the real Db struct
1211
#[async_trait::async_trait]
1212
impl Database for Db {
1213
    // Backup task operations
1214
    async fn insert_backup_task(
1215
        &self,
1216
        task_id: &str,
1217
        requestor: &str,
1218
        nft_count: i32,
1219
        tokens: &serde_json::Value,
1220
        storage_mode: &str,
1221
        archive_format: Option<&str>,
1222
        retention_days: Option<u64>,
1223
    ) -> Result<(), sqlx::Error> {
1224
        Db::insert_backup_task(
UNCOV
1225
            self,
×
1226
            task_id,
×
1227
            requestor,
×
1228
            nft_count,
×
UNCOV
1229
            tokens,
×
UNCOV
1230
            storage_mode,
×
UNCOV
1231
            archive_format,
×
UNCOV
1232
            retention_days,
×
1233
        )
UNCOV
1234
        .await
×
1235
    }
1236

UNCOV
1237
    async fn get_backup_task(&self, task_id: &str) -> Result<Option<BackupTask>, sqlx::Error> {
×
UNCOV
1238
        Db::get_backup_task(self, task_id).await
×
1239
    }
1240

UNCOV
1241
    async fn delete_backup_task(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
UNCOV
1242
        Db::delete_backup_task(self, task_id).await
×
1243
    }
1244

UNCOV
1245
    async fn get_incomplete_backup_tasks(&self) -> Result<Vec<BackupTask>, sqlx::Error> {
×
UNCOV
1246
        Db::get_incomplete_backup_tasks(self).await
×
1247
    }
1248

1249
    async fn list_requestor_backup_tasks_paginated(
1250
        &self,
1251
        requestor: &str,
1252
        include_tokens: bool,
1253
        limit: i64,
1254
        offset: i64,
1255
    ) -> Result<(Vec<BackupTask>, u32), sqlx::Error> {
1256
        Db::list_requestor_backup_tasks_paginated(self, requestor, include_tokens, limit, offset)
×
UNCOV
1257
            .await
×
1258
    }
1259

1260
    async fn list_unprocessed_expired_backups(&self) -> Result<Vec<ExpiredBackup>, sqlx::Error> {
×
UNCOV
1261
        Db::list_unprocessed_expired_backups(self).await
×
1262
    }
1263

1264
    // Backup task status and error operations
UNCOV
1265
    async fn clear_backup_errors(&self, task_id: &str, scope: &str) -> Result<(), sqlx::Error> {
×
UNCOV
1266
        Db::clear_backup_errors(self, task_id, scope).await
×
1267
    }
1268

UNCOV
1269
    async fn set_backup_error(&self, task_id: &str, error: &str) -> Result<(), sqlx::Error> {
×
UNCOV
1270
        Db::set_backup_error(self, task_id, error).await
×
1271
    }
1272

1273
    async fn set_error_logs(
1274
        &self,
1275
        task_id: &str,
1276
        archive_error_log: Option<&str>,
1277
        ipfs_error_log: Option<&str>,
1278
    ) -> Result<(), sqlx::Error> {
1279
        Db::set_error_logs(self, task_id, archive_error_log, ipfs_error_log).await
×
1280
    }
1281

1282
    async fn update_archive_request_error_log(
1283
        &self,
1284
        task_id: &str,
1285
        error_log: &str,
1286
    ) -> Result<(), sqlx::Error> {
1287
        Db::update_archive_request_error_log(self, task_id, error_log).await
×
1288
    }
1289

1290
    async fn update_pin_request_error_log(
1291
        &self,
1292
        task_id: &str,
1293
        error_log: &str,
1294
    ) -> Result<(), sqlx::Error> {
UNCOV
1295
        Db::update_pin_request_error_log(self, task_id, error_log).await
×
1296
    }
1297

1298
    async fn set_archive_request_error(
1299
        &self,
1300
        task_id: &str,
1301
        fatal_error: &str,
1302
    ) -> Result<(), sqlx::Error> {
UNCOV
1303
        Db::set_archive_request_error(self, task_id, fatal_error).await
×
1304
    }
1305

1306
    async fn set_pin_request_error(
1307
        &self,
1308
        task_id: &str,
1309
        fatal_error: &str,
1310
    ) -> Result<(), sqlx::Error> {
UNCOV
1311
        Db::set_pin_request_error(self, task_id, fatal_error).await
×
1312
    }
1313

1314
    // Status update operations
1315
    async fn update_archive_request_status(
1316
        &self,
1317
        task_id: &str,
1318
        status: &str,
1319
    ) -> Result<(), sqlx::Error> {
UNCOV
1320
        Db::update_archive_request_status(self, task_id, status).await
×
1321
    }
1322

1323
    async fn update_pin_request_status(
1324
        &self,
1325
        task_id: &str,
1326
        status: &str,
1327
    ) -> Result<(), sqlx::Error> {
UNCOV
1328
        Db::update_pin_request_status(self, task_id, status).await
×
1329
    }
1330

1331
    async fn update_backup_statuses(
1332
        &self,
1333
        task_id: &str,
1334
        scope: &str,
1335
        archive_status: &str,
1336
        ipfs_status: &str,
1337
    ) -> Result<(), sqlx::Error> {
UNCOV
1338
        Db::update_backup_statuses(self, task_id, scope, archive_status, ipfs_status).await
×
1339
    }
1340

1341
    async fn update_archive_request_statuses(
1342
        &self,
1343
        task_ids: &[String],
1344
        status: &str,
1345
    ) -> Result<(), sqlx::Error> {
UNCOV
1346
        Db::update_archive_request_statuses(self, task_ids, status).await
×
1347
    }
1348

1349
    // Deletion operations
1350
    async fn start_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
UNCOV
1351
        Db::start_deletion(self, task_id).await
×
1352
    }
1353

UNCOV
1354
    async fn start_archive_request_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
UNCOV
1355
        Db::start_archive_request_deletion(self, task_id).await
×
1356
    }
1357

UNCOV
1358
    async fn start_pin_request_deletions(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
UNCOV
1359
        Db::start_pin_request_deletions(self, task_id).await
×
1360
    }
1361

NEW
1362
    async fn complete_archive_request_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
NEW
1363
        Db::complete_archive_request_deletion(self, task_id).await
×
1364
    }
1365

NEW
1366
    async fn complete_pin_request_deletion(&self, task_id: &str) -> Result<(), sqlx::Error> {
×
NEW
1367
        Db::complete_pin_request_deletion(self, task_id).await
×
1368
    }
1369

1370
    // Retry operations
UNCOV
1371
    async fn retry_backup(&self, task_id: &str, retention_days: u64) -> Result<(), sqlx::Error> {
×
UNCOV
1372
        Db::retry_backup(self, task_id, retention_days).await
×
1373
    }
1374

1375
    // Pin operations
1376
    async fn insert_pins_with_tokens(
1377
        &self,
1378
        task_id: &str,
1379
        token_pin_mappings: &[crate::TokenPinMapping],
1380
    ) -> Result<(), sqlx::Error> {
1381
        Db::insert_pins_with_tokens(self, task_id, token_pin_mappings).await
×
1382
    }
1383

1384
    async fn get_pins_by_task_id(&self, task_id: &str) -> Result<Vec<PinRow>, sqlx::Error> {
×
1385
        Db::get_pins_by_task_id(self, task_id).await
×
1386
    }
1387

1388
    async fn get_active_pins(&self) -> Result<Vec<PinRow>, sqlx::Error> {
×
1389
        Db::get_active_pins(self).await
×
1390
    }
1391

NEW
1392
    async fn update_pin_statuses(&self, updates: &[(i64, String)]) -> Result<(), sqlx::Error> {
×
NEW
1393
        Db::update_pin_statuses(self, updates).await
×
1394
    }
1395

1396
    // Pinned tokens operations
1397
    async fn get_pinned_tokens_by_requestor(
1398
        &self,
1399
        requestor: &str,
1400
        limit: i64,
1401
        offset: i64,
1402
    ) -> Result<(Vec<TokenWithPins>, u32), sqlx::Error> {
UNCOV
1403
        Db::get_pinned_tokens_by_requestor(self, requestor, limit, offset).await
×
1404
    }
1405

1406
    async fn get_pinned_token_by_requestor(
1407
        &self,
1408
        requestor: &str,
1409
        chain: &str,
1410
        contract_address: &str,
1411
        token_id: &str,
1412
    ) -> Result<Option<TokenWithPins>, sqlx::Error> {
UNCOV
1413
        Db::get_pinned_token_by_requestor(self, requestor, chain, contract_address, token_id).await
×
1414
    }
1415
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc