• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

payjoin / rust-payjoin / 15767995657

19 Jun 2025 11:38PM UTC coverage: 86.087% (-0.05%) from 86.135%
15767995657

push

github

web-flow
Replace `Persister` with `SessionPersister` for `v2::Receiver` (#750)

Please take a look at individual commits for a more complete description
of the changes.
commits messages prefixed'd with "squash" indicates that they are meant
to be squash'd with the previous commit before this PR gets merged.

One open item before this is undrafted:
- [X] c7c31fcf4 introduces a In memory
receiver sesssion persister. It is accessible to the integration tests
but not to sub mod unit tests (?) . Need to move this persister impl a
common location or figure out why its not visibile to the unit tests.

710 of 823 new or added lines in 8 files covered. (86.27%)

5 existing lines in 3 files now uncovered.

7617 of 8848 relevant lines covered (86.09%)

525.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.02
/payjoin-cli/src/app/v2/mod.rs
1
use std::sync::{Arc, Mutex};
2

3
use anyhow::{anyhow, Context, Result};
4
use payjoin::bitcoin::consensus::encode::serialize_hex;
5
use payjoin::bitcoin::psbt::Psbt;
6
use payjoin::bitcoin::{Amount, FeeRate};
7
use payjoin::persist::OptionalTransitionOutcome;
8
use payjoin::receive::v2::{
9
    process_err_res, replay_event_log as replay_receiver_event_log, MaybeInputsOwned,
10
    MaybeInputsSeen, OutputsUnknown, PayjoinProposal, ProvisionalProposal, Receiver,
11
    ReceiverTypeState, SessionHistory, UncheckedProposal, WantsInputs, WantsOutputs, WithContext,
12
};
13
use payjoin::send::v2::{Sender, SenderBuilder, WithReplyKey};
14
use payjoin::Uri;
15
use tokio::sync::watch;
16

17
use super::config::Config;
18
use super::wallet::BitcoindWallet;
19
use super::App as AppTrait;
20
use crate::app::v2::ohttp::{unwrap_ohttp_keys_or_else_fetch, RelayManager};
21
use crate::app::{handle_interrupt, http_agent};
22
use crate::db::v2::{ReceiverPersister, SenderPersister};
23
use crate::db::Database;
24

25
mod ohttp;
26

27
#[derive(Clone)]
28
pub(crate) struct App {
29
    config: Config,
30
    db: Arc<Database>,
31
    wallet: BitcoindWallet,
32
    interrupt: watch::Receiver<()>,
33
    relay_manager: Arc<Mutex<RelayManager>>,
34
}
35

36
#[async_trait::async_trait]
37
impl AppTrait for App {
38
    fn new(config: Config) -> Result<Self> {
4✔
39
        let db = Arc::new(Database::create(&config.db_path)?);
4✔
40
        let relay_manager = Arc::new(Mutex::new(RelayManager::new()));
4✔
41
        let (interrupt_tx, interrupt_rx) = watch::channel(());
4✔
42
        tokio::spawn(handle_interrupt(interrupt_tx));
4✔
43
        let wallet = BitcoindWallet::new(&config.bitcoind)?;
4✔
44
        let app = Self { config, db, wallet, interrupt: interrupt_rx, relay_manager };
4✔
45
        app.wallet()
4✔
46
            .network()
4✔
47
            .context("Failed to connect to bitcoind. Check config RPC connection.")?;
4✔
48
        Ok(app)
4✔
49
    }
4✔
50

51
    fn wallet(&self) -> BitcoindWallet { self.wallet.clone() }
14✔
52

53
    async fn send_payjoin(&self, bip21: &str, fee_rate: FeeRate) -> Result<()> {
2✔
54
        use payjoin::UriExt;
55
        let uri =
2✔
56
            Uri::try_from(bip21).map_err(|e| anyhow!("Failed to create URI from BIP21: {}", e))?;
2✔
57
        let uri = uri.assume_checked();
2✔
58
        let uri = uri.check_pj_supported().map_err(|_| anyhow!("URI does not support Payjoin"))?;
2✔
59
        let url = uri.extras.endpoint();
2✔
60
        // match bip21 to send_session public_key
61
        let req_ctx = match self.db.get_send_session(url)? {
2✔
62
            Some(send_session) => send_session,
1✔
63
            None => {
64
                let psbt = self.create_original_psbt(&uri, fee_rate)?;
1✔
65
                let mut persister = SenderPersister::new(self.db.clone());
1✔
66
                let new_sender = SenderBuilder::new(psbt, uri.clone())
1✔
67
                    .build_recommended(fee_rate)
1✔
68
                    .with_context(|| "Failed to build payjoin request")?;
1✔
69
                let storage_token = new_sender
1✔
70
                    .persist(&mut persister)
1✔
71
                    .map_err(|e| anyhow!("Failed to persist sender: {}", e))?;
1✔
72
                Sender::load(storage_token, &persister)
1✔
73
                    .map_err(|e| anyhow!("Failed to load sender: {}", e))?
1✔
74
            }
75
        };
76
        self.spawn_payjoin_sender(req_ctx).await
2✔
77
    }
4✔
78

79
    async fn receive_payjoin(&self, amount: Amount) -> Result<()> {
1✔
80
        let address = self.wallet().get_new_address()?;
1✔
81
        let ohttp_keys =
1✔
82
            unwrap_ohttp_keys_or_else_fetch(&self.config, None, self.relay_manager.clone())
1✔
83
                .await?
1✔
84
                .ohttp_keys;
85
        let persister = ReceiverPersister::new(self.db.clone())?;
1✔
86
        let session = Receiver::create_session(
1✔
87
            address,
1✔
88
            self.config.v2()?.pj_directory.clone(),
1✔
89
            ohttp_keys,
1✔
90
            None,
1✔
91
        )
1✔
92
        .save(&persister)?;
1✔
93
        println!("Receive session established");
1✔
94
        let mut pj_uri = session.pj_uri();
1✔
95
        pj_uri.amount = Some(amount);
1✔
96
        println!("Request Payjoin by sharing this Payjoin Uri:");
1✔
97
        println!("{}", pj_uri);
1✔
98

1✔
99
        self.process_receiver_session(ReceiverTypeState::WithContext(session.clone()), &persister)
1✔
100
            .await?;
1✔
NEW
101
        Ok(())
×
102
    }
2✔
103

104
    #[allow(clippy::incompatible_msrv)]
105
    async fn resume_payjoins(&self) -> Result<()> {
1✔
106
        let recv_session_ids = self.db.get_recv_session_ids()?;
1✔
107
        let send_sessions = self.db.get_send_sessions()?;
1✔
108

109
        if recv_session_ids.is_empty() && send_sessions.is_empty() {
1✔
110
            println!("No sessions to resume.");
×
111
            return Ok(());
×
112
        }
1✔
113

1✔
114
        let mut tasks = Vec::new();
1✔
115

116
        for session_id in recv_session_ids {
2✔
117
            let self_clone = self.clone();
1✔
118
            let recv_persister = ReceiverPersister::from_id(self.db.clone(), session_id)?;
1✔
119
            let receiver_state = replay_receiver_event_log(&recv_persister)
1✔
120
                .map_err(|e| anyhow!("Failed to replay receiver event log: {:?}", e))?
1✔
121
                .0;
122
            tasks.push(tokio::spawn(async move {
1✔
123
                self_clone.process_receiver_session(receiver_state, &recv_persister).await
1✔
124
            }));
1✔
125
        }
1✔
126

127
        for session in send_sessions {
1✔
128
            let self_clone = self.clone();
×
129
            tasks.push(tokio::spawn(async move { self_clone.spawn_payjoin_sender(session).await }));
×
130
        }
×
131

132
        let mut interrupt = self.interrupt.clone();
1✔
133
        tokio::select! {
1✔
134
            _ = async {
1✔
135
                for task in tasks {
2✔
136
                    let _ = task.await;
1✔
137
                }
138
            } => {
1✔
139
                println!("All resumed sessions completed.");
1✔
140
            }
1✔
141
            _ = interrupt.changed() => {
1✔
UNCOV
142
                println!("Resumed sessions were interrupted.");
×
UNCOV
143
            }
×
144
        }
145
        Ok(())
1✔
146
    }
2✔
147
}
148

149
impl App {
150
    #[allow(clippy::incompatible_msrv)]
151
    async fn spawn_payjoin_sender(&self, mut req_ctx: Sender<WithReplyKey>) -> Result<()> {
2✔
152
        let mut interrupt = self.interrupt.clone();
2✔
153
        tokio::select! {
2✔
154
            res = self.long_poll_post(&mut req_ctx) => {
2✔
155
                self.process_pj_response(res?)?;
1✔
156
                self.db.clear_send_session(req_ctx.endpoint())?;
1✔
157
            }
158
            _ = interrupt.changed() => {
2✔
159
                println!("Interrupted. Call `send` with the same arguments to resume this session or `resume` to resume all sessions.");
1✔
160
            }
1✔
161
        }
162
        Ok(())
2✔
163
    }
2✔
164

165
    async fn long_poll_post(&self, req_ctx: &mut Sender<WithReplyKey>) -> Result<Psbt> {
2✔
166
        let ohttp_relay = self.unwrap_relay_or_else_fetch(Some(req_ctx.endpoint().clone())).await?;
2✔
167

168
        match req_ctx.extract_v2(ohttp_relay.clone()) {
2✔
169
            Ok((req, ctx)) => {
2✔
170
                println!("Posting Original PSBT Payload request...");
2✔
171
                let response = post_request(req).await?;
2✔
172
                println!("Sent fallback transaction");
2✔
173
                let v2_ctx = Arc::new(ctx.process_response(&response.bytes().await?)?);
2✔
174
                loop {
175
                    let (req, ohttp_ctx) = v2_ctx.extract_req(&ohttp_relay)?;
3✔
176
                    let response = post_request(req).await?;
3✔
177
                    match v2_ctx.process_response(&response.bytes().await?, ohttp_ctx) {
2✔
178
                        Ok(Some(psbt)) => return Ok(psbt),
1✔
179
                        Ok(None) => {
1✔
180
                            println!("No response yet.");
1✔
181
                        }
1✔
182
                        Err(re) => {
×
183
                            println!("{re}");
×
184
                            log::debug!("{re:?}");
×
185
                            return Err(anyhow!("Response error").context(re));
×
186
                        }
187
                    }
188
                }
189
            }
190
            Err(_) => {
191
                let (req, v1_ctx) = req_ctx.extract_v1();
×
192
                println!("Posting Original PSBT Payload request...");
×
193
                let response = post_request(req).await?;
×
194
                println!("Sent fallback transaction");
×
195
                match v1_ctx.process_response(&response.bytes().await?) {
×
196
                    Ok(psbt) => Ok(psbt),
×
197
                    Err(re) => {
×
198
                        println!("{re}");
×
199
                        log::debug!("{re:?}");
×
200
                        Err(anyhow!("Response error").context(re))
×
201
                    }
202
                }
203
            }
204
        }
205
    }
1✔
206

207
    async fn long_poll_fallback(
2✔
208
        &self,
2✔
209
        session: Receiver<WithContext>,
2✔
210
        persister: &ReceiverPersister,
2✔
211
    ) -> Result<Receiver<UncheckedProposal>> {
2✔
212
        let ohttp_relay = self
2✔
213
            .unwrap_relay_or_else_fetch(Some(session.pj_uri().extras.endpoint().clone()))
2✔
214
            .await?;
2✔
215

216
        let mut session = session;
2✔
217
        loop {
218
            let (req, context) = session.extract_req(&ohttp_relay)?;
2✔
219
            println!("Polling receive request...");
2✔
220
            let ohttp_response = post_request(req).await?;
2✔
221
            let state_transition = session
1✔
222
                .process_res(ohttp_response.bytes().await?.to_vec().as_slice(), context)
1✔
223
                .save(persister);
1✔
224
            match state_transition {
1✔
225
                Ok(OptionalTransitionOutcome::Progress(next_state)) => {
1✔
226
                    println!("Got a request from the sender. Responding with a Payjoin proposal.");
1✔
227
                    return Ok(next_state);
1✔
228
                }
NEW
229
                Ok(OptionalTransitionOutcome::Stasis(current_state)) => {
×
NEW
230
                    session = current_state;
×
NEW
231
                    continue;
×
232
                }
NEW
233
                Err(e) => return Err(e.into()),
×
234
            }
235
        }
236
    }
1✔
237

238
    async fn process_receiver_session(
2✔
239
        &self,
2✔
240
        session: ReceiverTypeState,
2✔
241
        persister: &ReceiverPersister,
2✔
242
    ) -> Result<()> {
2✔
243
        let res = {
2✔
244
            match session {
2✔
245
                ReceiverTypeState::WithContext(proposal) =>
2✔
246
                    self.read_from_directory(proposal, persister).await,
2✔
NEW
247
                ReceiverTypeState::UncheckedProposal(proposal) =>
×
NEW
248
                    self.check_proposal(proposal, persister).await,
×
NEW
249
                ReceiverTypeState::MaybeInputsOwned(proposal) =>
×
NEW
250
                    self.check_inputs_not_owned(proposal, persister).await,
×
NEW
251
                ReceiverTypeState::MaybeInputsSeen(proposal) =>
×
NEW
252
                    self.check_no_inputs_seen_before(proposal, persister).await,
×
NEW
253
                ReceiverTypeState::OutputsUnknown(proposal) =>
×
NEW
254
                    self.identify_receiver_outputs(proposal, persister).await,
×
NEW
255
                ReceiverTypeState::WantsOutputs(proposal) =>
×
NEW
256
                    self.commit_outputs(proposal, persister).await,
×
NEW
257
                ReceiverTypeState::WantsInputs(proposal) =>
×
NEW
258
                    self.contribute_inputs(proposal, persister).await,
×
NEW
259
                ReceiverTypeState::ProvisionalProposal(proposal) =>
×
NEW
260
                    self.finalize_proposal(proposal, persister).await,
×
NEW
261
                ReceiverTypeState::PayjoinProposal(proposal) =>
×
NEW
262
                    self.send_payjoin_proposal(proposal, persister).await,
×
263
                ReceiverTypeState::Uninitialized(_) =>
NEW
264
                    return Err(anyhow!("Uninitialized receiver session")),
×
265
                ReceiverTypeState::TerminalState =>
NEW
266
                    return Err(anyhow!("Terminal receiver session")),
×
267
            }
268
        };
269

270
        match res {
2✔
271
            Ok(_) => Ok(()),
1✔
272
            Err(e) => {
1✔
273
                let (_, session_history) = replay_receiver_event_log(persister)?;
1✔
274
                let pj_uri = match session_history.pj_uri() {
1✔
275
                    Some(uri) => Some(uri.extras.endpoint().clone()),
1✔
NEW
276
                    None => None,
×
277
                };
278
                let ohttp_relay = self.unwrap_relay_or_else_fetch(pj_uri).await?;
1✔
279
                handle_recoverable_error(&ohttp_relay, &session_history).await?;
1✔
280

281
                Err(e)
1✔
282
            }
283
        }
284
    }
2✔
285

286
    #[allow(clippy::incompatible_msrv)]
287
    async fn read_from_directory(
2✔
288
        &self,
2✔
289
        session: Receiver<WithContext>,
2✔
290
        persister: &ReceiverPersister,
2✔
291
    ) -> Result<()> {
2✔
292
        let mut interrupt = self.interrupt.clone();
2✔
293
        let receiver = tokio::select! {
2✔
294
            res = self.long_poll_fallback(session, persister) => res,
2✔
295
            _ = interrupt.changed() => {
2✔
296
                println!("Interrupted. Call the `resume` command to resume all sessions.");
1✔
297
                return Err(anyhow!("Interrupted"));
1✔
298
            }
NEW
299
        }?;
×
300
        println!("Fallback transaction received. Consider broadcasting this to get paid if the Payjoin fails:");
1✔
301
        println!("{}", serialize_hex(&receiver.extract_tx_to_schedule_broadcast()));
1✔
302
        self.check_proposal(receiver, persister).await
1✔
303
    }
2✔
304

305
    async fn check_proposal(
1✔
306
        &self,
1✔
307
        proposal: Receiver<UncheckedProposal>,
1✔
308
        persister: &ReceiverPersister,
1✔
309
    ) -> Result<()> {
1✔
310
        let wallet = self.wallet();
1✔
311
        let proposal = proposal
1✔
312
            .check_broadcast_suitability(None, |tx| Ok(wallet.can_broadcast(tx)?))
1✔
313
            .save(persister)?;
1✔
314

315
        self.check_inputs_not_owned(proposal, persister).await
1✔
316
    }
1✔
317

318
    async fn check_inputs_not_owned(
1✔
319
        &self,
1✔
320
        proposal: Receiver<MaybeInputsOwned>,
1✔
321
        persister: &ReceiverPersister,
1✔
322
    ) -> Result<()> {
1✔
323
        let wallet = self.wallet();
1✔
324
        let proposal =
1✔
325
            proposal.check_inputs_not_owned(|input| Ok(wallet.is_mine(input)?)).save(persister)?;
1✔
326
        self.check_no_inputs_seen_before(proposal, persister).await
1✔
327
    }
1✔
328

329
    async fn check_no_inputs_seen_before(
1✔
330
        &self,
1✔
331
        proposal: Receiver<MaybeInputsSeen>,
1✔
332
        persister: &ReceiverPersister,
1✔
333
    ) -> Result<()> {
1✔
334
        let proposal = proposal
1✔
335
            .check_no_inputs_seen_before(|input| Ok(self.db.insert_input_seen_before(*input)?))
1✔
336
            .save(persister)?;
1✔
337
        self.identify_receiver_outputs(proposal, persister).await
1✔
338
    }
1✔
339

340
    async fn identify_receiver_outputs(
1✔
341
        &self,
1✔
342
        proposal: Receiver<OutputsUnknown>,
1✔
343
        persister: &ReceiverPersister,
1✔
344
    ) -> Result<()> {
1✔
345
        let wallet = self.wallet();
1✔
346
        let proposal = proposal
1✔
347
            .identify_receiver_outputs(|output_script| Ok(wallet.is_mine(output_script)?))
2✔
348
            .save(persister)?;
1✔
349
        self.commit_outputs(proposal, persister).await
1✔
350
    }
1✔
351

352
    async fn commit_outputs(
1✔
353
        &self,
1✔
354
        proposal: Receiver<WantsOutputs>,
1✔
355
        persister: &ReceiverPersister,
1✔
356
    ) -> Result<()> {
1✔
357
        let proposal = proposal.commit_outputs().save(persister)?;
1✔
358
        self.contribute_inputs(proposal, persister).await
1✔
359
    }
1✔
360

361
    async fn contribute_inputs(
1✔
362
        &self,
1✔
363
        proposal: Receiver<WantsInputs>,
1✔
364
        persister: &ReceiverPersister,
1✔
365
    ) -> Result<()> {
1✔
366
        let wallet = self.wallet();
1✔
367
        let candidate_inputs = wallet.list_unspent()?;
1✔
368

369
        let selected_input = proposal.try_preserving_privacy(candidate_inputs)?;
1✔
370
        let proposal =
1✔
371
            proposal.contribute_inputs(vec![selected_input])?.commit_inputs().save(persister)?;
1✔
372
        self.finalize_proposal(proposal, persister).await
1✔
373
    }
1✔
374

375
    async fn finalize_proposal(
1✔
376
        &self,
1✔
377
        proposal: Receiver<ProvisionalProposal>,
1✔
378
        persister: &ReceiverPersister,
1✔
379
    ) -> Result<()> {
1✔
380
        let wallet = self.wallet();
1✔
381
        let proposal = proposal
1✔
382
            .finalize_proposal(
1✔
383
                |psbt| Ok(wallet.process_psbt(psbt)?),
1✔
384
                None,
1✔
385
                self.config.max_fee_rate,
1✔
386
            )
1✔
387
            .save(persister)?;
1✔
388
        self.send_payjoin_proposal(proposal, persister).await
1✔
389
    }
1✔
390

391
    async fn send_payjoin_proposal(
1✔
392
        &self,
1✔
393
        mut proposal: Receiver<PayjoinProposal>,
1✔
394
        persister: &ReceiverPersister,
1✔
395
    ) -> Result<()> {
1✔
396
        let (req, ohttp_ctx) = proposal
1✔
397
            .extract_req(&self.unwrap_relay_or_else_fetch(None).await?)
1✔
398
            .map_err(|e| anyhow!("v2 req extraction failed {}", e))?;
1✔
399
        let res = post_request(req).await?;
1✔
400
        let payjoin_psbt = proposal.psbt().clone();
1✔
401
        proposal.process_res(&res.bytes().await?, ohttp_ctx).save(persister)?;
1✔
402
        println!(
1✔
403
            "Response successful. Watch mempool for successful Payjoin. TXID: {}",
1✔
404
            payjoin_psbt.extract_tx_unchecked_fee_rate().clone().compute_txid()
1✔
405
        );
1✔
406
        Ok(())
1✔
407
    }
1✔
408

409
    async fn unwrap_relay_or_else_fetch(
6✔
410
        &self,
6✔
411
        directory: Option<payjoin::Url>,
6✔
412
    ) -> Result<payjoin::Url> {
6✔
413
        let selected_relay =
6✔
414
            self.relay_manager.lock().expect("Lock should not be poisoned").get_selected_relay();
6✔
415
        let ohttp_relay = match selected_relay {
6✔
416
            Some(relay) => relay,
1✔
417
            None =>
418
                unwrap_ohttp_keys_or_else_fetch(&self.config, directory, self.relay_manager.clone())
5✔
419
                    .await?
5✔
420
                    .relay_url,
421
        };
422
        Ok(ohttp_relay)
6✔
423
    }
6✔
424
}
425

426
/// Handle request error by sending an error response over the directory
427
async fn handle_recoverable_error(
1✔
428
    ohttp_relay: &payjoin::Url,
1✔
429
    session_history: &SessionHistory,
1✔
430
) -> Result<()> {
1✔
431
    let e = match session_history.terminal_error() {
1✔
NEW
432
        Some((_, Some(e))) => e,
×
433
        _ => return Ok(()),
1✔
434
    };
NEW
435
    let (err_req, err_ctx) = session_history
×
NEW
436
        .extract_err_req(ohttp_relay)?
×
NEW
437
        .expect("If JsonReply is Some, then err_req and err_ctx should be Some");
×
NEW
438
    let to_return = anyhow!("Replied with error: {}", e.to_json().to_string());
×
439

440
    let err_response = match post_request(err_req).await {
×
441
        Ok(response) => response,
×
NEW
442
        Err(e) => return Err(anyhow!("Failed to post error request: {}", e)),
×
443
    };
444

445
    let err_bytes = match err_response.bytes().await {
×
446
        Ok(bytes) => bytes,
×
NEW
447
        Err(e) => return Err(anyhow!("Failed to get error response bytes: {}", e)),
×
448
    };
449

NEW
450
    if let Err(e) = process_err_res(&err_bytes, err_ctx) {
×
NEW
451
        return Err(anyhow!("Failed to process error response: {}", e));
×
452
    }
×
453

×
NEW
454
    Err(to_return)
×
455
}
1✔
456

457
async fn post_request(req: payjoin::Request) -> Result<reqwest::Response> {
8✔
458
    let http = http_agent()?;
8✔
459
    http.post(req.url)
8✔
460
        .header("Content-Type", req.content_type)
8✔
461
        .body(req.body)
8✔
462
        .send()
8✔
463
        .await
8✔
464
        .map_err(map_reqwest_err)
6✔
465
}
6✔
466

467
fn map_reqwest_err(e: reqwest::Error) -> anyhow::Error {
×
468
    match e.status() {
×
469
        Some(status_code) => anyhow!("HTTP request failed: {} {}", status_code, e),
×
470
        None => anyhow!("No HTTP response: {}", e),
×
471
    }
472
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc