• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

payjoin / rust-payjoin / 15568116171

10 Jun 2025 07:07PM UTC coverage: 85.714% (+0.2%) from 85.474%
15568116171

Pull #750

github

web-flow
Merge 06a46f9e7 into 89331880f
Pull Request #750: Replace `Persister` with `SessionPersister` for `v2::Receiver`

840 of 962 new or added lines in 10 files covered. (87.32%)

5 existing lines in 3 files now uncovered.

7596 of 8862 relevant lines covered (85.71%)

525.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.02
/payjoin-cli/src/app/v2/mod.rs
1
use std::sync::{Arc, Mutex};
2

3
use anyhow::{anyhow, Context, Result};
4
use payjoin::bitcoin::consensus::encode::serialize_hex;
5
use payjoin::bitcoin::psbt::Psbt;
6
use payjoin::bitcoin::{Amount, FeeRate};
7
use payjoin::persist::OptionalTransitionOutcome;
8
use payjoin::receive::v2::{
9
    process_err_res, replay_receiver_event_log, MaybeInputsOwned, MaybeInputsSeen, OutputsUnknown,
10
    PayjoinProposal, ProvisionalProposal, Receiver, ReceiverTypeState, SessionHistory,
11
    UncheckedProposal, WantsInputs, WantsOutputs, WithContext,
12
};
13
use payjoin::send::v2::{Sender, SenderBuilder, WithReplyKey};
14
use payjoin::Uri;
15
use tokio::sync::watch;
16

17
use super::config::Config;
18
use super::wallet::BitcoindWallet;
19
use super::App as AppTrait;
20
use crate::app::v2::ohttp::{unwrap_ohttp_keys_or_else_fetch, RelayManager};
21
use crate::app::{handle_interrupt, http_agent};
22
use crate::db::v2::{ReceiverPersister, SenderPersister};
23
use crate::db::Database;
24

25
mod ohttp;
26

27
#[derive(Clone)]
28
pub(crate) struct App {
29
    config: Config,
30
    db: Arc<Database>,
31
    wallet: BitcoindWallet,
32
    interrupt: watch::Receiver<()>,
33
    relay_manager: Arc<Mutex<RelayManager>>,
34
}
35

36
#[async_trait::async_trait]
37
impl AppTrait for App {
38
    fn new(config: Config) -> Result<Self> {
4✔
39
        let db = Arc::new(Database::create(&config.db_path)?);
4✔
40
        let relay_manager = Arc::new(Mutex::new(RelayManager::new()));
4✔
41
        let (interrupt_tx, interrupt_rx) = watch::channel(());
4✔
42
        tokio::spawn(handle_interrupt(interrupt_tx));
4✔
43
        let wallet = BitcoindWallet::new(&config.bitcoind)?;
4✔
44
        let app = Self { config, db, wallet, interrupt: interrupt_rx, relay_manager };
4✔
45
        app.wallet()
4✔
46
            .network()
4✔
47
            .context("Failed to connect to bitcoind. Check config RPC connection.")?;
4✔
48
        Ok(app)
4✔
49
    }
4✔
50

51
    fn wallet(&self) -> BitcoindWallet { self.wallet.clone() }
14✔
52

53
    async fn send_payjoin(&self, bip21: &str, fee_rate: FeeRate) -> Result<()> {
2✔
54
        use payjoin::UriExt;
55
        let uri =
2✔
56
            Uri::try_from(bip21).map_err(|e| anyhow!("Failed to create URI from BIP21: {}", e))?;
2✔
57
        let uri = uri.assume_checked();
2✔
58
        let uri = uri.check_pj_supported().map_err(|_| anyhow!("URI does not support Payjoin"))?;
2✔
59
        let url = uri.extras.endpoint();
2✔
60
        // match bip21 to send_session public_key
61
        let req_ctx = match self.db.get_send_session(url)? {
2✔
62
            Some(send_session) => send_session,
1✔
63
            None => {
64
                let psbt = self.create_original_psbt(&uri, fee_rate)?;
1✔
65
                let mut persister = SenderPersister::new(self.db.clone());
1✔
66
                let new_sender = SenderBuilder::new(psbt, uri.clone())
1✔
67
                    .build_recommended(fee_rate)
1✔
68
                    .with_context(|| "Failed to build payjoin request")?;
1✔
69
                let storage_token = new_sender
1✔
70
                    .persist(&mut persister)
1✔
71
                    .map_err(|e| anyhow!("Failed to persist sender: {}", e))?;
1✔
72
                Sender::load(storage_token, &persister)
1✔
73
                    .map_err(|e| anyhow!("Failed to load sender: {}", e))?
1✔
74
            }
75
        };
76
        self.spawn_payjoin_sender(req_ctx).await
2✔
77
    }
4✔
78

79
    async fn receive_payjoin(&self, amount: Amount) -> Result<()> {
1✔
80
        let address = self.wallet().get_new_address()?;
1✔
81
        let ohttp_keys =
1✔
82
            unwrap_ohttp_keys_or_else_fetch(&self.config, None, self.relay_manager.clone())
1✔
83
                .await?
1✔
84
                .ohttp_keys;
85
        let persister = ReceiverPersister::new(self.db.clone())?;
1✔
86
        let session = Receiver::create_session(
1✔
87
            address,
1✔
88
            self.config.v2()?.pj_directory.clone(),
1✔
89
            ohttp_keys,
1✔
90
            None,
1✔
91
        )
1✔
92
        .save(&persister)?;
1✔
93
        println!("Receive session established");
1✔
94
        let mut pj_uri = session.pj_uri();
1✔
95
        pj_uri.amount = Some(amount);
1✔
96
        println!("Request Payjoin by sharing this Payjoin Uri:");
1✔
97
        println!("{}", pj_uri);
1✔
98

1✔
99
        self.process_receiver_session(ReceiverTypeState::WithContext(session.clone()), &persister)
1✔
100
            .await?;
1✔
NEW
101
        Ok(())
×
102
    }
2✔
103

104
    #[allow(clippy::incompatible_msrv)]
105
    async fn resume_payjoins(&self) -> Result<()> {
1✔
106
        let recv_session_ids = self.db.get_recv_session_ids()?;
1✔
107
        let send_sessions = self.db.get_send_sessions()?;
1✔
108

109
        if recv_session_ids.is_empty() && send_sessions.is_empty() {
1✔
110
            println!("No sessions to resume.");
×
111
            return Ok(());
×
112
        }
1✔
113

1✔
114
        let mut tasks = Vec::new();
1✔
115

116
        for session_id in recv_session_ids {
2✔
117
            let self_clone = self.clone();
1✔
118
            let recv_persister = ReceiverPersister::from_id(self.db.clone(), session_id)?;
1✔
119
            let receiver_state = replay_receiver_event_log(&recv_persister)
1✔
120
                .map_err(|e| anyhow!("Failed to replay receiver event log: {:?}", e))?
1✔
121
                .0;
122
            tasks.push(tokio::spawn(async move {
1✔
123
                self_clone.process_receiver_session(receiver_state, &recv_persister).await
1✔
124
            }));
1✔
125
        }
1✔
126

127
        for session in send_sessions {
1✔
128
            let self_clone = self.clone();
×
129
            tasks.push(tokio::spawn(async move { self_clone.spawn_payjoin_sender(session).await }));
×
130
        }
×
131

132
        let mut interrupt = self.interrupt.clone();
1✔
133
        tokio::select! {
1✔
134
            _ = async {
1✔
135
                for task in tasks {
2✔
136
                    let _ = task.await;
1✔
137
                }
138
            } => {
1✔
139
                println!("All resumed sessions completed.");
1✔
140
            }
1✔
141
            _ = interrupt.changed() => {
1✔
UNCOV
142
                println!("Resumed sessions were interrupted.");
×
UNCOV
143
            }
×
144
        }
145
        Ok(())
1✔
146
    }
2✔
147
}
148

149
impl App {
150
    #[allow(clippy::incompatible_msrv)]
151
    async fn spawn_payjoin_sender(&self, mut req_ctx: Sender<WithReplyKey>) -> Result<()> {
2✔
152
        let mut interrupt = self.interrupt.clone();
2✔
153
        tokio::select! {
2✔
154
            res = self.long_poll_post(&mut req_ctx) => {
2✔
155
                self.process_pj_response(res?)?;
1✔
156
                self.db.clear_send_session(req_ctx.endpoint())?;
1✔
157
            }
158
            _ = interrupt.changed() => {
2✔
159
                println!("Interrupted. Call `send` with the same arguments to resume this session or `resume` to resume all sessions.");
1✔
160
            }
1✔
161
        }
162
        Ok(())
2✔
163
    }
2✔
164

165
    async fn long_poll_post(&self, req_ctx: &mut Sender<WithReplyKey>) -> Result<Psbt> {
2✔
166
        let ohttp_relay = self.unwrap_relay_or_else_fetch(Some(req_ctx.endpoint().clone())).await?;
2✔
167

168
        match req_ctx.extract_v2(ohttp_relay.clone()) {
2✔
169
            Ok((req, ctx)) => {
2✔
170
                println!("Posting Original PSBT Payload request...");
2✔
171
                let response = post_request(req).await?;
2✔
172
                println!("Sent fallback transaction");
2✔
173
                let v2_ctx = Arc::new(ctx.process_response(&response.bytes().await?)?);
2✔
174
                loop {
175
                    let (req, ohttp_ctx) = v2_ctx.extract_req(&ohttp_relay)?;
3✔
176
                    let response = post_request(req).await?;
3✔
177
                    match v2_ctx.process_response(&response.bytes().await?, ohttp_ctx) {
2✔
178
                        Ok(Some(psbt)) => return Ok(psbt),
1✔
179
                        Ok(None) => {
1✔
180
                            println!("No response yet.");
1✔
181
                        }
1✔
182
                        Err(re) => {
×
183
                            println!("{re}");
×
184
                            log::debug!("{re:?}");
×
185
                            return Err(anyhow!("Response error").context(re));
×
186
                        }
187
                    }
188
                }
189
            }
190
            Err(_) => {
191
                let (req, v1_ctx) = req_ctx.extract_v1();
×
192
                println!("Posting Original PSBT Payload request...");
×
193
                let response = post_request(req).await?;
×
194
                println!("Sent fallback transaction");
×
195
                match v1_ctx.process_response(&response.bytes().await?) {
×
196
                    Ok(psbt) => Ok(psbt),
×
197
                    Err(re) => {
×
198
                        println!("{re}");
×
199
                        log::debug!("{re:?}");
×
200
                        Err(anyhow!("Response error").context(re))
×
201
                    }
202
                }
203
            }
204
        }
205
    }
1✔
206

207
    async fn long_poll_fallback(
2✔
208
        &self,
2✔
209
        session: Receiver<WithContext>,
2✔
210
        persister: &ReceiverPersister,
2✔
211
    ) -> Result<Receiver<UncheckedProposal>> {
2✔
212
        let ohttp_relay = self
2✔
213
            .unwrap_relay_or_else_fetch(Some(session.pj_uri().extras.endpoint().clone()))
2✔
214
            .await?;
2✔
215

216
        let mut session = session;
2✔
217
        loop {
218
            let (req, context) = session.extract_req(&ohttp_relay)?;
2✔
219
            println!("Polling receive request...");
2✔
220
            let ohttp_response = post_request(req).await?;
2✔
221
            let state_transition = session
1✔
222
                .process_res(ohttp_response.bytes().await?.to_vec().as_slice(), context)
1✔
223
                .save(persister);
1✔
224
            match state_transition {
1✔
225
                Ok(OptionalTransitionOutcome::Progress(next_state)) => {
1✔
226
                    println!("Got a request from the sender. Responding with a Payjoin proposal.");
1✔
227
                    return Ok(next_state);
1✔
228
                }
NEW
229
                Ok(OptionalTransitionOutcome::Stasis(current_state)) => {
×
NEW
230
                    session = current_state;
×
NEW
231
                    continue;
×
232
                }
NEW
233
                Err(e) => return Err(e.into()),
×
234
            }
235
        }
236
    }
1✔
237

238
    async fn process_receiver_session(
2✔
239
        &self,
2✔
240
        session: ReceiverTypeState,
2✔
241
        persister: &ReceiverPersister,
2✔
242
    ) -> Result<()> {
2✔
243
        let res = {
2✔
244
            match session {
2✔
245
                ReceiverTypeState::WithContext(proposal) =>
2✔
246
                    self.read_from_directory(proposal, persister).await,
2✔
NEW
247
                ReceiverTypeState::UncheckedProposal(proposal) =>
×
NEW
248
                    self.check_proposal(proposal, persister).await,
×
NEW
249
                ReceiverTypeState::MaybeInputsOwned(proposal) =>
×
NEW
250
                    self.check_inputs_not_owned(proposal, persister).await,
×
NEW
251
                ReceiverTypeState::MaybeInputsSeen(proposal) =>
×
NEW
252
                    self.check_no_inputs_seen_before(proposal, persister).await,
×
NEW
253
                ReceiverTypeState::OutputsUnknown(proposal) =>
×
NEW
254
                    self.identify_receiver_outputs(proposal, persister).await,
×
NEW
255
                ReceiverTypeState::WantsOutputs(proposal) =>
×
NEW
256
                    self.commit_outputs(proposal, persister).await,
×
NEW
257
                ReceiverTypeState::WantsInputs(proposal) =>
×
NEW
258
                    self.contribute_inputs(proposal, persister).await,
×
NEW
259
                ReceiverTypeState::ProvisionalProposal(proposal) =>
×
NEW
260
                    self.finalize_proposal(proposal, persister).await,
×
NEW
261
                ReceiverTypeState::PayjoinProposal(proposal) =>
×
NEW
262
                    self.send_payjoin_proposal(proposal, persister).await,
×
263
                ReceiverTypeState::Uninitialized(_) =>
NEW
264
                    return Err(anyhow!("Uninitialized receiver session")),
×
265
                ReceiverTypeState::TerminalState =>
NEW
266
                    return Err(anyhow!("Terminal receiver session")),
×
267
            }
268
        };
269

270
        match res {
2✔
271
            Ok(_) => Ok(()),
1✔
272
            Err(e) => {
1✔
273
                let (_, session_history) = replay_receiver_event_log(persister)?;
1✔
274
                let pj_uri = match session_history.pj_uri() {
1✔
275
                    Some(uri) => Some(uri.extras.endpoint().clone()),
1✔
NEW
276
                    None => None,
×
277
                };
278
                let ohttp_relay = self.unwrap_relay_or_else_fetch(pj_uri).await?;
1✔
279
                handle_recoverable_error(&ohttp_relay, &session_history).await?;
1✔
280

281
                Err(e)
1✔
282
            }
283
        }
284
    }
2✔
285

286
    #[allow(clippy::incompatible_msrv)]
287
    async fn read_from_directory(
2✔
288
        &self,
2✔
289
        session: Receiver<WithContext>,
2✔
290
        persister: &ReceiverPersister,
2✔
291
    ) -> Result<()> {
2✔
292
        let mut interrupt = self.interrupt.clone();
2✔
293
        let receiver = tokio::select! {
2✔
294
            res = self.long_poll_fallback(session, persister) => res,
2✔
295
            _ = interrupt.changed() => {
2✔
296
                println!("Interrupted. Call the `resume` command to resume all sessions.");
1✔
297
                return Err(anyhow!("Interrupted"));
1✔
298
            }
NEW
299
        }?;
×
300
        println!("Fallback transaction received. Consider broadcasting this to get paid if the Payjoin fails:");
1✔
301
        println!("{}", serialize_hex(&receiver.extract_tx_to_schedule_broadcast()));
1✔
302
        self.check_proposal(receiver, persister).await
1✔
303
    }
2✔
304

305
    async fn check_proposal(
1✔
306
        &self,
1✔
307
        proposal: Receiver<UncheckedProposal>,
1✔
308
        persister: &ReceiverPersister,
1✔
309
    ) -> Result<()> {
1✔
310
        let wallet = self.wallet();
1✔
311
        let proposal = proposal
1✔
312
            .check_broadcast_suitability(None, |tx| Ok(wallet.can_broadcast(tx)?))
1✔
313
            .save(persister)?;
1✔
314

315
        self.check_inputs_not_owned(proposal, persister).await
1✔
316
    }
1✔
317

318
    async fn check_inputs_not_owned(
1✔
319
        &self,
1✔
320
        proposal: Receiver<MaybeInputsOwned>,
1✔
321
        persister: &ReceiverPersister,
1✔
322
    ) -> Result<()> {
1✔
323
        let wallet = self.wallet();
1✔
324
        let proposal =
1✔
325
            proposal.check_inputs_not_owned(|input| Ok(wallet.is_mine(input)?)).save(persister)?;
1✔
326
        self.check_no_inputs_seen_before(proposal, persister).await
1✔
327
    }
1✔
328

329
    async fn check_no_inputs_seen_before(
1✔
330
        &self,
1✔
331
        proposal: Receiver<MaybeInputsSeen>,
1✔
332
        persister: &ReceiverPersister,
1✔
333
    ) -> Result<()> {
1✔
334
        let proposal = proposal
1✔
335
            .check_no_inputs_seen_before(|input| Ok(self.db.insert_input_seen_before(*input)?))
1✔
336
            .save(persister)?;
1✔
337
        self.identify_receiver_outputs(proposal, persister).await
1✔
338
    }
1✔
339

340
    async fn identify_receiver_outputs(
1✔
341
        &self,
1✔
342
        proposal: Receiver<OutputsUnknown>,
1✔
343
        persister: &ReceiverPersister,
1✔
344
    ) -> Result<()> {
1✔
345
        let wallet = self.wallet();
1✔
346
        let proposal = proposal
1✔
347
            .identify_receiver_outputs(|output_script| Ok(wallet.is_mine(output_script)?))
2✔
348
            .save(persister)?;
1✔
349
        self.commit_outputs(proposal, persister).await
1✔
350
    }
1✔
351

352
    async fn commit_outputs(
1✔
353
        &self,
1✔
354
        proposal: Receiver<WantsOutputs>,
1✔
355
        persister: &ReceiverPersister,
1✔
356
    ) -> Result<()> {
1✔
357
        let proposal = proposal.commit_outputs().save(persister)?;
1✔
358
        self.contribute_inputs(proposal, persister).await
1✔
359
    }
1✔
360

361
    async fn contribute_inputs(
1✔
362
        &self,
1✔
363
        proposal: Receiver<WantsInputs>,
1✔
364
        persister: &ReceiverPersister,
1✔
365
    ) -> Result<()> {
1✔
366
        let wallet = self.wallet();
1✔
367
        let candidate_inputs = wallet.list_unspent()?;
1✔
368

369
        let selected_input = proposal.try_preserving_privacy(candidate_inputs)?;
1✔
370
        let proposal =
1✔
371
            proposal.contribute_inputs(vec![selected_input])?.commit_inputs().save(persister)?;
1✔
372
        self.finalize_proposal(proposal, persister).await
1✔
373
    }
1✔
374

375
    async fn finalize_proposal(
1✔
376
        &self,
1✔
377
        proposal: Receiver<ProvisionalProposal>,
1✔
378
        persister: &ReceiverPersister,
1✔
379
    ) -> Result<()> {
1✔
380
        let wallet = self.wallet();
1✔
381
        let proposal = proposal
1✔
382
            .finalize_proposal(
1✔
383
                |psbt| Ok(wallet.process_psbt(psbt)?),
1✔
384
                None,
1✔
385
                self.config.max_fee_rate,
1✔
386
            )
1✔
387
            .save(persister)?;
1✔
388
        self.send_payjoin_proposal(proposal, persister).await
1✔
389
    }
1✔
390

391
    async fn send_payjoin_proposal(
1✔
392
        &self,
1✔
393
        mut proposal: Receiver<PayjoinProposal>,
1✔
394
        persister: &ReceiverPersister,
1✔
395
    ) -> Result<()> {
1✔
396
        let (req, ohttp_ctx) = proposal
1✔
397
            .extract_req(&self.unwrap_relay_or_else_fetch(None).await?)
1✔
398
            .map_err(|e| anyhow!("v2 req extraction failed {}", e))?;
1✔
399
        let res = post_request(req).await?;
1✔
400
        let payjoin_psbt = proposal.psbt().clone();
1✔
401
        proposal.process_res(&res.bytes().await?, ohttp_ctx).save(persister)?;
1✔
402
        println!(
1✔
403
            "Response successful. Watch mempool for successful Payjoin. TXID: {}",
1✔
404
            payjoin_psbt.extract_tx_unchecked_fee_rate().clone().compute_txid()
1✔
405
        );
1✔
406
        Ok(())
1✔
407
    }
1✔
408

409
    async fn unwrap_relay_or_else_fetch(
6✔
410
        &self,
6✔
411
        directory: Option<payjoin::Url>,
6✔
412
    ) -> Result<payjoin::Url> {
6✔
413
        let selected_relay =
6✔
414
            self.relay_manager.lock().expect("Lock should not be poisoned").get_selected_relay();
6✔
415
        let ohttp_relay = match selected_relay {
6✔
416
            Some(relay) => relay,
1✔
417
            None =>
418
                unwrap_ohttp_keys_or_else_fetch(&self.config, directory, self.relay_manager.clone())
5✔
419
                    .await?
5✔
420
                    .relay_url,
421
        };
422
        Ok(ohttp_relay)
6✔
423
    }
6✔
424
}
425

426
/// Handle request error by sending an error response over the directory
427
async fn handle_recoverable_error(
1✔
428
    ohttp_relay: &payjoin::Url,
1✔
429
    session_history: &SessionHistory,
1✔
430
) -> Result<()> {
1✔
431
    let e = match session_history.terminal_error() {
1✔
NEW
432
        Some((_, Some(e))) => e,
×
433
        _ => return Ok(()),
1✔
434
    };
NEW
435
    let (err_req, err_ctx) = session_history
×
NEW
436
        .extract_err_req(ohttp_relay)?
×
NEW
437
        .expect("If JsonReply is Some, then err_req and err_ctx should be Some");
×
NEW
438
    let to_return = anyhow!("Replied with error: {}", e.to_json().to_string());
×
439

440
    let err_response = match post_request(err_req).await {
×
441
        Ok(response) => response,
×
NEW
442
        Err(e) => return Err(anyhow!("Failed to post error request: {}", e)),
×
443
    };
444

445
    let err_bytes = match err_response.bytes().await {
×
446
        Ok(bytes) => bytes,
×
NEW
447
        Err(e) => return Err(anyhow!("Failed to get error response bytes: {}", e)),
×
448
    };
449

NEW
450
    if let Err(e) = process_err_res(&err_bytes, err_ctx) {
×
NEW
451
        return Err(anyhow!("Failed to process error response: {}", e));
×
452
    }
×
453

×
NEW
454
    Err(to_return)
×
455
}
1✔
456

457
async fn post_request(req: payjoin::Request) -> Result<reqwest::Response> {
8✔
458
    let http = http_agent()?;
8✔
459
    http.post(req.url)
8✔
460
        .header("Content-Type", req.content_type)
8✔
461
        .body(req.body)
8✔
462
        .send()
8✔
463
        .await
8✔
464
        .map_err(map_reqwest_err)
6✔
465
}
6✔
466

467
fn map_reqwest_err(e: reqwest::Error) -> anyhow::Error {
×
468
    match e.status() {
×
469
        Some(status_code) => anyhow!("HTTP request failed: {} {}", status_code, e),
×
470
        None => anyhow!("No HTTP response: {}", e),
×
471
    }
472
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc