• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

payjoin / rust-payjoin / 24134407852

08 Apr 2026 12:05PM UTC coverage: 79.263% (-5.1%) from 84.34%
24134407852

Pull #1469

github

web-flow
Merge 50b8c94d1 into 45d286f0e
Pull Request #1469: Add optional esplora wallet setup for payjoin-cli backend

189 of 364 new or added lines in 4 files covered. (51.92%)

529 existing lines in 9 files now uncovered.

10320 of 13020 relevant lines covered (79.26%)

396.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/payjoin-cli/src/app/v2/mod.rs
1
use std::fmt;
2
use std::sync::{Arc, Mutex};
3

4
use anyhow::{anyhow, Context, Result};
5
use payjoin::bitcoin::consensus::encode::serialize_hex;
6
use payjoin::bitcoin::{Amount, FeeRate};
7
use payjoin::persist::{OptionalTransitionOutcome, SessionPersister};
8
use payjoin::receive::v2::{
9
    replay_event_log as replay_receiver_event_log, HasReplyableError, Initialized,
10
    MaybeInputsOwned, MaybeInputsSeen, Monitor, OutputsUnknown, PayjoinProposal,
11
    ProvisionalProposal, ReceiveSession, Receiver, ReceiverBuilder,
12
    SessionOutcome as ReceiverSessionOutcome, UncheckedOriginalPayload, WantsFeeRange, WantsInputs,
13
    WantsOutputs,
14
};
15
use payjoin::send::v2::{
16
    replay_event_log as replay_sender_event_log, PollingForProposal, SendSession, Sender,
17
    SenderBuilder, SessionOutcome as SenderSessionOutcome, WithReplyKey,
18
};
19
use payjoin::{ImplementationError, PjParam, Uri};
20
use tokio::sync::watch;
21

22
use super::config::Config;
23
use super::wallet::{create_wallet, PayjoinWallet};
24
use super::App as AppTrait;
25
use crate::app::v2::ohttp::{unwrap_ohttp_keys_or_else_fetch, RelayManager};
26
use crate::app::{handle_interrupt, http_agent};
27
use crate::db::v2::{ReceiverPersister, SenderPersister, SessionId};
28
use crate::db::Database;
29

30
mod ohttp;
31

32
const W_ID: usize = 12;
33
const W_ROLE: usize = 25;
34
const W_DONE: usize = 15;
35
const W_STATUS: usize = 15;
36

37
#[derive(Clone)]
38
pub(crate) struct App {
39
    config: Config,
40
    db: Arc<Database>,
41
    wallet: Arc<dyn PayjoinWallet>,
42
    interrupt: watch::Receiver<()>,
43
    relay_manager: Arc<Mutex<RelayManager>>,
44
}
45

46
trait StatusText {
47
    fn status_text(&self) -> &'static str;
48
}
49

50
impl StatusText for SendSession {
51
    fn status_text(&self) -> &'static str {
×
52
        match self {
×
53
            SendSession::WithReplyKey(_) | SendSession::PollingForProposal(_) =>
54
                "Waiting for proposal",
×
55
            SendSession::Closed(session_outcome) => match session_outcome {
×
56
                SenderSessionOutcome::Failure => "Session failure",
×
57
                SenderSessionOutcome::Success(_) => "Session success",
×
58
                SenderSessionOutcome::Cancel => "Session cancelled",
×
59
            },
60
        }
61
    }
×
62
}
63

64
impl StatusText for ReceiveSession {
65
    fn status_text(&self) -> &'static str {
×
66
        match self {
×
67
            ReceiveSession::Initialized(_) => "Waiting for original proposal",
×
68
            ReceiveSession::UncheckedOriginalPayload(_)
69
            | ReceiveSession::MaybeInputsOwned(_)
70
            | ReceiveSession::MaybeInputsSeen(_)
71
            | ReceiveSession::OutputsUnknown(_)
72
            | ReceiveSession::WantsOutputs(_)
73
            | ReceiveSession::WantsInputs(_)
74
            | ReceiveSession::WantsFeeRange(_)
75
            | ReceiveSession::ProvisionalProposal(_) => "Processing original proposal",
×
76
            ReceiveSession::PayjoinProposal(_) => "Payjoin proposal sent",
×
77
            ReceiveSession::HasReplyableError(_) =>
78
                "Session failure, waiting to post error response",
×
79
            ReceiveSession::Monitor(_) => "Monitoring payjoin proposal",
×
80
            ReceiveSession::Closed(session_outcome) => match session_outcome {
×
81
                ReceiverSessionOutcome::Failure => "Session failure",
×
82
                ReceiverSessionOutcome::Success(_) => "Session success, Payjoin proposal was broadcasted",
×
83
                ReceiverSessionOutcome::Cancel => "Session cancelled",
×
84
                ReceiverSessionOutcome::FallbackBroadcasted => "Fallback broadcasted",
×
85
                ReceiverSessionOutcome::PayjoinProposalSent =>
86
                    "Payjoin proposal sent, skipping monitoring as the sender is spending non-SegWit inputs",
×
87
            },
88
        }
89
    }
×
90
}
91

92
fn print_header() {
×
93
    println!(
×
94
        "{:<W_ID$} {:<W_ROLE$} {:<W_DONE$} {:<W_STATUS$}",
95
        "Session ID", "Sender/Receiver", "Completed At", "Status"
96
    );
97
}
×
98

99
enum Role {
100
    Sender,
101
    Receiver,
102
}
103
impl Role {
104
    fn as_str(&self) -> &'static str {
×
105
        match self {
×
106
            Role::Sender => "Sender",
×
107
            Role::Receiver => "Receiver",
×
108
        }
109
    }
×
110
}
111

112
struct SessionHistoryRow<Status> {
113
    session_id: SessionId,
114
    role: Role,
115
    status: Status,
116
    completed_at: Option<u64>,
117
    error_message: Option<String>,
118
}
119

120
impl<Status: StatusText> fmt::Display for SessionHistoryRow<Status> {
121
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
×
122
        write!(
×
123
            f,
×
124
            "{:<W_ID$} {:<W_ROLE$} {:<W_DONE$} {:<W_STATUS$}",
125
            self.session_id.to_string(),
×
126
            self.role.as_str(),
×
127
            match self.completed_at {
×
128
                None => "Not Completed".to_string(),
×
129
                Some(secs) => {
×
130
                    // TODO: human readable time
131
                    secs.to_string()
×
132
                }
133
            },
134
            self.error_message.as_deref().unwrap_or(self.status.status_text())
×
135
        )
136
    }
×
137
}
138

139
#[async_trait::async_trait]
140
impl AppTrait for App {
UNCOV
141
    async fn new(config: Config) -> Result<Self> {
×
142
        let db = Arc::new(Database::create(&config.db_path)?);
143
        let relay_manager = Arc::new(Mutex::new(RelayManager::new()));
144
        let (interrupt_tx, interrupt_rx) = watch::channel(());
145
        tokio::spawn(handle_interrupt(interrupt_tx));
146
        let wallet = create_wallet(&config)?;
147
        let app = Self { config, db, wallet, interrupt: interrupt_rx, relay_manager };
148
        app.wallet().network().context("Failed to connect to wallet. Check config.")?;
149
        Ok(app)
UNCOV
150
    }
×
151

NEW
152
    fn wallet(&self) -> Arc<dyn PayjoinWallet> { self.wallet.clone() }
×
153

154
    #[allow(clippy::incompatible_msrv)]
UNCOV
155
    async fn send_payjoin(&self, bip21: &str, fee_rate: FeeRate) -> Result<()> {
×
156
        use payjoin::UriExt;
157
        let uri = Uri::try_from(bip21)
158
            .map_err(|e| anyhow!("Failed to create URI from BIP21: {}", e))?
×
159
            .assume_checked()
160
            .check_pj_supported()
161
            .map_err(|_| anyhow!("URI does not support Payjoin"))?;
×
162
        let address = uri.address;
163
        let amount = uri.amount.ok_or_else(|| anyhow!("please specify the amount in the Uri"))?;
×
164
        match uri.extras.pj_param() {
165
            #[cfg(feature = "v1")]
166
            PjParam::V1(pj_param) => {
167
                use std::str::FromStr;
168

169
                let psbt = self.create_original_psbt(&address, amount, fee_rate)?;
170
                let (req, ctx) = payjoin::send::v1::SenderBuilder::from_parts(
171
                    psbt,
172
                    pj_param,
173
                    &address,
174
                    Some(amount),
175
                )
176
                .build_recommended(fee_rate)
177
                .with_context(|| "Failed to build payjoin request")?
178
                .create_v1_post_request();
179
                let http = http_agent(&self.config)?;
180
                let body = String::from_utf8(req.body.clone()).unwrap();
181
                println!("Sending fallback request to {}", &req.url);
182
                let response = http
183
                    .post(req.url)
184
                    .header("Content-Type", req.content_type)
185
                    .body(body.clone())
186
                    .send()
187
                    .await
188
                    .with_context(|| "HTTP request failed")?;
189
                let fallback_tx = payjoin::bitcoin::Psbt::from_str(&body)
190
                    .map_err(|e| anyhow!("Failed to load PSBT from base64: {}", e))?
×
191
                    .extract_tx()?;
192
                println!("Sent fallback transaction txid: {}", fallback_tx.compute_txid());
193
                println!(
194
                    "Sent fallback transaction hex: {:#}",
195
                    payjoin::bitcoin::consensus::encode::serialize_hex(&fallback_tx)
196
                );
197
                let psbt = ctx.process_response(&response.bytes().await?).map_err(|e| {
×
198
                    tracing::debug!("Error processing response: {e:?}");
×
199
                    anyhow!("Failed to process response {e}")
×
200
                })?;
×
201

202
                self.process_pj_response(psbt)?;
203
                Ok(())
204
            }
205
            PjParam::V2(pj_param) => {
206
                let receiver_pubkey = pj_param.receiver_pubkey();
207
                let sender_state =
UNCOV
208
                    self.db.get_send_session_ids()?.into_iter().find_map(|session_id| {
×
UNCOV
209
                        let session_receiver_pubkey = self
×
UNCOV
210
                            .db
×
UNCOV
211
                            .get_send_session_receiver_pk(&session_id)
×
UNCOV
212
                            .expect("Receiver pubkey should exist if session id exists");
×
UNCOV
213
                        if session_receiver_pubkey == *receiver_pubkey {
×
UNCOV
214
                            let sender_persister =
×
UNCOV
215
                                SenderPersister::from_id(self.db.clone(), session_id);
×
UNCOV
216
                            let (send_session, _) = replay_sender_event_log(&sender_persister)
×
UNCOV
217
                                .map_err(|e| anyhow!("Failed to replay sender event log: {:?}", e))
×
UNCOV
218
                                .ok()?;
×
219

UNCOV
220
                            Some((send_session, sender_persister))
×
221
                        } else {
222
                            None
×
223
                        }
UNCOV
224
                    });
×
225

226
                let (sender_state, persister) = match sender_state {
227
                    Some((sender_state, persister)) => (sender_state, persister),
228
                    None => {
229
                        let persister =
230
                            SenderPersister::new(self.db.clone(), receiver_pubkey.clone())?;
231
                        let psbt = self.create_original_psbt(&address, amount, fee_rate)?;
232
                        let sender =
233
                            SenderBuilder::from_parts(psbt, pj_param, &address, Some(amount))
234
                                .build_recommended(fee_rate)?
235
                                .save(&persister)?;
236

237
                        (SendSession::WithReplyKey(sender), persister)
238
                    }
239
                };
240
                let mut interrupt = self.interrupt.clone();
241
                tokio::select! {
242
                    _ = self.process_sender_session(sender_state, &persister) => return Ok(()),
243
                    _ = interrupt.changed() => {
244
                        println!("Interrupted. Call `send` with the same arguments to resume this session or `resume` to resume all sessions.");
245
                        return Err(anyhow!("Interrupted"))
246
                    }
247
                }
248
            }
249
            _ => unimplemented!("Unrecognized payjoin version"),
250
        }
UNCOV
251
    }
×
252

UNCOV
253
    async fn receive_payjoin(&self, amount: Amount) -> Result<()> {
×
254
        let address = self.wallet().get_new_address()?;
255
        let ohttp_keys =
256
            unwrap_ohttp_keys_or_else_fetch(&self.config, None, self.relay_manager.clone())
257
                .await?
258
                .ohttp_keys;
259
        let persister = ReceiverPersister::new(self.db.clone())?;
260
        let session =
261
            ReceiverBuilder::new(address, self.config.v2()?.pj_directory.as_str(), ohttp_keys)?
262
                .with_amount(amount)
263
                .with_max_fee_rate(self.config.max_fee_rate.unwrap_or(FeeRate::BROADCAST_MIN))
264
                .build()
265
                .save(&persister)?;
266

267
        println!("Receive session established");
268
        let pj_uri = session.pj_uri();
269
        println!("Request Payjoin by sharing this Payjoin Uri:");
270
        println!("{pj_uri}");
271

272
        self.process_receiver_session(ReceiveSession::Initialized(session.clone()), &persister)
273
            .await?;
274
        Ok(())
UNCOV
275
    }
×
276

277
    #[allow(clippy::incompatible_msrv)]
UNCOV
278
    async fn resume_payjoins(&self) -> Result<()> {
×
279
        let recv_session_ids = self.db.get_recv_session_ids()?;
280
        let send_session_ids = self.db.get_send_session_ids()?;
281

282
        if recv_session_ids.is_empty() && send_session_ids.is_empty() {
283
            println!("No sessions to resume.");
284
            return Ok(());
285
        }
286

287
        let mut tasks = Vec::new();
288

289
        // Process receiver sessions
290
        for session_id in recv_session_ids {
291
            let self_clone = self.clone();
292
            let recv_persister = ReceiverPersister::from_id(self.db.clone(), session_id.clone());
293
            match replay_receiver_event_log(&recv_persister) {
294
                Ok((receiver_state, _)) => {
UNCOV
295
                    tasks.push(tokio::spawn(async move {
×
UNCOV
296
                        self_clone.process_receiver_session(receiver_state, &recv_persister).await
×
UNCOV
297
                    }));
×
298
                }
299
                Err(e) => {
300
                    tracing::error!("An error {:?} occurred while replaying receiver session", e);
301
                    Self::close_failed_session(&recv_persister, &session_id, "receiver");
302
                }
303
            }
304
        }
305

306
        // Process sender sessions
307
        for session_id in send_session_ids {
308
            let sender_persiter = SenderPersister::from_id(self.db.clone(), session_id.clone());
309
            match replay_sender_event_log(&sender_persiter) {
310
                Ok((sender_state, _)) => {
311
                    let self_clone = self.clone();
312
                    tasks.push(tokio::spawn(async move {
×
313
                        self_clone.process_sender_session(sender_state, &sender_persiter).await
×
314
                    }));
×
315
                }
316
                Err(e) => {
317
                    tracing::error!("An error {:?} occurred while replaying Sender session", e);
318
                    Self::close_failed_session(&sender_persiter, &session_id, "sender");
319
                }
320
            }
321
        }
322

323
        let mut interrupt = self.interrupt.clone();
324
        tokio::select! {
UNCOV
325
            _ = async {
×
UNCOV
326
                for task in tasks {
×
UNCOV
327
                    let _ = task.await;
×
328
                }
UNCOV
329
            } => {
×
330
                println!("All resumed sessions completed.");
331
            }
332
            _ = interrupt.changed() => {
333
                println!("Resumed sessions were interrupted.");
334
            }
335
        }
336
        Ok(())
UNCOV
337
    }
×
338

339
    #[cfg(feature = "v2")]
340
    async fn history(&self) -> Result<()> {
×
341
        print_header();
342
        let mut send_rows = vec![];
343
        let mut recv_rows = vec![];
344
        self.db.get_send_session_ids()?.into_iter().for_each(|session_id| {
×
345
            let persister = SenderPersister::from_id(self.db.clone(), session_id.clone());
×
346
            match replay_sender_event_log(&persister) {
×
347
                Ok((sender_state, _)) => {
×
348
                    let row = SessionHistoryRow {
×
349
                        session_id,
×
350
                        role: Role::Sender,
×
351
                        status: sender_state.clone(),
×
352
                        completed_at: None,
×
353
                        error_message: None,
×
354
                    };
×
355
                    send_rows.push(row);
×
356
                }
×
357
                Err(e) => {
×
358
                    let row = SessionHistoryRow {
×
359
                        session_id,
×
360
                        role: Role::Sender,
×
361
                        status: SendSession::Closed(SenderSessionOutcome::Failure),
×
362
                        completed_at: None,
×
363
                        error_message: Some(e.to_string()),
×
364
                    };
×
365
                    send_rows.push(row);
×
366
                }
×
367
            }
368
        });
×
369

370
        self.db.get_recv_session_ids()?.into_iter().for_each(|session_id| {
×
371
            let persister = ReceiverPersister::from_id(self.db.clone(), session_id.clone());
×
372
            match replay_receiver_event_log(&persister) {
×
373
                Ok((receiver_state, _)) => {
×
374
                    let row = SessionHistoryRow {
×
375
                        session_id,
×
376
                        role: Role::Receiver,
×
377
                        status: receiver_state.clone(),
×
378
                        completed_at: None,
×
379
                        error_message: None,
×
380
                    };
×
381
                    recv_rows.push(row);
×
382
                }
×
383
                Err(e) => {
×
384
                    let row = SessionHistoryRow {
×
385
                        session_id,
×
386
                        role: Role::Receiver,
×
387
                        status: ReceiveSession::Closed(ReceiverSessionOutcome::Failure),
×
388
                        completed_at: None,
×
389
                        error_message: Some(e.to_string()),
×
390
                    };
×
391
                    recv_rows.push(row);
×
392
                }
×
393
            }
394
        });
×
395

396
        self.db.get_inactive_send_session_ids()?.into_iter().for_each(
397
            |(session_id, completed_at)| {
×
398
                let persister = SenderPersister::from_id(self.db.clone(), session_id.clone());
×
399
                match replay_sender_event_log(&persister) {
×
400
                    Ok((sender_state, _)) => {
×
401
                        let row = SessionHistoryRow {
×
402
                            session_id,
×
403
                            role: Role::Sender,
×
404
                            status: sender_state.clone(),
×
405
                            completed_at: Some(completed_at),
×
406
                            error_message: None,
×
407
                        };
×
408
                        send_rows.push(row);
×
409
                    }
×
410
                    Err(e) => {
×
411
                        let row = SessionHistoryRow {
×
412
                            session_id,
×
413
                            role: Role::Sender,
×
414
                            status: SendSession::Closed(SenderSessionOutcome::Failure),
×
415
                            completed_at: Some(completed_at),
×
416
                            error_message: Some(e.to_string()),
×
417
                        };
×
418
                        send_rows.push(row);
×
419
                    }
×
420
                }
421
            },
×
422
        );
423

424
        self.db.get_inactive_recv_session_ids()?.into_iter().for_each(
425
            |(session_id, completed_at)| {
×
426
                let persister = ReceiverPersister::from_id(self.db.clone(), session_id.clone());
×
427
                match replay_receiver_event_log(&persister) {
×
428
                    Ok((receiver_state, _)) => {
×
429
                        let row = SessionHistoryRow {
×
430
                            session_id,
×
431
                            role: Role::Receiver,
×
432
                            status: receiver_state.clone(),
×
433
                            completed_at: Some(completed_at),
×
434
                            error_message: None,
×
435
                        };
×
436
                        recv_rows.push(row);
×
437
                    }
×
438
                    Err(e) => {
×
439
                        let row = SessionHistoryRow {
×
440
                            session_id,
×
441
                            role: Role::Receiver,
×
442
                            status: ReceiveSession::Closed(ReceiverSessionOutcome::Failure),
×
443
                            completed_at: Some(completed_at),
×
444
                            error_message: Some(e.to_string()),
×
445
                        };
×
446
                        recv_rows.push(row);
×
447
                    }
×
448
                }
449
            },
×
450
        );
451

452
        // Print receiver and sender rows separately
453
        for row in send_rows {
454
            println!("{row}");
455
        }
456
        for row in recv_rows {
457
            println!("{row}");
458
        }
459

460
        Ok(())
461
    }
×
462
}
463

464
impl App {
465
    fn close_failed_session<P>(persister: &P, session_id: &SessionId, role: &str)
×
466
    where
×
467
        P: SessionPersister,
×
468
    {
469
        if let Err(close_err) = SessionPersister::close(persister) {
×
470
            tracing::error!("Failed to close {} session {}: {:?}", role, session_id, close_err);
×
471
        } else {
472
            tracing::error!("Closed failed {} session: {}", role, session_id);
×
473
        }
474
    }
×
475

UNCOV
476
    async fn process_sender_session(
×
UNCOV
477
        &self,
×
UNCOV
478
        session: SendSession,
×
UNCOV
479
        persister: &SenderPersister,
×
UNCOV
480
    ) -> Result<()> {
×
481
        match session {
×
UNCOV
482
            SendSession::WithReplyKey(context) =>
×
UNCOV
483
                self.post_original_proposal(context, persister).await?,
×
UNCOV
484
            SendSession::PollingForProposal(context) =>
×
UNCOV
485
                self.get_proposed_payjoin_psbt(context, persister).await?,
×
486
            SendSession::Closed(SenderSessionOutcome::Success(proposal)) => {
×
487
                self.process_pj_response(proposal)?;
×
488
                return Ok(());
×
489
            }
490
            _ => return Err(anyhow!("Unexpected sender state")),
×
491
        }
UNCOV
492
        Ok(())
×
UNCOV
493
    }
×
494

UNCOV
495
    async fn post_original_proposal(
×
UNCOV
496
        &self,
×
UNCOV
497
        sender: Sender<WithReplyKey>,
×
UNCOV
498
        persister: &SenderPersister,
×
UNCOV
499
    ) -> Result<()> {
×
UNCOV
500
        let (req, ctx) = sender.create_v2_post_request(
×
UNCOV
501
            self.unwrap_relay_or_else_fetch(Some(&sender.endpoint())).await?.as_str(),
×
502
        )?;
×
UNCOV
503
        let response = self.post_request(req).await?;
×
UNCOV
504
        println!("Posted original proposal...");
×
UNCOV
505
        let sender = sender.process_response(&response.bytes().await?, ctx).save(persister)?;
×
UNCOV
506
        self.get_proposed_payjoin_psbt(sender, persister).await
×
507
    }
×
508

UNCOV
509
    async fn get_proposed_payjoin_psbt(
×
UNCOV
510
        &self,
×
UNCOV
511
        sender: Sender<PollingForProposal>,
×
UNCOV
512
        persister: &SenderPersister,
×
UNCOV
513
    ) -> Result<()> {
×
UNCOV
514
        let ohttp_relay = self.unwrap_relay_or_else_fetch(Some(&sender.endpoint())).await?;
×
UNCOV
515
        let mut session = sender.clone();
×
516
        // Long poll until we get a response
517
        loop {
UNCOV
518
            let (req, ctx) = session.create_poll_request(ohttp_relay.as_str())?;
×
UNCOV
519
            let response = self.post_request(req).await?;
×
UNCOV
520
            let res = session.process_response(&response.bytes().await?, ctx).save(persister);
×
UNCOV
521
            match res {
×
UNCOV
522
                Ok(OptionalTransitionOutcome::Progress(psbt)) => {
×
UNCOV
523
                    println!("Proposal received. Processing...");
×
UNCOV
524
                    self.process_pj_response(psbt)?;
×
UNCOV
525
                    return Ok(());
×
526
                }
UNCOV
527
                Ok(OptionalTransitionOutcome::Stasis(current_state)) => {
×
UNCOV
528
                    println!("No response yet.");
×
UNCOV
529
                    session = current_state;
×
UNCOV
530
                    continue;
×
531
                }
532
                Err(re) => {
×
533
                    println!("{re}");
×
534
                    tracing::debug!("{re:?}");
×
535
                    return Err(anyhow!("Response error").context(re));
×
536
                }
537
            }
538
        }
UNCOV
539
    }
×
540

UNCOV
541
    async fn long_poll_fallback(
×
UNCOV
542
        &self,
×
UNCOV
543
        session: Receiver<Initialized>,
×
UNCOV
544
        persister: &ReceiverPersister,
×
UNCOV
545
    ) -> Result<Receiver<UncheckedOriginalPayload>> {
×
UNCOV
546
        let ohttp_relay =
×
UNCOV
547
            self.unwrap_relay_or_else_fetch(Some(&session.pj_uri().extras.endpoint())).await?;
×
548

UNCOV
549
        let mut session = session;
×
550
        loop {
UNCOV
551
            let (req, context) = session.create_poll_request(ohttp_relay.as_str())?;
×
UNCOV
552
            println!("Polling receive request...");
×
UNCOV
553
            let ohttp_response = self.post_request(req).await?;
×
UNCOV
554
            let state_transition = session
×
UNCOV
555
                .process_response(ohttp_response.bytes().await?.to_vec().as_slice(), context)
×
UNCOV
556
                .save(persister);
×
UNCOV
557
            match state_transition {
×
UNCOV
558
                Ok(OptionalTransitionOutcome::Progress(next_state)) => {
×
UNCOV
559
                    println!("Got a request from the sender. Responding with a Payjoin proposal.");
×
UNCOV
560
                    return Ok(next_state);
×
561
                }
562
                Ok(OptionalTransitionOutcome::Stasis(current_state)) => {
×
563
                    session = current_state;
×
564
                    continue;
×
565
                }
566
                Err(e) => return Err(e.into()),
×
567
            }
568
        }
UNCOV
569
    }
×
570

UNCOV
571
    async fn process_receiver_session(
×
UNCOV
572
        &self,
×
UNCOV
573
        session: ReceiveSession,
×
UNCOV
574
        persister: &ReceiverPersister,
×
UNCOV
575
    ) -> Result<()> {
×
UNCOV
576
        let res = {
×
UNCOV
577
            match session {
×
UNCOV
578
                ReceiveSession::Initialized(proposal) =>
×
UNCOV
579
                    self.read_from_directory(proposal, persister).await,
×
580
                ReceiveSession::UncheckedOriginalPayload(proposal) =>
×
581
                    self.check_proposal(proposal, persister).await,
×
582
                ReceiveSession::MaybeInputsOwned(proposal) =>
×
583
                    self.check_inputs_not_owned(proposal, persister).await,
×
584
                ReceiveSession::MaybeInputsSeen(proposal) =>
×
585
                    self.check_no_inputs_seen_before(proposal, persister).await,
×
586
                ReceiveSession::OutputsUnknown(proposal) =>
×
587
                    self.identify_receiver_outputs(proposal, persister).await,
×
588
                ReceiveSession::WantsOutputs(proposal) =>
×
589
                    self.commit_outputs(proposal, persister).await,
×
590
                ReceiveSession::WantsInputs(proposal) =>
×
591
                    self.contribute_inputs(proposal, persister).await,
×
592
                ReceiveSession::WantsFeeRange(proposal) =>
×
593
                    self.apply_fee_range(proposal, persister).await,
×
594
                ReceiveSession::ProvisionalProposal(proposal) =>
×
595
                    self.finalize_proposal(proposal, persister).await,
×
596
                ReceiveSession::PayjoinProposal(proposal) =>
×
597
                    self.send_payjoin_proposal(proposal, persister).await,
×
598
                ReceiveSession::HasReplyableError(error) =>
×
599
                    self.handle_error(error, persister).await,
×
UNCOV
600
                ReceiveSession::Monitor(proposal) =>
×
UNCOV
601
                    self.monitor_payjoin_proposal(proposal, persister).await,
×
602
                ReceiveSession::Closed(_) => return Err(anyhow!("Session closed")),
×
603
            }
604
        };
UNCOV
605
        res
×
UNCOV
606
    }
×
607

608
    #[allow(clippy::incompatible_msrv)]
UNCOV
609
    async fn read_from_directory(
×
UNCOV
610
        &self,
×
UNCOV
611
        session: Receiver<Initialized>,
×
UNCOV
612
        persister: &ReceiverPersister,
×
UNCOV
613
    ) -> Result<()> {
×
UNCOV
614
        let mut interrupt = self.interrupt.clone();
×
UNCOV
615
        let receiver = tokio::select! {
×
UNCOV
616
            res = self.long_poll_fallback(session, persister) => res,
×
UNCOV
617
            _ = interrupt.changed() => {
×
UNCOV
618
                println!("Interrupted. Call the `resume` command to resume all sessions.");
×
UNCOV
619
                return Err(anyhow!("Interrupted"));
×
620
            }
621
        }?;
×
UNCOV
622
        self.check_proposal(receiver, persister).await
×
UNCOV
623
    }
×
624

UNCOV
625
    async fn check_proposal(
×
UNCOV
626
        &self,
×
UNCOV
627
        proposal: Receiver<UncheckedOriginalPayload>,
×
UNCOV
628
        persister: &ReceiverPersister,
×
UNCOV
629
    ) -> Result<()> {
×
UNCOV
630
        let wallet = self.wallet();
×
UNCOV
631
        let proposal = proposal
×
UNCOV
632
            .check_broadcast_suitability(None, |tx| {
×
UNCOV
633
                wallet
×
UNCOV
634
                    .can_broadcast(tx)
×
UNCOV
635
                    .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
×
UNCOV
636
            })
×
UNCOV
637
            .save(persister)?;
×
638

UNCOV
639
        println!("Fallback transaction received. Consider broadcasting this to get paid if the Payjoin fails:");
×
UNCOV
640
        println!("{}", serialize_hex(&proposal.extract_tx_to_schedule_broadcast()));
×
UNCOV
641
        self.check_inputs_not_owned(proposal, persister).await
×
642
    }
×
643

UNCOV
644
    async fn check_inputs_not_owned(
×
UNCOV
645
        &self,
×
UNCOV
646
        proposal: Receiver<MaybeInputsOwned>,
×
UNCOV
647
        persister: &ReceiverPersister,
×
UNCOV
648
    ) -> Result<()> {
×
UNCOV
649
        let wallet = self.wallet();
×
UNCOV
650
        let proposal = proposal
×
UNCOV
651
            .check_inputs_not_owned(&mut |input| {
×
UNCOV
652
                wallet
×
UNCOV
653
                    .is_mine(input)
×
UNCOV
654
                    .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
×
UNCOV
655
            })
×
UNCOV
656
            .save(persister)?;
×
UNCOV
657
        self.check_no_inputs_seen_before(proposal, persister).await
×
658
    }
×
659

UNCOV
660
    async fn check_no_inputs_seen_before(
×
UNCOV
661
        &self,
×
UNCOV
662
        proposal: Receiver<MaybeInputsSeen>,
×
UNCOV
663
        persister: &ReceiverPersister,
×
UNCOV
664
    ) -> Result<()> {
×
UNCOV
665
        let proposal = proposal
×
UNCOV
666
            .check_no_inputs_seen_before(&mut |input| {
×
UNCOV
667
                Ok(self.db.insert_input_seen_before(*input)?)
×
UNCOV
668
            })
×
UNCOV
669
            .save(persister)?;
×
UNCOV
670
        self.identify_receiver_outputs(proposal, persister).await
×
671
    }
×
672

UNCOV
673
    async fn identify_receiver_outputs(
×
UNCOV
674
        &self,
×
UNCOV
675
        proposal: Receiver<OutputsUnknown>,
×
UNCOV
676
        persister: &ReceiverPersister,
×
UNCOV
677
    ) -> Result<()> {
×
UNCOV
678
        let wallet = self.wallet();
×
UNCOV
679
        let proposal = proposal
×
UNCOV
680
            .identify_receiver_outputs(&mut |output_script| {
×
UNCOV
681
                wallet
×
UNCOV
682
                    .is_mine(output_script)
×
UNCOV
683
                    .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
×
UNCOV
684
            })
×
UNCOV
685
            .save(persister)?;
×
UNCOV
686
        self.commit_outputs(proposal, persister).await
×
687
    }
×
688

UNCOV
689
    async fn commit_outputs(
×
UNCOV
690
        &self,
×
UNCOV
691
        proposal: Receiver<WantsOutputs>,
×
UNCOV
692
        persister: &ReceiverPersister,
×
UNCOV
693
    ) -> Result<()> {
×
UNCOV
694
        let proposal = proposal.commit_outputs().save(persister)?;
×
UNCOV
695
        self.contribute_inputs(proposal, persister).await
×
696
    }
×
697

UNCOV
698
    async fn contribute_inputs(
×
UNCOV
699
        &self,
×
UNCOV
700
        proposal: Receiver<WantsInputs>,
×
UNCOV
701
        persister: &ReceiverPersister,
×
UNCOV
702
    ) -> Result<()> {
×
UNCOV
703
        let wallet = self.wallet();
×
UNCOV
704
        let candidate_inputs = wallet.list_unspent()?;
×
705

UNCOV
706
        if candidate_inputs.is_empty() {
×
707
            return Err(anyhow::anyhow!(
×
708
                "No spendable UTXOs available in wallet. Cannot contribute inputs to payjoin."
×
709
            ));
×
UNCOV
710
        }
×
711

UNCOV
712
        let selected_input = proposal.try_preserving_privacy(candidate_inputs)?;
×
UNCOV
713
        let proposal =
×
UNCOV
714
            proposal.contribute_inputs(vec![selected_input])?.commit_inputs().save(persister)?;
×
UNCOV
715
        self.apply_fee_range(proposal, persister).await
×
716
    }
×
717

UNCOV
718
    async fn apply_fee_range(
×
UNCOV
719
        &self,
×
UNCOV
720
        proposal: Receiver<WantsFeeRange>,
×
UNCOV
721
        persister: &ReceiverPersister,
×
UNCOV
722
    ) -> Result<()> {
×
UNCOV
723
        let proposal = proposal.apply_fee_range(None, self.config.max_fee_rate).save(persister)?;
×
UNCOV
724
        self.finalize_proposal(proposal, persister).await
×
725
    }
×
726

UNCOV
727
    async fn finalize_proposal(
×
UNCOV
728
        &self,
×
UNCOV
729
        proposal: Receiver<ProvisionalProposal>,
×
UNCOV
730
        persister: &ReceiverPersister,
×
UNCOV
731
    ) -> Result<()> {
×
UNCOV
732
        let wallet = self.wallet();
×
UNCOV
733
        let proposal = proposal
×
UNCOV
734
            .finalize_proposal(|psbt| {
×
UNCOV
735
                wallet
×
UNCOV
736
                    .process_psbt(psbt)
×
UNCOV
737
                    .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
×
UNCOV
738
            })
×
UNCOV
739
            .save(persister)?;
×
UNCOV
740
        self.send_payjoin_proposal(proposal, persister).await
×
741
    }
×
742

UNCOV
743
    async fn send_payjoin_proposal(
×
UNCOV
744
        &self,
×
UNCOV
745
        proposal: Receiver<PayjoinProposal>,
×
UNCOV
746
        persister: &ReceiverPersister,
×
UNCOV
747
    ) -> Result<()> {
×
UNCOV
748
        let (req, ohttp_ctx) = proposal
×
UNCOV
749
            .create_post_request(self.unwrap_relay_or_else_fetch(None::<&str>).await?.as_str())
×
UNCOV
750
            .map_err(|e| anyhow!("v2 req extraction failed {}", e))?;
×
UNCOV
751
        let res = self.post_request(req).await?;
×
UNCOV
752
        let payjoin_psbt = proposal.psbt().clone();
×
UNCOV
753
        let session = proposal.process_response(&res.bytes().await?, ohttp_ctx).save(persister)?;
×
UNCOV
754
        println!(
×
755
            "Response successful. Watch mempool for successful Payjoin. TXID: {}",
UNCOV
756
            payjoin_psbt.extract_tx_unchecked_fee_rate().compute_txid()
×
757
        );
758

UNCOV
759
        return self.monitor_payjoin_proposal(session, persister).await;
×
760
    }
×
761

UNCOV
762
    async fn monitor_payjoin_proposal(
×
UNCOV
763
        &self,
×
UNCOV
764
        proposal: Receiver<Monitor>,
×
UNCOV
765
        persister: &ReceiverPersister,
×
UNCOV
766
    ) -> Result<()> {
×
767
        // On a session resumption, the receiver will resume again in this state.
UNCOV
768
        let poll_interval = tokio::time::Duration::from_millis(200);
×
UNCOV
769
        let timeout_duration = tokio::time::Duration::from_secs(5);
×
770

UNCOV
771
        let mut interval = tokio::time::interval(poll_interval);
×
UNCOV
772
        interval.tick().await;
×
773

UNCOV
774
        tracing::debug!("Polling for payment confirmation");
×
775

UNCOV
776
        let result = tokio::time::timeout(timeout_duration, async {
×
777
            loop {
UNCOV
778
                interval.tick().await;
×
UNCOV
779
                let check_result = proposal
×
UNCOV
780
                    .check_payment(|txid| {
×
UNCOV
781
                        self.wallet()
×
UNCOV
782
                            .get_raw_transaction(&txid)
×
UNCOV
783
                            .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
×
UNCOV
784
                    })
×
UNCOV
785
                    .save(persister);
×
786

UNCOV
787
                match check_result {
×
788
                    Ok(_) => {
UNCOV
789
                        println!("Payjoin transaction detected in the mempool!");
×
UNCOV
790
                        return Ok(());
×
791
                    }
792
                    Err(_) => {
793
                        // keep polling
794

795
                        continue;
×
796
                    }
797
                }
798
            }
UNCOV
799
        })
×
UNCOV
800
        .await;
×
801

UNCOV
802
        match result {
×
UNCOV
803
            Ok(ok) => ok,
×
804
            Err(_) => Err(anyhow!(
×
805
                "Timeout waiting for payment confirmation after {:?}",
×
806
                timeout_duration
×
807
            )),
×
808
        }
UNCOV
809
    }
×
810

UNCOV
811
    async fn unwrap_relay_or_else_fetch(
×
UNCOV
812
        &self,
×
UNCOV
813
        directory: Option<impl payjoin::IntoUrl>,
×
UNCOV
814
    ) -> Result<url::Url> {
×
UNCOV
815
        let directory = directory.map(|url| url.into_url()).transpose()?;
×
UNCOV
816
        let selected_relay =
×
UNCOV
817
            self.relay_manager.lock().expect("Lock should not be poisoned").get_selected_relay();
×
UNCOV
818
        let ohttp_relay = match selected_relay {
×
UNCOV
819
            Some(relay) => relay,
×
820
            None =>
UNCOV
821
                unwrap_ohttp_keys_or_else_fetch(&self.config, directory, self.relay_manager.clone())
×
UNCOV
822
                    .await?
×
823
                    .relay_url,
824
        };
UNCOV
825
        Ok(ohttp_relay)
×
UNCOV
826
    }
×
827

828
    /// Handle error by attempting to send an error response over the directory
829
    async fn handle_error(
×
830
        &self,
×
831
        session: Receiver<HasReplyableError>,
×
832
        persister: &ReceiverPersister,
×
833
    ) -> Result<()> {
×
834
        let (err_req, err_ctx) = session
×
835
            .create_error_request(self.unwrap_relay_or_else_fetch(None::<&str>).await?.as_str())?;
×
836

837
        let err_response = match self.post_request(err_req).await {
×
838
            Ok(response) => response,
×
839
            Err(e) => return Err(anyhow!("Failed to post error request: {}", e)),
×
840
        };
841

842
        let err_bytes = match err_response.bytes().await {
×
843
            Ok(bytes) => bytes,
×
844
            Err(e) => return Err(anyhow!("Failed to get error response bytes: {}", e)),
×
845
        };
846

847
        if let Err(e) = session.process_error_response(&err_bytes, err_ctx).save(persister) {
×
848
            return Err(anyhow!("Failed to process error response: {}", e));
×
849
        }
×
850

851
        Ok(())
×
852
    }
×
853

UNCOV
854
    async fn post_request(&self, req: payjoin::Request) -> Result<reqwest::Response> {
×
UNCOV
855
        let http = http_agent(&self.config)?;
×
UNCOV
856
        http.post(req.url)
×
UNCOV
857
            .header("Content-Type", req.content_type)
×
UNCOV
858
            .body(req.body)
×
UNCOV
859
            .send()
×
UNCOV
860
            .await
×
UNCOV
861
            .map_err(map_reqwest_err)
×
UNCOV
862
    }
×
863
}
864

865
fn map_reqwest_err(e: reqwest::Error) -> anyhow::Error {
×
866
    match e.status() {
×
867
        Some(status_code) => anyhow!("HTTP request failed: {} {}", status_code, e),
×
868
        None => anyhow!("No HTTP response: {}", e),
×
869
    }
870
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc