• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

payjoin / rust-payjoin / 26187598484

20 May 2026 08:19PM UTC coverage: 85.14% (-0.1%) from 85.284%
26187598484

Pull #1557

github

web-flow
Merge 3bb462567 into 17a3b6889
Pull Request #1557: Sender Pending Fallback state

23 of 51 new or added lines in 6 files covered. (45.1%)

1 existing line in 1 file now uncovered.

11665 of 13701 relevant lines covered (85.14%)

394.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.72
/payjoin-cli/src/app/v2/mod.rs
1
use std::fmt;
2
use std::sync::{Arc, Mutex};
3

4
use anyhow::{anyhow, Context, Result};
5
use payjoin::bitcoin::consensus::encode::serialize_hex;
6
use payjoin::bitcoin::{Amount, FeeRate};
7
use payjoin::persist::{OptionalTransitionOutcome, SessionPersister};
8
use payjoin::receive::v2::{
9
    replay_event_log as replay_receiver_event_log, HasReplyableError, Initialized,
10
    MaybeInputsOwned, MaybeInputsSeen, Monitor, OutputsUnknown, PayjoinProposal,
11
    ProvisionalProposal, ReceiveSession, Receiver, ReceiverBuilder,
12
    SessionOutcome as ReceiverSessionOutcome, UncheckedOriginalPayload, WantsFeeRange, WantsInputs,
13
    WantsOutputs,
14
};
15
use payjoin::send::v2::{
16
    replay_event_log as replay_sender_event_log, PendingFallback, PollingForProposal, SendSession,
17
    Sender, SenderBuilder, SessionOutcome as SenderSessionOutcome, WithReplyKey,
18
};
19
use payjoin::{ImplementationError, PjParam, Uri};
20
use tokio::sync::watch;
21

22
use super::config::Config;
23
use super::wallet::BitcoindWallet;
24
use super::App as AppTrait;
25
use crate::app::v2::ohttp::{unwrap_ohttp_keys_or_else_fetch, RelayManager};
26
use crate::app::{handle_interrupt, http_agent};
27
use crate::db::v2::{ReceiverPersister, SenderPersister, SessionId};
28
use crate::db::Database;
29

30
mod ohttp;
31

32
const W_ID: usize = 12;
33
const W_ROLE: usize = 25;
34
const W_DONE: usize = 15;
35
const W_STATUS: usize = 15;
36

37
#[derive(Clone)]
38
pub(crate) struct App {
39
    config: Config,
40
    db: Arc<Database>,
41
    wallet: BitcoindWallet,
42
    interrupt: watch::Receiver<()>,
43
    relay_manager: Arc<Mutex<RelayManager>>,
44
}
45

46
trait StatusText {
47
    fn status_text(&self) -> &'static str;
48
}
49

50
impl StatusText for SendSession {
51
    fn status_text(&self) -> &'static str {
×
52
        match self {
×
53
            SendSession::WithReplyKey(_) | SendSession::PollingForProposal(_) =>
54
                "Waiting for proposal",
×
55
            SendSession::Closed(session_outcome) => match session_outcome {
×
56
                SenderSessionOutcome::Failure => "Session failure",
×
57
                SenderSessionOutcome::Success(_) => "Session success",
×
58
                SenderSessionOutcome::Cancel => "Session cancelled",
×
59
            },
NEW
60
            SendSession::PendingFallback(_) => "Session awaiting fallback",
×
61
        }
62
    }
×
63
}
64

65
impl StatusText for ReceiveSession {
66
    fn status_text(&self) -> &'static str {
×
67
        match self {
×
68
            ReceiveSession::Initialized(_) => "Waiting for original proposal",
×
69
            ReceiveSession::UncheckedOriginalPayload(_)
70
            | ReceiveSession::MaybeInputsOwned(_)
71
            | ReceiveSession::MaybeInputsSeen(_)
72
            | ReceiveSession::OutputsUnknown(_)
73
            | ReceiveSession::WantsOutputs(_)
74
            | ReceiveSession::WantsInputs(_)
75
            | ReceiveSession::WantsFeeRange(_)
76
            | ReceiveSession::ProvisionalProposal(_) => "Processing original proposal",
×
77
            ReceiveSession::PayjoinProposal(_) => "Payjoin proposal sent",
×
78
            ReceiveSession::HasReplyableError(_) =>
79
                "Session failure, waiting to post error response",
×
80
            ReceiveSession::Monitor(_) => "Monitoring payjoin proposal",
×
81
            ReceiveSession::Closed(session_outcome) => match session_outcome {
×
82
                ReceiverSessionOutcome::Failure => "Session failure",
×
83
                ReceiverSessionOutcome::Success(_) => "Session success, Payjoin proposal was broadcasted",
×
84
                ReceiverSessionOutcome::Cancel => "Session cancelled",
×
85
                ReceiverSessionOutcome::FallbackBroadcasted => "Fallback broadcasted",
×
86
                ReceiverSessionOutcome::PayjoinProposalSent =>
87
                    "Payjoin proposal sent, skipping monitoring as the sender is spending non-SegWit inputs",
×
88
            },
89
        }
90
    }
×
91
}
92

93
fn print_header() {
×
94
    println!(
×
95
        "{:<W_ID$} {:<W_ROLE$} {:<W_DONE$} {:<W_STATUS$}",
96
        "Session ID", "Sender/Receiver", "Completed At", "Status"
97
    );
98
}
×
99

100
enum Role {
101
    Sender,
102
    Receiver,
103
}
104
impl Role {
105
    fn as_str(&self) -> &'static str {
×
106
        match self {
×
107
            Role::Sender => "Sender",
×
108
            Role::Receiver => "Receiver",
×
109
        }
110
    }
×
111
}
112

113
struct SessionHistoryRow<Status> {
114
    session_id: SessionId,
115
    role: Role,
116
    status: Status,
117
    completed_at: Option<u64>,
118
    error_message: Option<String>,
119
}
120

121
impl<Status: StatusText> fmt::Display for SessionHistoryRow<Status> {
122
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
×
123
        write!(
×
124
            f,
×
125
            "{:<W_ID$} {:<W_ROLE$} {:<W_DONE$} {:<W_STATUS$}",
126
            self.session_id.to_string(),
×
127
            self.role.as_str(),
×
128
            match self.completed_at {
×
129
                None => "Not Completed".to_string(),
×
130
                Some(secs) => {
×
131
                    // TODO: human readable time
132
                    secs.to_string()
×
133
                }
134
            },
135
            self.error_message.as_deref().unwrap_or(self.status.status_text())
×
136
        )
137
    }
×
138
}
139

140
#[async_trait::async_trait]
141
impl AppTrait for App {
142
    async fn new(config: Config) -> Result<Self> {
11✔
143
        let db = Arc::new(Database::create(&config.db_path)?);
144
        let relay_manager = Arc::new(Mutex::new(RelayManager::new()));
145
        let (interrupt_tx, interrupt_rx) = watch::channel(());
146
        tokio::spawn(handle_interrupt(interrupt_tx));
147
        let wallet = BitcoindWallet::new(&config.bitcoind).await?;
148
        let app = Self { config, db, wallet, interrupt: interrupt_rx, relay_manager };
149
        app.wallet()
150
            .network()
151
            .context("Failed to connect to bitcoind. Check config RPC connection.")?;
152
        Ok(app)
153
    }
11✔
154

155
    fn wallet(&self) -> BitcoindWallet { self.wallet.clone() }
30✔
156

157
    #[allow(clippy::incompatible_msrv)]
158
    async fn send_payjoin(&self, bip21: &str, fee_rate: FeeRate) -> Result<()> {
4✔
159
        use payjoin::UriExt;
160
        let uri = Uri::try_from(bip21)
161
            .map_err(|e| anyhow!("Failed to create URI from BIP21: {}", e))?
×
162
            .assume_checked()
163
            .check_pj_supported()
164
            .map_err(|_| anyhow!("URI does not support Payjoin"))?;
×
165
        let address = uri.address;
166
        let amount = uri.amount.ok_or_else(|| anyhow!("please specify the amount in the Uri"))?;
×
167
        match uri.extras.pj_param() {
168
            #[cfg(feature = "v1")]
169
            PjParam::V1(pj_param) => {
170
                let psbt = self.create_original_psbt(&address, amount, fee_rate)?;
171
                let fallback_tx = psbt.clone().extract_tx()?;
172
                let (req, ctx) = payjoin::send::v1::SenderBuilder::from_parts(
173
                    psbt,
174
                    pj_param,
175
                    &address,
176
                    Some(amount),
177
                )
178
                .build_recommended(fee_rate)
179
                .with_context(|| "Failed to build payjoin request")?
180
                .create_v1_post_request();
181
                let http = http_agent(&self.config)?;
182
                let body = String::from_utf8(req.body.clone()).unwrap();
183
                println!("Sending Original PSBT to {}", req.url);
184
                let response = match http
185
                    .post(req.url)
186
                    .header("Content-Type", req.content_type)
187
                    .body(body.clone())
188
                    .send()
189
                    .await
190
                {
191
                    Ok(response) => response,
192
                    Err(e) => {
193
                        tracing::error!("HTTP request failed: {e}");
194
                        println!("Payjoin failed. To broadcast the fallback transaction, run:");
195
                        println!(
196
                            "  bitcoin-cli -rpcwallet=<wallet> sendrawtransaction {:#}",
197
                            payjoin::bitcoin::consensus::encode::serialize_hex(&fallback_tx)
198
                        );
199
                        return Err(anyhow!("HTTP request failed: {e}"));
200
                    }
201
                };
202
                let psbt = match ctx.process_response(&response.bytes().await?) {
203
                    Ok(psbt) => psbt,
204
                    Err(e) => {
205
                        tracing::error!("Error processing response: {e:?}");
206
                        println!("Payjoin failed. To broadcast the fallback transaction, run:");
207
                        println!(
208
                            "  bitcoin-cli -rpcwallet=<wallet> sendrawtransaction {:#}",
209
                            payjoin::bitcoin::consensus::encode::serialize_hex(&fallback_tx)
210
                        );
211
                        return Err(anyhow!("Failed to process response {e}"));
212
                    }
213
                };
214

215
                self.process_pj_response(psbt)?;
216
                Ok(())
217
            }
218
            PjParam::V2(pj_param) => {
219
                let receiver_pubkey = pj_param.receiver_pubkey();
220
                let sender_state =
221
                    self.db.get_send_session_ids()?.into_iter().find_map(|session_id| {
1✔
222
                        let session_receiver_pubkey = self
1✔
223
                            .db
1✔
224
                            .get_send_session_receiver_pk(&session_id)
1✔
225
                            .expect("Receiver pubkey should exist if session id exists");
1✔
226
                        if session_receiver_pubkey == *receiver_pubkey {
1✔
227
                            let sender_persister =
1✔
228
                                SenderPersister::from_id(self.db.clone(), session_id);
1✔
229
                            let (send_session, _) = replay_sender_event_log(&sender_persister)
1✔
230
                                .map_err(|e| anyhow!("Failed to replay sender event log: {:?}", e))
1✔
231
                                .ok()?;
1✔
232

233
                            Some((send_session, sender_persister))
1✔
234
                        } else {
235
                            None
×
236
                        }
237
                    });
1✔
238

239
                let (sender_state, persister) = match sender_state {
240
                    Some((sender_state, persister)) => (sender_state, persister),
241
                    None => {
242
                        let persister =
243
                            SenderPersister::new(self.db.clone(), bip21, receiver_pubkey)?;
244
                        let psbt = self.create_original_psbt(&address, amount, fee_rate)?;
245
                        let sender =
246
                            SenderBuilder::from_parts(psbt, pj_param, &address, Some(amount))
247
                                .build_recommended(fee_rate)?
248
                                .save(&persister)?;
249

250
                        (SendSession::WithReplyKey(sender), persister)
251
                    }
252
                };
253
                let mut interrupt = self.interrupt.clone();
254
                tokio::select! {
255
                    res = self.process_sender_session(sender_state, &persister) => {
256
                        match res {
257
                            Ok(()) => return Ok(()),
258
                            Err(err) => {
259
                                let id = persister.session_id();
260
                                println!("Session {id} failed. Run `payjoin-cli cancel {id}` to cancel and broadcast the original transaction.");
261
                                return Err(err);
262
                            }
263
                        }
264
                    },
265
                    _ = interrupt.changed() => {
266
                        let id = persister.session_id();
267
                        println!(
268
                            "Session {id} interrupted. Call `send` again to resume, `resume` to resume all sessions, or `payjoin-cli cancel {id}` to cancel and broadcast the original transaction."
269
                        );
270
                        return Err(anyhow!("Interrupted"))
271
                    }
272
                }
273
            }
274
            _ => unimplemented!("Unrecognized payjoin version"),
275
        }
276
    }
4✔
277

278
    async fn receive_payjoin(&self, amount: Amount) -> Result<()> {
2✔
279
        let address = self.wallet().get_new_address()?;
280
        let ohttp_keys =
281
            unwrap_ohttp_keys_or_else_fetch(&self.config, None, self.relay_manager.clone())
282
                .await?
283
                .ohttp_keys;
284
        let persister = ReceiverPersister::new(self.db.clone())?;
285
        let session =
286
            ReceiverBuilder::new(address, self.config.v2()?.pj_directory.as_str(), ohttp_keys)?
287
                .with_amount(amount)
288
                .with_max_fee_rate(self.config.max_fee_rate.unwrap_or(FeeRate::BROADCAST_MIN))
289
                .build()
290
                .save(&persister)?;
291

292
        println!("Receive session established");
293
        let pj_uri = session.pj_uri();
294
        println!("Request Payjoin by sharing this Payjoin Uri:");
295
        println!("{pj_uri}");
296

297
        self.process_receiver_session(ReceiveSession::Initialized(session.clone()), &persister)
298
            .await?;
299
        Ok(())
300
    }
2✔
301

302
    #[allow(clippy::incompatible_msrv)]
303
    async fn resume_payjoins(&self) -> Result<()> {
4✔
304
        let recv_session_ids = self.db.get_recv_session_ids()?;
305
        let send_session_ids = self.db.get_send_session_ids()?;
306

307
        if recv_session_ids.is_empty() && send_session_ids.is_empty() {
308
            println!("No sessions to resume.");
309
            return Ok(());
310
        }
311

312
        let mut tasks = Vec::new();
313

314
        // Process receiver sessions
315
        for session_id in recv_session_ids {
316
            let self_clone = self.clone();
317
            let recv_persister = ReceiverPersister::from_id(self.db.clone(), session_id.clone());
318
            match replay_receiver_event_log(&recv_persister) {
319
                Ok((receiver_state, _)) => {
320
                    tasks.push(tokio::spawn(async move {
2✔
321
                        self_clone.process_receiver_session(receiver_state, &recv_persister).await
2✔
322
                    }));
1✔
323
                }
324
                Err(e) => {
325
                    tracing::error!("An error {:?} occurred while replaying receiver session", e);
326
                    Self::close_failed_session(&recv_persister, &session_id, "receiver");
327
                }
328
            }
329
        }
330

331
        // Process sender sessions
332
        for session_id in send_session_ids {
333
            let sender_persister = SenderPersister::from_id(self.db.clone(), session_id.clone());
334
            match replay_sender_event_log(&sender_persister) {
335
                Ok((sender_state, _)) => {
336
                    let self_clone = self.clone();
337
                    tasks.push(tokio::spawn(async move {
×
338
                        self_clone.process_sender_session(sender_state, &sender_persister).await
×
339
                    }));
×
340
                }
341
                Err(e) => {
342
                    tracing::error!("An error {:?} occurred while replaying Sender session", e);
343
                    Self::close_failed_session(&sender_persister, &session_id, "sender");
344
                }
345
            }
346
        }
347

348
        let mut interrupt = self.interrupt.clone();
349
        tokio::select! {
350
            _ = async {
2✔
351
                for task in tasks {
2✔
352
                    let _ = task.await;
2✔
353
                }
354
            } => {
1✔
355
                println!("All resumed sessions completed.");
356
            }
357
            _ = interrupt.changed() => {
358
                println!("Resumed sessions were interrupted.");
359
            }
360
        }
361
        Ok(())
362
    }
4✔
363

364
    #[cfg(feature = "v2")]
365
    async fn history(&self) -> Result<()> {
×
366
        print_header();
367
        let mut send_rows = vec![];
368
        let mut recv_rows = vec![];
369
        self.db.get_send_session_ids()?.into_iter().for_each(|session_id| {
×
370
            let persister = SenderPersister::from_id(self.db.clone(), session_id.clone());
×
371
            match replay_sender_event_log(&persister) {
×
372
                Ok((sender_state, _)) => {
×
373
                    let row = SessionHistoryRow {
×
374
                        session_id,
×
375
                        role: Role::Sender,
×
376
                        status: sender_state.clone(),
×
377
                        completed_at: None,
×
378
                        error_message: None,
×
379
                    };
×
380
                    send_rows.push(row);
×
381
                }
×
382
                Err(e) => {
×
383
                    let row = SessionHistoryRow {
×
384
                        session_id,
×
385
                        role: Role::Sender,
×
386
                        status: SendSession::Closed(SenderSessionOutcome::Failure),
×
387
                        completed_at: None,
×
388
                        error_message: Some(e.to_string()),
×
389
                    };
×
390
                    send_rows.push(row);
×
391
                }
×
392
            }
393
        });
×
394

395
        self.db.get_recv_session_ids()?.into_iter().for_each(|session_id| {
×
396
            let persister = ReceiverPersister::from_id(self.db.clone(), session_id.clone());
×
397
            match replay_receiver_event_log(&persister) {
×
398
                Ok((receiver_state, _)) => {
×
399
                    let row = SessionHistoryRow {
×
400
                        session_id,
×
401
                        role: Role::Receiver,
×
402
                        status: receiver_state.clone(),
×
403
                        completed_at: None,
×
404
                        error_message: None,
×
405
                    };
×
406
                    recv_rows.push(row);
×
407
                }
×
408
                Err(e) => {
×
409
                    let row = SessionHistoryRow {
×
410
                        session_id,
×
411
                        role: Role::Receiver,
×
412
                        status: ReceiveSession::Closed(ReceiverSessionOutcome::Failure),
×
413
                        completed_at: None,
×
414
                        error_message: Some(e.to_string()),
×
415
                    };
×
416
                    recv_rows.push(row);
×
417
                }
×
418
            }
419
        });
×
420

421
        self.db.get_inactive_send_session_ids()?.into_iter().for_each(
422
            |(session_id, completed_at)| {
×
423
                let persister = SenderPersister::from_id(self.db.clone(), session_id.clone());
×
424
                match replay_sender_event_log(&persister) {
×
425
                    Ok((sender_state, _)) => {
×
426
                        let row = SessionHistoryRow {
×
427
                            session_id,
×
428
                            role: Role::Sender,
×
429
                            status: sender_state.clone(),
×
430
                            completed_at: Some(completed_at),
×
431
                            error_message: None,
×
432
                        };
×
433
                        send_rows.push(row);
×
434
                    }
×
435
                    Err(e) => {
×
436
                        let row = SessionHistoryRow {
×
437
                            session_id,
×
438
                            role: Role::Sender,
×
439
                            status: SendSession::Closed(SenderSessionOutcome::Failure),
×
440
                            completed_at: Some(completed_at),
×
441
                            error_message: Some(e.to_string()),
×
442
                        };
×
443
                        send_rows.push(row);
×
444
                    }
×
445
                }
446
            },
×
447
        );
448

449
        self.db.get_inactive_recv_session_ids()?.into_iter().for_each(
450
            |(session_id, completed_at)| {
×
451
                let persister = ReceiverPersister::from_id(self.db.clone(), session_id.clone());
×
452
                match replay_receiver_event_log(&persister) {
×
453
                    Ok((receiver_state, _)) => {
×
454
                        let row = SessionHistoryRow {
×
455
                            session_id,
×
456
                            role: Role::Receiver,
×
457
                            status: receiver_state.clone(),
×
458
                            completed_at: Some(completed_at),
×
459
                            error_message: None,
×
460
                        };
×
461
                        recv_rows.push(row);
×
462
                    }
×
463
                    Err(e) => {
×
464
                        let row = SessionHistoryRow {
×
465
                            session_id,
×
466
                            role: Role::Receiver,
×
467
                            status: ReceiveSession::Closed(ReceiverSessionOutcome::Failure),
×
468
                            completed_at: Some(completed_at),
×
469
                            error_message: Some(e.to_string()),
×
470
                        };
×
471
                        recv_rows.push(row);
×
472
                    }
×
473
                }
474
            },
×
475
        );
476

477
        // Print receiver and sender rows separately
478
        for row in send_rows {
479
            println!("{row}");
480
        }
481
        for row in recv_rows {
482
            println!("{row}");
483
        }
484

485
        Ok(())
486
    }
×
487

488
    async fn cancel_sender(&self, session_id: SessionId, no_broadcast: bool) -> Result<()> {
1✔
489
        let persister = SenderPersister::from_id(self.db.clone(), session_id.clone());
490
        let (session, _history) = replay_sender_event_log(&persister)?;
491

492
        let pending: Sender<PendingFallback> = match session {
493
            SendSession::WithReplyKey(sender) => sender.cancel().save(&persister)?,
494
            SendSession::PollingForProposal(sender) => sender.cancel().save(&persister)?,
495
            SendSession::PendingFallback(sender) => sender,
496
            SendSession::Closed(SenderSessionOutcome::Success(proposal)) => {
497
                let txid = proposal.extract_tx_unchecked_fee_rate().compute_txid();
498
                println!(
499
                    "Session {session_id} already produced payjoin transaction {txid}. \
500
                     Cannot cancel a completed session."
501
                );
502
                return Ok(());
503
            }
504
            SendSession::Closed(_) => {
505
                println!("Session {session_id} is already closed. Nothing left to do.");
506
                return Ok(());
507
            }
508
        };
509

510
        if no_broadcast {
511
            println!(
512
                "Session {session_id} cancelled. Broadcast the original transaction manually:\n{}",
513
                serialize_hex(pending.fallback_tx())
514
            );
515
        } else {
516
            self.wallet().broadcast_tx(pending.fallback_tx())?;
517
            println!(
518
                "Broadcasted fallback transaction txid: {}",
519
                pending.fallback_tx().compute_txid()
520
            );
521
        }
522
        pending.close().save(&persister)?;
523
        Ok(())
524
    }
1✔
525
}
526

527
impl App {
528
    fn close_failed_session<P>(persister: &P, session_id: &SessionId, role: &str)
×
529
    where
×
530
        P: SessionPersister,
×
531
    {
532
        if let Err(close_err) = SessionPersister::close(persister) {
×
533
            tracing::error!("Failed to close {} session {}: {:?}", role, session_id, close_err);
×
534
        } else {
535
            tracing::error!("Closed failed {} session: {}", role, session_id);
×
536
        }
537
    }
×
538

539
    async fn process_sender_session(
3✔
540
        &self,
3✔
541
        session: SendSession,
3✔
542
        persister: &SenderPersister,
3✔
543
    ) -> Result<()> {
3✔
544
        match session {
×
545
            SendSession::WithReplyKey(context) =>
2✔
546
                self.post_original_proposal(context, persister).await?,
2✔
547
            SendSession::PollingForProposal(context) =>
1✔
548
                self.get_proposed_payjoin_psbt(context, persister).await?,
1✔
549
            SendSession::Closed(SenderSessionOutcome::Success(proposal)) => {
×
550
                self.process_pj_response(proposal)?;
×
551
                return Ok(());
×
552
            }
553
            SendSession::Closed(SenderSessionOutcome::Failure)
554
            | SendSession::Closed(SenderSessionOutcome::Cancel) => {
NEW
555
                println!("Session is closed. Nothing left to do");
×
NEW
556
                return Ok(());
×
557
            }
558
            SendSession::PendingFallback(_) => {
559
                let id = persister.session_id();
×
560
                println!(
×
561
                    "Session {id} was cancelled. Run `payjoin-cli cancel {id}` to cancel and broadcast the original transaction."
562
                );
563
                return Ok(());
×
564
            }
565
        }
566
        Ok(())
1✔
567
    }
1✔
568

569
    async fn post_original_proposal(
2✔
570
        &self,
2✔
571
        sender: Sender<WithReplyKey>,
2✔
572
        persister: &SenderPersister,
2✔
573
    ) -> Result<()> {
2✔
574
        let (req, ctx) = sender.create_v2_post_request(
2✔
575
            self.unwrap_relay_or_else_fetch(Some(&sender.endpoint())).await?.as_str(),
2✔
576
        )?;
×
577
        let response = self.post_request(req).await?;
2✔
578
        let sender = sender.process_response(&response.bytes().await?, ctx).save(persister)?;
2✔
579
        println!("Posted Original PSBT...");
2✔
580
        self.get_proposed_payjoin_psbt(sender, persister).await
2✔
581
    }
×
582

583
    async fn get_proposed_payjoin_psbt(
3✔
584
        &self,
3✔
585
        sender: Sender<PollingForProposal>,
3✔
586
        persister: &SenderPersister,
3✔
587
    ) -> Result<()> {
3✔
588
        let ohttp_relay = self.unwrap_relay_or_else_fetch(Some(&sender.endpoint())).await?;
3✔
589
        let mut session = sender.clone();
3✔
590
        // Long poll until we get a response
591
        loop {
592
            let (req, ctx) = session.create_poll_request(ohttp_relay.as_str())?;
5✔
593
            let response = self.post_request(req).await?;
5✔
594
            let res = session.process_response(&response.bytes().await?, ctx).save(persister);
3✔
595
            match res {
3✔
596
                Ok(OptionalTransitionOutcome::Progress(psbt)) => {
1✔
597
                    println!("Proposal received. Processing...");
1✔
598
                    self.process_pj_response(psbt)?;
1✔
599
                    return Ok(());
1✔
600
                }
601
                Ok(OptionalTransitionOutcome::Stasis(current_state)) => {
2✔
602
                    println!("No response yet.");
2✔
603
                    session = current_state;
2✔
604
                    continue;
2✔
605
                }
606
                Err(re) => {
×
607
                    println!("{re}");
×
608
                    tracing::debug!("{re:?}");
×
609
                    return Err(anyhow!("Response error").context(re));
×
610
                }
611
            }
612
        }
613
    }
1✔
614

615
    async fn long_poll_fallback(
3✔
616
        &self,
3✔
617
        session: Receiver<Initialized>,
3✔
618
        persister: &ReceiverPersister,
3✔
619
    ) -> Result<Receiver<UncheckedOriginalPayload>> {
3✔
620
        let ohttp_relay =
3✔
621
            self.unwrap_relay_or_else_fetch(Some(&session.pj_uri().extras.endpoint())).await?;
3✔
622

623
        let mut session = session;
3✔
624
        loop {
625
            let (req, context) = session.create_poll_request(ohttp_relay.as_str())?;
3✔
626
            println!("Polling receive request...");
3✔
627
            let ohttp_response = self.post_request(req).await?;
3✔
628
            let state_transition = session
1✔
629
                .process_response(ohttp_response.bytes().await?.to_vec().as_slice(), context)
1✔
630
                .save(persister);
1✔
631
            match state_transition {
1✔
632
                Ok(OptionalTransitionOutcome::Progress(next_state)) => {
1✔
633
                    println!("Got a request from the sender. Responding with a Payjoin proposal.");
1✔
634
                    return Ok(next_state);
1✔
635
                }
636
                Ok(OptionalTransitionOutcome::Stasis(current_state)) => {
×
637
                    session = current_state;
×
638
                    continue;
×
639
                }
640
                Err(e) => return Err(e.into()),
×
641
            }
642
        }
643
    }
1✔
644

645
    async fn process_receiver_session(
4✔
646
        &self,
4✔
647
        session: ReceiveSession,
4✔
648
        persister: &ReceiverPersister,
4✔
649
    ) -> Result<()> {
4✔
650
        let res = {
3✔
651
            match session {
4✔
652
                ReceiveSession::Initialized(proposal) =>
3✔
653
                    self.read_from_directory(proposal, persister).await,
3✔
654
                ReceiveSession::UncheckedOriginalPayload(proposal) =>
×
655
                    self.check_proposal(proposal, persister).await,
×
656
                ReceiveSession::MaybeInputsOwned(proposal) =>
×
657
                    self.check_inputs_not_owned(proposal, persister).await,
×
658
                ReceiveSession::MaybeInputsSeen(proposal) =>
×
659
                    self.check_no_inputs_seen_before(proposal, persister).await,
×
660
                ReceiveSession::OutputsUnknown(proposal) =>
×
661
                    self.identify_receiver_outputs(proposal, persister).await,
×
662
                ReceiveSession::WantsOutputs(proposal) =>
×
663
                    self.commit_outputs(proposal, persister).await,
×
664
                ReceiveSession::WantsInputs(proposal) =>
×
665
                    self.contribute_inputs(proposal, persister).await,
×
666
                ReceiveSession::WantsFeeRange(proposal) =>
×
667
                    self.apply_fee_range(proposal, persister).await,
×
668
                ReceiveSession::ProvisionalProposal(proposal) =>
×
669
                    self.finalize_proposal(proposal, persister).await,
×
670
                ReceiveSession::PayjoinProposal(proposal) =>
×
671
                    self.send_payjoin_proposal(proposal, persister).await,
×
672
                ReceiveSession::HasReplyableError(error) =>
×
673
                    self.handle_error(error, persister).await,
×
674
                ReceiveSession::Monitor(proposal) =>
1✔
675
                    self.monitor_payjoin_proposal(proposal, persister).await,
1✔
676
                ReceiveSession::Closed(_) => return Err(anyhow!("Session closed")),
×
677
            }
678
        };
679
        res
3✔
680
    }
3✔
681

682
    #[allow(clippy::incompatible_msrv)]
683
    async fn read_from_directory(
3✔
684
        &self,
3✔
685
        session: Receiver<Initialized>,
3✔
686
        persister: &ReceiverPersister,
3✔
687
    ) -> Result<()> {
3✔
688
        let mut interrupt = self.interrupt.clone();
3✔
689
        let receiver = tokio::select! {
3✔
690
            res = self.long_poll_fallback(session, persister) => res,
3✔
691
            _ = interrupt.changed() => {
3✔
692
                println!("Interrupted. Call the `resume` command to resume all sessions.");
2✔
693
                return Err(anyhow!("Interrupted"));
2✔
694
            }
695
        }?;
×
696
        self.check_proposal(receiver, persister).await
1✔
697
    }
2✔
698

699
    async fn check_proposal(
1✔
700
        &self,
1✔
701
        proposal: Receiver<UncheckedOriginalPayload>,
1✔
702
        persister: &ReceiverPersister,
1✔
703
    ) -> Result<()> {
1✔
704
        let wallet = self.wallet();
1✔
705
        let proposal = proposal
1✔
706
            .check_broadcast_suitability(None, |tx| {
1✔
707
                wallet
1✔
708
                    .can_broadcast(tx)
1✔
709
                    .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
1✔
710
            })
1✔
711
            .save(persister)?;
1✔
712

713
        println!("Fallback transaction received. Consider broadcasting this to get paid if the Payjoin fails:");
1✔
714
        println!("{}", serialize_hex(&proposal.extract_tx_to_schedule_broadcast()));
1✔
715
        self.check_inputs_not_owned(proposal, persister).await
1✔
716
    }
×
717

718
    async fn check_inputs_not_owned(
1✔
719
        &self,
1✔
720
        proposal: Receiver<MaybeInputsOwned>,
1✔
721
        persister: &ReceiverPersister,
1✔
722
    ) -> Result<()> {
1✔
723
        let wallet = self.wallet();
1✔
724
        let proposal = proposal
1✔
725
            .check_inputs_not_owned(&mut |input| {
1✔
726
                wallet
1✔
727
                    .is_mine(input)
1✔
728
                    .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
1✔
729
            })
1✔
730
            .save(persister)?;
1✔
731
        self.check_no_inputs_seen_before(proposal, persister).await
1✔
732
    }
×
733

734
    async fn check_no_inputs_seen_before(
1✔
735
        &self,
1✔
736
        proposal: Receiver<MaybeInputsSeen>,
1✔
737
        persister: &ReceiverPersister,
1✔
738
    ) -> Result<()> {
1✔
739
        let proposal = proposal
1✔
740
            .check_no_inputs_seen_before(&mut |input| {
1✔
741
                Ok(self.db.insert_input_seen_before(*input)?)
1✔
742
            })
1✔
743
            .save(persister)?;
1✔
744
        self.identify_receiver_outputs(proposal, persister).await
1✔
745
    }
×
746

747
    async fn identify_receiver_outputs(
1✔
748
        &self,
1✔
749
        proposal: Receiver<OutputsUnknown>,
1✔
750
        persister: &ReceiverPersister,
1✔
751
    ) -> Result<()> {
1✔
752
        let wallet = self.wallet();
1✔
753
        let proposal = proposal
1✔
754
            .identify_receiver_outputs(&mut |output_script| {
2✔
755
                wallet
2✔
756
                    .is_mine(output_script)
2✔
757
                    .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
2✔
758
            })
2✔
759
            .save(persister)?;
1✔
760
        self.commit_outputs(proposal, persister).await
1✔
761
    }
×
762

763
    async fn commit_outputs(
1✔
764
        &self,
1✔
765
        proposal: Receiver<WantsOutputs>,
1✔
766
        persister: &ReceiverPersister,
1✔
767
    ) -> Result<()> {
1✔
768
        let proposal = proposal.commit_outputs().save(persister)?;
1✔
769
        self.contribute_inputs(proposal, persister).await
1✔
770
    }
×
771

772
    async fn contribute_inputs(
1✔
773
        &self,
1✔
774
        proposal: Receiver<WantsInputs>,
1✔
775
        persister: &ReceiverPersister,
1✔
776
    ) -> Result<()> {
1✔
777
        let wallet = self.wallet();
1✔
778
        let candidate_inputs = wallet.list_unspent()?;
1✔
779

780
        if candidate_inputs.is_empty() {
1✔
781
            return Err(anyhow::anyhow!(
×
782
                "No spendable UTXOs available in wallet. Cannot contribute inputs to payjoin."
×
783
            ));
×
784
        }
1✔
785

786
        let selected_input = proposal.try_preserving_privacy(candidate_inputs)?;
1✔
787
        let proposal =
1✔
788
            proposal.contribute_inputs(vec![selected_input])?.commit_inputs().save(persister)?;
1✔
789
        self.apply_fee_range(proposal, persister).await
1✔
790
    }
×
791

792
    async fn apply_fee_range(
1✔
793
        &self,
1✔
794
        proposal: Receiver<WantsFeeRange>,
1✔
795
        persister: &ReceiverPersister,
1✔
796
    ) -> Result<()> {
1✔
797
        let proposal = proposal.apply_fee_range(None, self.config.max_fee_rate).save(persister)?;
1✔
798
        self.finalize_proposal(proposal, persister).await
1✔
799
    }
×
800

801
    async fn finalize_proposal(
1✔
802
        &self,
1✔
803
        proposal: Receiver<ProvisionalProposal>,
1✔
804
        persister: &ReceiverPersister,
1✔
805
    ) -> Result<()> {
1✔
806
        let wallet = self.wallet();
1✔
807
        let proposal = proposal
1✔
808
            .finalize_proposal(|psbt| {
1✔
809
                wallet
1✔
810
                    .process_psbt(psbt)
1✔
811
                    .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
1✔
812
            })
1✔
813
            .save(persister)?;
1✔
814
        self.send_payjoin_proposal(proposal, persister).await
1✔
815
    }
×
816

817
    async fn send_payjoin_proposal(
1✔
818
        &self,
1✔
819
        proposal: Receiver<PayjoinProposal>,
1✔
820
        persister: &ReceiverPersister,
1✔
821
    ) -> Result<()> {
1✔
822
        let (req, ohttp_ctx) = proposal
1✔
823
            .create_post_request(self.unwrap_relay_or_else_fetch(None::<&str>).await?.as_str())
1✔
824
            .map_err(|e| anyhow!("v2 req extraction failed {}", e))?;
1✔
825
        let res = self.post_request(req).await?;
1✔
826
        let payjoin_psbt = proposal.psbt().clone();
1✔
827
        let session = proposal.process_response(&res.bytes().await?, ohttp_ctx).save(persister)?;
1✔
828
        println!(
1✔
829
            "Response successful. Watch mempool for successful Payjoin. TXID: {}",
830
            payjoin_psbt.extract_tx_unchecked_fee_rate().compute_txid()
1✔
831
        );
832

833
        return self.monitor_payjoin_proposal(session, persister).await;
1✔
834
    }
×
835

836
    async fn monitor_payjoin_proposal(
2✔
837
        &self,
2✔
838
        proposal: Receiver<Monitor>,
2✔
839
        persister: &ReceiverPersister,
2✔
840
    ) -> Result<()> {
2✔
841
        // On a session resumption, the receiver will resume again in this state.
842
        let poll_interval = tokio::time::Duration::from_millis(200);
2✔
843
        let timeout_duration = tokio::time::Duration::from_secs(5);
2✔
844

845
        let mut interval = tokio::time::interval(poll_interval);
2✔
846
        interval.tick().await;
2✔
847

848
        tracing::debug!("Polling for payment confirmation");
1✔
849

850
        let result = tokio::time::timeout(timeout_duration, async {
1✔
851
            loop {
852
                interval.tick().await;
1✔
853
                let check_result = proposal
1✔
854
                    .check_payment(|txid| {
1✔
855
                        self.wallet()
1✔
856
                            .get_raw_transaction(&txid)
1✔
857
                            .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
1✔
858
                    })
1✔
859
                    .save(persister);
1✔
860

861
                match check_result {
1✔
862
                    Ok(_) => {
863
                        println!("Payjoin transaction detected in the mempool!");
1✔
864
                        return Ok(());
1✔
865
                    }
866
                    Err(_) => {
867
                        // keep polling
868

869
                        continue;
×
870
                    }
871
                }
872
            }
873
        })
1✔
874
        .await;
1✔
875

876
        match result {
1✔
877
            Ok(ok) => ok,
1✔
878
            Err(_) => Err(anyhow!(
×
879
                "Timeout waiting for payment confirmation after {:?}",
×
880
                timeout_duration
×
881
            )),
×
882
        }
883
    }
1✔
884

885
    async fn unwrap_relay_or_else_fetch(
9✔
886
        &self,
9✔
887
        directory: Option<impl payjoin::IntoUrl>,
9✔
888
    ) -> Result<payjoin::Url> {
9✔
889
        let directory = directory.map(|url| url.into_url()).transpose()?;
9✔
890
        let selected_relay =
9✔
891
            self.relay_manager.lock().expect("Lock should not be poisoned").get_selected_relay();
9✔
892
        let ohttp_relay = match selected_relay {
9✔
893
            Some(relay) => relay,
5✔
894
            None =>
895
                unwrap_ohttp_keys_or_else_fetch(&self.config, directory, self.relay_manager.clone())
4✔
896
                    .await?
4✔
897
                    .relay_url,
898
        };
899
        Ok(ohttp_relay)
9✔
900
    }
9✔
901

902
    /// Handle error by attempting to send an error response over the directory
903
    async fn handle_error(
×
904
        &self,
×
905
        session: Receiver<HasReplyableError>,
×
906
        persister: &ReceiverPersister,
×
907
    ) -> Result<()> {
×
908
        let (err_req, err_ctx) = session
×
909
            .create_error_request(self.unwrap_relay_or_else_fetch(None::<&str>).await?.as_str())?;
×
910

911
        let err_response = match self.post_request(err_req).await {
×
912
            Ok(response) => response,
×
913
            Err(e) => return Err(anyhow!("Failed to post error request: {}", e)),
×
914
        };
915

916
        let err_bytes = match err_response.bytes().await {
×
917
            Ok(bytes) => bytes,
×
918
            Err(e) => return Err(anyhow!("Failed to get error response bytes: {}", e)),
×
919
        };
920

921
        if let Err(e) = session.process_error_response(&err_bytes, err_ctx).save(persister) {
×
922
            return Err(anyhow!("Failed to process error response: {}", e));
×
923
        }
×
924

925
        Ok(())
×
926
    }
×
927

928
    async fn post_request(&self, req: payjoin::Request) -> Result<reqwest::Response> {
11✔
929
        let http = http_agent(&self.config)?;
11✔
930
        http.post(req.url)
11✔
931
            .header("Content-Type", req.content_type)
11✔
932
            .body(req.body)
11✔
933
            .send()
11✔
934
            .await
11✔
935
            .and_then(|r| r.error_for_status())
7✔
936
            .context("HTTP request failed")
7✔
937
    }
7✔
938
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc