• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

payjoin / rust-payjoin / 17444435859

03 Sep 2025 07:49PM UTC coverage: 84.73% (-1.2%) from 85.946%
17444435859

Pull #1039

github

web-flow
Merge e78b4ff91 into 31258eb86
Pull Request #1039: WIP - display session history

1 of 138 new or added lines in 6 files covered. (0.72%)

29 existing lines in 1 file now uncovered.

8201 of 9679 relevant lines covered (84.73%)

482.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.45
/payjoin-cli/src/db/v2.rs
1
use std::sync::Arc;
2

3
use payjoin::persist::SessionPersister;
4
use payjoin::receive::v2::SessionEvent as ReceiverSessionEvent;
5
use payjoin::send::v2::SessionEvent as SenderSessionEvent;
6
use payjoin::HpkePublicKey;
7
use rusqlite::params;
8

9
use super::*;
10

11
#[derive(Debug, Clone)]
12
pub(crate) struct SessionId(i64);
13

14
impl core::ops::Deref for SessionId {
15
    type Target = i64;
16
    fn deref(&self) -> &Self::Target { &self.0 }
18✔
17
}
18

19
impl std::fmt::Display for SessionId {
NEW
20
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.0) }
×
21
}
22

23
#[derive(Clone)]
24
pub(crate) struct SenderPersister {
25
    db: Arc<Database>,
26
    session_id: SessionId,
27
}
28

29
impl SenderPersister {
30
    pub fn new(db: Arc<Database>, receiver_pubkey: HpkePublicKey) -> crate::db::Result<Self> {
1✔
31
        let conn = db.get_connection()?;
1✔
32

33
        // Create a new session in send_sessions and get its ID
34
        let session_id: i64 = conn.query_row(
1✔
35
            "INSERT INTO send_sessions (session_id, receiver_pubkey) VALUES (NULL, ?1) RETURNING session_id",
1✔
36
            params![receiver_pubkey.to_compressed_bytes()],
1✔
37
            |row| row.get(0),
1✔
38
        )?;
×
39

40
        Ok(Self { db, session_id: SessionId(session_id) })
1✔
41
    }
1✔
42

43
    pub fn from_id(db: Arc<Database>, id: SessionId) -> crate::db::Result<Self> {
1✔
44
        Ok(Self { db, session_id: id })
1✔
45
    }
1✔
46
}
47

48
impl SessionPersister for SenderPersister {
49
    type SessionEvent = SenderSessionEvent;
50
    type InternalStorageError = crate::db::error::Error;
51

52
    fn save_event(
3✔
53
        &self,
3✔
54
        event: SenderSessionEvent,
3✔
55
    ) -> std::result::Result<(), Self::InternalStorageError> {
3✔
56
        let conn = self.db.get_connection()?;
3✔
57
        let event_data = serde_json::to_string(&event).map_err(Error::Serialize)?;
3✔
58

59
        conn.execute(
3✔
60
            "INSERT INTO send_session_events (session_id, event_data, created_at) VALUES (?1, ?2, ?3)",
3✔
61
            params![*self.session_id, event_data, now()],
3✔
62
        )?;
3✔
63

64
        Ok(())
3✔
65
    }
3✔
66

67
    fn load(
1✔
68
        &self,
1✔
69
    ) -> std::result::Result<Box<dyn Iterator<Item = SenderSessionEvent>>, Self::InternalStorageError>
1✔
70
    {
71
        let conn = self.db.get_connection()?;
1✔
72
        let mut stmt = conn.prepare(
1✔
73
            "SELECT event_data FROM send_session_events WHERE session_id = ?1 ORDER BY created_at ASC",
1✔
74
        )?;
1✔
75

76
        let event_rows = stmt.query_map(params![*self.session_id], |row| {
2✔
77
            let event_data: String = row.get(0)?;
2✔
78
            Ok(event_data)
2✔
79
        })?;
2✔
80

81
        let events: Vec<SenderSessionEvent> = event_rows
1✔
82
            .map(|row| {
2✔
83
                let event_data = row.expect("Failed to read event data from database");
2✔
84
                serde_json::from_str::<SenderSessionEvent>(&event_data)
2✔
85
                    .expect("Database corruption: failed to deserialize session event")
2✔
86
            })
2✔
87
            .collect();
1✔
88

89
        Ok(Box::new(events.into_iter()))
1✔
90
    }
1✔
91

92
    fn close(&self) -> std::result::Result<(), Self::InternalStorageError> {
1✔
93
        let conn = self.db.get_connection()?;
1✔
94

95
        conn.execute(
1✔
96
            "UPDATE send_sessions SET completed_at = ?1 WHERE session_id = ?2",
1✔
97
            params![now(), *self.session_id],
1✔
98
        )?;
1✔
99

100
        Ok(())
1✔
101
    }
1✔
102
}
103

104
#[derive(Clone)]
105
pub(crate) struct ReceiverPersister {
106
    db: Arc<Database>,
107
    session_id: SessionId,
108
}
109

110
impl ReceiverPersister {
111
    pub fn new(db: Arc<Database>) -> crate::db::Result<Self> {
1✔
112
        let conn = db.get_connection()?;
1✔
113

114
        // Create a new session in receive_sessions and get its ID
115
        let session_id: i64 = conn.query_row(
1✔
116
            "INSERT INTO receive_sessions (session_id) VALUES (NULL) RETURNING session_id",
1✔
117
            [],
1✔
118
            |row| row.get(0),
1✔
119
        )?;
×
120

121
        Ok(Self { db, session_id: SessionId(session_id) })
1✔
122
    }
1✔
123

124
    pub fn from_id(db: Arc<Database>, id: SessionId) -> crate::db::Result<Self> {
1✔
125
        Ok(Self { db, session_id: id })
1✔
126
    }
1✔
127
}
128

129
impl SessionPersister for ReceiverPersister {
130
    type SessionEvent = ReceiverSessionEvent;
131
    type InternalStorageError = crate::db::error::Error;
132

133
    fn save_event(
10✔
134
        &self,
10✔
135
        event: ReceiverSessionEvent,
10✔
136
    ) -> std::result::Result<(), Self::InternalStorageError> {
10✔
137
        let conn = self.db.get_connection()?;
10✔
138
        let event_data = serde_json::to_string(&event).map_err(Error::Serialize)?;
10✔
139

140
        conn.execute(
10✔
141
            "INSERT INTO receive_session_events (session_id, event_data, created_at) VALUES (?1, ?2, ?3)",
10✔
142
            params![*self.session_id, event_data, now()],
10✔
143
        )?;
10✔
144

145
        Ok(())
10✔
146
    }
10✔
147

148
    fn load(
2✔
149
        &self,
2✔
150
    ) -> std::result::Result<
2✔
151
        Box<dyn Iterator<Item = ReceiverSessionEvent>>,
2✔
152
        Self::InternalStorageError,
2✔
153
    > {
2✔
154
        let conn = self.db.get_connection()?;
2✔
155
        let mut stmt = conn.prepare(
2✔
156
            "SELECT event_data FROM receive_session_events WHERE session_id = ?1 ORDER BY created_at ASC",
2✔
157
        )?;
2✔
158

159
        let event_rows = stmt.query_map(params![*self.session_id], |row| {
2✔
160
            let event_data: String = row.get(0)?;
2✔
161
            Ok(event_data)
2✔
162
        })?;
2✔
163

164
        let events: Vec<ReceiverSessionEvent> = event_rows
2✔
165
            .map(|row| {
2✔
166
                let event_data = row.expect("Failed to read event data from database");
2✔
167
                serde_json::from_str::<ReceiverSessionEvent>(&event_data)
2✔
168
                    .expect("Database corruption: failed to deserialize session event")
2✔
169
            })
2✔
170
            .collect();
2✔
171

172
        Ok(Box::new(events.into_iter()))
2✔
173
    }
2✔
174

175
    fn close(&self) -> std::result::Result<(), Self::InternalStorageError> {
1✔
176
        let conn = self.db.get_connection()?;
1✔
177

178
        conn.execute(
1✔
179
            "UPDATE receive_sessions SET completed_at = ?1 WHERE session_id = ?2",
1✔
180
            params![now(), *self.session_id],
1✔
181
        )?;
1✔
182

183
        Ok(())
1✔
184
    }
1✔
185
}
186

187
impl Database {
188
    pub(crate) fn get_recv_session_ids(&self) -> Result<Vec<SessionId>> {
3✔
189
        let conn = self.get_connection()?;
3✔
190
        let mut stmt =
3✔
191
            conn.prepare("SELECT session_id FROM receive_sessions WHERE completed_at IS NULL")?;
3✔
192

193
        let session_rows = stmt.query_map([], |row| {
3✔
194
            let session_id: i64 = row.get(0)?;
1✔
195
            Ok(SessionId(session_id))
1✔
196
        })?;
1✔
197

198
        let mut session_ids = Vec::new();
3✔
199
        for session_row in session_rows {
4✔
200
            let session_id = session_row?;
1✔
201
            session_ids.push(session_id);
1✔
202
        }
203

204
        Ok(session_ids)
3✔
205
    }
3✔
206

207
    pub(crate) fn get_send_session_ids(&self) -> Result<Vec<SessionId>> {
5✔
208
        let conn = self.get_connection()?;
5✔
209
        let mut stmt =
5✔
210
            conn.prepare("SELECT session_id FROM send_sessions WHERE completed_at IS NULL")?;
5✔
211

212
        let session_rows = stmt.query_map([], |row| {
5✔
213
            let session_id: i64 = row.get(0)?;
1✔
214
            Ok(SessionId(session_id))
1✔
215
        })?;
1✔
216

217
        let mut session_ids = Vec::new();
5✔
218
        for session_row in session_rows {
6✔
219
            let session_id = session_row?;
1✔
220
            session_ids.push(session_id);
1✔
221
        }
222

223
        Ok(session_ids)
5✔
224
    }
5✔
225

226
    pub(crate) fn get_send_session_receiver_pk(
1✔
227
        &self,
1✔
228
        session_id: &SessionId,
1✔
229
    ) -> Result<HpkePublicKey> {
1✔
230
        let conn = self.get_connection()?;
1✔
231
        let mut stmt =
1✔
232
            conn.prepare("SELECT receiver_pubkey FROM send_sessions WHERE session_id = ?1")?;
1✔
233
        let receiver_pubkey: Vec<u8> = stmt.query_row(params![session_id.0], |row| row.get(0))?;
1✔
234
        Ok(HpkePublicKey::from_compressed_bytes(&receiver_pubkey).expect("Valid receiver pubkey"))
1✔
235
    }
1✔
236

NEW
237
    pub(crate) fn get_inactive_send_session_ids(&self) -> Result<Vec<(SessionId, u64)>> {
×
NEW
238
        let conn = self.get_connection()?;
×
NEW
239
        let mut stmt = conn.prepare(
×
NEW
240
            "SELECT session_id, completed_at FROM send_sessions WHERE completed_at IS NOT NULL",
×
NEW
241
        )?;
×
NEW
242
        let session_rows = stmt.query_map([], |row| {
×
NEW
243
            let session_id: i64 = row.get(0)?;
×
NEW
244
            let completed_at: u64 = row.get(1)?;
×
NEW
245
            Ok((SessionId(session_id), completed_at))
×
NEW
246
        })?;
×
247

NEW
248
        let mut session_ids = Vec::new();
×
NEW
249
        for session_row in session_rows {
×
NEW
250
            let (session_id, completed_at) = session_row?;
×
NEW
251
            session_ids.push((session_id, completed_at));
×
252
        }
NEW
253
        Ok(session_ids)
×
NEW
254
    }
×
255

NEW
256
    pub(crate) fn get_inactive_recv_session_ids(&self) -> Result<Vec<(SessionId, u64)>> {
×
NEW
257
        let conn = self.get_connection()?;
×
NEW
258
        let mut stmt = conn.prepare(
×
NEW
259
            "SELECT session_id, completed_at FROM receive_sessions WHERE completed_at IS NOT NULL",
×
NEW
260
        )?;
×
NEW
261
        let session_rows = stmt.query_map([], |row| {
×
NEW
262
            let session_id: i64 = row.get(0)?;
×
NEW
263
            let completed_at: u64 = row.get(1)?;
×
NEW
264
            Ok((SessionId(session_id), completed_at))
×
NEW
265
        })?;
×
266

NEW
267
        let mut session_ids = Vec::new();
×
NEW
268
        for session_row in session_rows {
×
NEW
269
            let (session_id, completed_at) = session_row?;
×
NEW
270
            session_ids.push((session_id, completed_at));
×
271
        }
NEW
272
        Ok(session_ids)
×
NEW
273
    }
×
274
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc