• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

payjoin / rust-payjoin / 19148430674

06 Nov 2025 08:08PM UTC coverage: 83.457% (-0.3%) from 83.737%
19148430674

Pull #1164

github

web-flow
Merge 3d44e430b into 7d9e7f556
Pull Request #1164: WIP:Demonstrate failure handling with fallback TX broadcast

9 of 35 new or added lines in 1 file covered. (25.71%)

198 existing lines in 7 files now uncovered.

8990 of 10772 relevant lines covered (83.46%)

459.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

52.17
/payjoin-cli/src/app/v2/mod.rs
1
use std::fmt;
2
use std::sync::{Arc, Mutex};
3

4
use anyhow::{anyhow, Context, Result};
5
use payjoin::bitcoin::consensus::encode::serialize_hex;
6
use payjoin::bitcoin::{Amount, FeeRate};
7
use payjoin::persist::{OptionalTransitionOutcome, PersistedError};
8
use payjoin::receive::v2::{
9
    replay_event_log as replay_receiver_event_log, HasReplyableError, Initialized,
10
    MaybeInputsOwned, MaybeInputsSeen, Monitor, OutputsUnknown, PayjoinProposal,
11
    ProvisionalProposal, ReceiveSession, Receiver, ReceiverBuilder,
12
    SessionOutcome as ReceiverSessionOutcome, UncheckedOriginalPayload, WantsFeeRange, WantsInputs,
13
    WantsOutputs,
14
};
15
use payjoin::send::v2::{
16
    replay_event_log as replay_sender_event_log, EncapsulationError, PollingForProposal,
17
    SendSession, Sender, SenderBuilder, SessionOutcome as SenderSessionOutcome, WithReplyKey,
18
};
19
use payjoin::{ImplementationError, PjParam, Uri};
20
use tokio::sync::watch;
21

22
use super::config::Config;
23
use super::wallet::BitcoindWallet;
24
use super::App as AppTrait;
25
use crate::app::v2::ohttp::{unwrap_ohttp_keys_or_else_fetch, RelayManager};
26
use crate::app::{handle_interrupt, http_agent};
27
use crate::db::v2::{ReceiverPersister, SenderPersister, SessionId};
28
use crate::db::{error as db_error, Database};
29

30
mod ohttp;
31

32
const W_ID: usize = 12;
33
const W_ROLE: usize = 25;
34
const W_DONE: usize = 15;
35
const W_STATUS: usize = 15;
36

37
#[derive(Clone)]
38
pub(crate) struct App {
39
    config: Config,
40
    db: Arc<Database>,
41
    wallet: BitcoindWallet,
42
    interrupt: watch::Receiver<()>,
43
    relay_manager: Arc<Mutex<RelayManager>>,
44
}
45

46
trait StatusText {
47
    fn status_text(&self) -> &'static str;
48
}
49

50
impl StatusText for SendSession {
51
    fn status_text(&self) -> &'static str {
×
52
        match self {
×
53
            SendSession::WithReplyKey(_) | SendSession::PollingForProposal(_) =>
54
                "Waiting for proposal",
×
55
            SendSession::Closed(session_outcome) => match session_outcome {
×
56
                SenderSessionOutcome::Failure => "Session failure",
×
57
                SenderSessionOutcome::Success(_) => "Session success",
×
58
                SenderSessionOutcome::Cancel => "Session cancelled",
×
59
            },
60
        }
UNCOV
61
    }
×
62
}
63

64
impl StatusText for ReceiveSession {
UNCOV
65
    fn status_text(&self) -> &'static str {
×
66
        match self {
×
67
            ReceiveSession::Initialized(_) => "Waiting for original proposal",
×
68
            ReceiveSession::UncheckedOriginalPayload(_)
69
            | ReceiveSession::MaybeInputsOwned(_)
70
            | ReceiveSession::MaybeInputsSeen(_)
71
            | ReceiveSession::OutputsUnknown(_)
72
            | ReceiveSession::WantsOutputs(_)
73
            | ReceiveSession::WantsInputs(_)
74
            | ReceiveSession::WantsFeeRange(_)
UNCOV
75
            | ReceiveSession::ProvisionalProposal(_) => "Processing original proposal",
×
76
            ReceiveSession::PayjoinProposal(_) => "Payjoin proposal sent",
×
77
            ReceiveSession::HasReplyableError(_) =>
UNCOV
78
                "Session failure, waiting to post error response",
×
79
            ReceiveSession::Monitor(_) => "Monitoring payjoin proposal",
×
80
            ReceiveSession::Closed(session_outcome) => match session_outcome {
×
81
                ReceiverSessionOutcome::Failure => "Session failure",
×
82
                ReceiverSessionOutcome::Success(_) => "Session success",
×
83
                ReceiverSessionOutcome::Cancel => "Session cancelled",
×
84
                ReceiverSessionOutcome::FallbackBroadcasted => "Fallback broadcasted",
×
85
            },
86
        }
UNCOV
87
    }
×
88
}
89

UNCOV
90
fn print_header() {
×
91
    println!(
×
92
        "{:<W_ID$} {:<W_ROLE$} {:<W_DONE$} {:<W_STATUS$}",
×
93
        "Session ID", "Sender/Receiver", "Completed At", "Status"
94
    );
UNCOV
95
}
×
96

97
enum Role {
98
    Sender,
99
    Receiver,
100
}
101
impl Role {
UNCOV
102
    fn as_str(&self) -> &'static str {
×
103
        match self {
×
104
            Role::Sender => "Sender",
×
105
            Role::Receiver => "Receiver",
×
106
        }
UNCOV
107
    }
×
108
}
109

110
struct SessionHistoryRow<Status> {
111
    session_id: SessionId,
112
    role: Role,
113
    status: Status,
114
    completed_at: Option<u64>,
115
    error_message: Option<String>,
116
}
117

118
impl<Status: StatusText> fmt::Display for SessionHistoryRow<Status> {
UNCOV
119
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
×
120
        write!(
×
121
            f,
×
122
            "{:<W_ID$} {:<W_ROLE$} {:<W_DONE$} {:<W_STATUS$}",
×
123
            self.session_id.to_string(),
×
124
            self.role.as_str(),
×
125
            match self.completed_at {
×
126
                None => "Not Completed".to_string(),
×
127
                Some(secs) => {
×
128
                    // TODO: human readable time
UNCOV
129
                    secs.to_string()
×
130
                }
131
            },
UNCOV
132
            self.error_message.as_deref().unwrap_or(self.status.status_text())
×
133
        )
UNCOV
134
    }
×
135
}
136

137
#[async_trait::async_trait]
138
impl AppTrait for App {
139
    async fn new(config: Config) -> Result<Self> {
8✔
140
        let db = Arc::new(Database::create(&config.db_path)?);
141
        let relay_manager = Arc::new(Mutex::new(RelayManager::new()));
142
        let (interrupt_tx, interrupt_rx) = watch::channel(());
143
        tokio::spawn(handle_interrupt(interrupt_tx));
144
        let wallet = BitcoindWallet::new(&config.bitcoind).await?;
145
        let app = Self { config, db, wallet, interrupt: interrupt_rx, relay_manager };
146
        app.wallet()
147
            .network()
148
            .context("Failed to connect to bitcoind. Check config RPC connection.")?;
149
        Ok(app)
150
    }
8✔
151

152
    fn wallet(&self) -> BitcoindWallet { self.wallet.clone() }
21✔
153

154
    #[allow(clippy::incompatible_msrv)]
155
    async fn send_payjoin(&self, bip21: &str, fee_rate: FeeRate) -> Result<()> {
3✔
156
        use payjoin::UriExt;
157
        let uri = Uri::try_from(bip21)
UNCOV
158
            .map_err(|e| anyhow!("Failed to create URI from BIP21: {}", e))?
×
159
            .assume_checked()
160
            .check_pj_supported()
UNCOV
161
            .map_err(|_| anyhow!("URI does not support Payjoin"))?;
×
162
        let address = uri.address;
UNCOV
163
        let amount = uri.amount.ok_or_else(|| anyhow!("please specify the amount in the Uri"))?;
×
164
        match uri.extras.pj_param() {
165
            #[cfg(feature = "v1")]
166
            PjParam::V1(pj_param) => {
167
                use std::str::FromStr;
168

169
                let psbt = self.create_original_psbt(&address, amount, fee_rate)?;
170
                let (req, ctx) = payjoin::send::v1::SenderBuilder::from_parts(
171
                    psbt,
172
                    pj_param,
173
                    &address,
174
                    Some(amount),
175
                )
176
                .build_recommended(fee_rate)
177
                .with_context(|| "Failed to build payjoin request")?
178
                .create_v1_post_request();
179
                let http = http_agent(&self.config)?;
180
                let body = String::from_utf8(req.body.clone()).unwrap();
181
                println!("Sending fallback request to {}", &req.url);
182
                let response = http
183
                    .post(req.url)
184
                    .header("Content-Type", req.content_type)
185
                    .body(body.clone())
186
                    .send()
187
                    .await
188
                    .with_context(|| "HTTP request failed")?;
189
                let fallback_tx = payjoin::bitcoin::Psbt::from_str(&body)
UNCOV
190
                    .map_err(|e| anyhow!("Failed to load PSBT from base64: {}", e))?
×
191
                    .extract_tx()?;
192
                println!("Sent fallback transaction txid: {}", fallback_tx.compute_txid());
193
                println!(
194
                    "Sent fallback transaction hex: {:#}",
195
                    payjoin::bitcoin::consensus::encode::serialize_hex(&fallback_tx)
196
                );
197
                // Try to process the payjoin response
198
                match ctx.process_response(&response.bytes().await?.to_vec()) {
199
                    Ok(psbt) => {
200
                        println!("Payjoin proposal received, processing...");
201
                        self.process_pj_response(psbt)?;
202
                        Ok(())
203
                    }
204
                    Err(e) => {
205
                        tracing::debug!("Error processing response: {e:?}");
206
                        println!("Payjoin failed: {}. Broadcasting fallback transaction.", e);
207

208
                        // Broadcast the fallback transaction
209
                        let txid = self.wallet().broadcast_tx(&fallback_tx)?;
210
                        println!("Fallback transaction broadcasted. TXID: {}", txid);
211
                        Ok(())
212
                    }
213
                }
214
            }
215
            PjParam::V2(pj_param) => {
216
                let receiver_pubkey = pj_param.receiver_pubkey();
217
                let (sender_state, persister, fallback_tx) =
218
                    match self.db.get_send_session_ids()?.into_iter().find_map(|session_id| {
1✔
219
                        let session_receiver_pubkey = self
1✔
220
                            .db
1✔
221
                            .get_send_session_receiver_pk(&session_id)
1✔
222
                            .expect("Receiver pubkey should exist if session id exists");
1✔
223
                        if session_receiver_pubkey == *receiver_pubkey {
1✔
224
                            let sender_persister =
1✔
225
                                SenderPersister::from_id(self.db.clone(), session_id);
1✔
226
                            let (send_session, history) =
1✔
227
                                replay_sender_event_log(&sender_persister)
1✔
228
                                    .map_err(|e| {
1✔
NEW
229
                                        anyhow!("Failed to replay sender event log: {:?}", e)
×
NEW
230
                                    })
×
231
                                    .ok()?;
1✔
232

233
                            Some((send_session, sender_persister, history.fallback_tx()))
1✔
234
                        } else {
235
                            None
×
236
                        }
237
                    }) {
1✔
238
                        Some((sender_state, persister, fallback_tx)) =>
239
                            (sender_state, persister, fallback_tx),
240
                        None => {
241
                            let persister =
242
                                SenderPersister::new(self.db.clone(), receiver_pubkey.clone())?;
243
                            let psbt = self.create_original_psbt(&address, amount, fee_rate)?;
NEW
244
                            let fallback_tx = psbt.clone().extract_tx().map_err(|e| {
×
NEW
245
                                anyhow!("Failed to extract fallback transaction: {}", e)
×
NEW
246
                            })?;
×
247
                            let sender =
248
                                SenderBuilder::from_parts(psbt, pj_param, &address, Some(amount))
249
                                    .build_recommended(fee_rate)?
250
                                    .save(&persister)?;
251

252
                            (SendSession::WithReplyKey(sender), persister, fallback_tx)
253
                        }
254
                    };
255
                let mut interrupt = self.interrupt.clone();
256
                tokio::select! {
257
                    _ = self.process_sender_session(sender_state, &persister, &fallback_tx) => return Ok(()),
258
                    _ = interrupt.changed() => {
259
                        println!("Interrupted. Call `send` with the same arguments to resume this session or `resume` to resume all sessions.");
260
                        return Err(anyhow!("Interrupted"))
261
                    }
262
                }
263
            }
264
            _ => unimplemented!("Unrecognized payjoin version"),
265
        }
266
    }
3✔
267

268
    async fn receive_payjoin(&self, amount: Amount) -> Result<()> {
1✔
269
        let address = self.wallet().get_new_address()?;
270
        let ohttp_keys =
271
            unwrap_ohttp_keys_or_else_fetch(&self.config, None, self.relay_manager.clone())
272
                .await?
273
                .ohttp_keys;
274
        let persister = ReceiverPersister::new(self.db.clone())?;
275
        let session =
276
            ReceiverBuilder::new(address, self.config.v2()?.pj_directory.as_str(), ohttp_keys)?
277
                .with_amount(amount)
278
                .with_max_fee_rate(self.config.max_fee_rate.unwrap_or(FeeRate::BROADCAST_MIN))
279
                .build()
280
                .save(&persister)?;
281

282
        println!("Receive session established");
283
        let pj_uri = session.pj_uri();
284
        println!("Request Payjoin by sharing this Payjoin Uri:");
285
        println!("{pj_uri}");
286

287
        self.process_receiver_session(ReceiveSession::Initialized(session.clone()), &persister)
288
            .await?;
289
        Ok(())
290
    }
1✔
291

292
    #[allow(clippy::incompatible_msrv)]
293
    async fn resume_payjoins(&self) -> Result<()> {
4✔
294
        let recv_session_ids = self.db.get_recv_session_ids()?;
295
        let send_session_ids = self.db.get_send_session_ids()?;
296

297
        if recv_session_ids.is_empty() && send_session_ids.is_empty() {
298
            println!("No sessions to resume.");
299
            return Ok(());
300
        }
301

302
        let mut tasks = Vec::new();
303

304
        for session_id in recv_session_ids {
305
            let self_clone = self.clone();
306
            let recv_persister = ReceiverPersister::from_id(self.db.clone(), session_id);
307
            let receiver_state = replay_receiver_event_log(&recv_persister)
UNCOV
308
                .map_err(|e| anyhow!("Failed to replay receiver event log: {:?}", e))?
×
309
                .0;
310
            tasks.push(tokio::spawn(async move {
2✔
311
                self_clone.process_receiver_session(receiver_state, &recv_persister).await
2✔
312
            }));
1✔
313
        }
314

315
        for session_id in send_session_ids {
316
            let sender_persiter = SenderPersister::from_id(self.db.clone(), session_id);
317
            let (sender_state, history) = replay_sender_event_log(&sender_persiter)
NEW
318
                .map_err(|e| anyhow!("Failed to replay sender event log: {:?}", e))?;
×
319
            let fallback_tx = history.fallback_tx();
320
            let self_clone = self.clone();
321
            tasks.push(tokio::spawn(async move {
×
322
                self_clone
×
NEW
323
                    .process_sender_session(sender_state, &sender_persiter, &fallback_tx)
×
NEW
324
                    .await
×
NEW
325
            }));
×
326
        }
327

328
        let mut interrupt = self.interrupt.clone();
329
        tokio::select! {
330
            _ = async {
2✔
331
                for task in tasks {
3✔
332
                    let _ = task.await;
2✔
333
                }
334
            } => {
1✔
335
                println!("All resumed sessions completed.");
336
            }
337
            _ = interrupt.changed() => {
338
                println!("Resumed sessions were interrupted.");
339
            }
340
        }
341
        Ok(())
342
    }
4✔
343

344
    #[cfg(feature = "v2")]
UNCOV
345
    async fn history(&self) -> Result<()> {
×
346
        print_header();
347
        let mut send_rows = vec![];
348
        let mut recv_rows = vec![];
UNCOV
349
        self.db.get_send_session_ids()?.into_iter().for_each(|session_id| {
×
350
            let persister = SenderPersister::from_id(self.db.clone(), session_id.clone());
×
351
            match replay_sender_event_log(&persister) {
×
352
                Ok((sender_state, _)) => {
×
353
                    let row = SessionHistoryRow {
×
354
                        session_id,
×
355
                        role: Role::Sender,
×
356
                        status: sender_state.clone(),
×
357
                        completed_at: None,
×
358
                        error_message: None,
×
359
                    };
×
360
                    send_rows.push(row);
×
361
                }
×
362
                Err(e) => {
×
363
                    let row = SessionHistoryRow {
×
364
                        session_id,
×
365
                        role: Role::Sender,
×
366
                        status: SendSession::Closed(SenderSessionOutcome::Failure),
×
367
                        completed_at: None,
×
368
                        error_message: Some(e.to_string()),
×
369
                    };
×
370
                    send_rows.push(row);
×
371
                }
×
372
            }
UNCOV
373
        });
×
374

UNCOV
375
        self.db.get_recv_session_ids()?.into_iter().for_each(|session_id| {
×
376
            let persister = ReceiverPersister::from_id(self.db.clone(), session_id.clone());
×
377
            match replay_receiver_event_log(&persister) {
×
378
                Ok((receiver_state, _)) => {
×
379
                    let row = SessionHistoryRow {
×
380
                        session_id,
×
381
                        role: Role::Receiver,
×
382
                        status: receiver_state.clone(),
×
383
                        completed_at: None,
×
384
                        error_message: None,
×
385
                    };
×
386
                    recv_rows.push(row);
×
387
                }
×
388
                Err(e) => {
×
389
                    let row = SessionHistoryRow {
×
390
                        session_id,
×
391
                        role: Role::Receiver,
×
392
                        status: ReceiveSession::Closed(ReceiverSessionOutcome::Failure),
×
393
                        completed_at: None,
×
394
                        error_message: Some(e.to_string()),
×
395
                    };
×
396
                    recv_rows.push(row);
×
397
                }
×
398
            }
UNCOV
399
        });
×
400

401
        self.db.get_inactive_send_session_ids()?.into_iter().for_each(
UNCOV
402
            |(session_id, completed_at)| {
×
403
                let persister = SenderPersister::from_id(self.db.clone(), session_id.clone());
×
404
                match replay_sender_event_log(&persister) {
×
405
                    Ok((sender_state, _)) => {
×
406
                        let row = SessionHistoryRow {
×
407
                            session_id,
×
408
                            role: Role::Sender,
×
409
                            status: sender_state.clone(),
×
410
                            completed_at: Some(completed_at),
×
411
                            error_message: None,
×
412
                        };
×
413
                        send_rows.push(row);
×
414
                    }
×
415
                    Err(e) => {
×
416
                        let row = SessionHistoryRow {
×
417
                            session_id,
×
418
                            role: Role::Sender,
×
419
                            status: SendSession::Closed(SenderSessionOutcome::Failure),
×
420
                            completed_at: Some(completed_at),
×
421
                            error_message: Some(e.to_string()),
×
422
                        };
×
423
                        send_rows.push(row);
×
424
                    }
×
425
                }
UNCOV
426
            },
×
427
        );
428

429
        self.db.get_inactive_recv_session_ids()?.into_iter().for_each(
UNCOV
430
            |(session_id, completed_at)| {
×
431
                let persister = ReceiverPersister::from_id(self.db.clone(), session_id.clone());
×
432
                match replay_receiver_event_log(&persister) {
×
433
                    Ok((receiver_state, _)) => {
×
434
                        let row = SessionHistoryRow {
×
435
                            session_id,
×
436
                            role: Role::Receiver,
×
437
                            status: receiver_state.clone(),
×
438
                            completed_at: Some(completed_at),
×
439
                            error_message: None,
×
440
                        };
×
441
                        recv_rows.push(row);
×
442
                    }
×
443
                    Err(e) => {
×
444
                        let row = SessionHistoryRow {
×
445
                            session_id,
×
446
                            role: Role::Receiver,
×
447
                            status: ReceiveSession::Closed(ReceiverSessionOutcome::Failure),
×
448
                            completed_at: Some(completed_at),
×
449
                            error_message: Some(e.to_string()),
×
450
                        };
×
451
                        recv_rows.push(row);
×
452
                    }
×
453
                }
UNCOV
454
            },
×
455
        );
456

457
        // Print receiver and sender rows separately
458
        for row in send_rows {
459
            println!("{row}");
460
        }
461
        for row in recv_rows {
462
            println!("{row}");
463
        }
464

465
        Ok(())
UNCOV
466
    }
×
467
}
468

469
impl App {
470
    async fn process_sender_session(
2✔
471
        &self,
2✔
472
        session: SendSession,
2✔
473
        persister: &SenderPersister,
2✔
474
        fallback_tx: &payjoin::bitcoin::Transaction,
2✔
475
    ) -> Result<()> {
2✔
UNCOV
476
        match session {
×
477
            SendSession::WithReplyKey(context) => {
1✔
478
                let response = self.post_original_proposal(context, persister).await;
1✔
NEW
479
                match response {
×
480
                    Ok(_) => {
NEW
481
                        return Ok(());
×
482
                    }
NEW
483
                    Err(e) => {
×
NEW
484
                        if let Some(persisted_error) = e.downcast_ref::<PersistedError<
×
NEW
485
                            EncapsulationError,
×
NEW
486
                            db_error::Error,
×
NEW
487
                            (),
×
NEW
488
                        >>() {
×
NEW
489
                            if let Some(api_error) = persisted_error.api_error_ref() {
×
NEW
490
                                println!("Error posting original proposal: {api_error}");
×
NEW
491
                                let txid = self.wallet().broadcast_tx(&fallback_tx)?;
×
NEW
492
                                println!("Fallback transaction broadcasted. TXID: {}", txid);
×
NEW
493
                                return Err(anyhow!(
×
NEW
494
                                    "Fallback transaction broadcasted due to: {api_error}"
×
NEW
495
                                ));
×
NEW
496
                            }
×
NEW
497
                        }
×
498
                    }
499
                }
500
            }
501
            SendSession::PollingForProposal(context) =>
1✔
502
                self.get_proposed_payjoin_psbt(context, persister).await?,
1✔
UNCOV
503
            SendSession::Closed(SenderSessionOutcome::Success(proposal)) => {
×
504
                self.process_pj_response(proposal)?;
×
505
                return Ok(());
×
506
            }
UNCOV
507
            _ => return Err(anyhow!("Unexpected sender state")),
×
508
        }
509
        Ok(())
1✔
510
    }
1✔
511

512
    async fn post_original_proposal(
1✔
513
        &self,
1✔
514
        sender: Sender<WithReplyKey>,
1✔
515
        persister: &SenderPersister,
1✔
516
    ) -> Result<()> {
1✔
517
        let (req, ctx) = sender.create_v2_post_request(
1✔
518
            self.unwrap_relay_or_else_fetch(Some(&sender.endpoint())).await?.as_str(),
1✔
UNCOV
519
        )?;
×
520
        let response = self.post_request(req).await?;
1✔
521
        println!("Posted original proposal...");
1✔
522
        let sender = sender.process_response(&response.bytes().await?, ctx).save(persister)?;
1✔
523
        self.get_proposed_payjoin_psbt(sender, persister).await
1✔
UNCOV
524
    }
×
525

526
    async fn get_proposed_payjoin_psbt(
2✔
527
        &self,
2✔
528
        sender: Sender<PollingForProposal>,
2✔
529
        persister: &SenderPersister,
2✔
530
    ) -> Result<()> {
2✔
531
        let mut session = sender.clone();
2✔
532
        // Long poll until we get a response
533
        loop {
534
            let (req, ctx) = session.create_poll_request(
3✔
535
                self.unwrap_relay_or_else_fetch(Some(&session.endpoint())).await?.as_str(),
3✔
UNCOV
536
            )?;
×
537
            let response = self.post_request(req).await?;
3✔
538
            let res = session.process_response(&response.bytes().await?, ctx).save(persister);
2✔
539
            match res {
2✔
540
                Ok(OptionalTransitionOutcome::Progress(psbt)) => {
1✔
541
                    println!("Proposal received. Processing...");
1✔
542
                    self.process_pj_response(psbt)?;
1✔
543
                    return Ok(());
1✔
544
                }
545
                Ok(OptionalTransitionOutcome::Stasis(current_state)) => {
1✔
546
                    println!("No response yet.");
1✔
547
                    session = current_state;
1✔
548
                    continue;
1✔
549
                }
UNCOV
550
                Err(re) => {
×
551
                    println!("{re}");
×
552
                    tracing::debug!("{re:?}");
×
553
                    return Err(anyhow!("Response error").context(re));
×
554
                }
555
            }
556
        }
557
    }
1✔
558

559
    async fn long_poll_fallback(
2✔
560
        &self,
2✔
561
        session: Receiver<Initialized>,
2✔
562
        persister: &ReceiverPersister,
2✔
563
    ) -> Result<Receiver<UncheckedOriginalPayload>> {
2✔
564
        let ohttp_relay =
2✔
565
            self.unwrap_relay_or_else_fetch(Some(&session.pj_uri().extras.endpoint())).await?;
2✔
566

567
        let mut session = session;
2✔
568
        loop {
569
            let (req, context) = session.create_poll_request(ohttp_relay.as_str())?;
2✔
570
            println!("Polling receive request...");
2✔
571
            let ohttp_response = self.post_request(req).await?;
2✔
572
            let state_transition = session
1✔
573
                .process_response(ohttp_response.bytes().await?.to_vec().as_slice(), context)
1✔
574
                .save(persister);
1✔
575
            match state_transition {
1✔
576
                Ok(OptionalTransitionOutcome::Progress(next_state)) => {
1✔
577
                    println!("Got a request from the sender. Responding with a Payjoin proposal.");
1✔
578
                    return Ok(next_state);
1✔
579
                }
UNCOV
580
                Ok(OptionalTransitionOutcome::Stasis(current_state)) => {
×
581
                    session = current_state;
×
582
                    continue;
×
583
                }
UNCOV
584
                Err(e) => return Err(e.into()),
×
585
            }
586
        }
587
    }
1✔
588

589
    async fn process_receiver_session(
3✔
590
        &self,
3✔
591
        session: ReceiveSession,
3✔
592
        persister: &ReceiverPersister,
3✔
593
    ) -> Result<()> {
3✔
594
        let res = {
2✔
595
            match session {
3✔
596
                ReceiveSession::Initialized(proposal) =>
2✔
597
                    self.read_from_directory(proposal, persister).await,
2✔
UNCOV
598
                ReceiveSession::UncheckedOriginalPayload(proposal) =>
×
599
                    self.check_proposal(proposal, persister).await,
×
600
                ReceiveSession::MaybeInputsOwned(proposal) =>
×
601
                    self.check_inputs_not_owned(proposal, persister).await,
×
602
                ReceiveSession::MaybeInputsSeen(proposal) =>
×
603
                    self.check_no_inputs_seen_before(proposal, persister).await,
×
604
                ReceiveSession::OutputsUnknown(proposal) =>
×
605
                    self.identify_receiver_outputs(proposal, persister).await,
×
606
                ReceiveSession::WantsOutputs(proposal) =>
×
607
                    self.commit_outputs(proposal, persister).await,
×
608
                ReceiveSession::WantsInputs(proposal) =>
×
609
                    self.contribute_inputs(proposal, persister).await,
×
610
                ReceiveSession::WantsFeeRange(proposal) =>
×
611
                    self.apply_fee_range(proposal, persister).await,
×
612
                ReceiveSession::ProvisionalProposal(proposal) =>
×
613
                    self.finalize_proposal(proposal, persister).await,
×
614
                ReceiveSession::PayjoinProposal(proposal) =>
×
615
                    self.send_payjoin_proposal(proposal, persister).await,
×
616
                ReceiveSession::HasReplyableError(error) =>
×
617
                    self.handle_error(error, persister).await,
×
618
                ReceiveSession::Monitor(proposal) =>
1✔
619
                    self.monitor_payjoin_proposal(proposal, persister).await,
1✔
UNCOV
620
                ReceiveSession::Closed(_) => return Err(anyhow!("Session closed")),
×
621
            }
622
        };
623
        res
2✔
624
    }
2✔
625

626
    #[allow(clippy::incompatible_msrv)]
627
    async fn read_from_directory(
2✔
628
        &self,
2✔
629
        session: Receiver<Initialized>,
2✔
630
        persister: &ReceiverPersister,
2✔
631
    ) -> Result<()> {
2✔
632
        let mut interrupt = self.interrupt.clone();
2✔
633
        let receiver = tokio::select! {
2✔
634
            res = self.long_poll_fallback(session, persister) => res,
2✔
635
            _ = interrupt.changed() => {
2✔
636
                println!("Interrupted. Call the `resume` command to resume all sessions.");
1✔
637
                return Err(anyhow!("Interrupted"));
1✔
638
            }
UNCOV
639
        }?;
×
640
        self.check_proposal(receiver, persister).await
1✔
641
    }
1✔
642

643
    async fn check_proposal(
1✔
644
        &self,
1✔
645
        proposal: Receiver<UncheckedOriginalPayload>,
1✔
646
        persister: &ReceiverPersister,
1✔
647
    ) -> Result<()> {
1✔
648
        let wallet = self.wallet();
1✔
649
        let proposal = proposal
1✔
650
            .check_broadcast_suitability(None, |tx| {
1✔
651
                wallet
1✔
652
                    .can_broadcast(tx)
1✔
653
                    .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
1✔
654
            })
1✔
655
            .save(persister)?;
1✔
656

657
        println!("Fallback transaction received. Consider broadcasting this to get paid if the Payjoin fails:");
1✔
658
        println!("{}", serialize_hex(&proposal.extract_tx_to_schedule_broadcast()));
1✔
659
        self.check_inputs_not_owned(proposal, persister).await
1✔
UNCOV
660
    }
×
661

662
    async fn check_inputs_not_owned(
1✔
663
        &self,
1✔
664
        proposal: Receiver<MaybeInputsOwned>,
1✔
665
        persister: &ReceiverPersister,
1✔
666
    ) -> Result<()> {
1✔
667
        let wallet = self.wallet();
1✔
668
        let proposal = proposal
1✔
669
            .check_inputs_not_owned(&mut |input| {
1✔
670
                wallet
1✔
671
                    .is_mine(input)
1✔
672
                    .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
1✔
673
            })
1✔
674
            .save(persister)?;
1✔
675
        self.check_no_inputs_seen_before(proposal, persister).await
1✔
UNCOV
676
    }
×
677

678
    async fn check_no_inputs_seen_before(
1✔
679
        &self,
1✔
680
        proposal: Receiver<MaybeInputsSeen>,
1✔
681
        persister: &ReceiverPersister,
1✔
682
    ) -> Result<()> {
1✔
683
        let proposal = proposal
1✔
684
            .check_no_inputs_seen_before(&mut |input| {
1✔
685
                Ok(self.db.insert_input_seen_before(*input)?)
1✔
686
            })
1✔
687
            .save(persister)?;
1✔
688
        self.identify_receiver_outputs(proposal, persister).await
1✔
UNCOV
689
    }
×
690

691
    async fn identify_receiver_outputs(
1✔
692
        &self,
1✔
693
        proposal: Receiver<OutputsUnknown>,
1✔
694
        persister: &ReceiverPersister,
1✔
695
    ) -> Result<()> {
1✔
696
        let wallet = self.wallet();
1✔
697
        let proposal = proposal
1✔
698
            .identify_receiver_outputs(&mut |output_script| {
2✔
699
                wallet
2✔
700
                    .is_mine(output_script)
2✔
701
                    .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
2✔
702
            })
2✔
703
            .save(persister)?;
1✔
704
        self.commit_outputs(proposal, persister).await
1✔
UNCOV
705
    }
×
706

707
    async fn commit_outputs(
1✔
708
        &self,
1✔
709
        proposal: Receiver<WantsOutputs>,
1✔
710
        persister: &ReceiverPersister,
1✔
711
    ) -> Result<()> {
1✔
712
        let proposal = proposal.commit_outputs().save(persister)?;
1✔
713
        self.contribute_inputs(proposal, persister).await
1✔
UNCOV
714
    }
×
715

716
    async fn contribute_inputs(
1✔
717
        &self,
1✔
718
        proposal: Receiver<WantsInputs>,
1✔
719
        persister: &ReceiverPersister,
1✔
720
    ) -> Result<()> {
1✔
721
        let wallet = self.wallet();
1✔
722
        let candidate_inputs = wallet.list_unspent()?;
1✔
723

724
        let selected_input = proposal.try_preserving_privacy(candidate_inputs)?;
1✔
725
        let proposal =
1✔
726
            proposal.contribute_inputs(vec![selected_input])?.commit_inputs().save(persister)?;
1✔
727
        self.apply_fee_range(proposal, persister).await
1✔
UNCOV
728
    }
×
729

730
    async fn apply_fee_range(
1✔
731
        &self,
1✔
732
        proposal: Receiver<WantsFeeRange>,
1✔
733
        persister: &ReceiverPersister,
1✔
734
    ) -> Result<()> {
1✔
735
        let proposal = proposal.apply_fee_range(None, self.config.max_fee_rate).save(persister)?;
1✔
736
        self.finalize_proposal(proposal, persister).await
1✔
UNCOV
737
    }
×
738

739
    async fn finalize_proposal(
1✔
740
        &self,
1✔
741
        proposal: Receiver<ProvisionalProposal>,
1✔
742
        persister: &ReceiverPersister,
1✔
743
    ) -> Result<()> {
1✔
744
        let wallet = self.wallet();
1✔
745
        let proposal = proposal
1✔
746
            .finalize_proposal(|psbt| {
1✔
747
                wallet
1✔
748
                    .process_psbt(psbt)
1✔
749
                    .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
1✔
750
            })
1✔
751
            .save(persister)?;
1✔
752
        self.send_payjoin_proposal(proposal, persister).await
1✔
UNCOV
753
    }
×
754

755
    async fn send_payjoin_proposal(
1✔
756
        &self,
1✔
757
        proposal: Receiver<PayjoinProposal>,
1✔
758
        persister: &ReceiverPersister,
1✔
759
    ) -> Result<()> {
1✔
760
        let (req, ohttp_ctx) = proposal
1✔
761
            .create_post_request(self.unwrap_relay_or_else_fetch(None::<&str>).await?.as_str())
1✔
762
            .map_err(|e| anyhow!("v2 req extraction failed {}", e))?;
1✔
763
        let res = self.post_request(req).await?;
1✔
764
        let payjoin_psbt = proposal.psbt().clone();
1✔
765
        let session = proposal.process_response(&res.bytes().await?, ohttp_ctx).save(persister)?;
1✔
766
        println!(
1✔
767
            "Response successful. Watch mempool for successful Payjoin. TXID: {}",
1✔
768
            payjoin_psbt.extract_tx_unchecked_fee_rate().compute_txid()
1✔
769
        );
770

771
        return self.monitor_payjoin_proposal(session, persister).await;
1✔
UNCOV
772
    }
×
773

774
    async fn monitor_payjoin_proposal(
2✔
775
        &self,
2✔
776
        proposal: Receiver<Monitor>,
2✔
777
        persister: &ReceiverPersister,
2✔
778
    ) -> Result<()> {
2✔
779
        // On a session resumption, the receiver will resume again in this state.
780
        let poll_interval = tokio::time::Duration::from_millis(200);
2✔
781
        let timeout_duration = tokio::time::Duration::from_secs(5);
2✔
782

783
        let mut interval = tokio::time::interval(poll_interval);
2✔
784
        interval.tick().await;
2✔
785

786
        tracing::debug!("Polling for payment confirmation");
1✔
787

788
        let result = tokio::time::timeout(timeout_duration, async {
1✔
789
            loop {
790
                interval.tick().await;
1✔
791
                let check_result = proposal
1✔
792
                    .check_payment(
1✔
793
                        |txid| {
1✔
794
                            self.wallet()
1✔
795
                                .get_raw_transaction(&txid)
1✔
796
                                .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
1✔
797
                        },
1✔
UNCOV
798
                        |outpoint| {
×
UNCOV
799
                            self.wallet()
×
UNCOV
800
                                .is_outpoint_spent(&outpoint)
×
UNCOV
801
                                .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
×
UNCOV
802
                        },
×
803
                    )
804
                    .save(persister);
1✔
805

806
                match check_result {
1✔
807
                    Ok(_) => {
808
                        println!("Payjoin transaction detected in the mempool!");
1✔
809
                        return Ok(());
1✔
810
                    }
811
                    Err(_) => {
812
                        // keep polling
813

UNCOV
814
                        continue;
×
815
                    }
816
                }
817
            }
818
        })
1✔
819
        .await;
1✔
820

821
        match result {
1✔
822
            Ok(ok) => ok,
1✔
UNCOV
823
            Err(_) => Err(anyhow!(
×
824
                "Timeout waiting for payment confirmation after {:?}",
×
825
                timeout_duration
×
826
            )),
×
827
        }
828
    }
1✔
829

830
    async fn unwrap_relay_or_else_fetch(
7✔
831
        &self,
7✔
832
        directory: Option<impl payjoin::IntoUrl>,
7✔
833
    ) -> Result<url::Url> {
7✔
834
        let directory = directory.map(|url| url.into_url()).transpose()?;
7✔
835
        let selected_relay =
7✔
836
            self.relay_manager.lock().expect("Lock should not be poisoned").get_selected_relay();
7✔
837
        let ohttp_relay = match selected_relay {
7✔
838
            Some(relay) => relay,
3✔
839
            None =>
840
                unwrap_ohttp_keys_or_else_fetch(&self.config, directory, self.relay_manager.clone())
4✔
841
                    .await?
4✔
842
                    .relay_url,
843
        };
844
        Ok(ohttp_relay)
7✔
845
    }
7✔
846

847
    /// Handle error by attempting to send an error response over the directory
UNCOV
848
    async fn handle_error(
×
UNCOV
849
        &self,
×
UNCOV
850
        session: Receiver<HasReplyableError>,
×
UNCOV
851
        persister: &ReceiverPersister,
×
852
    ) -> Result<()> {
×
853
        let (err_req, err_ctx) = session
×
854
            .create_error_request(self.unwrap_relay_or_else_fetch(None::<&str>).await?.as_str())?;
×
855

UNCOV
856
        let err_response = match self.post_request(err_req).await {
×
857
            Ok(response) => response,
×
UNCOV
858
            Err(e) => return Err(anyhow!("Failed to post error request: {}", e)),
×
859
        };
860

UNCOV
861
        let err_bytes = match err_response.bytes().await {
×
UNCOV
862
            Ok(bytes) => bytes,
×
UNCOV
863
            Err(e) => return Err(anyhow!("Failed to get error response bytes: {}", e)),
×
864
        };
865

UNCOV
866
        if let Err(e) = session.process_error_response(&err_bytes, err_ctx).save(persister) {
×
UNCOV
867
            return Err(anyhow!("Failed to process error response: {}", e));
×
UNCOV
868
        }
×
869

UNCOV
870
        Ok(())
×
UNCOV
871
    }
×
872

873
    async fn post_request(&self, req: payjoin::Request) -> Result<reqwest::Response> {
7✔
874
        let http = http_agent(&self.config)?;
7✔
875
        http.post(req.url)
7✔
876
            .header("Content-Type", req.content_type)
7✔
877
            .body(req.body)
7✔
878
            .send()
7✔
879
            .await
7✔
880
            .map_err(map_reqwest_err)
5✔
881
    }
5✔
882
}
883

UNCOV
884
fn map_reqwest_err(e: reqwest::Error) -> anyhow::Error {
×
UNCOV
885
    match e.status() {
×
UNCOV
886
        Some(status_code) => anyhow!("HTTP request failed: {} {}", status_code, e),
×
UNCOV
887
        None => anyhow!("No HTTP response: {}", e),
×
888
    }
UNCOV
889
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc