• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

payjoin / rust-payjoin / 17305051001

28 Aug 2025 06:47PM UTC coverage: 85.711% (-0.06%) from 85.772%
17305051001

Pull #1013

github

web-flow
Merge 5e9039fc1 into bffb9b55f
Pull Request #1013: Look up RK instead of looping

8 of 8 new or added lines in 1 file covered. (100.0%)

5 existing lines in 1 file now uncovered.

8002 of 9336 relevant lines covered (85.71%)

500.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.27
/payjoin-cli/src/db/v2.rs
1
use std::sync::Arc;
2

3
use payjoin::persist::SessionPersister;
4
use payjoin::receive::v2::SessionEvent as ReceiverSessionEvent;
5
use payjoin::send::v2::SessionEvent as SenderSessionEvent;
6
use payjoin::HpkePublicKey;
7
use rusqlite::{params, OptionalExtension};
8

9
use super::*;
10

11
#[derive(Debug, Clone)]
12
pub(crate) struct SessionId(i64);
13

14
impl core::ops::Deref for SessionId {
15
    type Target = i64;
16
    fn deref(&self) -> &Self::Target { &self.0 }
18✔
17
}
18

19
#[derive(Clone)]
20
pub(crate) struct SenderPersister {
21
    db: Arc<Database>,
22
    session_id: SessionId,
23
}
24

25
impl SenderPersister {
26
    pub fn new(db: Arc<Database>, receiver_pubkey: HpkePublicKey) -> crate::db::Result<Self> {
1✔
27
        let conn = db.get_connection()?;
1✔
28

29
        // Create a new session in send_sessions and get its ID
30
        let session_id: i64 = conn.query_row(
1✔
31
            "INSERT INTO send_sessions (session_id, receiver_pubkey) VALUES (NULL, ?1) RETURNING session_id",
1✔
32
            params![receiver_pubkey.to_compressed_bytes()],
1✔
33
            |row| row.get(0),
1✔
34
        )?;
×
35

36
        Ok(Self { db, session_id: SessionId(session_id) })
1✔
37
    }
1✔
38

39
    pub fn from_id(db: Arc<Database>, id: SessionId) -> crate::db::Result<Self> {
1✔
40
        Ok(Self { db, session_id: id })
1✔
41
    }
1✔
42
}
43

44
impl SessionPersister for SenderPersister {
45
    type SessionEvent = SenderSessionEvent;
46
    type InternalStorageError = crate::db::error::Error;
47

48
    fn save_event(
3✔
49
        &self,
3✔
50
        event: SenderSessionEvent,
3✔
51
    ) -> std::result::Result<(), Self::InternalStorageError> {
3✔
52
        let conn = self.db.get_connection()?;
3✔
53
        let event_data = serde_json::to_string(&event).map_err(Error::Serialize)?;
3✔
54

55
        conn.execute(
3✔
56
            "INSERT INTO send_session_events (session_id, event_data, created_at) VALUES (?1, ?2, ?3)",
3✔
57
            params![*self.session_id, event_data, now()],
3✔
58
        )?;
3✔
59

60
        Ok(())
3✔
61
    }
3✔
62

63
    fn load(
1✔
64
        &self,
1✔
65
    ) -> std::result::Result<Box<dyn Iterator<Item = SenderSessionEvent>>, Self::InternalStorageError>
1✔
66
    {
67
        let conn = self.db.get_connection()?;
1✔
68
        let mut stmt = conn.prepare(
1✔
69
            "SELECT event_data FROM send_session_events WHERE session_id = ?1 ORDER BY created_at ASC",
1✔
70
        )?;
1✔
71

72
        let event_rows = stmt.query_map(params![*self.session_id], |row| {
2✔
73
            let event_data: String = row.get(0)?;
2✔
74
            Ok(event_data)
2✔
75
        })?;
2✔
76

77
        let events: Vec<SenderSessionEvent> = event_rows
1✔
78
            .map(|row| {
2✔
79
                let event_data = row.expect("Failed to read event data from database");
2✔
80
                serde_json::from_str::<SenderSessionEvent>(&event_data)
2✔
81
                    .expect("Database corruption: failed to deserialize session event")
2✔
82
            })
2✔
83
            .collect();
1✔
84

85
        Ok(Box::new(events.into_iter()))
1✔
86
    }
1✔
87

88
    fn close(&self) -> std::result::Result<(), Self::InternalStorageError> {
1✔
89
        let conn = self.db.get_connection()?;
1✔
90

91
        conn.execute(
1✔
92
            "UPDATE send_sessions SET completed_at = ?1 WHERE session_id = ?2",
1✔
93
            params![now(), *self.session_id],
1✔
94
        )?;
1✔
95

96
        Ok(())
1✔
97
    }
1✔
98
}
99

100
#[derive(Clone)]
101
pub(crate) struct ReceiverPersister {
102
    db: Arc<Database>,
103
    session_id: SessionId,
104
}
105

106
impl ReceiverPersister {
107
    pub fn new(db: Arc<Database>) -> crate::db::Result<Self> {
1✔
108
        let conn = db.get_connection()?;
1✔
109

110
        // Create a new session in receive_sessions and get its ID
111
        let session_id: i64 = conn.query_row(
1✔
112
            "INSERT INTO receive_sessions (session_id) VALUES (NULL) RETURNING session_id",
1✔
113
            [],
1✔
114
            |row| row.get(0),
1✔
115
        )?;
×
116

117
        Ok(Self { db, session_id: SessionId(session_id) })
1✔
118
    }
1✔
119

120
    pub fn from_id(db: Arc<Database>, id: SessionId) -> crate::db::Result<Self> {
1✔
121
        Ok(Self { db, session_id: id })
1✔
122
    }
1✔
123
}
124

125
impl SessionPersister for ReceiverPersister {
126
    type SessionEvent = ReceiverSessionEvent;
127
    type InternalStorageError = crate::db::error::Error;
128

129
    fn save_event(
10✔
130
        &self,
10✔
131
        event: ReceiverSessionEvent,
10✔
132
    ) -> std::result::Result<(), Self::InternalStorageError> {
10✔
133
        let conn = self.db.get_connection()?;
10✔
134
        let event_data = serde_json::to_string(&event).map_err(Error::Serialize)?;
10✔
135

136
        conn.execute(
10✔
137
            "INSERT INTO receive_session_events (session_id, event_data, created_at) VALUES (?1, ?2, ?3)",
10✔
138
            params![*self.session_id, event_data, now()],
10✔
139
        )?;
10✔
140

141
        Ok(())
10✔
142
    }
10✔
143

144
    fn load(
2✔
145
        &self,
2✔
146
    ) -> std::result::Result<
2✔
147
        Box<dyn Iterator<Item = ReceiverSessionEvent>>,
2✔
148
        Self::InternalStorageError,
2✔
149
    > {
2✔
150
        let conn = self.db.get_connection()?;
2✔
151
        let mut stmt = conn.prepare(
2✔
152
            "SELECT event_data FROM receive_session_events WHERE session_id = ?1 ORDER BY created_at ASC",
2✔
153
        )?;
2✔
154

155
        let event_rows = stmt.query_map(params![*self.session_id], |row| {
2✔
156
            let event_data: String = row.get(0)?;
2✔
157
            Ok(event_data)
2✔
158
        })?;
2✔
159

160
        let events: Vec<ReceiverSessionEvent> = event_rows
2✔
161
            .map(|row| {
2✔
162
                let event_data = row.expect("Failed to read event data from database");
2✔
163
                serde_json::from_str::<ReceiverSessionEvent>(&event_data)
2✔
164
                    .expect("Database corruption: failed to deserialize session event")
2✔
165
            })
2✔
166
            .collect();
2✔
167

168
        Ok(Box::new(events.into_iter()))
2✔
169
    }
2✔
170

171
    fn close(&self) -> std::result::Result<(), Self::InternalStorageError> {
1✔
172
        let conn = self.db.get_connection()?;
1✔
173

174
        conn.execute(
1✔
175
            "UPDATE receive_sessions SET completed_at = ?1 WHERE session_id = ?2",
1✔
176
            params![now(), *self.session_id],
1✔
177
        )?;
1✔
178

179
        Ok(())
1✔
180
    }
1✔
181
}
182

183
impl Database {
184
    pub(crate) fn get_recv_session_ids(&self) -> Result<Vec<SessionId>> {
3✔
185
        let conn = self.get_connection()?;
3✔
186
        let mut stmt =
3✔
187
            conn.prepare("SELECT session_id FROM receive_sessions WHERE completed_at IS NULL")?;
3✔
188

189
        let session_rows = stmt.query_map([], |row| {
3✔
190
            let session_id: i64 = row.get(0)?;
1✔
191
            Ok(SessionId(session_id))
1✔
192
        })?;
1✔
193

194
        let mut session_ids = Vec::new();
3✔
195
        for session_row in session_rows {
4✔
196
            let session_id = session_row?;
1✔
197
            session_ids.push(session_id);
1✔
198
        }
199

200
        Ok(session_ids)
3✔
201
    }
3✔
202

203
    pub(crate) fn get_send_session_ids(&self) -> Result<Vec<SessionId>> {
3✔
204
        let conn = self.get_connection()?;
3✔
205
        let mut stmt =
3✔
206
            conn.prepare("SELECT session_id FROM send_sessions WHERE completed_at IS NULL")?;
3✔
207

208
        let session_rows = stmt.query_map([], |row| {
3✔
UNCOV
209
            let session_id: i64 = row.get(0)?;
×
UNCOV
210
            Ok(SessionId(session_id))
×
UNCOV
211
        })?;
×
212

213
        let mut session_ids = Vec::new();
3✔
214
        for session_row in session_rows {
3✔
UNCOV
215
            let session_id = session_row?;
×
UNCOV
216
            session_ids.push(session_id);
×
217
        }
218

219
        Ok(session_ids)
3✔
220
    }
3✔
221

222
    pub(crate) fn get_send_session_id_with_receiver_pk(
2✔
223
        &self,
2✔
224
        receiver_pk: &HpkePublicKey,
2✔
225
    ) -> Result<Option<SessionId>> {
2✔
226
        let conn = self.get_connection()?;
2✔
227
        let mut stmt =
2✔
228
            conn.prepare("SELECT session_id FROM send_sessions WHERE receiver_pubkey = ?1")?;
2✔
229
        let session_id = stmt
2✔
230
            .query_row(params![receiver_pk.to_compressed_bytes()], |row| row.get(0))
2✔
231
            .optional()?;
2✔
232
        Ok(session_id.map(SessionId))
2✔
233
    }
2✔
234
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc