• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

payjoin / rust-payjoin / 15767995657

19 Jun 2025 11:38PM UTC coverage: 86.087% (-0.05%) from 86.135%
15767995657

push

github

web-flow
Replace `Persister` with `SessionPersister` for `v2::Receiver` (#750)

Please take a look at individual commits for a more complete description
of the changes.
commits messages prefixed'd with "squash" indicates that they are meant
to be squash'd with the previous commit before this PR gets merged.

One open item before this is undrafted:
- [X] c7c31fcf4 introduces a In memory
receiver sesssion persister. It is accessible to the integration tests
but not to sub mod unit tests (?) . Need to move this persister impl a
common location or figure out why its not visibile to the unit tests.

710 of 823 new or added lines in 8 files covered. (86.27%)

5 existing lines in 3 files now uncovered.

7617 of 8848 relevant lines covered (86.09%)

525.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.24
/payjoin-cli/src/db/v2.rs
1
use std::sync::Arc;
2
use std::time::SystemTime;
3

4
use bitcoincore_rpc::jsonrpc::serde_json;
5
use payjoin::bitcoin::hex::DisplayHex;
6
use payjoin::persist::{Persister, SessionPersister, Value};
7
use payjoin::receive::v2::SessionEvent;
8
use payjoin::send::v2::{Sender, SenderToken, WithReplyKey};
9
use serde::{Deserialize, Serialize};
10
use sled::Tree;
11
use url::Url;
12

13
use super::*;
14

15
#[derive(Debug, Clone, Serialize, Deserialize)]
16
pub(crate) struct SessionWrapper<V> {
17
    pub(crate) completed_at: Option<SystemTime>,
18
    pub(crate) events: Vec<V>,
19
}
20

21
#[derive(Debug, Clone)]
22
pub struct SessionId([u8; 8]);
23

24
impl SessionId {
25
    pub fn new(id: u64) -> Self { Self(id.to_be_bytes()) }
2✔
26
}
27

28
impl AsRef<[u8]> for SessionId {
29
    fn as_ref(&self) -> &[u8] { self.0.as_ref() }
13✔
30
}
31

32
pub(crate) struct SenderPersister(Arc<Database>);
33
impl SenderPersister {
34
    pub fn new(db: Arc<Database>) -> Self { Self(db) }
1✔
35
}
36

37
impl Persister<Sender<WithReplyKey>> for SenderPersister {
38
    type Token = SenderToken;
39
    type Error = crate::db::error::Error;
40
    fn save(
1✔
41
        &mut self,
1✔
42
        value: Sender<WithReplyKey>,
1✔
43
    ) -> std::result::Result<SenderToken, Self::Error> {
1✔
44
        let send_tree = self.0 .0.open_tree("send_sessions")?;
1✔
45
        let key = value.key();
1✔
46
        let value = serde_json::to_vec(&value).map_err(Error::Serialize)?;
1✔
47
        send_tree.insert(key.clone(), value.as_slice())?;
1✔
48
        send_tree.flush()?;
1✔
49
        Ok(key)
1✔
50
    }
1✔
51

52
    fn load(&self, key: SenderToken) -> std::result::Result<Sender<WithReplyKey>, Self::Error> {
1✔
53
        let send_tree = self.0 .0.open_tree("send_sessions")?;
1✔
54
        let value = send_tree.get(key.as_ref())?.ok_or(Error::NotFound(key.to_string()))?;
1✔
55
        serde_json::from_slice(&value).map_err(Error::Deserialize)
1✔
56
    }
1✔
57
}
58

59
#[derive(Clone)]
60
pub(crate) struct ReceiverPersister {
61
    db: Arc<Database>,
62
    session_id: SessionId,
63
}
64
impl ReceiverPersister {
65
    pub fn new(db: Arc<Database>) -> crate::db::Result<Self> {
1✔
66
        let id = SessionId::new(db.0.generate_id()?);
1✔
67
        let recv_tree = db.0.open_tree("recv_sessions")?;
1✔
68
        let empty_session: SessionWrapper<SessionEvent> =
1✔
69
            SessionWrapper { completed_at: None, events: vec![] };
1✔
70
        let value = serde_json::to_vec(&empty_session).map_err(Error::Serialize)?;
1✔
71
        recv_tree.insert(id.as_ref(), value.as_slice())?;
1✔
72
        recv_tree.flush()?;
1✔
73

74
        Ok(Self { db: db.clone(), session_id: id })
1✔
75
    }
1✔
76

77
    pub fn from_id(db: Arc<Database>, id: SessionId) -> crate::db::Result<Self> {
1✔
78
        Ok(Self { db: db.clone(), session_id: id })
1✔
79
    }
1✔
80
}
81

82
impl SessionPersister for ReceiverPersister {
83
    type SessionEvent = SessionEvent;
84
    type InternalStorageError = crate::db::error::Error;
85

86
    fn save_event(
9✔
87
        &self,
9✔
88
        event: &SessionEvent,
9✔
89
    ) -> std::result::Result<(), Self::InternalStorageError> {
9✔
90
        let recv_tree = self.db.0.open_tree("recv_sessions")?;
9✔
91
        let key = self.session_id.as_ref();
9✔
92
        let session =
9✔
93
            recv_tree.get(key)?.ok_or(Error::NotFound(key.to_vec().to_lower_hex_string()))?;
9✔
94
        let mut session_wrapper: SessionWrapper<SessionEvent> =
9✔
95
            serde_json::from_slice(&session).map_err(Error::Deserialize)?;
9✔
96
        session_wrapper.events.push(event.clone());
9✔
97
        let value = serde_json::to_vec(&session_wrapper).map_err(Error::Serialize)?;
9✔
98
        recv_tree.insert(key, value.as_slice())?;
9✔
99
        recv_tree.flush()?;
9✔
100
        Ok(())
9✔
101
    }
9✔
102

103
    fn load(
2✔
104
        &self,
2✔
105
    ) -> std::result::Result<Box<dyn Iterator<Item = SessionEvent>>, Self::InternalStorageError>
2✔
106
    {
2✔
107
        let recv_tree = self.db.0.open_tree("recv_sessions")?;
2✔
108
        let session_wrapper = recv_tree.get(self.session_id.as_ref())?;
2✔
109
        let value = session_wrapper.expect("key should exist");
2✔
110
        let wrapper: SessionWrapper<SessionEvent> =
2✔
111
            serde_json::from_slice(&value).map_err(Error::Deserialize)?;
2✔
112
        Ok(Box::new(wrapper.events.into_iter()))
2✔
113
    }
2✔
114

115
    fn close(&self) -> std::result::Result<(), Self::InternalStorageError> {
1✔
116
        let recv_tree = self.db.0.open_tree("recv_sessions")?;
1✔
117
        let key = self.session_id.as_ref();
1✔
118
        if let Some(existing) = recv_tree.get(key)? {
1✔
119
            let mut wrapper: SessionWrapper<SessionEvent> =
1✔
120
                serde_json::from_slice(&existing).map_err(Error::Deserialize)?;
1✔
121
            wrapper.completed_at = Some(SystemTime::now());
1✔
122
            let value = serde_json::to_vec(&wrapper).map_err(Error::Serialize)?;
1✔
123
            recv_tree.insert(key, value.as_slice())?;
1✔
NEW
124
        }
×
125
        recv_tree.flush()?;
1✔
126
        Ok(())
1✔
127
    }
1✔
128
}
129

130
impl Database {
131
    pub(crate) fn get_recv_session_ids(&self) -> Result<Vec<SessionId>> {
1✔
132
        let recv_tree = self.0.open_tree("recv_sessions")?;
1✔
133
        let mut session_ids = Vec::new();
1✔
134
        for item in recv_tree.iter() {
1✔
135
            let (key, _) = item?;
1✔
136
            session_ids.push(SessionId::new(u64::from_be_bytes(
1✔
137
                key.as_ref().try_into().map_err(Error::TryFromSlice)?,
1✔
138
            )));
139
        }
140
        Ok(session_ids)
1✔
141
    }
1✔
142

143
    pub(crate) fn get_send_sessions(&self) -> Result<Vec<Sender<WithReplyKey>>> {
1✔
144
        let send_tree: Tree = self.0.open_tree("send_sessions")?;
1✔
145
        let mut sessions = Vec::new();
1✔
146
        for item in send_tree.iter() {
1✔
147
            let (_, value) = item?;
×
148
            let session: Sender<WithReplyKey> =
×
149
                serde_json::from_slice(&value).map_err(Error::Deserialize)?;
×
150
            sessions.push(session);
×
151
        }
152
        Ok(sessions)
1✔
153
    }
1✔
154

155
    pub(crate) fn get_send_session(&self, pj_url: &Url) -> Result<Option<Sender<WithReplyKey>>> {
2✔
156
        let send_tree = self.0.open_tree("send_sessions")?;
2✔
157
        if let Some(val) = send_tree.get(pj_url.as_str())? {
2✔
158
            let session: Sender<WithReplyKey> =
1✔
159
                serde_json::from_slice(&val).map_err(Error::Deserialize)?;
1✔
160
            Ok(Some(session))
1✔
161
        } else {
162
            Ok(None)
1✔
163
        }
164
    }
2✔
165

166
    pub(crate) fn clear_send_session(&self, pj_url: &Url) -> Result<()> {
1✔
167
        let send_tree: Tree = self.0.open_tree("send_sessions")?;
1✔
168
        send_tree.remove(pj_url.as_str())?;
1✔
169
        send_tree.flush()?;
1✔
170
        Ok(())
1✔
171
    }
1✔
172
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc