• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

payjoin / rust-payjoin / 17985874962

24 Sep 2025 06:23PM UTC coverage: 84.316% (-0.2%) from 84.525%
17985874962

Pull #1060

github

web-flow
Merge fa845c8d1 into f7589fbc0
Pull Request #1060: Handle fatal errors in receiver state machine

144 of 193 new or added lines in 5 files covered. (74.61%)

3 existing lines in 1 file now uncovered.

8564 of 10157 relevant lines covered (84.32%)

481.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.57
/payjoin-cli/src/app/v2/mod.rs
1
use std::fmt;
2
use std::sync::{Arc, Mutex};
3

4
use anyhow::{anyhow, Context, Result};
5
use payjoin::bitcoin::consensus::encode::serialize_hex;
6
use payjoin::bitcoin::{Amount, FeeRate};
7
use payjoin::persist::OptionalTransitionOutcome;
8
use payjoin::receive::v2::{
9
    replay_event_log as replay_receiver_event_log, HasReplyableError, Initialized,
10
    MaybeInputsOwned, MaybeInputsSeen, OutputsUnknown, PayjoinProposal, ProvisionalProposal,
11
    ReceiveSession, Receiver, ReceiverBuilder, UncheckedOriginalPayload, WantsFeeRange,
12
    WantsInputs, WantsOutputs,
13
};
14
use payjoin::send::v2::{
15
    replay_event_log as replay_sender_event_log, PollingForProposal, SendSession, Sender,
16
    SenderBuilder, WithReplyKey,
17
};
18
use payjoin::{ImplementationError, PjParam, Uri};
19
use tokio::sync::watch;
20

21
use super::config::Config;
22
use super::wallet::BitcoindWallet;
23
use super::App as AppTrait;
24
use crate::app::v2::ohttp::{unwrap_ohttp_keys_or_else_fetch, RelayManager};
25
use crate::app::{handle_interrupt, http_agent};
26
use crate::db::v2::{ReceiverPersister, SenderPersister, SessionId};
27
use crate::db::Database;
28

29
mod ohttp;
30

31
const W_ID: usize = 12;
32
const W_ROLE: usize = 25;
33
const W_DONE: usize = 15;
34
const W_STATUS: usize = 15;
35

36
#[derive(Clone)]
37
pub(crate) struct App {
38
    config: Config,
39
    db: Arc<Database>,
40
    wallet: BitcoindWallet,
41
    interrupt: watch::Receiver<()>,
42
    relay_manager: Arc<Mutex<RelayManager>>,
43
}
44

45
trait StatusText {
46
    fn status_text(&self) -> &'static str;
47
}
48

49
impl StatusText for SendSession {
50
    fn status_text(&self) -> &'static str {
×
51
        match self {
×
52
            SendSession::WithReplyKey(_) | SendSession::PollingForProposal(_) =>
53
                "Waiting for proposal",
×
54
            SendSession::ProposalReceived(_) => "Proposal received",
×
55
            SendSession::TerminalFailure => "Session failure",
×
56
        }
57
    }
×
58
}
59

60
impl StatusText for ReceiveSession {
61
    fn status_text(&self) -> &'static str {
×
62
        match self {
×
63
            ReceiveSession::Initialized(_) => "Waiting for original proposal",
×
64
            ReceiveSession::UncheckedOriginalPayload(_)
65
            | ReceiveSession::MaybeInputsOwned(_)
66
            | ReceiveSession::MaybeInputsSeen(_)
67
            | ReceiveSession::OutputsUnknown(_)
68
            | ReceiveSession::WantsOutputs(_)
69
            | ReceiveSession::WantsInputs(_)
70
            | ReceiveSession::WantsFeeRange(_)
71
            | ReceiveSession::ProvisionalProposal(_) => "Processing original proposal",
×
72
            ReceiveSession::PayjoinProposal(_) => "Payjoin proposal sent",
×
NEW
73
            ReceiveSession::HasReplyableError(_) => "Session failure",
×
74
        }
75
    }
×
76
}
77

78
fn print_header() {
×
79
    println!(
×
80
        "{:<W_ID$} {:<W_ROLE$} {:<W_DONE$} {:<W_STATUS$}",
×
81
        "Session ID", "Sender/Receiver", "Completed At", "Status"
82
    );
83
}
×
84

85
enum Role {
86
    Sender,
87
    Receiver,
88
}
89
impl Role {
90
    fn as_str(&self) -> &'static str {
×
91
        match self {
×
92
            Role::Sender => "Sender",
×
93
            Role::Receiver => "Receiver",
×
94
        }
95
    }
×
96
}
97

98
struct SessionHistoryRow<Status> {
99
    session_id: SessionId,
100
    role: Role,
101
    status: Status,
102
    completed_at: Option<u64>,
103
    error_message: Option<String>,
104
}
105

106
impl<Status: StatusText> fmt::Display for SessionHistoryRow<Status> {
107
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
×
108
        write!(
×
109
            f,
×
110
            "{:<W_ID$} {:<W_ROLE$} {:<W_DONE$} {:<W_STATUS$}",
×
111
            self.session_id.to_string(),
×
112
            self.role.as_str(),
×
113
            match self.completed_at {
×
114
                None => "Not Completed".to_string(),
×
115
                Some(secs) => {
×
116
                    // TODO: human readable time
117
                    secs.to_string()
×
118
                }
119
            },
120
            self.error_message.as_deref().unwrap_or(self.status.status_text())
×
121
        )
122
    }
×
123
}
124

125
#[async_trait::async_trait]
126
impl AppTrait for App {
127
    async fn new(config: Config) -> Result<Self> {
7✔
128
        let db = Arc::new(Database::create(&config.db_path)?);
129
        let relay_manager = Arc::new(Mutex::new(RelayManager::new()));
130
        let (interrupt_tx, interrupt_rx) = watch::channel(());
131
        tokio::spawn(handle_interrupt(interrupt_tx));
132
        let wallet = BitcoindWallet::new(&config.bitcoind).await?;
133
        let app = Self { config, db, wallet, interrupt: interrupt_rx, relay_manager };
134
        app.wallet()
135
            .network()
136
            .context("Failed to connect to bitcoind. Check config RPC connection.")?;
137
        Ok(app)
138
    }
7✔
139

140
    fn wallet(&self) -> BitcoindWallet { self.wallet.clone() }
19✔
141

142
    #[allow(clippy::incompatible_msrv)]
143
    async fn send_payjoin(&self, bip21: &str, fee_rate: FeeRate) -> Result<()> {
3✔
144
        use payjoin::UriExt;
145
        let uri = Uri::try_from(bip21)
146
            .map_err(|e| anyhow!("Failed to create URI from BIP21: {}", e))?
×
147
            .assume_checked()
148
            .check_pj_supported()
149
            .map_err(|_| anyhow!("URI does not support Payjoin"))?;
×
150
        let address = uri.address;
151
        let amount = uri.amount.ok_or_else(|| anyhow!("please specify the amount in the Uri"))?;
×
152
        match uri.extras.pj_param() {
153
            #[cfg(feature = "v1")]
154
            PjParam::V1(pj_param) => {
155
                use std::str::FromStr;
156

157
                let psbt = self.create_original_psbt(&address, amount, fee_rate)?;
158
                let (req, ctx) = payjoin::send::v1::SenderBuilder::from_parts(
159
                    psbt,
160
                    pj_param,
161
                    &address,
162
                    Some(amount),
163
                )
164
                .build_recommended(fee_rate)
165
                .with_context(|| "Failed to build payjoin request")?
166
                .create_v1_post_request();
167
                let http = http_agent(&self.config)?;
168
                let body = String::from_utf8(req.body.clone()).unwrap();
169
                println!("Sending fallback request to {}", &req.url);
170
                let response = http
171
                    .post(req.url)
172
                    .header("Content-Type", req.content_type)
173
                    .body(body.clone())
174
                    .send()
175
                    .await
176
                    .with_context(|| "HTTP request failed")?;
177
                let fallback_tx = payjoin::bitcoin::Psbt::from_str(&body)
178
                    .map_err(|e| anyhow!("Failed to load PSBT from base64: {}", e))?
×
179
                    .extract_tx()?;
180
                println!("Sent fallback transaction txid: {}", fallback_tx.compute_txid());
181
                println!(
182
                    "Sent fallback transaction hex: {:#}",
183
                    payjoin::bitcoin::consensus::encode::serialize_hex(&fallback_tx)
184
                );
185
                let psbt = ctx.process_response(&response.bytes().await?).map_err(|e| {
×
186
                    tracing::debug!("Error processing response: {e:?}");
×
187
                    anyhow!("Failed to process response {e}")
×
188
                })?;
×
189

190
                self.process_pj_response(psbt)?;
191
                Ok(())
192
            }
193
            PjParam::V2(pj_param) => {
194
                let receiver_pubkey = pj_param.receiver_pubkey();
195
                let sender_state =
196
                    self.db.get_send_session_ids()?.into_iter().find_map(|session_id| {
1✔
197
                        let session_receiver_pubkey = self
1✔
198
                            .db
1✔
199
                            .get_send_session_receiver_pk(&session_id)
1✔
200
                            .expect("Receiver pubkey should exist if session id exists");
1✔
201
                        if session_receiver_pubkey == *receiver_pubkey {
1✔
202
                            let sender_persister =
1✔
203
                                SenderPersister::from_id(self.db.clone(), session_id);
1✔
204
                            let (send_session, _) = replay_sender_event_log(&sender_persister)
1✔
205
                                .map_err(|e| anyhow!("Failed to replay sender event log: {:?}", e))
1✔
206
                                .ok()?;
1✔
207

208
                            Some((send_session, sender_persister))
1✔
209
                        } else {
210
                            None
×
211
                        }
212
                    });
1✔
213

214
                let (sender_state, persister) = match sender_state {
215
                    Some((sender_state, persister)) => (sender_state, persister),
216
                    None => {
217
                        let persister =
218
                            SenderPersister::new(self.db.clone(), receiver_pubkey.clone())?;
219
                        let psbt = self.create_original_psbt(&address, amount, fee_rate)?;
220
                        let sender =
221
                            SenderBuilder::from_parts(psbt, pj_param, &address, Some(amount))
222
                                .build_recommended(fee_rate)?
223
                                .save(&persister)?;
224

225
                        (SendSession::WithReplyKey(sender), persister)
226
                    }
227
                };
228
                let mut interrupt = self.interrupt.clone();
229
                tokio::select! {
230
                    _ = self.process_sender_session(sender_state, &persister) => return Ok(()),
231
                    _ = interrupt.changed() => {
232
                        println!("Interrupted. Call `send` with the same arguments to resume this session or `resume` to resume all sessions.");
233
                        return Err(anyhow!("Interrupted"))
234
                    }
235
                }
236
            }
237
            _ => unimplemented!("Unrecognized payjoin version"),
238
        }
239
    }
3✔
240

241
    async fn receive_payjoin(&self, amount: Amount) -> Result<()> {
1✔
242
        let address = self.wallet().get_new_address()?;
243
        let ohttp_keys =
244
            unwrap_ohttp_keys_or_else_fetch(&self.config, None, self.relay_manager.clone())
245
                .await?
246
                .ohttp_keys;
247
        let persister = ReceiverPersister::new(self.db.clone())?;
248
        let session =
249
            ReceiverBuilder::new(address, self.config.v2()?.pj_directory.as_str(), ohttp_keys)?
250
                .with_amount(amount)
251
                .with_max_fee_rate(self.config.max_fee_rate.unwrap_or(FeeRate::BROADCAST_MIN))
252
                .build()
253
                .save(&persister)?;
254

255
        println!("Receive session established");
256
        let pj_uri = session.pj_uri();
257
        println!("Request Payjoin by sharing this Payjoin Uri:");
258
        println!("{}", pj_uri);
259

260
        self.process_receiver_session(ReceiveSession::Initialized(session.clone()), &persister)
261
            .await?;
262
        Ok(())
263
    }
1✔
264

265
    #[allow(clippy::incompatible_msrv)]
266
    async fn resume_payjoins(&self) -> Result<()> {
3✔
267
        let recv_session_ids = self.db.get_recv_session_ids()?;
268
        let send_session_ids = self.db.get_send_session_ids()?;
269

270
        if recv_session_ids.is_empty() && send_session_ids.is_empty() {
271
            println!("No sessions to resume.");
272
            return Ok(());
273
        }
274

275
        let mut tasks = Vec::new();
276

277
        for session_id in recv_session_ids {
278
            let self_clone = self.clone();
279
            let recv_persister = ReceiverPersister::from_id(self.db.clone(), session_id);
280
            let receiver_state = replay_receiver_event_log(&recv_persister)
281
                .map_err(|e| anyhow!("Failed to replay receiver event log: {:?}", e))?
×
282
                .0;
283
            tasks.push(tokio::spawn(async move {
1✔
284
                self_clone.process_receiver_session(receiver_state, &recv_persister).await
1✔
285
            }));
1✔
286
        }
287

288
        for session_id in send_session_ids {
289
            let sender_persiter = SenderPersister::from_id(self.db.clone(), session_id);
290
            let sender_state = replay_sender_event_log(&sender_persiter)
291
                .map_err(|e| anyhow!("Failed to replay sender event log: {:?}", e))?
×
292
                .0;
293
            let self_clone = self.clone();
294
            tasks.push(tokio::spawn(async move {
×
295
                self_clone.process_sender_session(sender_state, &sender_persiter).await
×
296
            }));
×
297
        }
298

299
        let mut interrupt = self.interrupt.clone();
300
        tokio::select! {
301
            _ = async {
1✔
302
                for task in tasks {
2✔
303
                    let _ = task.await;
1✔
304
                }
305
            } => {
1✔
306
                println!("All resumed sessions completed.");
307
            }
308
            _ = interrupt.changed() => {
309
                println!("Resumed sessions were interrupted.");
310
            }
311
        }
312
        Ok(())
313
    }
3✔
314

315
    #[cfg(feature = "v2")]
316
    async fn history(&self) -> Result<()> {
×
317
        print_header();
318
        let mut send_rows = vec![];
319
        let mut recv_rows = vec![];
320
        self.db.get_send_session_ids()?.into_iter().try_for_each(|session_id| {
×
321
            let persister = SenderPersister::from_id(self.db.clone(), session_id.clone());
×
322
            if let Ok((sender_state, _)) = replay_sender_event_log(&persister) {
×
323
                let row = SessionHistoryRow {
×
324
                    session_id,
×
325
                    role: Role::Sender,
×
326
                    status: sender_state,
×
327
                    completed_at: None,
×
328
                    error_message: None,
×
329
                };
×
330
                send_rows.push(row);
×
331
            }
×
332
            Ok::<_, anyhow::Error>(())
×
333
        })?;
×
334

335
        self.db.get_recv_session_ids()?.into_iter().try_for_each(|session_id| {
×
336
            let persister = ReceiverPersister::from_id(self.db.clone(), session_id.clone());
×
337
            if let Ok((receiver_state, _)) = replay_receiver_event_log(&persister) {
×
338
                let row = SessionHistoryRow {
×
339
                    session_id,
×
340
                    role: Role::Receiver,
×
341
                    status: receiver_state,
×
342
                    completed_at: None,
×
343
                    error_message: None,
×
344
                };
×
345
                recv_rows.push(row);
×
346
            }
×
347
            Ok::<_, anyhow::Error>(())
×
348
        })?;
×
349

350
        self.db.get_inactive_send_session_ids()?.into_iter().try_for_each(
351
            |(session_id, completed_at)| {
×
352
                let persister = SenderPersister::from_id(self.db.clone(), session_id.clone());
×
353
                if let Ok((sender_state, session_history)) = replay_sender_event_log(&persister) {
×
354
                    let row = SessionHistoryRow {
×
355
                        session_id,
×
356
                        role: Role::Sender,
×
357
                        status: sender_state,
×
358
                        completed_at: Some(completed_at),
×
359
                        error_message: session_history.terminal_error(),
×
360
                    };
×
361
                    send_rows.push(row);
×
362
                }
×
363
                Ok::<_, anyhow::Error>(())
×
364
            },
×
365
        )?;
366

367
        self.db.get_inactive_recv_session_ids()?.into_iter().try_for_each(
368
            |(session_id, completed_at)| {
×
369
                let persister = ReceiverPersister::from_id(self.db.clone(), session_id.clone());
×
370
                if let Ok((receiver_state, session_history)) = replay_receiver_event_log(&persister)
×
371
                {
372
                    let row = SessionHistoryRow {
×
373
                        session_id,
×
374
                        role: Role::Receiver,
×
375
                        status: receiver_state,
×
376
                        completed_at: Some(completed_at),
×
NEW
377
                        error_message: session_history
×
NEW
378
                            .terminal_error()
×
NEW
379
                            .map(|e| e.to_json().to_string()),
×
380
                    };
381
                    recv_rows.push(row);
×
382
                }
×
383
                Ok::<_, anyhow::Error>(())
×
384
            },
×
385
        )?;
386

387
        // Print receiver and sender rows separately
388
        for row in send_rows {
389
            println!("{}", row);
390
        }
391
        for row in recv_rows {
392
            println!("{}", row);
393
        }
394

395
        Ok(())
396
    }
×
397
}
398

399
impl App {
400
    async fn process_sender_session(
2✔
401
        &self,
2✔
402
        session: SendSession,
2✔
403
        persister: &SenderPersister,
2✔
404
    ) -> Result<()> {
2✔
405
        match session {
2✔
406
            SendSession::WithReplyKey(context) =>
1✔
407
                self.post_original_proposal(context, persister).await?,
1✔
408
            SendSession::PollingForProposal(context) =>
1✔
409
                self.get_proposed_payjoin_psbt(context, persister).await?,
1✔
410
            SendSession::ProposalReceived(proposal) => {
×
411
                self.process_pj_response(proposal)?;
×
412
                return Ok(());
×
413
            }
414
            _ => return Err(anyhow!("Unexpected sender state")),
×
415
        }
416
        Ok(())
1✔
417
    }
1✔
418

419
    async fn post_original_proposal(
1✔
420
        &self,
1✔
421
        sender: Sender<WithReplyKey>,
1✔
422
        persister: &SenderPersister,
1✔
423
    ) -> Result<()> {
1✔
424
        let (req, ctx) = sender.create_v2_post_request(
1✔
425
            self.unwrap_relay_or_else_fetch(Some(sender.endpoint().clone())).await?.as_str(),
1✔
426
        )?;
×
427
        let response = self.post_request(req).await?;
1✔
428
        println!("Posted original proposal...");
1✔
429
        let sender = sender.process_response(&response.bytes().await?, ctx).save(persister)?;
1✔
430
        self.get_proposed_payjoin_psbt(sender, persister).await
1✔
431
    }
×
432

433
    async fn get_proposed_payjoin_psbt(
2✔
434
        &self,
2✔
435
        sender: Sender<PollingForProposal>,
2✔
436
        persister: &SenderPersister,
2✔
437
    ) -> Result<()> {
2✔
438
        let mut session = sender.clone();
2✔
439
        // Long poll until we get a response
440
        loop {
441
            let (req, ctx) = session.create_poll_request(
3✔
442
                self.unwrap_relay_or_else_fetch(Some(session.endpoint().clone())).await?.as_str(),
3✔
443
            )?;
×
444
            let response = self.post_request(req).await?;
3✔
445
            let res = session.process_response(&response.bytes().await?, ctx).save(persister);
2✔
446
            match res {
2✔
447
                Ok(OptionalTransitionOutcome::Progress(psbt)) => {
1✔
448
                    println!("Proposal received. Processing...");
1✔
449
                    self.process_pj_response(psbt)?;
1✔
450
                    return Ok(());
1✔
451
                }
452
                Ok(OptionalTransitionOutcome::Stasis(current_state)) => {
1✔
453
                    println!("No response yet.");
1✔
454
                    session = current_state;
1✔
455
                    continue;
1✔
456
                }
457
                Err(re) => {
×
458
                    println!("{re}");
×
459
                    tracing::debug!("{re:?}");
×
460
                    return Err(anyhow!("Response error").context(re));
×
461
                }
462
            }
463
        }
464
    }
1✔
465

466
    async fn long_poll_fallback(
2✔
467
        &self,
2✔
468
        session: Receiver<Initialized>,
2✔
469
        persister: &ReceiverPersister,
2✔
470
    ) -> Result<Receiver<UncheckedOriginalPayload>> {
2✔
471
        let ohttp_relay = self
2✔
472
            .unwrap_relay_or_else_fetch(Some(session.pj_uri().extras.endpoint().clone()))
2✔
473
            .await?;
2✔
474

475
        let mut session = session;
2✔
476
        loop {
477
            let (req, context) = session.create_poll_request(ohttp_relay.as_str())?;
2✔
478
            println!("Polling receive request...");
2✔
479
            let ohttp_response = self.post_request(req).await?;
2✔
480
            let state_transition = session
1✔
481
                .process_response(ohttp_response.bytes().await?.to_vec().as_slice(), context)
1✔
482
                .save(persister);
1✔
483
            match state_transition {
1✔
484
                Ok(OptionalTransitionOutcome::Progress(next_state)) => {
1✔
485
                    println!("Got a request from the sender. Responding with a Payjoin proposal.");
1✔
486
                    return Ok(next_state);
1✔
487
                }
488
                Ok(OptionalTransitionOutcome::Stasis(current_state)) => {
×
489
                    session = current_state;
×
490
                    continue;
×
491
                }
492
                Err(e) => return Err(e.into()),
×
493
            }
494
        }
495
    }
1✔
496

497
    async fn process_receiver_session(
2✔
498
        &self,
2✔
499
        session: ReceiveSession,
2✔
500
        persister: &ReceiverPersister,
2✔
501
    ) -> Result<()> {
2✔
502
        let res = {
2✔
503
            match session {
2✔
504
                ReceiveSession::Initialized(proposal) =>
2✔
505
                    self.read_from_directory(proposal, persister).await,
2✔
506
                ReceiveSession::UncheckedOriginalPayload(proposal) =>
×
507
                    self.check_proposal(proposal, persister).await,
×
508
                ReceiveSession::MaybeInputsOwned(proposal) =>
×
509
                    self.check_inputs_not_owned(proposal, persister).await,
×
510
                ReceiveSession::MaybeInputsSeen(proposal) =>
×
511
                    self.check_no_inputs_seen_before(proposal, persister).await,
×
512
                ReceiveSession::OutputsUnknown(proposal) =>
×
513
                    self.identify_receiver_outputs(proposal, persister).await,
×
514
                ReceiveSession::WantsOutputs(proposal) =>
×
515
                    self.commit_outputs(proposal, persister).await,
×
516
                ReceiveSession::WantsInputs(proposal) =>
×
517
                    self.contribute_inputs(proposal, persister).await,
×
518
                ReceiveSession::WantsFeeRange(proposal) =>
×
519
                    self.apply_fee_range(proposal, persister).await,
×
520
                ReceiveSession::ProvisionalProposal(proposal) =>
×
521
                    self.finalize_proposal(proposal, persister).await,
×
522
                ReceiveSession::PayjoinProposal(proposal) =>
×
523
                    self.send_payjoin_proposal(proposal, persister).await,
×
NEW
524
                ReceiveSession::HasReplyableError(error) =>
×
NEW
525
                    self.handle_error(error, persister).await,
×
526
            }
527
        };
528
        res
2✔
529
    }
2✔
530

531
    #[allow(clippy::incompatible_msrv)]
532
    async fn read_from_directory(
2✔
533
        &self,
2✔
534
        session: Receiver<Initialized>,
2✔
535
        persister: &ReceiverPersister,
2✔
536
    ) -> Result<()> {
2✔
537
        let mut interrupt = self.interrupt.clone();
2✔
538
        let receiver = tokio::select! {
2✔
539
            res = self.long_poll_fallback(session, persister) => res,
2✔
540
            _ = interrupt.changed() => {
2✔
541
                println!("Interrupted. Call the `resume` command to resume all sessions.");
1✔
542
                return Err(anyhow!("Interrupted"));
1✔
543
            }
544
        }?;
×
545
        self.check_proposal(receiver, persister).await
1✔
546
    }
2✔
547

548
    async fn check_proposal(
1✔
549
        &self,
1✔
550
        proposal: Receiver<UncheckedOriginalPayload>,
1✔
551
        persister: &ReceiverPersister,
1✔
552
    ) -> Result<()> {
1✔
553
        let wallet = self.wallet();
1✔
554
        let proposal = proposal
1✔
555
            .check_broadcast_suitability(None, |tx| {
1✔
556
                wallet
1✔
557
                    .can_broadcast(tx)
1✔
558
                    .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
1✔
559
            })
1✔
560
            .save(persister)?;
1✔
561

562
        println!("Fallback transaction received. Consider broadcasting this to get paid if the Payjoin fails:");
1✔
563
        println!("{}", serialize_hex(&proposal.extract_tx_to_schedule_broadcast()));
1✔
564
        self.check_inputs_not_owned(proposal, persister).await
1✔
565
    }
1✔
566

567
    async fn check_inputs_not_owned(
1✔
568
        &self,
1✔
569
        proposal: Receiver<MaybeInputsOwned>,
1✔
570
        persister: &ReceiverPersister,
1✔
571
    ) -> Result<()> {
1✔
572
        let wallet = self.wallet();
1✔
573
        let proposal = proposal
1✔
574
            .check_inputs_not_owned(&mut |input| {
1✔
575
                wallet
1✔
576
                    .is_mine(input)
1✔
577
                    .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
1✔
578
            })
1✔
579
            .save(persister)?;
1✔
580
        self.check_no_inputs_seen_before(proposal, persister).await
1✔
581
    }
1✔
582

583
    async fn check_no_inputs_seen_before(
1✔
584
        &self,
1✔
585
        proposal: Receiver<MaybeInputsSeen>,
1✔
586
        persister: &ReceiverPersister,
1✔
587
    ) -> Result<()> {
1✔
588
        let proposal = proposal
1✔
589
            .check_no_inputs_seen_before(&mut |input| {
1✔
590
                Ok(self.db.insert_input_seen_before(*input)?)
1✔
591
            })
1✔
592
            .save(persister)?;
1✔
593
        self.identify_receiver_outputs(proposal, persister).await
1✔
594
    }
1✔
595

596
    async fn identify_receiver_outputs(
1✔
597
        &self,
1✔
598
        proposal: Receiver<OutputsUnknown>,
1✔
599
        persister: &ReceiverPersister,
1✔
600
    ) -> Result<()> {
1✔
601
        let wallet = self.wallet();
1✔
602
        let proposal = proposal
1✔
603
            .identify_receiver_outputs(&mut |output_script| {
2✔
604
                wallet
2✔
605
                    .is_mine(output_script)
2✔
606
                    .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
2✔
607
            })
2✔
608
            .save(persister)?;
1✔
609
        self.commit_outputs(proposal, persister).await
1✔
610
    }
1✔
611

612
    async fn commit_outputs(
1✔
613
        &self,
1✔
614
        proposal: Receiver<WantsOutputs>,
1✔
615
        persister: &ReceiverPersister,
1✔
616
    ) -> Result<()> {
1✔
617
        let proposal = proposal.commit_outputs().save(persister)?;
1✔
618
        self.contribute_inputs(proposal, persister).await
1✔
619
    }
1✔
620

621
    async fn contribute_inputs(
1✔
622
        &self,
1✔
623
        proposal: Receiver<WantsInputs>,
1✔
624
        persister: &ReceiverPersister,
1✔
625
    ) -> Result<()> {
1✔
626
        let wallet = self.wallet();
1✔
627
        let candidate_inputs = wallet.list_unspent()?;
1✔
628

629
        let selected_input = proposal.try_preserving_privacy(candidate_inputs)?;
1✔
630
        let proposal =
1✔
631
            proposal.contribute_inputs(vec![selected_input])?.commit_inputs().save(persister)?;
1✔
632
        self.apply_fee_range(proposal, persister).await
1✔
633
    }
1✔
634

635
    async fn apply_fee_range(
1✔
636
        &self,
1✔
637
        proposal: Receiver<WantsFeeRange>,
1✔
638
        persister: &ReceiverPersister,
1✔
639
    ) -> Result<()> {
1✔
640
        let proposal = proposal.apply_fee_range(None, self.config.max_fee_rate).save(persister)?;
1✔
641
        self.finalize_proposal(proposal, persister).await
1✔
642
    }
1✔
643

644
    async fn finalize_proposal(
1✔
645
        &self,
1✔
646
        proposal: Receiver<ProvisionalProposal>,
1✔
647
        persister: &ReceiverPersister,
1✔
648
    ) -> Result<()> {
1✔
649
        let wallet = self.wallet();
1✔
650
        let proposal = proposal
1✔
651
            .finalize_proposal(|psbt| {
1✔
652
                wallet
1✔
653
                    .process_psbt(psbt)
1✔
654
                    .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
1✔
655
            })
1✔
656
            .save(persister)?;
1✔
657
        self.send_payjoin_proposal(proposal, persister).await
1✔
658
    }
1✔
659

660
    async fn send_payjoin_proposal(
1✔
661
        &self,
1✔
662
        proposal: Receiver<PayjoinProposal>,
1✔
663
        persister: &ReceiverPersister,
1✔
664
    ) -> Result<()> {
1✔
665
        let (req, ohttp_ctx) = proposal
1✔
666
            .create_post_request(self.unwrap_relay_or_else_fetch(None).await?.as_str())
1✔
667
            .map_err(|e| anyhow!("v2 req extraction failed {}", e))?;
1✔
668
        let res = self.post_request(req).await?;
1✔
669
        let payjoin_psbt = proposal.psbt().clone();
1✔
670
        proposal.process_response(&res.bytes().await?, ohttp_ctx).save(persister)?;
1✔
671
        println!(
1✔
672
            "Response successful. Watch mempool for successful Payjoin. TXID: {}",
1✔
673
            payjoin_psbt.extract_tx_unchecked_fee_rate().compute_txid()
1✔
674
        );
675
        Ok(())
1✔
676
    }
1✔
677

678
    async fn unwrap_relay_or_else_fetch(
7✔
679
        &self,
7✔
680
        directory: Option<payjoin::Url>,
7✔
681
    ) -> Result<payjoin::Url> {
7✔
682
        let selected_relay =
7✔
683
            self.relay_manager.lock().expect("Lock should not be poisoned").get_selected_relay();
7✔
684
        let ohttp_relay = match selected_relay {
7✔
685
            Some(relay) => relay,
3✔
686
            None =>
687
                unwrap_ohttp_keys_or_else_fetch(&self.config, directory, self.relay_manager.clone())
4✔
688
                    .await?
4✔
689
                    .relay_url,
690
        };
691
        Ok(ohttp_relay)
7✔
692
    }
7✔
693

694
    /// Handle error by attempting to send an error response over the directory
NEW
695
    async fn handle_error(
×
UNCOV
696
        &self,
×
NEW
697
        session: Receiver<HasReplyableError>,
×
NEW
698
        persister: &ReceiverPersister,
×
UNCOV
699
    ) -> Result<()> {
×
NEW
700
        let (err_req, err_ctx) =
×
NEW
701
            session.create_error_request(self.unwrap_relay_or_else_fetch(None).await?.as_str())?;
×
702

703
        let err_response = match self.post_request(err_req).await {
×
704
            Ok(response) => response,
×
705
            Err(e) => return Err(anyhow!("Failed to post error request: {}", e)),
×
706
        };
707

708
        let err_bytes = match err_response.bytes().await {
×
709
            Ok(bytes) => bytes,
×
710
            Err(e) => return Err(anyhow!("Failed to get error response bytes: {}", e)),
×
711
        };
712

NEW
713
        if let Err(e) = session.process_error_response(&err_bytes, err_ctx).save(persister) {
×
714
            return Err(anyhow!("Failed to process error response: {}", e));
×
715
        }
×
716

NEW
717
        Ok(())
×
UNCOV
718
    }
×
719

720
    async fn post_request(&self, req: payjoin::Request) -> Result<reqwest::Response> {
7✔
721
        let http = http_agent(&self.config)?;
7✔
722
        http.post(req.url)
7✔
723
            .header("Content-Type", req.content_type)
7✔
724
            .body(req.body)
7✔
725
            .send()
7✔
726
            .await
7✔
727
            .map_err(map_reqwest_err)
5✔
728
    }
5✔
729
}
730

731
fn map_reqwest_err(e: reqwest::Error) -> anyhow::Error {
×
732
    match e.status() {
×
733
        Some(status_code) => anyhow!("HTTP request failed: {} {}", status_code, e),
×
734
        None => anyhow!("No HTTP response: {}", e),
×
735
    }
736
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc