• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

payjoin / rust-payjoin / 19249965305

10 Nov 2025 11:52PM UTC coverage: 83.511% (-0.07%) from 83.578%
19249965305

Pull #1158

github

web-flow
Merge 735d4f5c5 into 7d9e7f556
Pull Request #1158: Add completed_event_id FK to prevent session replay

43 of 60 new or added lines in 3 files covered. (71.67%)

27 existing lines in 1 file now uncovered.

9020 of 10801 relevant lines covered (83.51%)

458.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.34
/payjoin-cli/src/db/v2.rs
1
use std::sync::Arc;
2

3
use payjoin::persist::SessionPersister;
4
use payjoin::receive::v2::SessionEvent as ReceiverSessionEvent;
5
use payjoin::send::v2::SessionEvent as SenderSessionEvent;
6
use payjoin::HpkePublicKey;
7
use rusqlite::params;
8

9
use super::*;
10

11
#[derive(Debug, Clone)]
12
pub(crate) struct SessionId(i64);
13

14
impl core::ops::Deref for SessionId {
15
    type Target = i64;
16
    fn deref(&self) -> &Self::Target { &self.0 }
22✔
17
}
18

19
impl std::fmt::Display for SessionId {
20
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.0) }
×
21
}
22

23
#[derive(Clone)]
24
pub(crate) struct SenderPersister {
25
    db: Arc<Database>,
26
    session_id: SessionId,
27
}
28

29
impl SenderPersister {
30
    pub fn new(db: Arc<Database>, receiver_pubkey: HpkePublicKey) -> crate::db::Result<Self> {
1✔
31
        let conn = db.get_connection()?;
1✔
32

33
        // Create a new session in send_sessions and get its ID
34
        let session_id: i64 = conn.query_row(
1✔
35
            "INSERT INTO send_sessions (session_id, receiver_pubkey) VALUES (NULL, ?1) RETURNING session_id",
1✔
36
            params![receiver_pubkey.to_compressed_bytes()],
1✔
37
            |row| row.get(0),
1✔
38
        )?;
×
39

40
        Ok(Self { db, session_id: SessionId(session_id) })
1✔
41
    }
1✔
42

43
    pub fn from_id(db: Arc<Database>, id: SessionId) -> Self { Self { db, session_id: id } }
1✔
44
}
45

46
impl SessionPersister for SenderPersister {
47
    type SessionEvent = SenderSessionEvent;
48
    type InternalStorageError = crate::db::error::Error;
49

50
    fn save_event(
3✔
51
        &self,
3✔
52
        event: SenderSessionEvent,
3✔
53
    ) -> std::result::Result<(), Self::InternalStorageError> {
3✔
54
        let conn = self.db.get_connection()?;
3✔
55
        let event_data = serde_json::to_string(&event).map_err(Error::Serialize)?;
3✔
56

57
        conn.execute(
3✔
58
            "INSERT INTO send_session_events (session_id, event_data, created_at) VALUES (?1, ?2, ?3)",
3✔
59
            params![*self.session_id, event_data, now()],
3✔
60
        )?;
3✔
61

62
        Ok(())
3✔
63
    }
3✔
64

65
    fn load(
1✔
66
        &self,
1✔
67
    ) -> std::result::Result<Box<dyn Iterator<Item = SenderSessionEvent>>, Self::InternalStorageError>
1✔
68
    {
69
        let conn = self.db.get_connection()?;
1✔
70
        let mut stmt = conn.prepare(
1✔
71
            "SELECT event_data FROM send_session_events WHERE session_id = ?1 ORDER BY id ASC",
1✔
72
        )?;
1✔
73

74
        let event_rows = stmt.query_map(params![*self.session_id], |row| {
2✔
75
            let event_data: String = row.get(0)?;
2✔
76
            Ok(event_data)
2✔
77
        })?;
2✔
78

79
        let events: Vec<SenderSessionEvent> = event_rows
1✔
80
            .map(|row| {
2✔
81
                let event_data = row.expect("Failed to read event data from database");
2✔
82
                serde_json::from_str::<SenderSessionEvent>(&event_data)
2✔
83
                    .expect("Database corruption: failed to deserialize session event")
2✔
84
            })
2✔
85
            .collect();
1✔
86

87
        Ok(Box::new(events.into_iter()))
1✔
88
    }
1✔
89

90
    fn close(&self) -> std::result::Result<(), Self::InternalStorageError> {
1✔
91
        let conn = self.db.get_connection()?;
1✔
92

93
        let mut stmt = conn.prepare(
1✔
94
            "SELECT id, event_data FROM send_session_events 
1✔
95
             WHERE session_id = ?1 
1✔
96
             ORDER BY id DESC 
1✔
97
             LIMIT 1",
1✔
98
        )?;
1✔
99

100
        let (id, event_data) = match stmt.query_row(params![*self.session_id], |row| {
1✔
101
            let id: i64 = row.get(0)?;
1✔
102
            let event_data: String = row.get(1)?;
1✔
103
            Ok((id, event_data))
1✔
104
        }) {
1✔
105
            Ok(data) => data,
1✔
106
            Err(rusqlite::Error::QueryReturnedNoRows) =>
NEW
107
                return Err(Error::MissingCompletedEventId { session_id: *self.session_id }),
×
NEW
UNCOV
108
            Err(e) => return Err(Error::Rusqlite(e)),
×
109
        };
110

111
        let event: SenderSessionEvent = serde_json::from_str(&event_data)?;
1✔
112

113
        let completed_event_id = match event {
1✔
114
            SenderSessionEvent::Closed(_) => id,
1✔
NEW
UNCOV
115
            _ => return Err(Error::MissingCompletedEventId { session_id: *self.session_id }),
×
116
        };
117

118
        conn.execute(
1✔
119
            "UPDATE send_sessions SET completed_event_id = ?1 WHERE session_id = ?2",
1✔
120
            params![completed_event_id, *self.session_id],
1✔
121
        )?;
1✔
122

123
        Ok(())
1✔
124
    }
1✔
125
}
126

127
#[derive(Clone)]
128
pub(crate) struct ReceiverPersister {
129
    db: Arc<Database>,
130
    session_id: SessionId,
131
}
132

133
impl ReceiverPersister {
134
    pub fn new(db: Arc<Database>) -> crate::db::Result<Self> {
1✔
135
        let conn = db.get_connection()?;
1✔
136

137
        // Create a new session in receive_sessions and get its ID
138
        let session_id: i64 = conn.query_row(
1✔
139
            "INSERT INTO receive_sessions (session_id) VALUES (NULL) RETURNING session_id",
1✔
140
            [],
1✔
141
            |row| row.get(0),
1✔
UNCOV
142
        )?;
×
143

144
        Ok(Self { db, session_id: SessionId(session_id) })
1✔
145
    }
1✔
146

147
    pub fn from_id(db: Arc<Database>, id: SessionId) -> Self { Self { db, session_id: id } }
2✔
148
}
149

150
impl SessionPersister for ReceiverPersister {
151
    type SessionEvent = ReceiverSessionEvent;
152
    type InternalStorageError = crate::db::error::Error;
153

154
    fn save_event(
12✔
155
        &self,
12✔
156
        event: ReceiverSessionEvent,
12✔
157
    ) -> std::result::Result<(), Self::InternalStorageError> {
12✔
158
        let conn = self.db.get_connection()?;
12✔
159
        let event_data = serde_json::to_string(&event).map_err(Error::Serialize)?;
12✔
160

161
        conn.execute(
12✔
162
            "INSERT INTO receive_session_events (session_id, event_data, created_at) VALUES (?1, ?2, ?3)",
12✔
163
            params![*self.session_id, event_data, now()],
12✔
164
        )?;
12✔
165

166
        Ok(())
12✔
167
    }
12✔
168

169
    fn load(
2✔
170
        &self,
2✔
171
    ) -> std::result::Result<
2✔
172
        Box<dyn Iterator<Item = ReceiverSessionEvent>>,
2✔
173
        Self::InternalStorageError,
2✔
174
    > {
2✔
175
        let conn = self.db.get_connection()?;
2✔
176
        let mut stmt = conn.prepare(
2✔
177
            "SELECT event_data FROM receive_session_events WHERE session_id = ?1 ORDER BY id ASC",
2✔
178
        )?;
2✔
179

180
        let event_rows = stmt.query_map(params![*self.session_id], |row| {
12✔
181
            let event_data: String = row.get(0)?;
12✔
182
            Ok(event_data)
12✔
183
        })?;
12✔
184

185
        let events: Vec<ReceiverSessionEvent> = event_rows
2✔
186
            .map(|row| {
12✔
187
                let event_data = row.expect("Failed to read event data from database");
12✔
188
                serde_json::from_str::<ReceiverSessionEvent>(&event_data)
12✔
189
                    .expect("Database corruption: failed to deserialize session event")
12✔
190
            })
12✔
191
            .collect();
2✔
192

193
        Ok(Box::new(events.into_iter()))
2✔
194
    }
2✔
195

196
    fn close(&self) -> std::result::Result<(), Self::InternalStorageError> {
1✔
197
        let conn = self.db.get_connection()?;
1✔
198

199
        let mut stmt = conn.prepare(
1✔
200
            "SELECT id, event_data FROM receive_session_events 
1✔
201
             WHERE session_id = ?1 
1✔
202
             ORDER BY id DESC 
1✔
203
             LIMIT 1",
1✔
204
        )?;
1✔
205

206
        let (id, event_data) = match stmt.query_row(params![*self.session_id], |row| {
1✔
207
            let id: i64 = row.get(0)?;
1✔
208
            let event_data: String = row.get(1)?;
1✔
209
            Ok((id, event_data))
1✔
210
        }) {
1✔
211
            Ok(data) => data,
1✔
212
            Err(rusqlite::Error::QueryReturnedNoRows) =>
NEW
UNCOV
213
                return Err(Error::MissingCompletedEventId { session_id: *self.session_id }),
×
NEW
UNCOV
214
            Err(e) => return Err(Error::Rusqlite(e)),
×
215
        };
216

217
        let event: ReceiverSessionEvent = serde_json::from_str(&event_data)?;
1✔
218

219
        let completed_event_id = match event {
1✔
220
            ReceiverSessionEvent::Closed(_) => id,
1✔
NEW
UNCOV
221
            _ => return Err(Error::MissingCompletedEventId { session_id: *self.session_id }),
×
222
        };
223

224
        conn.execute(
1✔
225
            "UPDATE receive_sessions SET completed_event_id = ?1 WHERE session_id = ?2",
1✔
226
            params![completed_event_id, *self.session_id],
1✔
227
        )?;
1✔
228

229
        Ok(())
1✔
230
    }
1✔
231
}
232

233
impl Database {
234
    pub(crate) fn get_recv_session_ids(&self) -> Result<Vec<SessionId>> {
4✔
235
        let conn = self.get_connection()?;
4✔
236
        let mut stmt = conn
4✔
237
            .prepare("SELECT session_id FROM receive_sessions WHERE completed_event_id IS NULL")?;
4✔
238

239
        let session_rows = stmt.query_map([], |row| {
4✔
240
            let session_id: i64 = row.get(0)?;
2✔
241
            Ok(SessionId(session_id))
2✔
242
        })?;
2✔
243

244
        let mut session_ids = Vec::new();
4✔
245
        for session_row in session_rows {
6✔
246
            let session_id = session_row?;
2✔
247
            session_ids.push(session_id);
2✔
248
        }
249

250
        Ok(session_ids)
4✔
251
    }
4✔
252

253
    pub(crate) fn get_send_session_ids(&self) -> Result<Vec<SessionId>> {
6✔
254
        let conn = self.get_connection()?;
6✔
255
        let mut stmt =
6✔
256
            conn.prepare("SELECT session_id FROM send_sessions WHERE completed_event_id IS NULL")?;
6✔
257

258
        let session_rows = stmt.query_map([], |row| {
6✔
259
            let session_id: i64 = row.get(0)?;
1✔
260
            Ok(SessionId(session_id))
1✔
261
        })?;
1✔
262

263
        let mut session_ids = Vec::new();
6✔
264
        for session_row in session_rows {
7✔
265
            let session_id = session_row?;
1✔
266
            session_ids.push(session_id);
1✔
267
        }
268

269
        Ok(session_ids)
6✔
270
    }
6✔
271

272
    pub(crate) fn get_send_session_receiver_pk(
1✔
273
        &self,
1✔
274
        session_id: &SessionId,
1✔
275
    ) -> Result<HpkePublicKey> {
1✔
276
        let conn = self.get_connection()?;
1✔
277
        let mut stmt =
1✔
278
            conn.prepare("SELECT receiver_pubkey FROM send_sessions WHERE session_id = ?1")?;
1✔
279
        let receiver_pubkey: Vec<u8> = stmt.query_row(params![session_id.0], |row| row.get(0))?;
1✔
280
        Ok(HpkePublicKey::from_compressed_bytes(&receiver_pubkey).expect("Valid receiver pubkey"))
1✔
281
    }
1✔
282

283
    pub(crate) fn get_inactive_send_session_ids(&self) -> Result<Vec<(SessionId, u64)>> {
×
UNCOV
284
        let conn = self.get_connection()?;
×
285
        let mut stmt = conn.prepare(
×
NEW
286
            "SELECT s.session_id, e.created_at 
×
NEW
287
             FROM send_sessions s 
×
NEW
288
             JOIN send_session_events e ON s.completed_event_id = e.id 
×
NEW
289
             WHERE s.completed_event_id IS NOT NULL",
×
290
        )?;
×
291
        let session_rows = stmt.query_map([], |row| {
×
292
            let session_id: i64 = row.get(0)?;
×
293
            let completed_at: u64 = row.get(1)?;
×
294
            Ok((SessionId(session_id), completed_at))
×
295
        })?;
×
296

297
        let mut session_ids = Vec::new();
×
UNCOV
298
        for session_row in session_rows {
×
299
            let (session_id, completed_at) = session_row?;
×
300
            session_ids.push((session_id, completed_at));
×
301
        }
302
        Ok(session_ids)
×
UNCOV
303
    }
×
304

305
    pub(crate) fn get_inactive_recv_session_ids(&self) -> Result<Vec<(SessionId, u64)>> {
×
UNCOV
306
        let conn = self.get_connection()?;
×
UNCOV
307
        let mut stmt = conn.prepare(
×
NEW
UNCOV
308
            "SELECT r.session_id, e.created_at 
×
NEW
UNCOV
309
             FROM receive_sessions r 
×
NEW
UNCOV
310
             JOIN receive_session_events e ON r.completed_event_id = e.id 
×
NEW
UNCOV
311
             WHERE r.completed_event_id IS NOT NULL",
×
UNCOV
312
        )?;
×
UNCOV
313
        let session_rows = stmt.query_map([], |row| {
×
UNCOV
314
            let session_id: i64 = row.get(0)?;
×
UNCOV
315
            let completed_at: u64 = row.get(1)?;
×
UNCOV
316
            Ok((SessionId(session_id), completed_at))
×
UNCOV
317
        })?;
×
318

UNCOV
319
        let mut session_ids = Vec::new();
×
UNCOV
320
        for session_row in session_rows {
×
UNCOV
321
            let (session_id, completed_at) = session_row?;
×
UNCOV
322
            session_ids.push((session_id, completed_at));
×
323
        }
UNCOV
324
        Ok(session_ids)
×
UNCOV
325
    }
×
326
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc