• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

payjoin / rust-payjoin / 23491369257

24 Mar 2026 01:15PM UTC coverage: 84.165% (+0.04%) from 84.125%
23491369257

Pull #1376

github

web-flow
Merge cc8e38d8d into 3c5a384f5
Pull Request #1376: Guard concurrent sends with exclusive DB lock and URI/RK checks

63 of 69 new or added lines in 4 files covered. (91.3%)

5 existing lines in 1 file now uncovered.

10705 of 12719 relevant lines covered (84.17%)

415.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.55
/payjoin-cli/src/db/v2.rs
1
use std::sync::Arc;
2

3
use payjoin::persist::SessionPersister;
4
use payjoin::receive::v2::SessionEvent as ReceiverSessionEvent;
5
use payjoin::send::v2::SessionEvent as SenderSessionEvent;
6
use payjoin::HpkePublicKey;
7
use rusqlite::params;
8

9
use super::*;
10

11
#[derive(Debug, Clone)]
12
pub(crate) struct SessionId(i64);
13

14
impl core::ops::Deref for SessionId {
15
    type Target = i64;
16
    fn deref(&self) -> &Self::Target { &self.0 }
21✔
17
}
18

19
impl std::fmt::Display for SessionId {
20
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.0) }
×
21
}
22

23
#[derive(Clone, Debug)]
24
pub(crate) struct SenderPersister {
25
    db: Arc<Database>,
26
    session_id: SessionId,
27
}
28

29
impl SenderPersister {
30
    pub fn new(
7✔
31
        db: Arc<Database>,
7✔
32
        pj_uri: &str,
7✔
33
        receiver_pubkey: &HpkePublicKey,
7✔
34
    ) -> crate::db::Result<Self> {
7✔
35
        let conn = db.get_connection()?;
7✔
36
        let receiver_pubkey_bytes = receiver_pubkey.to_compressed_bytes();
7✔
37

38
        let (duplicate_uri, duplicate_rk): (bool, bool) = conn.query_row(
7✔
39
            "SELECT \
7✔
40
                EXISTS(SELECT 1 FROM send_sessions WHERE completed_at IS NULL AND pj_uri = ?1), \
7✔
41
                EXISTS(SELECT 1 FROM send_sessions WHERE completed_at IS NULL AND receiver_pubkey = ?2)",
7✔
42
            params![pj_uri, &receiver_pubkey_bytes],
7✔
43
            |row| Ok((row.get(0)?, row.get(1)?)),
7✔
NEW
44
        )?;
×
45

46
        if duplicate_uri {
7✔
47
            return Err(Error::DuplicateSendSession(DuplicateKind::Uri));
1✔
48
        }
6✔
49
        if duplicate_rk {
6✔
50
            return Err(Error::DuplicateSendSession(DuplicateKind::ReceiverPubkey));
1✔
51
        }
5✔
52

53
        // Create a new session in send_sessions and get its ID
54
        let session_id: i64 = conn.query_row(
5✔
55
            "INSERT INTO send_sessions (pj_uri, receiver_pubkey) VALUES (?1, ?2) RETURNING session_id",
5✔
56
            params![pj_uri, &receiver_pubkey_bytes],
5✔
57
            |row| row.get(0),
5✔
58
        )?;
×
59

60
        Ok(Self { db, session_id: SessionId(session_id) })
5✔
61
    }
7✔
62

63
    pub fn from_id(db: Arc<Database>, id: SessionId) -> Self { Self { db, session_id: id } }
1✔
64
}
65
impl SessionPersister for SenderPersister {
66
    type SessionEvent = SenderSessionEvent;
67
    type InternalStorageError = crate::db::error::Error;
68

69
    fn save_event(
3✔
70
        &self,
3✔
71
        event: SenderSessionEvent,
3✔
72
    ) -> std::result::Result<(), Self::InternalStorageError> {
3✔
73
        let conn = self.db.get_connection()?;
3✔
74
        let event_data = serde_json::to_string(&event).map_err(Error::Serialize)?;
3✔
75

76
        conn.execute(
3✔
77
            "INSERT INTO send_session_events (session_id, event_data, created_at) VALUES (?1, ?2, ?3)",
3✔
78
            params![*self.session_id, event_data, now()],
3✔
79
        )?;
3✔
80

81
        Ok(())
3✔
82
    }
3✔
83

84
    fn load(
1✔
85
        &self,
1✔
86
    ) -> std::result::Result<Box<dyn Iterator<Item = SenderSessionEvent>>, Self::InternalStorageError>
1✔
87
    {
88
        let conn = self.db.get_connection()?;
1✔
89
        let mut stmt = conn.prepare(
1✔
90
            "SELECT event_data FROM send_session_events WHERE session_id = ?1 ORDER BY id ASC",
1✔
91
        )?;
1✔
92

93
        let event_rows = stmt.query_map(params![*self.session_id], |row| {
2✔
94
            let event_data: String = row.get(0)?;
2✔
95
            Ok(event_data)
2✔
96
        })?;
2✔
97

98
        let events: Vec<SenderSessionEvent> = event_rows
1✔
99
            .map(|row| {
2✔
100
                let event_data = row.expect("Failed to read event data from database");
2✔
101
                serde_json::from_str::<SenderSessionEvent>(&event_data)
2✔
102
                    .expect("Database corruption: failed to deserialize session event")
2✔
103
            })
2✔
104
            .collect();
1✔
105

106
        Ok(Box::new(events.into_iter()))
1✔
107
    }
1✔
108

109
    fn close(&self) -> std::result::Result<(), Self::InternalStorageError> {
2✔
110
        let conn = self.db.get_connection()?;
2✔
111

112
        conn.execute(
2✔
113
            "UPDATE send_sessions SET completed_at = ?1 WHERE session_id = ?2",
2✔
114
            params![now(), *self.session_id],
2✔
115
        )?;
2✔
116

117
        Ok(())
2✔
118
    }
2✔
119
}
120

121
#[derive(Clone)]
122
pub(crate) struct ReceiverPersister {
123
    db: Arc<Database>,
124
    session_id: SessionId,
125
}
126

127
impl ReceiverPersister {
128
    pub fn new(db: Arc<Database>) -> crate::db::Result<Self> {
1✔
129
        let conn = db.get_connection()?;
1✔
130

131
        // Create a new session in receive_sessions and get its ID
132
        let session_id: i64 = conn.query_row(
1✔
133
            "INSERT INTO receive_sessions (session_id) VALUES (NULL) RETURNING session_id",
1✔
134
            [],
1✔
135
            |row| row.get(0),
1✔
136
        )?;
×
137

138
        Ok(Self { db, session_id: SessionId(session_id) })
1✔
139
    }
1✔
140

141
    pub fn from_id(db: Arc<Database>, id: SessionId) -> Self { Self { db, session_id: id } }
2✔
142
}
143

144
impl SessionPersister for ReceiverPersister {
145
    type SessionEvent = ReceiverSessionEvent;
146
    type InternalStorageError = crate::db::error::Error;
147

148
    fn save_event(
12✔
149
        &self,
12✔
150
        event: ReceiverSessionEvent,
12✔
151
    ) -> std::result::Result<(), Self::InternalStorageError> {
12✔
152
        let conn = self.db.get_connection()?;
12✔
153
        let event_data = serde_json::to_string(&event).map_err(Error::Serialize)?;
12✔
154

155
        conn.execute(
12✔
156
            "INSERT INTO receive_session_events (session_id, event_data, created_at) VALUES (?1, ?2, ?3)",
12✔
157
            params![*self.session_id, event_data, now()],
12✔
158
        )?;
12✔
159

160
        Ok(())
12✔
161
    }
12✔
162

163
    fn load(
2✔
164
        &self,
2✔
165
    ) -> std::result::Result<
2✔
166
        Box<dyn Iterator<Item = ReceiverSessionEvent>>,
2✔
167
        Self::InternalStorageError,
2✔
168
    > {
2✔
169
        let conn = self.db.get_connection()?;
2✔
170
        let mut stmt = conn.prepare(
2✔
171
            "SELECT event_data FROM receive_session_events WHERE session_id = ?1 ORDER BY id ASC",
2✔
172
        )?;
2✔
173

174
        let event_rows = stmt.query_map(params![*self.session_id], |row| {
12✔
175
            let event_data: String = row.get(0)?;
12✔
176
            Ok(event_data)
12✔
177
        })?;
12✔
178

179
        let events: Vec<ReceiverSessionEvent> = event_rows
2✔
180
            .map(|row| {
12✔
181
                let event_data = row.expect("Failed to read event data from database");
12✔
182
                serde_json::from_str::<ReceiverSessionEvent>(&event_data)
12✔
183
                    .expect("Database corruption: failed to deserialize session event")
12✔
184
            })
12✔
185
            .collect();
2✔
186

187
        Ok(Box::new(events.into_iter()))
2✔
188
    }
2✔
189

190
    fn close(&self) -> std::result::Result<(), Self::InternalStorageError> {
1✔
191
        let conn = self.db.get_connection()?;
1✔
192

193
        conn.execute(
1✔
194
            "UPDATE receive_sessions SET completed_at = ?1 WHERE session_id = ?2",
1✔
195
            params![now(), *self.session_id],
1✔
196
        )?;
1✔
197

198
        Ok(())
1✔
199
    }
1✔
200
}
201

202
impl Database {
203
    pub(crate) fn get_recv_session_ids(&self) -> Result<Vec<SessionId>> {
4✔
204
        let conn = self.get_connection()?;
4✔
205
        let mut stmt =
4✔
206
            conn.prepare("SELECT session_id FROM receive_sessions WHERE completed_at IS NULL")?;
4✔
207

208
        let session_rows = stmt.query_map([], |row| {
4✔
209
            let session_id: i64 = row.get(0)?;
2✔
210
            Ok(SessionId(session_id))
2✔
211
        })?;
2✔
212

213
        let mut session_ids = Vec::new();
4✔
214
        for session_row in session_rows {
4✔
215
            let session_id = session_row?;
2✔
216
            session_ids.push(session_id);
2✔
217
        }
218

219
        Ok(session_ids)
4✔
220
    }
4✔
221

222
    pub(crate) fn get_send_session_ids(&self) -> Result<Vec<SessionId>> {
6✔
223
        let conn = self.get_connection()?;
6✔
224
        let mut stmt =
6✔
225
            conn.prepare("SELECT session_id FROM send_sessions WHERE completed_at IS NULL")?;
6✔
226

227
        let session_rows = stmt.query_map([], |row| {
6✔
228
            let session_id: i64 = row.get(0)?;
1✔
229
            Ok(SessionId(session_id))
1✔
230
        })?;
1✔
231

232
        let mut session_ids = Vec::new();
6✔
233
        for session_row in session_rows {
6✔
234
            let session_id = session_row?;
1✔
235
            session_ids.push(session_id);
1✔
236
        }
237

238
        Ok(session_ids)
6✔
239
    }
6✔
240

241
    pub(crate) fn get_send_session_receiver_pk(
1✔
242
        &self,
1✔
243
        session_id: &SessionId,
1✔
244
    ) -> Result<HpkePublicKey> {
1✔
245
        let conn = self.get_connection()?;
1✔
246
        let mut stmt =
1✔
247
            conn.prepare("SELECT receiver_pubkey FROM send_sessions WHERE session_id = ?1")?;
1✔
248
        let receiver_pubkey: Vec<u8> = stmt.query_row(params![session_id.0], |row| row.get(0))?;
1✔
249
        Ok(HpkePublicKey::from_compressed_bytes(&receiver_pubkey).expect("Valid receiver pubkey"))
1✔
250
    }
1✔
251

252
    pub(crate) fn get_inactive_send_session_ids(&self) -> Result<Vec<(SessionId, u64)>> {
×
253
        let conn = self.get_connection()?;
×
254
        let mut stmt = conn.prepare(
×
255
            "SELECT session_id, completed_at FROM send_sessions WHERE completed_at IS NOT NULL",
×
256
        )?;
×
257
        let session_rows = stmt.query_map([], |row| {
×
258
            let session_id: i64 = row.get(0)?;
×
259
            let completed_at: u64 = row.get(1)?;
×
260
            Ok((SessionId(session_id), completed_at))
×
261
        })?;
×
262

263
        let mut session_ids = Vec::new();
×
264
        for session_row in session_rows {
×
265
            let (session_id, completed_at) = session_row?;
×
266
            session_ids.push((session_id, completed_at));
×
267
        }
268
        Ok(session_ids)
×
269
    }
×
270

271
    pub(crate) fn get_inactive_recv_session_ids(&self) -> Result<Vec<(SessionId, u64)>> {
×
272
        let conn = self.get_connection()?;
×
273
        let mut stmt = conn.prepare(
×
274
            "SELECT session_id, completed_at FROM receive_sessions WHERE completed_at IS NOT NULL",
×
275
        )?;
×
276
        let session_rows = stmt.query_map([], |row| {
×
277
            let session_id: i64 = row.get(0)?;
×
278
            let completed_at: u64 = row.get(1)?;
×
279
            Ok((SessionId(session_id), completed_at))
×
280
        })?;
×
281

282
        let mut session_ids = Vec::new();
×
283
        for session_row in session_rows {
×
284
            let (session_id, completed_at) = session_row?;
×
285
            session_ids.push((session_id, completed_at));
×
286
        }
287
        Ok(session_ids)
×
288
    }
×
289
}
290

291
#[cfg(all(test, feature = "v2"))]
292
mod tests {
293
    use std::sync::Arc;
294

295
    use payjoin::HpkeKeyPair;
296

297
    use super::*;
298

299
    fn create_test_db() -> Arc<Database> {
3✔
300
        // Use an in-memory database for tests
301
        let manager = r2d2_sqlite::SqliteConnectionManager::memory()
3✔
302
            .with_init(|conn| conn.execute_batch("PRAGMA locking_mode = EXCLUSIVE;"));
30✔
303
        let pool = r2d2::Pool::new(manager).expect("pool creation should succeed");
3✔
304
        let conn = pool.get().expect("connection should succeed");
3✔
305
        Database::init_schema(&conn).expect("schema init should succeed");
3✔
306
        Arc::new(Database(pool))
3✔
307
    }
3✔
308

309
    fn make_receiver_pubkey() -> payjoin::HpkePublicKey { HpkeKeyPair::gen_keypair().1 }
5✔
310

311
    /// Second call with the same URI (same active session) should return DuplicateSendSession(Uri).
312
    #[test]
313
    fn test_duplicate_uri_returns_error() {
1✔
314
        let db = create_test_db();
1✔
315
        let rk1 = make_receiver_pubkey();
1✔
316
        let rk2 = make_receiver_pubkey();
1✔
317
        let uri = "bitcoin:addr1?pj=https://example.com/BBBBBBBB";
1✔
318

319
        SenderPersister::new(db.clone(), uri, &rk1).expect("first session should succeed");
1✔
320

321
        let err = SenderPersister::new(db, uri, &rk2).expect_err("duplicate URI should fail");
1✔
322
        assert!(
1✔
323
            matches!(err, Error::DuplicateSendSession(DuplicateKind::Uri)),
1✔
324
            "expected DuplicateSendSession(Uri), got: {err:?}"
325
        );
326
    }
1✔
327

328
    /// Same receiver pubkey under a different URI should return DuplicateSendSession(ReceiverPubkey).
329
    #[test]
330
    fn test_duplicate_rk_returns_error() {
1✔
331
        let db = create_test_db();
1✔
332
        let rk = make_receiver_pubkey();
1✔
333
        let uri1 = "bitcoin:addr1?pj=https://example.com/CCCCCCCC";
1✔
334
        let uri2 = "bitcoin:addr1?pj=https://example.com/DDDDDDDD";
1✔
335

336
        SenderPersister::new(db.clone(), uri1, &rk).expect("first session should succeed");
1✔
337

338
        let err = SenderPersister::new(db, uri2, &rk).expect_err("duplicate RK should fail");
1✔
339
        assert!(
1✔
340
            matches!(err, Error::DuplicateSendSession(DuplicateKind::ReceiverPubkey)),
1✔
341
            "expected DuplicateSendSession(ReceiverPubkey), got: {err:?}"
342
        );
343
    }
1✔
344

345
    /// After a session is marked completed, a new session with the same URI should be allowed.
346
    #[test]
347
    fn test_completed_session_allows_reuse() {
1✔
348
        let db = create_test_db();
1✔
349
        let rk1 = make_receiver_pubkey();
1✔
350
        let rk2 = make_receiver_pubkey();
1✔
351
        let uri = "bitcoin:addr1?pj=https://example.com/EEEEEEEE";
1✔
352

353
        let persister =
1✔
354
            SenderPersister::new(db.clone(), uri, &rk1).expect("first session should succeed");
1✔
355

356
        // Mark the session as completed
357
        use payjoin::persist::SessionPersister;
358
        persister.close().expect("close should succeed");
1✔
359

360
        // Now a new session with the same URI should succeed (completed sessions don't block)
361
        let result = SenderPersister::new(db, uri, &rk2);
1✔
362
        assert!(result.is_ok(), "reuse after completion should succeed");
1✔
363
    }
1✔
364
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc