• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

payjoin / rust-payjoin / 23953420148

03 Apr 2026 04:25PM UTC coverage: 84.381% (+0.04%) from 84.34%
23953420148

Pull #1376

github

web-flow
Merge 4d37434a9 into 9ed621f78
Pull Request #1376: Guard concurrent sends with exclusive DB lock and URI/RK checks

65 of 71 new or added lines in 4 files covered. (91.55%)

7 existing lines in 1 file now uncovered.

10859 of 12869 relevant lines covered (84.38%)

410.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.68
/payjoin-cli/src/db/v2.rs
1
use std::sync::Arc;
2

3
use payjoin::persist::SessionPersister;
4
use payjoin::receive::v2::SessionEvent as ReceiverSessionEvent;
5
use payjoin::send::v2::SessionEvent as SenderSessionEvent;
6
use payjoin::HpkePublicKey;
7
use rusqlite::params;
8

9
use super::*;
10

11
#[derive(Debug, Clone)]
12
pub(crate) struct SessionId(i64);
13

14
impl core::ops::Deref for SessionId {
15
    type Target = i64;
16
    fn deref(&self) -> &Self::Target { &self.0 }
21✔
17
}
18

19
impl std::fmt::Display for SessionId {
20
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.0) }
×
21
}
22

23
#[derive(Clone, Debug)]
24
pub(crate) struct SenderPersister {
25
    db: Arc<Database>,
26
    session_id: SessionId,
27
}
28

29
impl SenderPersister {
30
    pub fn new(
7✔
31
        db: Arc<Database>,
7✔
32
        pj_uri: &str,
7✔
33
        receiver_pubkey: &HpkePublicKey,
7✔
34
    ) -> crate::db::Result<Self> {
7✔
35
        let conn = db.get_connection()?;
7✔
36
        let receiver_pubkey_bytes = receiver_pubkey.to_compressed_bytes();
7✔
37

38
        let (duplicate_uri, duplicate_rk): (bool, bool) = conn.query_row(
7✔
39
            "SELECT \
7✔
40
                EXISTS(SELECT 1 FROM send_sessions WHERE pj_uri = ?1), \
7✔
41
                EXISTS(SELECT 1 FROM send_sessions WHERE receiver_pubkey = ?2)",
7✔
42
            params![pj_uri, &receiver_pubkey_bytes],
7✔
43
            |row| Ok((row.get(0)?, row.get(1)?)),
7✔
NEW
44
        )?;
×
45

46
        if duplicate_uri {
7✔
47
            return Err(Error::DuplicateSendSession(DuplicateKind::Uri));
2✔
48
        }
5✔
49
        if duplicate_rk {
5✔
50
            return Err(Error::DuplicateSendSession(DuplicateKind::ReceiverPubkey));
1✔
51
        }
4✔
52

53
        // Create a new session in send_sessions and get its ID
54
        let session_id: i64 = conn.query_row(
4✔
55
            "INSERT INTO send_sessions (pj_uri, receiver_pubkey) VALUES (?1, ?2) RETURNING session_id",
4✔
56
            params![pj_uri, &receiver_pubkey_bytes],
4✔
57
            |row| row.get(0),
4✔
58
        )?;
×
59

60
        Ok(Self { db, session_id: SessionId(session_id) })
4✔
61
    }
7✔
62

63
    pub fn from_id(db: Arc<Database>, id: SessionId) -> Self { Self { db, session_id: id } }
1✔
64
    
65
}
66
impl SessionPersister for SenderPersister {
67
    type SessionEvent = SenderSessionEvent;
68
    type InternalStorageError = crate::db::error::Error;
69

70
    fn save_event(
3✔
71
        &self,
3✔
72
        event: SenderSessionEvent,
3✔
73
    ) -> std::result::Result<(), Self::InternalStorageError> {
3✔
74
        let conn = self.db.get_connection()?;
3✔
75
        let event_data = serde_json::to_string(&event).map_err(Error::Serialize)?;
3✔
76

77
        conn.execute(
3✔
78
            "INSERT INTO send_session_events (session_id, event_data, created_at) VALUES (?1, ?2, ?3)",
3✔
79
            params![*self.session_id, event_data, now()],
3✔
80
        )?;
3✔
81

82
        Ok(())
3✔
83
    }
3✔
84

85
    fn load(
1✔
86
        &self,
1✔
87
    ) -> std::result::Result<Box<dyn Iterator<Item = SenderSessionEvent>>, Self::InternalStorageError>
1✔
88
    {
89
        let conn = self.db.get_connection()?;
1✔
90
        let mut stmt = conn.prepare(
1✔
91
            "SELECT event_data FROM send_session_events WHERE session_id = ?1 ORDER BY id ASC",
1✔
92
        )?;
1✔
93

94
        let event_rows = stmt.query_map(params![*self.session_id], |row| {
2✔
95
            let event_data: String = row.get(0)?;
2✔
96
            Ok(event_data)
2✔
97
        })?;
2✔
98

99
        let events: Vec<SenderSessionEvent> = event_rows
1✔
100
            .map(|row| {
2✔
101
                let event_data = row.expect("Failed to read event data from database");
2✔
102
                serde_json::from_str::<SenderSessionEvent>(&event_data)
2✔
103
                    .expect("Database corruption: failed to deserialize session event")
2✔
104
            })
2✔
105
            .collect();
1✔
106

107
        Ok(Box::new(events.into_iter()))
1✔
108
    }
1✔
109

110
    fn close(&self) -> std::result::Result<(), Self::InternalStorageError> {
2✔
111
        let conn = self.db.get_connection()?;
2✔
112

113
        conn.execute(
2✔
114
            "UPDATE send_sessions SET completed_at = ?1 WHERE session_id = ?2",
2✔
115
            params![now(), *self.session_id],
2✔
116
        )?;
2✔
117

118
        Ok(())
2✔
119
    }
2✔
120
}
121

122
#[derive(Clone)]
123
pub(crate) struct ReceiverPersister {
124
    db: Arc<Database>,
125
    session_id: SessionId,
126
}
127

128
impl ReceiverPersister {
129
    pub fn new(db: Arc<Database>) -> crate::db::Result<Self> {
1✔
130
        let conn = db.get_connection()?;
1✔
131

132
        // Create a new session in receive_sessions and get its ID
133
        let session_id: i64 = conn.query_row(
1✔
134
            "INSERT INTO receive_sessions (session_id) VALUES (NULL) RETURNING session_id",
1✔
135
            [],
1✔
136
            |row| row.get(0),
1✔
UNCOV
137
        )?;
×
138

139
        Ok(Self { db, session_id: SessionId(session_id) })
1✔
140
    }
1✔
141

142
    pub fn from_id(db: Arc<Database>, id: SessionId) -> Self { Self { db, session_id: id } }
2✔
143
}
144

145
impl SessionPersister for ReceiverPersister {
146
    type SessionEvent = ReceiverSessionEvent;
147
    type InternalStorageError = crate::db::error::Error;
148

149
    fn save_event(
12✔
150
        &self,
12✔
151
        event: ReceiverSessionEvent,
12✔
152
    ) -> std::result::Result<(), Self::InternalStorageError> {
12✔
153
        let conn = self.db.get_connection()?;
12✔
154
        let event_data = serde_json::to_string(&event).map_err(Error::Serialize)?;
12✔
155

156
        conn.execute(
12✔
157
            "INSERT INTO receive_session_events (session_id, event_data, created_at) VALUES (?1, ?2, ?3)",
12✔
158
            params![*self.session_id, event_data, now()],
12✔
159
        )?;
12✔
160

161
        Ok(())
12✔
162
    }
12✔
163

164
    fn load(
2✔
165
        &self,
2✔
166
    ) -> std::result::Result<
2✔
167
        Box<dyn Iterator<Item = ReceiverSessionEvent>>,
2✔
168
        Self::InternalStorageError,
2✔
169
    > {
2✔
170
        let conn = self.db.get_connection()?;
2✔
171
        let mut stmt = conn.prepare(
2✔
172
            "SELECT event_data FROM receive_session_events WHERE session_id = ?1 ORDER BY id ASC",
2✔
173
        )?;
2✔
174

175
        let event_rows = stmt.query_map(params![*self.session_id], |row| {
12✔
176
            let event_data: String = row.get(0)?;
12✔
177
            Ok(event_data)
12✔
178
        })?;
12✔
179

180
        let events: Vec<ReceiverSessionEvent> = event_rows
2✔
181
            .map(|row| {
12✔
182
                let event_data = row.expect("Failed to read event data from database");
12✔
183
                serde_json::from_str::<ReceiverSessionEvent>(&event_data)
12✔
184
                    .expect("Database corruption: failed to deserialize session event")
12✔
185
            })
12✔
186
            .collect();
2✔
187

188
        Ok(Box::new(events.into_iter()))
2✔
189
    }
2✔
190

191
    fn close(&self) -> std::result::Result<(), Self::InternalStorageError> {
1✔
192
        let conn = self.db.get_connection()?;
1✔
193

194
        conn.execute(
1✔
195
            "UPDATE receive_sessions SET completed_at = ?1 WHERE session_id = ?2",
1✔
196
            params![now(), *self.session_id],
1✔
197
        )?;
1✔
198

199
        Ok(())
1✔
200
    }
1✔
201
}
202

203
impl Database {
204
    pub(crate) fn get_recv_session_ids(&self) -> Result<Vec<SessionId>> {
4✔
205
        let conn = self.get_connection()?;
4✔
206
        let mut stmt =
4✔
207
            conn.prepare("SELECT session_id FROM receive_sessions WHERE completed_at IS NULL")?;
4✔
208

209
        let session_rows = stmt.query_map([], |row| {
4✔
210
            let session_id: i64 = row.get(0)?;
2✔
211
            Ok(SessionId(session_id))
2✔
212
        })?;
2✔
213

214
        let mut session_ids = Vec::new();
4✔
215
        for session_row in session_rows {
4✔
216
            let session_id = session_row?;
2✔
217
            session_ids.push(session_id);
2✔
218
        }
219

220
        Ok(session_ids)
4✔
221
    }
4✔
222

223
    pub(crate) fn get_send_session_ids(&self) -> Result<Vec<SessionId>> {
6✔
224
        let conn = self.get_connection()?;
6✔
225
        let mut stmt =
6✔
226
            conn.prepare("SELECT session_id FROM send_sessions WHERE completed_at IS NULL")?;
6✔
227

228
        let session_rows = stmt.query_map([], |row| {
6✔
229
            let session_id: i64 = row.get(0)?;
1✔
230
            Ok(SessionId(session_id))
1✔
231
        })?;
1✔
232

233
        let mut session_ids = Vec::new();
6✔
234
        for session_row in session_rows {
6✔
235
            let session_id = session_row?;
1✔
236
            session_ids.push(session_id);
1✔
237
        }
238

239
        Ok(session_ids)
6✔
240
    }
6✔
241

242
    pub(crate) fn get_send_session_receiver_pk(
1✔
243
        &self,
1✔
244
        session_id: &SessionId,
1✔
245
    ) -> Result<HpkePublicKey> {
1✔
246
        let conn = self.get_connection()?;
1✔
247
        let mut stmt =
1✔
248
            conn.prepare("SELECT receiver_pubkey FROM send_sessions WHERE session_id = ?1")?;
1✔
249
        let receiver_pubkey: Vec<u8> = stmt.query_row(params![session_id.0], |row| row.get(0))?;
1✔
250
        Ok(HpkePublicKey::from_compressed_bytes(&receiver_pubkey).expect("Valid receiver pubkey"))
1✔
251
    }
1✔
252

253
    pub(crate) fn get_inactive_send_session_ids(&self) -> Result<Vec<(SessionId, u64)>> {
×
254
        let conn = self.get_connection()?;
×
255
        let mut stmt = conn.prepare(
×
256
            "SELECT session_id, completed_at FROM send_sessions WHERE completed_at IS NOT NULL",
×
257
        )?;
×
258
        let session_rows = stmt.query_map([], |row| {
×
259
            let session_id: i64 = row.get(0)?;
×
260
            let completed_at: u64 = row.get(1)?;
×
261
            Ok((SessionId(session_id), completed_at))
×
UNCOV
262
        })?;
×
263

264
        let mut session_ids = Vec::new();
×
265
        for session_row in session_rows {
×
266
            let (session_id, completed_at) = session_row?;
×
UNCOV
267
            session_ids.push((session_id, completed_at));
×
268
        }
269
        Ok(session_ids)
×
UNCOV
270
    }
×
271

272
    pub(crate) fn get_inactive_recv_session_ids(&self) -> Result<Vec<(SessionId, u64)>> {
×
273
        let conn = self.get_connection()?;
×
274
        let mut stmt = conn.prepare(
×
275
            "SELECT session_id, completed_at FROM receive_sessions WHERE completed_at IS NOT NULL",
×
276
        )?;
×
277
        let session_rows = stmt.query_map([], |row| {
×
278
            let session_id: i64 = row.get(0)?;
×
279
            let completed_at: u64 = row.get(1)?;
×
280
            Ok((SessionId(session_id), completed_at))
×
UNCOV
281
        })?;
×
282

283
        let mut session_ids = Vec::new();
×
284
        for session_row in session_rows {
×
285
            let (session_id, completed_at) = session_row?;
×
UNCOV
286
            session_ids.push((session_id, completed_at));
×
287
        }
288
        Ok(session_ids)
×
UNCOV
289
    }
×
290
}
291

292
#[cfg(all(test, feature = "v2"))]
293
mod tests {
294
    use std::sync::Arc;
295

296
    use payjoin::HpkeKeyPair;
297

298
    use super::*;
299

300
    fn create_test_db() -> Arc<Database> {
3✔
301
        // Use an in-memory database for tests
302
        let manager = r2d2_sqlite::SqliteConnectionManager::memory()
3✔
303
            .with_init(|conn| conn.execute_batch("PRAGMA locking_mode = EXCLUSIVE;"));
30✔
304
        let pool = r2d2::Pool::new(manager).expect("pool creation should succeed");
3✔
305
        let conn = pool.get().expect("connection should succeed");
3✔
306
        Database::init_schema(&conn).expect("schema init should succeed");
3✔
307
        Arc::new(Database(pool))
3✔
308
    }
3✔
309

310
    fn make_receiver_pubkey() -> payjoin::HpkePublicKey { HpkeKeyPair::gen_keypair().1 }
5✔
311

312
    // Second call with the same URI (same active session) should return DuplicateSendSession(Uri).
313
    #[test]
314
    fn test_duplicate_uri_returns_error() {
1✔
315
        let db = create_test_db();
1✔
316
        let rk1 = make_receiver_pubkey();
1✔
317
        let rk2 = make_receiver_pubkey();
1✔
318
        let uri = "bitcoin:addr1?pj=https://example.com/BBBBBBBB";
1✔
319

320
        SenderPersister::new(db.clone(), uri, &rk1).expect("first session should succeed");
1✔
321

322
        let err = SenderPersister::new(db, uri, &rk2).expect_err("duplicate URI should fail");
1✔
323
        assert!(
1✔
324
            matches!(err, Error::DuplicateSendSession(DuplicateKind::Uri)),
1✔
325
            "expected DuplicateSendSession(Uri), got: {err:?}"
326
        );
327
    }
1✔
328

329
    // Same receiver pubkey under a different URI should return DuplicateSendSession(ReceiverPubkey).
330
    #[test]
331
    fn test_duplicate_rk_returns_error() {
1✔
332
        let db = create_test_db();
1✔
333
        let rk = make_receiver_pubkey();
1✔
334
        let uri1 = "bitcoin:addr1?pj=https://example.com/CCCCCCCC";
1✔
335
        let uri2 = "bitcoin:addr1?pj=https://example.com/DDDDDDDD";
1✔
336

337
        SenderPersister::new(db.clone(), uri1, &rk).expect("first session should succeed");
1✔
338

339
        let err = SenderPersister::new(db, uri2, &rk).expect_err("duplicate RK should fail");
1✔
340
        assert!(
1✔
341
            matches!(err, Error::DuplicateSendSession(DuplicateKind::ReceiverPubkey)),
1✔
342
            "expected DuplicateSendSession(ReceiverPubkey), got: {err:?}"
343
        );
344
    }
1✔
345

346
    // After a session is marked completed, a new session with the same URI must still be rejected
347
    // to prevent address reuse, HPKE receiver-key reuse
348
    #[test]
349
    fn test_completed_session_blocks_reuse() {
1✔
350
        let db = create_test_db();
1✔
351
        let rk1 = make_receiver_pubkey();
1✔
352
        let rk2 = make_receiver_pubkey();
1✔
353
        let uri = "bitcoin:addr1?pj=https://example.com/EEEEEEEE";
1✔
354

355
        let persister =
1✔
356
            SenderPersister::new(db.clone(), uri, &rk1).expect("first session should succeed");
1✔
357

358
        // Mark the session as completed
359
        use payjoin::persist::SessionPersister;
360
        persister.close().expect("close should succeed");
1✔
361

362
        // A new session with the same URI must be rejected even after completion
363
        let err = SenderPersister::new(db, uri, &rk2)
1✔
364
            .expect_err("reuse of a completed session URI must be rejected");
1✔
365
        assert!(
1✔
366
            matches!(err, Error::DuplicateSendSession(DuplicateKind::Uri)),
1✔
367
            "expected DuplicateSendSession(Uri), got: {err:?}"
368
        );
369
    }
1✔
370
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc