• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

payjoin / rust-payjoin / 17840270211

18 Sep 2025 08:14PM UTC coverage: 79.013% (-5.6%) from 84.625%
17840270211

Pull #1091

github

web-flow
Merge afe69a929 into 0859949d5
Pull Request #1091: Add pki-https feature to prevent local cert validation on all-features

3 of 4 new or added lines in 1 file covered. (75.0%)

539 existing lines in 9 files now uncovered.

7556 of 9563 relevant lines covered (79.01%)

486.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/payjoin-cli/src/db/v2.rs
1
use std::sync::Arc;
2

3
use payjoin::persist::SessionPersister;
4
use payjoin::receive::v2::SessionEvent as ReceiverSessionEvent;
5
use payjoin::send::v2::SessionEvent as SenderSessionEvent;
6
use payjoin::HpkePublicKey;
7
use rusqlite::params;
8

9
use super::*;
10

11
#[derive(Debug, Clone)]
12
pub(crate) struct SessionId(i64);
13

14
impl core::ops::Deref for SessionId {
15
    type Target = i64;
UNCOV
16
    fn deref(&self) -> &Self::Target { &self.0 }
×
17
}
18

19
impl std::fmt::Display for SessionId {
20
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.0) }
×
21
}
22

23
#[derive(Clone)]
24
pub(crate) struct SenderPersister {
25
    db: Arc<Database>,
26
    session_id: SessionId,
27
}
28

29
impl SenderPersister {
UNCOV
30
    pub fn new(db: Arc<Database>, receiver_pubkey: HpkePublicKey) -> crate::db::Result<Self> {
×
UNCOV
31
        let conn = db.get_connection()?;
×
32

33
        // Create a new session in send_sessions and get its ID
UNCOV
34
        let session_id: i64 = conn.query_row(
×
UNCOV
35
            "INSERT INTO send_sessions (session_id, receiver_pubkey) VALUES (NULL, ?1) RETURNING session_id",
×
UNCOV
36
            params![receiver_pubkey.to_compressed_bytes()],
×
UNCOV
37
            |row| row.get(0),
×
38
        )?;
×
39

UNCOV
40
        Ok(Self { db, session_id: SessionId(session_id) })
×
UNCOV
41
    }
×
42

UNCOV
43
    pub fn from_id(db: Arc<Database>, id: SessionId) -> Self { Self { db, session_id: id } }
×
44
}
45

46
impl SessionPersister for SenderPersister {
47
    type SessionEvent = SenderSessionEvent;
48
    type InternalStorageError = crate::db::error::Error;
49

UNCOV
50
    fn save_event(
×
UNCOV
51
        &self,
×
UNCOV
52
        event: SenderSessionEvent,
×
UNCOV
53
    ) -> std::result::Result<(), Self::InternalStorageError> {
×
UNCOV
54
        let conn = self.db.get_connection()?;
×
UNCOV
55
        let event_data = serde_json::to_string(&event).map_err(Error::Serialize)?;
×
56

UNCOV
57
        conn.execute(
×
UNCOV
58
            "INSERT INTO send_session_events (session_id, event_data, created_at) VALUES (?1, ?2, ?3)",
×
UNCOV
59
            params![*self.session_id, event_data, now()],
×
UNCOV
60
        )?;
×
61

UNCOV
62
        Ok(())
×
UNCOV
63
    }
×
64

UNCOV
65
    fn load(
×
UNCOV
66
        &self,
×
UNCOV
67
    ) -> std::result::Result<Box<dyn Iterator<Item = SenderSessionEvent>>, Self::InternalStorageError>
×
68
    {
UNCOV
69
        let conn = self.db.get_connection()?;
×
UNCOV
70
        let mut stmt = conn.prepare(
×
UNCOV
71
            "SELECT event_data FROM send_session_events WHERE session_id = ?1 ORDER BY created_at ASC",
×
UNCOV
72
        )?;
×
73

UNCOV
74
        let event_rows = stmt.query_map(params![*self.session_id], |row| {
×
UNCOV
75
            let event_data: String = row.get(0)?;
×
UNCOV
76
            Ok(event_data)
×
UNCOV
77
        })?;
×
78

UNCOV
79
        let events: Vec<SenderSessionEvent> = event_rows
×
UNCOV
80
            .map(|row| {
×
UNCOV
81
                let event_data = row.expect("Failed to read event data from database");
×
UNCOV
82
                serde_json::from_str::<SenderSessionEvent>(&event_data)
×
UNCOV
83
                    .expect("Database corruption: failed to deserialize session event")
×
UNCOV
84
            })
×
UNCOV
85
            .collect();
×
86

UNCOV
87
        Ok(Box::new(events.into_iter()))
×
UNCOV
88
    }
×
89

UNCOV
90
    fn close(&self) -> std::result::Result<(), Self::InternalStorageError> {
×
UNCOV
91
        let conn = self.db.get_connection()?;
×
92

UNCOV
93
        conn.execute(
×
UNCOV
94
            "UPDATE send_sessions SET completed_at = ?1 WHERE session_id = ?2",
×
UNCOV
95
            params![now(), *self.session_id],
×
UNCOV
96
        )?;
×
97

UNCOV
98
        Ok(())
×
UNCOV
99
    }
×
100
}
101

102
#[derive(Clone)]
103
pub(crate) struct ReceiverPersister {
104
    db: Arc<Database>,
105
    session_id: SessionId,
106
}
107

108
impl ReceiverPersister {
UNCOV
109
    pub fn new(db: Arc<Database>) -> crate::db::Result<Self> {
×
UNCOV
110
        let conn = db.get_connection()?;
×
111

112
        // Create a new session in receive_sessions and get its ID
UNCOV
113
        let session_id: i64 = conn.query_row(
×
UNCOV
114
            "INSERT INTO receive_sessions (session_id) VALUES (NULL) RETURNING session_id",
×
UNCOV
115
            [],
×
UNCOV
116
            |row| row.get(0),
×
117
        )?;
×
118

UNCOV
119
        Ok(Self { db, session_id: SessionId(session_id) })
×
UNCOV
120
    }
×
121

UNCOV
122
    pub fn from_id(db: Arc<Database>, id: SessionId) -> Self { Self { db, session_id: id } }
×
123
}
124

125
impl SessionPersister for ReceiverPersister {
126
    type SessionEvent = ReceiverSessionEvent;
127
    type InternalStorageError = crate::db::error::Error;
128

UNCOV
129
    fn save_event(
×
UNCOV
130
        &self,
×
UNCOV
131
        event: ReceiverSessionEvent,
×
UNCOV
132
    ) -> std::result::Result<(), Self::InternalStorageError> {
×
UNCOV
133
        let conn = self.db.get_connection()?;
×
UNCOV
134
        let event_data = serde_json::to_string(&event).map_err(Error::Serialize)?;
×
135

UNCOV
136
        conn.execute(
×
UNCOV
137
            "INSERT INTO receive_session_events (session_id, event_data, created_at) VALUES (?1, ?2, ?3)",
×
UNCOV
138
            params![*self.session_id, event_data, now()],
×
UNCOV
139
        )?;
×
140

UNCOV
141
        Ok(())
×
UNCOV
142
    }
×
143

UNCOV
144
    fn load(
×
UNCOV
145
        &self,
×
UNCOV
146
    ) -> std::result::Result<
×
UNCOV
147
        Box<dyn Iterator<Item = ReceiverSessionEvent>>,
×
UNCOV
148
        Self::InternalStorageError,
×
UNCOV
149
    > {
×
UNCOV
150
        let conn = self.db.get_connection()?;
×
UNCOV
151
        let mut stmt = conn.prepare(
×
UNCOV
152
            "SELECT event_data FROM receive_session_events WHERE session_id = ?1 ORDER BY created_at ASC",
×
UNCOV
153
        )?;
×
154

UNCOV
155
        let event_rows = stmt.query_map(params![*self.session_id], |row| {
×
UNCOV
156
            let event_data: String = row.get(0)?;
×
UNCOV
157
            Ok(event_data)
×
UNCOV
158
        })?;
×
159

UNCOV
160
        let events: Vec<ReceiverSessionEvent> = event_rows
×
UNCOV
161
            .map(|row| {
×
UNCOV
162
                let event_data = row.expect("Failed to read event data from database");
×
UNCOV
163
                serde_json::from_str::<ReceiverSessionEvent>(&event_data)
×
UNCOV
164
                    .expect("Database corruption: failed to deserialize session event")
×
UNCOV
165
            })
×
UNCOV
166
            .collect();
×
167

UNCOV
168
        Ok(Box::new(events.into_iter()))
×
UNCOV
169
    }
×
170

UNCOV
171
    fn close(&self) -> std::result::Result<(), Self::InternalStorageError> {
×
UNCOV
172
        let conn = self.db.get_connection()?;
×
173

UNCOV
174
        conn.execute(
×
UNCOV
175
            "UPDATE receive_sessions SET completed_at = ?1 WHERE session_id = ?2",
×
UNCOV
176
            params![now(), *self.session_id],
×
UNCOV
177
        )?;
×
178

UNCOV
179
        Ok(())
×
UNCOV
180
    }
×
181
}
182

183
impl Database {
UNCOV
184
    pub(crate) fn get_recv_session_ids(&self) -> Result<Vec<SessionId>> {
×
UNCOV
185
        let conn = self.get_connection()?;
×
UNCOV
186
        let mut stmt =
×
UNCOV
187
            conn.prepare("SELECT session_id FROM receive_sessions WHERE completed_at IS NULL")?;
×
188

UNCOV
189
        let session_rows = stmt.query_map([], |row| {
×
UNCOV
190
            let session_id: i64 = row.get(0)?;
×
UNCOV
191
            Ok(SessionId(session_id))
×
UNCOV
192
        })?;
×
193

UNCOV
194
        let mut session_ids = Vec::new();
×
UNCOV
195
        for session_row in session_rows {
×
UNCOV
196
            let session_id = session_row?;
×
UNCOV
197
            session_ids.push(session_id);
×
198
        }
199

UNCOV
200
        Ok(session_ids)
×
UNCOV
201
    }
×
202

UNCOV
203
    pub(crate) fn get_send_session_ids(&self) -> Result<Vec<SessionId>> {
×
UNCOV
204
        let conn = self.get_connection()?;
×
UNCOV
205
        let mut stmt =
×
UNCOV
206
            conn.prepare("SELECT session_id FROM send_sessions WHERE completed_at IS NULL")?;
×
207

UNCOV
208
        let session_rows = stmt.query_map([], |row| {
×
UNCOV
209
            let session_id: i64 = row.get(0)?;
×
UNCOV
210
            Ok(SessionId(session_id))
×
UNCOV
211
        })?;
×
212

UNCOV
213
        let mut session_ids = Vec::new();
×
UNCOV
214
        for session_row in session_rows {
×
UNCOV
215
            let session_id = session_row?;
×
UNCOV
216
            session_ids.push(session_id);
×
217
        }
218

UNCOV
219
        Ok(session_ids)
×
UNCOV
220
    }
×
221

UNCOV
222
    pub(crate) fn get_send_session_receiver_pk(
×
UNCOV
223
        &self,
×
UNCOV
224
        session_id: &SessionId,
×
UNCOV
225
    ) -> Result<HpkePublicKey> {
×
UNCOV
226
        let conn = self.get_connection()?;
×
UNCOV
227
        let mut stmt =
×
UNCOV
228
            conn.prepare("SELECT receiver_pubkey FROM send_sessions WHERE session_id = ?1")?;
×
UNCOV
229
        let receiver_pubkey: Vec<u8> = stmt.query_row(params![session_id.0], |row| row.get(0))?;
×
UNCOV
230
        Ok(HpkePublicKey::from_compressed_bytes(&receiver_pubkey).expect("Valid receiver pubkey"))
×
UNCOV
231
    }
×
232

233
    pub(crate) fn get_inactive_send_session_ids(&self) -> Result<Vec<(SessionId, u64)>> {
×
234
        let conn = self.get_connection()?;
×
235
        let mut stmt = conn.prepare(
×
236
            "SELECT session_id, completed_at FROM send_sessions WHERE completed_at IS NOT NULL",
×
237
        )?;
×
238
        let session_rows = stmt.query_map([], |row| {
×
239
            let session_id: i64 = row.get(0)?;
×
240
            let completed_at: u64 = row.get(1)?;
×
241
            Ok((SessionId(session_id), completed_at))
×
242
        })?;
×
243

244
        let mut session_ids = Vec::new();
×
245
        for session_row in session_rows {
×
246
            let (session_id, completed_at) = session_row?;
×
247
            session_ids.push((session_id, completed_at));
×
248
        }
249
        Ok(session_ids)
×
250
    }
×
251

252
    pub(crate) fn get_inactive_recv_session_ids(&self) -> Result<Vec<(SessionId, u64)>> {
×
253
        let conn = self.get_connection()?;
×
254
        let mut stmt = conn.prepare(
×
255
            "SELECT session_id, completed_at FROM receive_sessions WHERE completed_at IS NOT NULL",
×
256
        )?;
×
257
        let session_rows = stmt.query_map([], |row| {
×
258
            let session_id: i64 = row.get(0)?;
×
259
            let completed_at: u64 = row.get(1)?;
×
260
            Ok((SessionId(session_id), completed_at))
×
261
        })?;
×
262

263
        let mut session_ids = Vec::new();
×
264
        for session_row in session_rows {
×
265
            let (session_id, completed_at) = session_row?;
×
266
            session_ids.push((session_id, completed_at));
×
267
        }
268
        Ok(session_ids)
×
269
    }
×
270
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc