• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

payjoin / rust-payjoin / 25944298217

15 May 2026 10:24PM UTC coverage: 85.039% (-0.3%) from 85.293%
25944298217

Pull #1558

github

web-flow
Merge 56641ee5a into b69034f35
Pull Request #1558: Receiver fallback typestate

683 of 856 new or added lines in 7 files covered. (79.79%)

8 existing lines in 2 files now uncovered.

12226 of 14377 relevant lines covered (85.04%)

377.13 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

46.82
/payjoin-cli/src/app/v2/mod.rs
1
use std::fmt;
2
use std::io::{self, Write};
3
use std::sync::{Arc, Mutex};
4

5
use anyhow::{anyhow, Context, Result};
6
use payjoin::bitcoin::consensus::encode::serialize_hex;
7
use payjoin::bitcoin::{Amount, FeeRate, Transaction};
8
use payjoin::persist::{OptionalTransitionOutcome, SessionPersister};
9
use payjoin::receive::v2::{
10
    replay_event_log as replay_receiver_event_log, HasReplyableError, Initialized,
11
    MaybeInputsOwned, MaybeInputsSeen, Monitor, OutputsUnknown, PayjoinProposal, PendingFallback,
12
    ProvisionalProposal, ReceiveSession, Receiver, ReceiverBuilder,
13
    SessionOutcome as ReceiverSessionOutcome, UncheckedOriginalPayload, WantsFeeRange, WantsInputs,
14
    WantsOutputs,
15
};
16
use payjoin::send::v2::{
17
    replay_event_log as replay_sender_event_log, PollingForProposal, SendSession, Sender,
18
    SenderBuilder, SessionOutcome as SenderSessionOutcome, WithReplyKey,
19
};
20
use payjoin::{ImplementationError, PjParam, Uri};
21
use tokio::sync::watch;
22

23
use super::config::Config;
24
use super::wallet::BitcoindWallet;
25
use super::App as AppTrait;
26
use crate::app::v2::ohttp::{unwrap_ohttp_keys_or_else_fetch, RelayManager};
27
use crate::app::{handle_interrupt, http_agent};
28
use crate::db::v2::{ReceiverPersister, SenderPersister, SessionId};
29
use crate::db::Database;
30

31
mod ohttp;
32

33
const W_ID: usize = 12;
34
const W_ROLE: usize = 25;
35
const W_DONE: usize = 15;
36
const W_STATUS: usize = 15;
37

38
#[derive(Clone)]
39
pub(crate) struct App {
40
    config: Config,
41
    db: Arc<Database>,
42
    wallet: BitcoindWallet,
43
    interrupt: watch::Receiver<()>,
44
    relay_manager: Arc<Mutex<RelayManager>>,
45
}
46

47
trait StatusText {
48
    fn status_text(&self) -> &'static str;
49
}
50

51
impl StatusText for SendSession {
52
    fn status_text(&self) -> &'static str {
×
53
        match self {
×
54
            SendSession::WithReplyKey(_) | SendSession::PollingForProposal(_) =>
55
                "Waiting for proposal",
×
56
            SendSession::Closed(session_outcome) => match session_outcome {
×
57
                SenderSessionOutcome::Failure => "Session failure",
×
58
                SenderSessionOutcome::Success(_) => "Session success",
×
59
                SenderSessionOutcome::Cancel => "Session cancelled",
×
60
            },
61
        }
62
    }
×
63
}
64

65
impl StatusText for ReceiveSession {
66
    fn status_text(&self) -> &'static str {
×
67
        match self {
×
68
            ReceiveSession::Initialized(_) => "Waiting for original proposal",
×
69
            ReceiveSession::UncheckedOriginalPayload(_)
70
            | ReceiveSession::MaybeInputsOwned(_)
71
            | ReceiveSession::MaybeInputsSeen(_)
72
            | ReceiveSession::OutputsUnknown(_)
73
            | ReceiveSession::WantsOutputs(_)
74
            | ReceiveSession::WantsInputs(_)
75
            | ReceiveSession::WantsFeeRange(_)
76
            | ReceiveSession::ProvisionalProposal(_) => "Processing original proposal",
×
77
            ReceiveSession::PayjoinProposal(_) => "Payjoin proposal sent",
×
78
            ReceiveSession::HasReplyableError(_) =>
79
                "Session failure, waiting to post error response",
×
80
            ReceiveSession::Monitor(_) => "Monitoring payjoin proposal",
×
NEW
81
            ReceiveSession::PendingFallback(_) => "Original transaction awaiting fallback decision",
×
82
            ReceiveSession::Closed(session_outcome) => match session_outcome {
×
83
                ReceiverSessionOutcome::Failure => "Session failure",
×
84
                ReceiverSessionOutcome::Success(_) => "Session success, Payjoin proposal was broadcasted",
×
85
                ReceiverSessionOutcome::Cancel => "Session cancelled",
×
86
                ReceiverSessionOutcome::FallbackBroadcasted => "Fallback broadcasted",
×
87
                ReceiverSessionOutcome::PayjoinProposalSent =>
88
                    "Payjoin proposal sent, skipping monitoring as the sender is spending non-SegWit inputs",
×
89
            },
90
        }
91
    }
×
92
}
93

94
fn print_header() {
×
95
    println!(
×
96
        "{:<W_ID$} {:<W_ROLE$} {:<W_DONE$} {:<W_STATUS$}",
97
        "Session ID", "Sender/Receiver", "Completed At", "Status"
98
    );
99
}
×
100

101
enum Role {
102
    Sender,
103
    Receiver,
104
}
105
impl Role {
106
    fn as_str(&self) -> &'static str {
×
107
        match self {
×
108
            Role::Sender => "Sender",
×
109
            Role::Receiver => "Receiver",
×
110
        }
111
    }
×
112
}
113

114
#[derive(Clone, Copy)]
115
enum FallbackHandling {
116
    Prompt,
117
    CloseWithoutBroadcast,
118
}
119

120
struct SessionHistoryRow<Status> {
121
    session_id: SessionId,
122
    role: Role,
123
    status: Status,
124
    completed_at: Option<u64>,
125
    error_message: Option<String>,
126
}
127

128
impl<Status: StatusText> fmt::Display for SessionHistoryRow<Status> {
129
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
×
130
        write!(
×
131
            f,
×
132
            "{:<W_ID$} {:<W_ROLE$} {:<W_DONE$} {:<W_STATUS$}",
133
            self.session_id.to_string(),
×
134
            self.role.as_str(),
×
135
            match self.completed_at {
×
136
                None => "Not Completed".to_string(),
×
137
                Some(secs) => {
×
138
                    // TODO: human readable time
139
                    secs.to_string()
×
140
                }
141
            },
142
            self.error_message.as_deref().unwrap_or(self.status.status_text())
×
143
        )
144
    }
×
145
}
146

147
#[async_trait::async_trait]
148
impl AppTrait for App {
149
    async fn new(config: Config) -> Result<Self> {
11✔
150
        let db = Arc::new(Database::create(&config.db_path)?);
151
        let relay_manager = Arc::new(Mutex::new(RelayManager::new()));
152
        let (interrupt_tx, interrupt_rx) = watch::channel(());
153
        tokio::spawn(handle_interrupt(interrupt_tx));
154
        let wallet = BitcoindWallet::new(&config.bitcoind).await?;
155
        let app = Self { config, db, wallet, interrupt: interrupt_rx, relay_manager };
156
        app.wallet()
157
            .network()
158
            .context("Failed to connect to bitcoind. Check config RPC connection.")?;
159
        Ok(app)
160
    }
11✔
161

162
    fn wallet(&self) -> BitcoindWallet { self.wallet.clone() }
30✔
163

164
    #[allow(clippy::incompatible_msrv)]
165
    async fn send_payjoin(&self, bip21: &str, fee_rate: FeeRate) -> Result<()> {
4✔
166
        use payjoin::UriExt;
167
        let uri = Uri::try_from(bip21)
168
            .map_err(|e| anyhow!("Failed to create URI from BIP21: {}", e))?
×
169
            .assume_checked()
170
            .check_pj_supported()
171
            .map_err(|_| anyhow!("URI does not support Payjoin"))?;
×
172
        let address = uri.address;
173
        let amount = uri.amount.ok_or_else(|| anyhow!("please specify the amount in the Uri"))?;
×
174
        match uri.extras.pj_param() {
175
            #[cfg(feature = "v1")]
176
            PjParam::V1(pj_param) => {
177
                let psbt = self.create_original_psbt(&address, amount, fee_rate)?;
178
                let fallback_tx = psbt.clone().extract_tx()?;
179
                let (req, ctx) = payjoin::send::v1::SenderBuilder::from_parts(
180
                    psbt,
181
                    pj_param,
182
                    &address,
183
                    Some(amount),
184
                )
185
                .build_recommended(fee_rate)
186
                .with_context(|| "Failed to build payjoin request")?
187
                .create_v1_post_request();
188
                let http = http_agent(&self.config)?;
189
                let body = String::from_utf8(req.body.clone()).unwrap();
190
                println!("Sending Original PSBT to {}", req.url);
191
                let response = match http
192
                    .post(req.url)
193
                    .header("Content-Type", req.content_type)
194
                    .body(body.clone())
195
                    .send()
196
                    .await
197
                {
198
                    Ok(response) => response,
199
                    Err(e) => {
200
                        tracing::error!("HTTP request failed: {e}");
201
                        println!("Payjoin failed. To broadcast the fallback transaction, run:");
202
                        println!(
203
                            "  bitcoin-cli -rpcwallet=<wallet> sendrawtransaction {:#}",
204
                            payjoin::bitcoin::consensus::encode::serialize_hex(&fallback_tx)
205
                        );
206
                        return Err(anyhow!("HTTP request failed: {e}"));
207
                    }
208
                };
209
                let psbt = match ctx.process_response(&response.bytes().await?) {
210
                    Ok(psbt) => psbt,
211
                    Err(e) => {
212
                        tracing::error!("Error processing response: {e:?}");
213
                        println!("Payjoin failed. To broadcast the fallback transaction, run:");
214
                        println!(
215
                            "  bitcoin-cli -rpcwallet=<wallet> sendrawtransaction {:#}",
216
                            payjoin::bitcoin::consensus::encode::serialize_hex(&fallback_tx)
217
                        );
218
                        return Err(anyhow!("Failed to process response {e}"));
219
                    }
220
                };
221

222
                self.process_pj_response(psbt)?;
223
                Ok(())
224
            }
225
            PjParam::V2(pj_param) => {
226
                let receiver_pubkey = pj_param.receiver_pubkey();
227
                let sender_state =
228
                    self.db.get_send_session_ids()?.into_iter().find_map(|session_id| {
1✔
229
                        let session_receiver_pubkey = self
1✔
230
                            .db
1✔
231
                            .get_send_session_receiver_pk(&session_id)
1✔
232
                            .expect("Receiver pubkey should exist if session id exists");
1✔
233
                        if session_receiver_pubkey == *receiver_pubkey {
1✔
234
                            let sender_persister =
1✔
235
                                SenderPersister::from_id(self.db.clone(), session_id);
1✔
236
                            let (send_session, _) = replay_sender_event_log(&sender_persister)
1✔
237
                                .map_err(|e| anyhow!("Failed to replay sender event log: {:?}", e))
1✔
238
                                .ok()?;
1✔
239

240
                            Some((send_session, sender_persister))
1✔
241
                        } else {
242
                            None
×
243
                        }
244
                    });
1✔
245

246
                let (sender_state, persister) = match sender_state {
247
                    Some((sender_state, persister)) => (sender_state, persister),
248
                    None => {
249
                        let persister =
250
                            SenderPersister::new(self.db.clone(), bip21, receiver_pubkey)?;
251
                        let psbt = self.create_original_psbt(&address, amount, fee_rate)?;
252
                        let sender =
253
                            SenderBuilder::from_parts(psbt, pj_param, &address, Some(amount))
254
                                .build_recommended(fee_rate)?
255
                                .save(&persister)?;
256

257
                        (SendSession::WithReplyKey(sender), persister)
258
                    }
259
                };
260
                let mut interrupt = self.interrupt.clone();
261
                tokio::select! {
262
                    res = self.process_sender_session(sender_state, &persister) => {
263
                        match res {
264
                            Ok(()) => return Ok(()),
265
                            Err(err) => {
266
                                let id = persister.session_id();
267
                                println!("Session {id} failed. Run `payjoin-cli fallback {id}` to broadcast the original transaction.");
268
                                return Err(err);
269
                            }
270
                        }
271
                    },
272
                    _ = interrupt.changed() => {
273
                        let id = persister.session_id();
274
                        println!(
275
                            "Session {id} interrupted. Call `send` again to resume, `resume` to resume all sessions, or `payjoin-cli fallback {id}` to broadcast the original transaction."
276
                        );
277
                        return Err(anyhow!("Interrupted"))
278
                    }
279
                }
280
            }
281
            _ => unimplemented!("Unrecognized payjoin version"),
282
        }
283
    }
4✔
284

285
    async fn receive_payjoin(&self, amount: Amount) -> Result<()> {
2✔
286
        let address = self.wallet().get_new_address()?;
287
        let ohttp_keys =
288
            unwrap_ohttp_keys_or_else_fetch(&self.config, None, self.relay_manager.clone())
289
                .await?
290
                .ohttp_keys;
291
        let persister = ReceiverPersister::new(self.db.clone())?;
292
        let session =
293
            ReceiverBuilder::new(address, self.config.v2()?.pj_directory.as_str(), ohttp_keys)?
294
                .with_amount(amount)
295
                .with_max_fee_rate(self.config.max_fee_rate.unwrap_or(FeeRate::BROADCAST_MIN))
296
                .build()
297
                .save(&persister)?;
298

299
        println!("Receive session established");
300
        let pj_uri = session.pj_uri();
301
        println!("Request Payjoin by sharing this Payjoin Uri:");
302
        println!("{pj_uri}");
303

304
        self.process_receiver_session(ReceiveSession::Initialized(session.clone()), &persister)
305
            .await?;
306
        Ok(())
307
    }
2✔
308

309
    #[allow(clippy::incompatible_msrv)]
310
    async fn resume_payjoins(&self) -> Result<()> {
4✔
311
        let recv_session_ids = self.db.get_recv_session_ids()?;
312
        let send_session_ids = self.db.get_send_session_ids()?;
313

314
        if recv_session_ids.is_empty() && send_session_ids.is_empty() {
315
            println!("No sessions to resume.");
316
            return Ok(());
317
        }
318

319
        let mut tasks = Vec::new();
320

321
        // Process receiver sessions
322
        for session_id in recv_session_ids {
323
            let self_clone = self.clone();
324
            let recv_persister = ReceiverPersister::from_id(self.db.clone(), session_id.clone());
325
            match replay_receiver_event_log(&recv_persister) {
326
                Ok((ReceiveSession::PendingFallback(pending_fallback), _)) => {
327
                    if let Err(e) = self.complete_pending_fallback(
328
                        pending_fallback,
329
                        &recv_persister,
330
                        FallbackHandling::Prompt,
331
                    ) {
332
                        tracing::error!(
333
                            "An error {:?} occurred while handling receiver session {}",
334
                            e,
335
                            session_id
336
                        );
337
                    }
338
                }
339
                Ok((receiver_state, _)) => tasks.push(tokio::spawn(async move {
2✔
340
                    self_clone.process_receiver_session(receiver_state, &recv_persister).await
2✔
341
                })),
1✔
342
                Err(e) => {
343
                    tracing::error!("An error {:?} occurred while replaying receiver session", e);
344
                    Self::close_failed_session(&recv_persister, &session_id, "receiver");
345
                }
346
            }
347
        }
348

349
        // Process sender sessions
350
        for session_id in send_session_ids {
351
            let sender_persister = SenderPersister::from_id(self.db.clone(), session_id.clone());
352
            match replay_sender_event_log(&sender_persister) {
353
                Ok((sender_state, _)) => {
354
                    let self_clone = self.clone();
355
                    tasks.push(tokio::spawn(async move {
×
356
                        self_clone.process_sender_session(sender_state, &sender_persister).await
×
357
                    }));
×
358
                }
359
                Err(e) => {
360
                    tracing::error!("An error {:?} occurred while replaying Sender session", e);
361
                    Self::close_failed_session(&sender_persister, &session_id, "sender");
362
                }
363
            }
364
        }
365

366
        let mut interrupt = self.interrupt.clone();
367
        tokio::select! {
368
            _ = async {
2✔
369
                for task in tasks {
2✔
370
                    let _ = task.await;
2✔
371
                }
372
            } => {
1✔
373
                println!("All resumed sessions completed.");
374
            }
375
            _ = interrupt.changed() => {
376
                println!("Resumed sessions were interrupted.");
377
            }
378
        }
379
        Ok(())
380
    }
4✔
381

382
    #[cfg(feature = "v2")]
383
    async fn history(&self) -> Result<()> {
×
384
        print_header();
385
        let mut send_rows = vec![];
386
        let mut recv_rows = vec![];
387
        self.db.get_send_session_ids()?.into_iter().for_each(|session_id| {
×
388
            let persister = SenderPersister::from_id(self.db.clone(), session_id.clone());
×
389
            match replay_sender_event_log(&persister) {
×
390
                Ok((sender_state, _)) => {
×
391
                    let row = SessionHistoryRow {
×
392
                        session_id,
×
393
                        role: Role::Sender,
×
394
                        status: sender_state.clone(),
×
395
                        completed_at: None,
×
396
                        error_message: None,
×
397
                    };
×
398
                    send_rows.push(row);
×
399
                }
×
400
                Err(e) => {
×
401
                    let row = SessionHistoryRow {
×
402
                        session_id,
×
403
                        role: Role::Sender,
×
404
                        status: SendSession::Closed(SenderSessionOutcome::Failure),
×
405
                        completed_at: None,
×
406
                        error_message: Some(e.to_string()),
×
407
                    };
×
408
                    send_rows.push(row);
×
409
                }
×
410
            }
411
        });
×
412

413
        self.db.get_recv_session_ids()?.into_iter().for_each(|session_id| {
×
414
            let persister = ReceiverPersister::from_id(self.db.clone(), session_id.clone());
×
415
            match replay_receiver_event_log(&persister) {
×
416
                Ok((receiver_state, _)) => {
×
417
                    let row = SessionHistoryRow {
×
418
                        session_id,
×
419
                        role: Role::Receiver,
×
420
                        status: receiver_state.clone(),
×
421
                        completed_at: None,
×
422
                        error_message: None,
×
423
                    };
×
424
                    recv_rows.push(row);
×
425
                }
×
426
                Err(e) => {
×
427
                    let row = SessionHistoryRow {
×
428
                        session_id,
×
429
                        role: Role::Receiver,
×
430
                        status: ReceiveSession::Closed(ReceiverSessionOutcome::Failure),
×
431
                        completed_at: None,
×
432
                        error_message: Some(e.to_string()),
×
433
                    };
×
434
                    recv_rows.push(row);
×
435
                }
×
436
            }
437
        });
×
438

439
        self.db.get_inactive_send_session_ids()?.into_iter().for_each(
440
            |(session_id, completed_at)| {
×
441
                let persister = SenderPersister::from_id(self.db.clone(), session_id.clone());
×
442
                match replay_sender_event_log(&persister) {
×
443
                    Ok((sender_state, _)) => {
×
444
                        let row = SessionHistoryRow {
×
445
                            session_id,
×
446
                            role: Role::Sender,
×
447
                            status: sender_state.clone(),
×
448
                            completed_at: Some(completed_at),
×
449
                            error_message: None,
×
450
                        };
×
451
                        send_rows.push(row);
×
452
                    }
×
453
                    Err(e) => {
×
454
                        let row = SessionHistoryRow {
×
455
                            session_id,
×
456
                            role: Role::Sender,
×
457
                            status: SendSession::Closed(SenderSessionOutcome::Failure),
×
458
                            completed_at: Some(completed_at),
×
459
                            error_message: Some(e.to_string()),
×
460
                        };
×
461
                        send_rows.push(row);
×
462
                    }
×
463
                }
464
            },
×
465
        );
466

467
        self.db.get_inactive_recv_session_ids()?.into_iter().for_each(
468
            |(session_id, completed_at)| {
×
469
                let persister = ReceiverPersister::from_id(self.db.clone(), session_id.clone());
×
470
                match replay_receiver_event_log(&persister) {
×
471
                    Ok((receiver_state, _)) => {
×
472
                        let row = SessionHistoryRow {
×
473
                            session_id,
×
474
                            role: Role::Receiver,
×
475
                            status: receiver_state.clone(),
×
476
                            completed_at: Some(completed_at),
×
477
                            error_message: None,
×
478
                        };
×
479
                        recv_rows.push(row);
×
480
                    }
×
481
                    Err(e) => {
×
482
                        let row = SessionHistoryRow {
×
483
                            session_id,
×
484
                            role: Role::Receiver,
×
485
                            status: ReceiveSession::Closed(ReceiverSessionOutcome::Failure),
×
486
                            completed_at: Some(completed_at),
×
487
                            error_message: Some(e.to_string()),
×
488
                        };
×
489
                        recv_rows.push(row);
×
490
                    }
×
491
                }
492
            },
×
493
        );
494

495
        // Print receiver and sender rows separately
496
        for row in send_rows {
497
            println!("{row}");
498
        }
499
        for row in recv_rows {
500
            println!("{row}");
501
        }
502

503
        Ok(())
504
    }
×
505

506
    async fn fallback_sender(&self, session_id: SessionId) -> Result<()> {
1✔
507
        let persister = SenderPersister::from_id(self.db.clone(), session_id.clone());
508
        let (session, history) = replay_sender_event_log(&persister)?;
509

510
        if let SendSession::Closed(SenderSessionOutcome::Success(proposal)) = session {
511
            let txid = proposal.clone().extract_tx_unchecked_fee_rate().compute_txid();
512
            println!(
513
                "Session {session_id} already produced payjoin transaction {txid}. \
514
                 Broadcasting the original now would double-spend against it. \
515
                 If the payjoin tx needs re-broadcast, run \
516
                 `bitcoin-cli gettransaction {txid}` to fetch the hex, then \
517
                 `bitcoin-cli sendrawtransaction <hex>`."
518
            );
519
            return Ok(());
520
        }
521

522
        let fallback_tx = history.fallback_tx();
523
        self.wallet().broadcast_tx(&fallback_tx)?;
524
        println!("Broadcasted fallback transaction txid: {}", fallback_tx.compute_txid());
525

526
        if let Err(e) = SessionPersister::close(&persister) {
527
            tracing::warn!("Failed to close session {session_id} after fallback: {e}");
528
        }
529
        Ok(())
530
    }
1✔
531

NEW
532
    async fn cancel_receiver(&self, session_id: SessionId) -> Result<()> {
×
533
        self.cancel_receiver_with_handling(session_id, FallbackHandling::Prompt).await
NEW
534
    }
×
535

NEW
536
    async fn cancel_receiver_without_broadcast(&self, session_id: SessionId) -> Result<()> {
×
537
        self.cancel_receiver_with_handling(session_id, FallbackHandling::CloseWithoutBroadcast)
538
            .await
NEW
539
    }
×
540
}
541

542
impl App {
543
    fn close_failed_session<P>(persister: &P, session_id: &SessionId, role: &str)
×
544
    where
×
545
        P: SessionPersister,
×
546
    {
547
        if let Err(close_err) = SessionPersister::close(persister) {
×
548
            tracing::error!("Failed to close {} session {}: {:?}", role, session_id, close_err);
×
549
        } else {
550
            tracing::error!("Closed failed {} session: {}", role, session_id);
×
551
        }
552
    }
×
553

NEW
554
    async fn cancel_receiver_with_handling(
×
NEW
555
        &self,
×
NEW
556
        session_id: SessionId,
×
NEW
557
        handling: FallbackHandling,
×
NEW
558
    ) -> Result<()> {
×
NEW
559
        let persister = ReceiverPersister::from_id(self.db.clone(), session_id.clone());
×
NEW
560
        let (session, _) = replay_receiver_event_log(&persister)?;
×
561

562
        macro_rules! cancel_to_pending_fallback {
563
            ($state:expr) => {{
564
                let pending_fallback = $state.cancel().save(&persister)?;
565
                self.complete_pending_fallback(pending_fallback, &persister, handling)
566
            }};
567
        }
568

NEW
569
        match session {
×
NEW
570
            ReceiveSession::Initialized(state) => {
×
NEW
571
                state.cancel().save(&persister)?;
×
NEW
572
                println!("Receiver session {session_id} cancelled.");
×
NEW
573
                Ok(())
×
574
            }
NEW
575
            ReceiveSession::UncheckedOriginalPayload(state) => {
×
NEW
576
                state.cancel().save(&persister)?;
×
NEW
577
                println!("Receiver session {session_id} cancelled.");
×
NEW
578
                Ok(())
×
579
            }
NEW
580
            ReceiveSession::MaybeInputsOwned(state) => cancel_to_pending_fallback!(state),
×
NEW
581
            ReceiveSession::MaybeInputsSeen(state) => cancel_to_pending_fallback!(state),
×
NEW
582
            ReceiveSession::OutputsUnknown(state) => cancel_to_pending_fallback!(state),
×
NEW
583
            ReceiveSession::WantsOutputs(state) => cancel_to_pending_fallback!(state),
×
NEW
584
            ReceiveSession::WantsInputs(state) => cancel_to_pending_fallback!(state),
×
NEW
585
            ReceiveSession::WantsFeeRange(state) => cancel_to_pending_fallback!(state),
×
NEW
586
            ReceiveSession::ProvisionalProposal(state) => cancel_to_pending_fallback!(state),
×
NEW
587
            ReceiveSession::PayjoinProposal(state) => cancel_to_pending_fallback!(state),
×
NEW
588
            ReceiveSession::Monitor(state) => cancel_to_pending_fallback!(state),
×
NEW
589
            ReceiveSession::HasReplyableError(state) => match state.cancel().save(&persister)? {
×
NEW
590
                Some(pending_fallback) =>
×
NEW
591
                    self.complete_pending_fallback(pending_fallback, &persister, handling),
×
592
                None => {
NEW
593
                    println!("Receiver session {session_id} cancelled.");
×
NEW
594
                    Ok(())
×
595
                }
596
            },
NEW
597
            ReceiveSession::PendingFallback(pending_fallback) =>
×
NEW
598
                self.complete_pending_fallback(pending_fallback, &persister, handling),
×
NEW
599
            ReceiveSession::Closed(outcome) => {
×
NEW
600
                println!("Receiver session {session_id} is already closed: {outcome:?}");
×
NEW
601
                Ok(())
×
602
            }
603
        }
NEW
604
    }
×
605

NEW
606
    fn complete_pending_fallback(
×
NEW
607
        &self,
×
NEW
608
        pending_fallback: Receiver<PendingFallback>,
×
NEW
609
        persister: &ReceiverPersister,
×
NEW
610
        handling: FallbackHandling,
×
NEW
611
    ) -> Result<()> {
×
NEW
612
        let should_broadcast = match handling {
×
613
            FallbackHandling::Prompt =>
NEW
614
                self.prompt_broadcast_fallback(pending_fallback.fallback_tx())?,
×
NEW
615
            FallbackHandling::CloseWithoutBroadcast => false,
×
616
        };
617

NEW
618
        if should_broadcast {
×
NEW
619
            let txid = self.wallet().broadcast_tx(pending_fallback.fallback_tx())?;
×
NEW
620
            println!("Broadcasted fallback transaction txid: {txid}");
×
NEW
621
        } else {
×
NEW
622
            println!("Closing receiver session without broadcasting the fallback transaction.");
×
NEW
623
        }
×
624

NEW
625
        pending_fallback.close().save(persister)?;
×
NEW
626
        Ok(())
×
NEW
627
    }
×
628

NEW
629
    fn prompt_broadcast_fallback(&self, fallback_tx: &Transaction) -> Result<bool> {
×
NEW
630
        println!(
×
631
            "Original transaction is pending fallback handling. TXID: {}",
NEW
632
            fallback_tx.compute_txid()
×
633
        );
NEW
634
        print!("Broadcast the original transaction before closing? [Y/n]: ");
×
NEW
635
        io::stdout().flush()?;
×
636

NEW
637
        let mut answer = String::new();
×
NEW
638
        io::stdin().read_line(&mut answer)?;
×
NEW
639
        let answer = answer.trim().to_ascii_lowercase();
×
NEW
640
        Ok(!matches!(answer.as_str(), "n" | "no" | "c" | "close"))
×
NEW
641
    }
×
642

643
    async fn process_sender_session(
3✔
644
        &self,
3✔
645
        session: SendSession,
3✔
646
        persister: &SenderPersister,
3✔
647
    ) -> Result<()> {
3✔
648
        match session {
×
649
            SendSession::WithReplyKey(context) =>
2✔
650
                self.post_original_proposal(context, persister).await?,
2✔
651
            SendSession::PollingForProposal(context) =>
1✔
652
                self.get_proposed_payjoin_psbt(context, persister).await?,
1✔
653
            SendSession::Closed(SenderSessionOutcome::Success(proposal)) => {
×
654
                self.process_pj_response(proposal)?;
×
655
                return Ok(());
×
656
            }
657
            SendSession::Closed(SenderSessionOutcome::Failure)
658
            | SendSession::Closed(SenderSessionOutcome::Cancel) => {
659
                let id = persister.session_id();
×
660
                println!(
×
661
                    "Session {id} ended without payjoin. Run `payjoin-cli fallback {id}` to broadcast the original transaction."
662
                );
663
                return Ok(());
×
664
            }
665
        }
666
        Ok(())
1✔
667
    }
1✔
668

669
    async fn post_original_proposal(
2✔
670
        &self,
2✔
671
        sender: Sender<WithReplyKey>,
2✔
672
        persister: &SenderPersister,
2✔
673
    ) -> Result<()> {
2✔
674
        let (req, ctx) = sender.create_v2_post_request(
2✔
675
            self.unwrap_relay_or_else_fetch(Some(&sender.endpoint())).await?.as_str(),
2✔
676
        )?;
×
677
        let response = self.post_request(req).await?;
2✔
678
        let sender = sender.process_response(&response.bytes().await?, ctx).save(persister)?;
2✔
679
        println!("Posted Original PSBT...");
2✔
680
        self.get_proposed_payjoin_psbt(sender, persister).await
2✔
681
    }
×
682

683
    async fn get_proposed_payjoin_psbt(
3✔
684
        &self,
3✔
685
        sender: Sender<PollingForProposal>,
3✔
686
        persister: &SenderPersister,
3✔
687
    ) -> Result<()> {
3✔
688
        let ohttp_relay = self.unwrap_relay_or_else_fetch(Some(&sender.endpoint())).await?;
3✔
689
        let mut session = sender.clone();
3✔
690
        // Long poll until we get a response
691
        loop {
692
            let (req, ctx) = session.create_poll_request(ohttp_relay.as_str())?;
5✔
693
            let response = self.post_request(req).await?;
5✔
694
            let res = session.process_response(&response.bytes().await?, ctx).save(persister);
3✔
695
            match res {
3✔
696
                Ok(OptionalTransitionOutcome::Progress(psbt)) => {
1✔
697
                    println!("Proposal received. Processing...");
1✔
698
                    self.process_pj_response(psbt)?;
1✔
699
                    return Ok(());
1✔
700
                }
701
                Ok(OptionalTransitionOutcome::Stasis(current_state)) => {
2✔
702
                    println!("No response yet.");
2✔
703
                    session = current_state;
2✔
704
                    continue;
2✔
705
                }
706
                Err(re) => {
×
707
                    println!("{re}");
×
708
                    tracing::debug!("{re:?}");
×
709
                    return Err(anyhow!("Response error").context(re));
×
710
                }
711
            }
712
        }
713
    }
1✔
714

715
    async fn long_poll_fallback(
3✔
716
        &self,
3✔
717
        session: Receiver<Initialized>,
3✔
718
        persister: &ReceiverPersister,
3✔
719
    ) -> Result<Receiver<UncheckedOriginalPayload>> {
3✔
720
        let ohttp_relay =
3✔
721
            self.unwrap_relay_or_else_fetch(Some(&session.pj_uri().extras.endpoint())).await?;
3✔
722

723
        let mut session = session;
3✔
724
        loop {
725
            let (req, context) = session.create_poll_request(ohttp_relay.as_str())?;
3✔
726
            println!("Polling receive request...");
3✔
727
            let ohttp_response = self.post_request(req).await?;
3✔
728
            let state_transition = session
1✔
729
                .process_response(ohttp_response.bytes().await?.to_vec().as_slice(), context)
1✔
730
                .save(persister);
1✔
731
            match state_transition {
1✔
732
                Ok(OptionalTransitionOutcome::Progress(next_state)) => {
1✔
733
                    println!("Got a request from the sender. Responding with a Payjoin proposal.");
1✔
734
                    return Ok(next_state);
1✔
735
                }
736
                Ok(OptionalTransitionOutcome::Stasis(current_state)) => {
×
737
                    session = current_state;
×
738
                    continue;
×
739
                }
740
                Err(e) => return Err(e.into()),
×
741
            }
742
        }
743
    }
1✔
744

745
    async fn process_receiver_session(
4✔
746
        &self,
4✔
747
        session: ReceiveSession,
4✔
748
        persister: &ReceiverPersister,
4✔
749
    ) -> Result<()> {
4✔
750
        let res = {
3✔
751
            match session {
4✔
752
                ReceiveSession::Initialized(proposal) =>
3✔
753
                    self.read_from_directory(proposal, persister).await,
3✔
754
                ReceiveSession::UncheckedOriginalPayload(proposal) =>
×
755
                    self.check_proposal(proposal, persister).await,
×
756
                ReceiveSession::MaybeInputsOwned(proposal) =>
×
757
                    self.check_inputs_not_owned(proposal, persister).await,
×
758
                ReceiveSession::MaybeInputsSeen(proposal) =>
×
759
                    self.check_no_inputs_seen_before(proposal, persister).await,
×
760
                ReceiveSession::OutputsUnknown(proposal) =>
×
761
                    self.identify_receiver_outputs(proposal, persister).await,
×
762
                ReceiveSession::WantsOutputs(proposal) =>
×
763
                    self.commit_outputs(proposal, persister).await,
×
764
                ReceiveSession::WantsInputs(proposal) =>
×
765
                    self.contribute_inputs(proposal, persister).await,
×
766
                ReceiveSession::WantsFeeRange(proposal) =>
×
767
                    self.apply_fee_range(proposal, persister).await,
×
768
                ReceiveSession::ProvisionalProposal(proposal) =>
×
769
                    self.finalize_proposal(proposal, persister).await,
×
770
                ReceiveSession::PayjoinProposal(proposal) =>
×
771
                    self.send_payjoin_proposal(proposal, persister).await,
×
772
                ReceiveSession::HasReplyableError(error) =>
×
773
                    self.handle_error(error, persister).await,
×
774
                ReceiveSession::Monitor(proposal) =>
1✔
775
                    self.monitor_payjoin_proposal(proposal, persister).await,
1✔
NEW
776
                ReceiveSession::PendingFallback(pending_fallback) => {
×
NEW
777
                    self.complete_pending_fallback(
×
NEW
778
                        pending_fallback,
×
NEW
779
                        persister,
×
NEW
780
                        FallbackHandling::Prompt,
×
NEW
781
                    )?;
×
NEW
782
                    Ok(())
×
783
                }
UNCOV
784
                ReceiveSession::Closed(_) => return Err(anyhow!("Session closed")),
×
785
            }
786
        };
787
        res
3✔
788
    }
3✔
789

790
    #[allow(clippy::incompatible_msrv)]
791
    async fn read_from_directory(
3✔
792
        &self,
3✔
793
        session: Receiver<Initialized>,
3✔
794
        persister: &ReceiverPersister,
3✔
795
    ) -> Result<()> {
3✔
796
        let mut interrupt = self.interrupt.clone();
3✔
797
        let receiver = tokio::select! {
3✔
798
            res = self.long_poll_fallback(session, persister) => res,
3✔
799
            _ = interrupt.changed() => {
3✔
800
                println!("Interrupted. Call the `resume` command to resume all sessions.");
2✔
801
                return Err(anyhow!("Interrupted"));
2✔
802
            }
803
        }?;
×
804
        self.check_proposal(receiver, persister).await
1✔
805
    }
2✔
806

807
    async fn check_proposal(
1✔
808
        &self,
1✔
809
        proposal: Receiver<UncheckedOriginalPayload>,
1✔
810
        persister: &ReceiverPersister,
1✔
811
    ) -> Result<()> {
1✔
812
        let wallet = self.wallet();
1✔
813
        let proposal = proposal
1✔
814
            .check_broadcast_suitability(None, |tx| {
1✔
815
                wallet
1✔
816
                    .can_broadcast(tx)
1✔
817
                    .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
1✔
818
            })
1✔
819
            .save(persister)?;
1✔
820

821
        println!("Fallback transaction received. Consider broadcasting this to get paid if the Payjoin fails:");
1✔
822
        println!("{}", serialize_hex(&proposal.extract_tx_to_schedule_broadcast()));
1✔
823
        self.check_inputs_not_owned(proposal, persister).await
1✔
824
    }
×
825

826
    async fn check_inputs_not_owned(
1✔
827
        &self,
1✔
828
        proposal: Receiver<MaybeInputsOwned>,
1✔
829
        persister: &ReceiverPersister,
1✔
830
    ) -> Result<()> {
1✔
831
        let wallet = self.wallet();
1✔
832
        let proposal = proposal
1✔
833
            .check_inputs_not_owned(&mut |input| {
1✔
834
                wallet
1✔
835
                    .is_mine(input)
1✔
836
                    .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
1✔
837
            })
1✔
838
            .save(persister)?;
1✔
839
        self.check_no_inputs_seen_before(proposal, persister).await
1✔
840
    }
×
841

842
    async fn check_no_inputs_seen_before(
1✔
843
        &self,
1✔
844
        proposal: Receiver<MaybeInputsSeen>,
1✔
845
        persister: &ReceiverPersister,
1✔
846
    ) -> Result<()> {
1✔
847
        let proposal = proposal
1✔
848
            .check_no_inputs_seen_before(&mut |input| {
1✔
849
                Ok(self.db.insert_input_seen_before(*input)?)
1✔
850
            })
1✔
851
            .save(persister)?;
1✔
852
        self.identify_receiver_outputs(proposal, persister).await
1✔
853
    }
×
854

855
    async fn identify_receiver_outputs(
1✔
856
        &self,
1✔
857
        proposal: Receiver<OutputsUnknown>,
1✔
858
        persister: &ReceiverPersister,
1✔
859
    ) -> Result<()> {
1✔
860
        let wallet = self.wallet();
1✔
861
        let proposal = proposal
1✔
862
            .identify_receiver_outputs(&mut |output_script| {
2✔
863
                wallet
2✔
864
                    .is_mine(output_script)
2✔
865
                    .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
2✔
866
            })
2✔
867
            .save(persister)?;
1✔
868
        self.commit_outputs(proposal, persister).await
1✔
869
    }
×
870

871
    async fn commit_outputs(
1✔
872
        &self,
1✔
873
        proposal: Receiver<WantsOutputs>,
1✔
874
        persister: &ReceiverPersister,
1✔
875
    ) -> Result<()> {
1✔
876
        let proposal = proposal.commit_outputs().save(persister)?;
1✔
877
        self.contribute_inputs(proposal, persister).await
1✔
878
    }
×
879

880
    async fn contribute_inputs(
1✔
881
        &self,
1✔
882
        proposal: Receiver<WantsInputs>,
1✔
883
        persister: &ReceiverPersister,
1✔
884
    ) -> Result<()> {
1✔
885
        let wallet = self.wallet();
1✔
886
        let candidate_inputs = wallet.list_unspent()?;
1✔
887

888
        if candidate_inputs.is_empty() {
1✔
889
            return Err(anyhow::anyhow!(
×
890
                "No spendable UTXOs available in wallet. Cannot contribute inputs to payjoin."
×
891
            ));
×
892
        }
1✔
893

894
        let selected_input = proposal.try_preserving_privacy(candidate_inputs)?;
1✔
895
        let proposal =
1✔
896
            proposal.contribute_inputs(vec![selected_input])?.commit_inputs().save(persister)?;
1✔
897
        self.apply_fee_range(proposal, persister).await
1✔
898
    }
×
899

900
    async fn apply_fee_range(
1✔
901
        &self,
1✔
902
        proposal: Receiver<WantsFeeRange>,
1✔
903
        persister: &ReceiverPersister,
1✔
904
    ) -> Result<()> {
1✔
905
        let proposal = proposal.apply_fee_range(None, self.config.max_fee_rate).save(persister)?;
1✔
906
        self.finalize_proposal(proposal, persister).await
1✔
907
    }
×
908

909
    async fn finalize_proposal(
1✔
910
        &self,
1✔
911
        proposal: Receiver<ProvisionalProposal>,
1✔
912
        persister: &ReceiverPersister,
1✔
913
    ) -> Result<()> {
1✔
914
        let wallet = self.wallet();
1✔
915
        let proposal = proposal
1✔
916
            .finalize_proposal(|psbt| {
1✔
917
                wallet
1✔
918
                    .process_psbt(psbt)
1✔
919
                    .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
1✔
920
            })
1✔
921
            .save(persister)?;
1✔
922
        self.send_payjoin_proposal(proposal, persister).await
1✔
923
    }
×
924

925
    async fn send_payjoin_proposal(
1✔
926
        &self,
1✔
927
        proposal: Receiver<PayjoinProposal>,
1✔
928
        persister: &ReceiverPersister,
1✔
929
    ) -> Result<()> {
1✔
930
        let (req, ohttp_ctx) = proposal
1✔
931
            .create_post_request(self.unwrap_relay_or_else_fetch(None::<&str>).await?.as_str())
1✔
932
            .map_err(|e| anyhow!("v2 req extraction failed {}", e))?;
1✔
933
        let res = self.post_request(req).await?;
1✔
934
        let payjoin_psbt = proposal.psbt().clone();
1✔
935
        let session =
1✔
936
            match proposal.process_response(&res.bytes().await?, ohttp_ctx).save(persister) {
1✔
937
                Ok(session) => session,
1✔
NEW
938
                Err(e) => {
×
NEW
939
                    let message = e.to_string();
×
NEW
940
                    if let Some(pending_fallback) = e.error_state() {
×
NEW
941
                        println!("Payjoin proposal post failed: {message}");
×
NEW
942
                        self.complete_pending_fallback(
×
NEW
943
                            pending_fallback,
×
NEW
944
                            persister,
×
NEW
945
                            FallbackHandling::Prompt,
×
NEW
946
                        )?;
×
NEW
947
                        return Ok(());
×
NEW
948
                    }
×
NEW
949
                    return Err(anyhow!("Failed to process payjoin proposal response: {message}"));
×
950
                }
951
            };
952
        println!(
1✔
953
            "Response successful. Watch mempool for successful Payjoin. TXID: {}",
954
            payjoin_psbt.extract_tx_unchecked_fee_rate().compute_txid()
1✔
955
        );
956

957
        return self.monitor_payjoin_proposal(session, persister).await;
1✔
958
    }
×
959

960
    async fn monitor_payjoin_proposal(
2✔
961
        &self,
2✔
962
        proposal: Receiver<Monitor>,
2✔
963
        persister: &ReceiverPersister,
2✔
964
    ) -> Result<()> {
2✔
965
        // On a session resumption, the receiver will resume again in this state.
966
        let poll_interval = tokio::time::Duration::from_millis(200);
2✔
967
        let timeout_duration = tokio::time::Duration::from_secs(5);
2✔
968

969
        let mut interval = tokio::time::interval(poll_interval);
2✔
970
        interval.tick().await;
2✔
971

972
        tracing::debug!("Polling for payment confirmation");
1✔
973

974
        let result = tokio::time::timeout(timeout_duration, async {
1✔
975
            loop {
976
                interval.tick().await;
1✔
977
                let check_result = proposal
1✔
978
                    .check_payment(|txid| {
1✔
979
                        self.wallet()
1✔
980
                            .get_raw_transaction(&txid)
1✔
981
                            .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
1✔
982
                    })
1✔
983
                    .save(persister);
1✔
984

985
                match check_result {
1✔
986
                    Ok(_) => {
987
                        println!("Payjoin transaction detected in the mempool!");
1✔
988
                        return Ok(());
1✔
989
                    }
990
                    Err(_) => {
991
                        // keep polling
992

993
                        continue;
×
994
                    }
995
                }
996
            }
997
        })
1✔
998
        .await;
1✔
999

1000
        match result {
1✔
1001
            Ok(ok) => ok,
1✔
1002
            Err(_) => Err(anyhow!(
×
1003
                "Timeout waiting for payment confirmation after {:?}",
×
1004
                timeout_duration
×
1005
            )),
×
1006
        }
1007
    }
1✔
1008

1009
    async fn unwrap_relay_or_else_fetch(
9✔
1010
        &self,
9✔
1011
        directory: Option<impl payjoin::IntoUrl>,
9✔
1012
    ) -> Result<payjoin::Url> {
9✔
1013
        let directory = directory.map(|url| url.into_url()).transpose()?;
9✔
1014
        let selected_relay =
9✔
1015
            self.relay_manager.lock().expect("Lock should not be poisoned").get_selected_relay();
9✔
1016
        let ohttp_relay = match selected_relay {
9✔
1017
            Some(relay) => relay,
5✔
1018
            None =>
1019
                unwrap_ohttp_keys_or_else_fetch(&self.config, directory, self.relay_manager.clone())
4✔
1020
                    .await?
4✔
1021
                    .relay_url,
1022
        };
1023
        Ok(ohttp_relay)
9✔
1024
    }
9✔
1025

1026
    /// Handle error by attempting to send an error response over the directory
1027
    async fn handle_error(
×
1028
        &self,
×
1029
        session: Receiver<HasReplyableError>,
×
1030
        persister: &ReceiverPersister,
×
1031
    ) -> Result<()> {
×
1032
        let (err_req, err_ctx) = session
×
1033
            .create_error_request(self.unwrap_relay_or_else_fetch(None::<&str>).await?.as_str())?;
×
1034

1035
        let err_response = match self.post_request(err_req).await {
×
1036
            Ok(response) => response,
×
1037
            Err(e) => return Err(anyhow!("Failed to post error request: {}", e)),
×
1038
        };
1039

1040
        let err_bytes = match err_response.bytes().await {
×
1041
            Ok(bytes) => bytes,
×
1042
            Err(e) => return Err(anyhow!("Failed to get error response bytes: {}", e)),
×
1043
        };
1044

NEW
1045
        match session.process_error_response(&err_bytes, err_ctx).save(persister) {
×
NEW
1046
            Ok(Some(pending_fallback)) => self.complete_pending_fallback(
×
NEW
1047
                pending_fallback,
×
NEW
1048
                persister,
×
NEW
1049
                FallbackHandling::Prompt,
×
1050
            ),
NEW
1051
            Ok(None) => Ok(()),
×
NEW
1052
            Err(e) => Err(anyhow!("Failed to process error response: {}", e)),
×
1053
        }
UNCOV
1054
    }
×
1055

1056
    async fn post_request(&self, req: payjoin::Request) -> Result<reqwest::Response> {
11✔
1057
        let http = http_agent(&self.config)?;
11✔
1058
        http.post(req.url)
11✔
1059
            .header("Content-Type", req.content_type)
11✔
1060
            .body(req.body)
11✔
1061
            .send()
11✔
1062
            .await
11✔
1063
            .and_then(|r| r.error_for_status())
7✔
1064
            .context("HTTP request failed")
7✔
1065
    }
7✔
1066
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc