• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

payjoin / rust-payjoin / 26202708607

21 May 2026 02:56AM UTC coverage: 85.179% (+0.04%) from 85.14%
26202708607

Pull #1558

github

web-flow
Merge 21722b1ce into b58af3fdc
Pull Request #1558: Receiver fallback typestate

691 of 801 new or added lines in 6 files covered. (86.27%)

21 existing lines in 3 files now uncovered.

12247 of 14378 relevant lines covered (85.18%)

377.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.81
/payjoin-cli/src/db/v2.rs
1
use std::sync::Arc;
2

3
use payjoin::persist::SessionPersister;
4
use payjoin::receive::v2::SessionEvent as ReceiverSessionEvent;
5
use payjoin::send::v2::SessionEvent as SenderSessionEvent;
6
use payjoin::HpkePublicKey;
7
use rusqlite::params;
8

9
use super::*;
10

11
#[derive(Debug, Clone)]
12
pub(crate) struct SessionId(pub(crate) i64);
13

14
impl core::ops::Deref for SessionId {
15
    type Target = i64;
16
    fn deref(&self) -> &Self::Target { &self.0 }
32✔
17
}
18

19
impl std::fmt::Display for SessionId {
20
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.0) }
5✔
21
}
22

23
#[derive(Clone, Debug)]
24
pub(crate) struct SenderPersister {
25
    db: Arc<Database>,
26
    session_id: SessionId,
27
}
28

29
impl SenderPersister {
30
    pub fn new(
8✔
31
        db: Arc<Database>,
8✔
32
        pj_uri: &str,
8✔
33
        receiver_pubkey: &HpkePublicKey,
8✔
34
    ) -> crate::db::Result<Self> {
8✔
35
        let conn = db.get_connection()?;
8✔
36
        let receiver_pubkey_bytes = receiver_pubkey.to_compressed_bytes();
8✔
37

38
        let (duplicate_uri, duplicate_rk): (bool, bool) = conn.query_row(
8✔
39
            "SELECT \
8✔
40
                EXISTS(SELECT 1 FROM send_sessions WHERE pj_uri = ?1), \
8✔
41
                EXISTS(SELECT 1 FROM send_sessions WHERE receiver_pubkey = ?2)",
8✔
42
            params![pj_uri, &receiver_pubkey_bytes],
8✔
43
            |row| Ok((row.get(0)?, row.get(1)?)),
8✔
44
        )?;
×
45

46
        if duplicate_uri {
8✔
47
            return Err(Error::DuplicateSendSession(DuplicateKind::Uri));
2✔
48
        }
6✔
49
        if duplicate_rk {
6✔
50
            return Err(Error::DuplicateSendSession(DuplicateKind::ReceiverPubkey));
1✔
51
        }
5✔
52

53
        // Create a new session in send_sessions and get its ID
54
        let session_id: i64 = conn.query_row(
5✔
55
            "INSERT INTO send_sessions (pj_uri, receiver_pubkey) VALUES (?1, ?2) RETURNING session_id",
5✔
56
            params![pj_uri, &receiver_pubkey_bytes],
5✔
57
            |row| row.get(0),
5✔
58
        )?;
×
59

60
        Ok(Self { db, session_id: SessionId(session_id) })
5✔
61
    }
8✔
62

63
    pub fn from_id(db: Arc<Database>, id: SessionId) -> Self { Self { db, session_id: id } }
2✔
64

65
    pub fn session_id(&self) -> SessionId { self.session_id.clone() }
2✔
66
}
67
impl SessionPersister for SenderPersister {
68
    type SessionEvent = SenderSessionEvent;
69
    type InternalStorageError = crate::db::error::Error;
70

71
    fn save_event(
7✔
72
        &self,
7✔
73
        event: SenderSessionEvent,
7✔
74
    ) -> std::result::Result<(), Self::InternalStorageError> {
7✔
75
        let conn = self.db.get_connection()?;
7✔
76
        let event_data = serde_json::to_string(&event).map_err(Error::Serialize)?;
7✔
77

78
        conn.execute(
7✔
79
            "INSERT INTO send_session_events (session_id, event_data, created_at) VALUES (?1, ?2, ?3)",
7✔
80
            params![*self.session_id, event_data, now()],
7✔
81
        )?;
7✔
82

83
        Ok(())
7✔
84
    }
7✔
85

86
    fn load(
2✔
87
        &self,
2✔
88
    ) -> std::result::Result<Box<dyn Iterator<Item = SenderSessionEvent>>, Self::InternalStorageError>
2✔
89
    {
90
        let conn = self.db.get_connection()?;
2✔
91
        let mut stmt = conn.prepare(
2✔
92
            "SELECT event_data FROM send_session_events WHERE session_id = ?1 ORDER BY id ASC",
2✔
93
        )?;
2✔
94

95
        let event_rows = stmt.query_map(params![*self.session_id], |row| {
4✔
96
            let event_data: String = row.get(0)?;
4✔
97
            Ok(event_data)
4✔
98
        })?;
4✔
99

100
        let events: Vec<SenderSessionEvent> = event_rows
2✔
101
            .map(|row| {
4✔
102
                let event_data = row.expect("Failed to read event data from database");
4✔
103
                serde_json::from_str::<SenderSessionEvent>(&event_data)
4✔
104
                    .expect("Database corruption: failed to deserialize session event")
4✔
105
            })
4✔
106
            .collect();
2✔
107

108
        Ok(Box::new(events.into_iter()))
2✔
109
    }
2✔
110

111
    fn close(&self) -> std::result::Result<(), Self::InternalStorageError> {
3✔
112
        let conn = self.db.get_connection()?;
3✔
113

114
        conn.execute(
3✔
115
            "UPDATE send_sessions SET completed_at = ?1 WHERE session_id = ?2",
3✔
116
            params![now(), *self.session_id],
3✔
117
        )?;
3✔
118

119
        Ok(())
3✔
120
    }
3✔
121
}
122

123
#[derive(Clone)]
124
pub(crate) struct ReceiverPersister {
125
    db: Arc<Database>,
126
    session_id: SessionId,
127
}
128

129
impl ReceiverPersister {
130
    pub fn new(db: Arc<Database>) -> crate::db::Result<Self> {
3✔
131
        let conn = db.get_connection()?;
3✔
132

133
        // Create a new session in receive_sessions and get its ID
134
        let session_id: i64 = conn.query_row(
3✔
135
            "INSERT INTO receive_sessions (session_id) VALUES (NULL) RETURNING session_id",
3✔
136
            [],
3✔
137
            |row| row.get(0),
3✔
138
        )?;
×
139

140
        Ok(Self { db, session_id: SessionId(session_id) })
3✔
141
    }
3✔
142

143
    pub fn from_id(db: Arc<Database>, id: SessionId) -> Self { Self { db, session_id: id } }
3✔
144

NEW
145
    pub fn session_id(&self) -> SessionId { self.session_id.clone() }
×
146
}
147

148
impl SessionPersister for ReceiverPersister {
149
    type SessionEvent = ReceiverSessionEvent;
150
    type InternalStorageError = crate::db::error::Error;
151

152
    fn save_event(
15✔
153
        &self,
15✔
154
        event: ReceiverSessionEvent,
15✔
155
    ) -> std::result::Result<(), Self::InternalStorageError> {
15✔
156
        let conn = self.db.get_connection()?;
15✔
157
        let event_data = serde_json::to_string(&event).map_err(Error::Serialize)?;
15✔
158

159
        conn.execute(
15✔
160
            "INSERT INTO receive_session_events (session_id, event_data, created_at) VALUES (?1, ?2, ?3)",
15✔
161
            params![*self.session_id, event_data, now()],
15✔
162
        )?;
15✔
163

164
        Ok(())
15✔
165
    }
15✔
166

167
    fn load(
3✔
168
        &self,
3✔
169
    ) -> std::result::Result<
3✔
170
        Box<dyn Iterator<Item = ReceiverSessionEvent>>,
3✔
171
        Self::InternalStorageError,
3✔
172
    > {
3✔
173
        let conn = self.db.get_connection()?;
3✔
174
        let mut stmt = conn.prepare(
3✔
175
            "SELECT event_data FROM receive_session_events WHERE session_id = ?1 ORDER BY id ASC",
3✔
176
        )?;
3✔
177

178
        let event_rows = stmt.query_map(params![*self.session_id], |row| {
13✔
179
            let event_data: String = row.get(0)?;
13✔
180
            Ok(event_data)
13✔
181
        })?;
13✔
182

183
        let events: Vec<ReceiverSessionEvent> = event_rows
3✔
184
            .map(|row| {
13✔
185
                let event_data = row.expect("Failed to read event data from database");
13✔
186
                serde_json::from_str::<ReceiverSessionEvent>(&event_data)
13✔
187
                    .expect("Database corruption: failed to deserialize session event")
13✔
188
            })
13✔
189
            .collect();
3✔
190

191
        Ok(Box::new(events.into_iter()))
3✔
192
    }
3✔
193

194
    fn close(&self) -> std::result::Result<(), Self::InternalStorageError> {
2✔
195
        let conn = self.db.get_connection()?;
2✔
196

197
        conn.execute(
2✔
198
            "UPDATE receive_sessions SET completed_at = ?1 WHERE session_id = ?2",
2✔
199
            params![now(), *self.session_id],
2✔
200
        )?;
2✔
201

202
        Ok(())
2✔
203
    }
2✔
204
}
205

206
impl Database {
207
    pub(crate) fn get_recv_session_ids(&self) -> Result<Vec<SessionId>> {
5✔
208
        let conn = self.get_connection()?;
5✔
209
        let mut stmt =
5✔
210
            conn.prepare("SELECT session_id FROM receive_sessions WHERE completed_at IS NULL")?;
5✔
211

212
        let session_rows = stmt.query_map([], |row| {
5✔
213
            let session_id: i64 = row.get(0)?;
2✔
214
            Ok(SessionId(session_id))
2✔
215
        })?;
2✔
216

217
        let mut session_ids = Vec::new();
5✔
218
        for session_row in session_rows {
5✔
219
            let session_id = session_row?;
2✔
220
            session_ids.push(session_id);
2✔
221
        }
222

223
        Ok(session_ids)
5✔
224
    }
5✔
225

226
    pub(crate) fn get_send_session_ids(&self) -> Result<Vec<SessionId>> {
8✔
227
        let conn = self.get_connection()?;
8✔
228
        let mut stmt =
8✔
229
            conn.prepare("SELECT session_id FROM send_sessions WHERE completed_at IS NULL")?;
8✔
230

231
        let session_rows = stmt.query_map([], |row| {
8✔
232
            let session_id: i64 = row.get(0)?;
2✔
233
            Ok(SessionId(session_id))
2✔
234
        })?;
2✔
235

236
        let mut session_ids = Vec::new();
8✔
237
        for session_row in session_rows {
8✔
238
            let session_id = session_row?;
2✔
239
            session_ids.push(session_id);
2✔
240
        }
241

242
        Ok(session_ids)
8✔
243
    }
8✔
244

245
    pub(crate) fn get_send_session_receiver_pk(
1✔
246
        &self,
1✔
247
        session_id: &SessionId,
1✔
248
    ) -> Result<HpkePublicKey> {
1✔
249
        let conn = self.get_connection()?;
1✔
250
        let mut stmt =
1✔
251
            conn.prepare("SELECT receiver_pubkey FROM send_sessions WHERE session_id = ?1")?;
1✔
252
        let receiver_pubkey: Vec<u8> = stmt.query_row(params![session_id.0], |row| row.get(0))?;
1✔
253
        Ok(HpkePublicKey::from_compressed_bytes(&receiver_pubkey).expect("Valid receiver pubkey"))
1✔
254
    }
1✔
255

256
    pub(crate) fn get_inactive_send_session_ids(&self) -> Result<Vec<(SessionId, u64)>> {
×
257
        let conn = self.get_connection()?;
×
258
        let mut stmt = conn.prepare(
×
259
            "SELECT session_id, completed_at FROM send_sessions WHERE completed_at IS NOT NULL",
×
260
        )?;
×
261
        let session_rows = stmt.query_map([], |row| {
×
262
            let session_id: i64 = row.get(0)?;
×
263
            let completed_at: u64 = row.get(1)?;
×
264
            Ok((SessionId(session_id), completed_at))
×
265
        })?;
×
266

267
        let mut session_ids = Vec::new();
×
268
        for session_row in session_rows {
×
269
            let (session_id, completed_at) = session_row?;
×
270
            session_ids.push((session_id, completed_at));
×
271
        }
272
        Ok(session_ids)
×
273
    }
×
274

275
    pub(crate) fn get_inactive_recv_session_ids(&self) -> Result<Vec<(SessionId, u64)>> {
×
276
        let conn = self.get_connection()?;
×
277
        let mut stmt = conn.prepare(
×
278
            "SELECT session_id, completed_at FROM receive_sessions WHERE completed_at IS NOT NULL",
×
279
        )?;
×
280
        let session_rows = stmt.query_map([], |row| {
×
281
            let session_id: i64 = row.get(0)?;
×
282
            let completed_at: u64 = row.get(1)?;
×
283
            Ok((SessionId(session_id), completed_at))
×
284
        })?;
×
285

286
        let mut session_ids = Vec::new();
×
287
        for session_row in session_rows {
×
288
            let (session_id, completed_at) = session_row?;
×
289
            session_ids.push((session_id, completed_at));
×
290
        }
291
        Ok(session_ids)
×
292
    }
×
293
}
294

295
#[cfg(all(test, feature = "v2"))]
296
mod tests {
297
    use std::sync::Arc;
298

299
    use payjoin::HpkeKeyPair;
300

301
    use super::*;
302

303
    fn create_test_db() -> Arc<Database> {
3✔
304
        // Use an in-memory database for tests
305
        let manager = r2d2_sqlite::SqliteConnectionManager::memory()
3✔
306
            .with_init(|conn| conn.execute_batch("PRAGMA locking_mode = EXCLUSIVE;"));
30✔
307
        let pool = r2d2::Pool::new(manager).expect("pool creation should succeed");
3✔
308
        let conn = pool.get().expect("connection should succeed");
3✔
309
        Database::init_schema(&conn).expect("schema init should succeed");
3✔
310
        Arc::new(Database(pool))
3✔
311
    }
3✔
312

313
    fn make_receiver_pubkey() -> payjoin::HpkePublicKey { HpkeKeyPair::gen_keypair().1 }
5✔
314

315
    // Second call with the same URI (same active session) should return DuplicateSendSession(Uri).
316
    #[test]
317
    fn test_duplicate_uri_returns_error() {
1✔
318
        let db = create_test_db();
1✔
319
        let rk1 = make_receiver_pubkey();
1✔
320
        let rk2 = make_receiver_pubkey();
1✔
321
        let uri = "bitcoin:addr1?pj=https://example.com/BBBBBBBB";
1✔
322

323
        SenderPersister::new(db.clone(), uri, &rk1).expect("first session should succeed");
1✔
324

325
        let err = SenderPersister::new(db, uri, &rk2).expect_err("duplicate URI should fail");
1✔
326
        assert!(
1✔
327
            matches!(err, Error::DuplicateSendSession(DuplicateKind::Uri)),
1✔
328
            "expected DuplicateSendSession(Uri), got: {err:?}"
329
        );
330
    }
1✔
331

332
    // Same receiver pubkey under a different URI should return DuplicateSendSession(ReceiverPubkey).
333
    #[test]
334
    fn test_duplicate_rk_returns_error() {
1✔
335
        let db = create_test_db();
1✔
336
        let rk = make_receiver_pubkey();
1✔
337
        let uri1 = "bitcoin:addr1?pj=https://example.com/CCCCCCCC";
1✔
338
        let uri2 = "bitcoin:addr1?pj=https://example.com/DDDDDDDD";
1✔
339

340
        SenderPersister::new(db.clone(), uri1, &rk).expect("first session should succeed");
1✔
341

342
        let err = SenderPersister::new(db, uri2, &rk).expect_err("duplicate RK should fail");
1✔
343
        assert!(
1✔
344
            matches!(err, Error::DuplicateSendSession(DuplicateKind::ReceiverPubkey)),
1✔
345
            "expected DuplicateSendSession(ReceiverPubkey), got: {err:?}"
346
        );
347
    }
1✔
348

349
    // After a session is marked completed, a new session with the same URI must still be rejected
350
    // to prevent address reuse, HPKE receiver-key reuse
351
    #[test]
352
    fn test_completed_session_blocks_reuse() {
1✔
353
        let db = create_test_db();
1✔
354
        let rk1 = make_receiver_pubkey();
1✔
355
        let rk2 = make_receiver_pubkey();
1✔
356
        let uri = "bitcoin:addr1?pj=https://example.com/EEEEEEEE";
1✔
357

358
        let persister =
1✔
359
            SenderPersister::new(db.clone(), uri, &rk1).expect("first session should succeed");
1✔
360

361
        // Mark the session as completed
362
        use payjoin::persist::SessionPersister;
363
        persister.close().expect("close should succeed");
1✔
364

365
        // A new session with the same URI must be rejected even after completion
366
        let err = SenderPersister::new(db, uri, &rk2)
1✔
367
            .expect_err("reuse of a completed session URI must be rejected");
1✔
368
        assert!(
1✔
369
            matches!(err, Error::DuplicateSendSession(DuplicateKind::Uri)),
1✔
370
            "expected DuplicateSendSession(Uri), got: {err:?}"
371
        );
372
    }
1✔
373
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc