• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

payjoin / rust-payjoin / 25688181666

11 May 2026 06:06PM UTC coverage: 85.312% (+0.02%) from 85.294%
25688181666

Pull #1399

github

web-flow
Merge b86ecd4b4 into 67b14ded2
Pull Request #1399: Split resume session relays

48 of 64 new or added lines in 2 files covered. (75.0%)

2 existing lines in 1 file now uncovered.

11675 of 13685 relevant lines covered (85.31%)

395.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.46
/payjoin-cli/src/app/v2/mod.rs
1
use std::fmt;
2
use std::sync::Arc;
3

4
use anyhow::{anyhow, Context, Result};
5
use payjoin::bitcoin::consensus::encode::serialize_hex;
6
use payjoin::bitcoin::{Amount, FeeRate};
7
use payjoin::persist::{OptionalTransitionOutcome, SessionPersister};
8
use payjoin::receive::v2::{
9
    replay_event_log as replay_receiver_event_log, HasReplyableError, Initialized,
10
    MaybeInputsOwned, MaybeInputsSeen, Monitor, OutputsUnknown, PayjoinProposal,
11
    ProvisionalProposal, ReceiveSession, Receiver, ReceiverBuilder,
12
    SessionOutcome as ReceiverSessionOutcome, UncheckedOriginalPayload, WantsFeeRange, WantsInputs,
13
    WantsOutputs,
14
};
15
use payjoin::send::v2::{
16
    replay_event_log as replay_sender_event_log, PollingForProposal, SendSession, Sender,
17
    SenderBuilder, SessionOutcome as SenderSessionOutcome, WithReplyKey,
18
};
19
use payjoin::{ImplementationError, PjParam, Uri};
20
use tokio::sync::watch;
21

22
use super::config::Config;
23
use super::wallet::BitcoindWallet;
24
use super::App as AppTrait;
25
use crate::app::v2::ohttp::{unwrap_ohttp_keys_or_else_fetch, RelayManager};
26
use crate::app::{handle_interrupt, http_agent};
27
use crate::db::v2::{ReceiverPersister, SenderPersister, SessionId};
28
use crate::db::Database;
29

30
mod ohttp;
31

32
const W_ID: usize = 12;
33
const W_ROLE: usize = 25;
34
const W_DONE: usize = 15;
35
const W_STATUS: usize = 15;
36

37
#[derive(Clone)]
38
pub(crate) struct App {
39
    config: Config,
40
    db: Arc<Database>,
41
    wallet: BitcoindWallet,
42
    interrupt: watch::Receiver<()>,
43
}
44

45
trait StatusText {
46
    fn status_text(&self) -> &'static str;
47
}
48

49
impl StatusText for SendSession {
50
    fn status_text(&self) -> &'static str {
×
51
        match self {
×
52
            SendSession::WithReplyKey(_) | SendSession::PollingForProposal(_) =>
53
                "Waiting for proposal",
×
54
            SendSession::Closed(session_outcome) => match session_outcome {
×
55
                SenderSessionOutcome::Failure => "Session failure",
×
56
                SenderSessionOutcome::Success(_) => "Session success",
×
57
                SenderSessionOutcome::Cancel => "Session cancelled",
×
58
            },
59
        }
60
    }
×
61
}
62

63
impl StatusText for ReceiveSession {
64
    fn status_text(&self) -> &'static str {
×
65
        match self {
×
66
            ReceiveSession::Initialized(_) => "Waiting for original proposal",
×
67
            ReceiveSession::UncheckedOriginalPayload(_)
68
            | ReceiveSession::MaybeInputsOwned(_)
69
            | ReceiveSession::MaybeInputsSeen(_)
70
            | ReceiveSession::OutputsUnknown(_)
71
            | ReceiveSession::WantsOutputs(_)
72
            | ReceiveSession::WantsInputs(_)
73
            | ReceiveSession::WantsFeeRange(_)
74
            | ReceiveSession::ProvisionalProposal(_) => "Processing original proposal",
×
75
            ReceiveSession::PayjoinProposal(_) => "Payjoin proposal sent",
×
76
            ReceiveSession::HasReplyableError(_) =>
77
                "Session failure, waiting to post error response",
×
78
            ReceiveSession::Monitor(_) => "Monitoring payjoin proposal",
×
79
            ReceiveSession::Closed(session_outcome) => match session_outcome {
×
80
                ReceiverSessionOutcome::Failure => "Session failure",
×
81
                ReceiverSessionOutcome::Success(_) => "Session success, Payjoin proposal was broadcasted",
×
82
                ReceiverSessionOutcome::Cancel => "Session cancelled",
×
83
                ReceiverSessionOutcome::FallbackBroadcasted => "Fallback broadcasted",
×
84
                ReceiverSessionOutcome::PayjoinProposalSent =>
85
                    "Payjoin proposal sent, skipping monitoring as the sender is spending non-SegWit inputs",
×
86
            },
87
        }
88
    }
×
89
}
90

91
fn print_header() {
×
92
    println!(
×
93
        "{:<W_ID$} {:<W_ROLE$} {:<W_DONE$} {:<W_STATUS$}",
94
        "Session ID", "Sender/Receiver", "Completed At", "Status"
95
    );
96
}
×
97

98
enum Role {
99
    Sender,
100
    Receiver,
101
}
102
impl Role {
103
    fn as_str(&self) -> &'static str {
×
104
        match self {
×
105
            Role::Sender => "Sender",
×
106
            Role::Receiver => "Receiver",
×
107
        }
108
    }
×
109
}
110

111
struct SessionHistoryRow<Status> {
112
    session_id: SessionId,
113
    role: Role,
114
    status: Status,
115
    completed_at: Option<u64>,
116
    error_message: Option<String>,
117
}
118

119
impl<Status: StatusText> fmt::Display for SessionHistoryRow<Status> {
120
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
×
121
        write!(
×
122
            f,
×
123
            "{:<W_ID$} {:<W_ROLE$} {:<W_DONE$} {:<W_STATUS$}",
124
            self.session_id.to_string(),
×
125
            self.role.as_str(),
×
126
            match self.completed_at {
×
127
                None => "Not Completed".to_string(),
×
128
                Some(secs) => {
×
129
                    // TODO: human readable time
130
                    secs.to_string()
×
131
                }
132
            },
133
            self.error_message.as_deref().unwrap_or(self.status.status_text())
×
134
        )
135
    }
×
136
}
137

138
#[async_trait::async_trait]
139
impl AppTrait for App {
140
    async fn new(config: Config) -> Result<Self> {
11✔
141
        let db = Arc::new(Database::create(&config.db_path)?);
142
        let (interrupt_tx, interrupt_rx) = watch::channel(());
143
        tokio::spawn(handle_interrupt(interrupt_tx));
144
        let wallet = BitcoindWallet::new(&config.bitcoind).await?;
145
        let app = Self { config, db, wallet, interrupt: interrupt_rx };
146
        app.wallet()
147
            .network()
148
            .context("Failed to connect to bitcoind. Check config RPC connection.")?;
149
        Ok(app)
150
    }
11✔
151

152
    fn wallet(&self) -> BitcoindWallet { self.wallet.clone() }
30✔
153

154
    async fn send_payjoin(&self, bip21: &str, fee_rate: FeeRate) -> Result<()> {
4✔
155
        use payjoin::UriExt;
156
        let uri = Uri::try_from(bip21)
157
            .map_err(|e| anyhow!("Failed to create URI from BIP21: {}", e))?
×
158
            .assume_checked()
159
            .check_pj_supported()
160
            .map_err(|_| anyhow!("URI does not support Payjoin"))?;
×
161
        let address = uri.address;
162
        let amount = uri.amount.ok_or_else(|| anyhow!("please specify the amount in the Uri"))?;
×
163
        match uri.extras.pj_param() {
164
            #[cfg(feature = "v1")]
165
            PjParam::V1(pj_param) => {
166
                let psbt = self.create_original_psbt(&address, amount, fee_rate)?;
167
                let fallback_tx = psbt.clone().extract_tx()?;
168
                let (req, ctx) = payjoin::send::v1::SenderBuilder::from_parts(
169
                    psbt,
170
                    pj_param,
171
                    &address,
172
                    Some(amount),
173
                )
174
                .build_recommended(fee_rate)
175
                .with_context(|| "Failed to build payjoin request")?
176
                .create_v1_post_request();
177
                let http = http_agent(&self.config)?;
178
                let body = String::from_utf8(req.body.clone()).unwrap();
179
                println!("Sending Original PSBT to {}", req.url);
180
                let response = match http
181
                    .post(req.url)
182
                    .header("Content-Type", req.content_type)
183
                    .body(body.clone())
184
                    .send()
185
                    .await
186
                {
187
                    Ok(response) => response,
188
                    Err(e) => {
189
                        tracing::error!("HTTP request failed: {e}");
190
                        println!("Payjoin failed. To broadcast the fallback transaction, run:");
191
                        println!(
192
                            "  bitcoin-cli -rpcwallet=<wallet> sendrawtransaction {:#}",
193
                            payjoin::bitcoin::consensus::encode::serialize_hex(&fallback_tx)
194
                        );
195
                        return Err(anyhow!("HTTP request failed: {e}"));
196
                    }
197
                };
198
                let psbt = match ctx.process_response(&response.bytes().await?) {
199
                    Ok(psbt) => psbt,
200
                    Err(e) => {
201
                        tracing::error!("Error processing response: {e:?}");
202
                        println!("Payjoin failed. To broadcast the fallback transaction, run:");
203
                        println!(
204
                            "  bitcoin-cli -rpcwallet=<wallet> sendrawtransaction {:#}",
205
                            payjoin::bitcoin::consensus::encode::serialize_hex(&fallback_tx)
206
                        );
207
                        return Err(anyhow!("Failed to process response {e}"));
208
                    }
209
                };
210

211
                self.process_pj_response(psbt)?;
212
                Ok(())
213
            }
214
            PjParam::V2(pj_param) => {
215
                let receiver_pubkey = pj_param.receiver_pubkey();
216
                let sender_state =
217
                    self.db.get_send_session_ids()?.into_iter().find_map(|session_id| {
1✔
218
                        let session_receiver_pubkey = self
1✔
219
                            .db
1✔
220
                            .get_send_session_receiver_pk(&session_id)
1✔
221
                            .expect("Receiver pubkey should exist if session id exists");
1✔
222
                        if session_receiver_pubkey == *receiver_pubkey {
1✔
223
                            let sender_persister =
1✔
224
                                SenderPersister::from_id(self.db.clone(), session_id);
1✔
225
                            let (send_session, _) = replay_sender_event_log(&sender_persister)
1✔
226
                                .map_err(|e| anyhow!("Failed to replay sender event log: {:?}", e))
1✔
227
                                .ok()?;
1✔
228

229
                            Some((send_session, sender_persister))
1✔
230
                        } else {
231
                            None
×
232
                        }
233
                    });
1✔
234

235
                let (sender_state, persister) = match sender_state {
236
                    Some((sender_state, persister)) => (sender_state, persister),
237
                    None => {
238
                        let persister =
239
                            SenderPersister::new(self.db.clone(), bip21, receiver_pubkey)?;
240
                        let psbt = self.create_original_psbt(&address, amount, fee_rate)?;
241
                        let sender =
242
                            SenderBuilder::from_parts(psbt, pj_param, &address, Some(amount))
243
                                .build_recommended(fee_rate)?
244
                                .save(&persister)?;
245

246
                        (SendSession::WithReplyKey(sender), persister)
247
                    }
248
                };
249
                let mut interrupt = self.interrupt.clone();
250
                tokio::select! {
251
                    res = self.process_sender_session(sender_state, &persister) => {
252
                        match res {
253
                            Ok(()) => return Ok(()),
254
                            Err(err) => {
255
                                let id = persister.session_id();
256
                                println!("Session {id} failed. Run `payjoin-cli fallback {id}` to broadcast the original transaction.");
257
                                return Err(err);
258
                            }
259
                        }
260
                    },
261
                    _ = interrupt.changed() => {
262
                        let id = persister.session_id();
263
                        println!(
264
                            "Session {id} interrupted. Call `send` again to resume, `resume` to resume all sessions, or `payjoin-cli fallback {id}` to broadcast the original transaction."
265
                        );
266
                        return Err(anyhow!("Interrupted"))
267
                    }
268
                }
269
            }
270
            _ => unimplemented!("Unrecognized payjoin version"),
271
        }
272
    }
4✔
273

274
    async fn receive_payjoin(&self, amount: Amount) -> Result<()> {
2✔
275
        let address = self.wallet().get_new_address()?;
276
        let mut relay_manager = RelayManager::new();
277
        let ohttp_keys = unwrap_ohttp_keys_or_else_fetch(&self.config, None, &mut relay_manager)
278
            .await?
279
            .ohttp_keys;
280
        let persister = ReceiverPersister::new(self.db.clone())?;
281
        let session =
282
            ReceiverBuilder::new(address, self.config.v2()?.pj_directory.as_str(), ohttp_keys)?
283
                .with_amount(amount)
284
                .with_max_fee_rate(self.config.max_fee_rate.unwrap_or(FeeRate::BROADCAST_MIN))
285
                .build()
286
                .save(&persister)?;
287

288
        println!("Receive session established");
289
        let pj_uri = session.pj_uri();
290
        println!("Request Payjoin by sharing this Payjoin Uri:");
291
        println!("{pj_uri}");
292

293
        self.process_receiver_session(ReceiveSession::Initialized(session.clone()), &persister)
294
            .await?;
295
        Ok(())
296
    }
2✔
297

298
    async fn resume_payjoins(&self) -> Result<()> {
4✔
299
        let recv_session_ids = self.db.get_recv_session_ids()?;
300
        let send_session_ids = self.db.get_send_session_ids()?;
301

302
        if recv_session_ids.is_empty() && send_session_ids.is_empty() {
303
            println!("No sessions to resume.");
304
            return Ok(());
305
        }
306

307
        let mut tasks = Vec::new();
308

309
        // Process receiver sessions
310
        for session_id in recv_session_ids {
311
            let self_clone = self.clone();
312
            let recv_persister = ReceiverPersister::from_id(self.db.clone(), session_id.clone());
313
            match replay_receiver_event_log(&recv_persister) {
314
                Ok((receiver_state, _)) => {
315
                    tasks.push(tokio::spawn(async move {
2✔
316
                        self_clone.process_receiver_session(receiver_state, &recv_persister).await
2✔
317
                    }));
1✔
318
                }
319
                Err(e) => {
320
                    tracing::error!("An error {:?} occurred while replaying receiver session", e);
321
                    Self::close_failed_session(&recv_persister, &session_id, "receiver");
322
                }
323
            }
324
        }
325

326
        // Process sender sessions
327
        for session_id in send_session_ids {
328
            let sender_persister = SenderPersister::from_id(self.db.clone(), session_id.clone());
329
            match replay_sender_event_log(&sender_persister) {
330
                Ok((sender_state, _)) => {
331
                    let self_clone = self.clone();
332
                    tasks.push(tokio::spawn(async move {
×
333
                        self_clone.process_sender_session(sender_state, &sender_persister).await
×
334
                    }));
×
335
                }
336
                Err(e) => {
337
                    tracing::error!("An error {:?} occurred while replaying Sender session", e);
338
                    Self::close_failed_session(&sender_persister, &session_id, "sender");
339
                }
340
            }
341
        }
342

343
        let mut interrupt = self.interrupt.clone();
344
        tokio::select! {
345
            _ = async {
2✔
346
                for task in tasks {
2✔
347
                    let _ = task.await;
2✔
348
                }
349
            } => {
1✔
350
                println!("All resumed sessions completed.");
351
            }
352
            _ = interrupt.changed() => {
353
                println!("Resumed sessions were interrupted.");
354
            }
355
        }
356
        Ok(())
357
    }
4✔
358

359
    #[cfg(feature = "v2")]
360
    async fn history(&self) -> Result<()> {
×
361
        print_header();
362
        let mut send_rows = vec![];
363
        let mut recv_rows = vec![];
364
        self.db.get_send_session_ids()?.into_iter().for_each(|session_id| {
×
365
            let persister = SenderPersister::from_id(self.db.clone(), session_id.clone());
×
366
            match replay_sender_event_log(&persister) {
×
367
                Ok((sender_state, _)) => {
×
368
                    let row = SessionHistoryRow {
×
369
                        session_id,
×
370
                        role: Role::Sender,
×
371
                        status: sender_state.clone(),
×
372
                        completed_at: None,
×
373
                        error_message: None,
×
374
                    };
×
375
                    send_rows.push(row);
×
376
                }
×
377
                Err(e) => {
×
378
                    let row = SessionHistoryRow {
×
379
                        session_id,
×
380
                        role: Role::Sender,
×
381
                        status: SendSession::Closed(SenderSessionOutcome::Failure),
×
382
                        completed_at: None,
×
383
                        error_message: Some(e.to_string()),
×
384
                    };
×
385
                    send_rows.push(row);
×
386
                }
×
387
            }
388
        });
×
389

390
        self.db.get_recv_session_ids()?.into_iter().for_each(|session_id| {
×
391
            let persister = ReceiverPersister::from_id(self.db.clone(), session_id.clone());
×
392
            match replay_receiver_event_log(&persister) {
×
393
                Ok((receiver_state, _)) => {
×
394
                    let row = SessionHistoryRow {
×
395
                        session_id,
×
396
                        role: Role::Receiver,
×
397
                        status: receiver_state.clone(),
×
398
                        completed_at: None,
×
399
                        error_message: None,
×
400
                    };
×
401
                    recv_rows.push(row);
×
402
                }
×
403
                Err(e) => {
×
404
                    let row = SessionHistoryRow {
×
405
                        session_id,
×
406
                        role: Role::Receiver,
×
407
                        status: ReceiveSession::Closed(ReceiverSessionOutcome::Failure),
×
408
                        completed_at: None,
×
409
                        error_message: Some(e.to_string()),
×
410
                    };
×
411
                    recv_rows.push(row);
×
412
                }
×
413
            }
414
        });
×
415

416
        self.db.get_inactive_send_session_ids()?.into_iter().for_each(
417
            |(session_id, completed_at)| {
×
418
                let persister = SenderPersister::from_id(self.db.clone(), session_id.clone());
×
419
                match replay_sender_event_log(&persister) {
×
420
                    Ok((sender_state, _)) => {
×
421
                        let row = SessionHistoryRow {
×
422
                            session_id,
×
423
                            role: Role::Sender,
×
424
                            status: sender_state.clone(),
×
425
                            completed_at: Some(completed_at),
×
426
                            error_message: None,
×
427
                        };
×
428
                        send_rows.push(row);
×
429
                    }
×
430
                    Err(e) => {
×
431
                        let row = SessionHistoryRow {
×
432
                            session_id,
×
433
                            role: Role::Sender,
×
434
                            status: SendSession::Closed(SenderSessionOutcome::Failure),
×
435
                            completed_at: Some(completed_at),
×
436
                            error_message: Some(e.to_string()),
×
437
                        };
×
438
                        send_rows.push(row);
×
439
                    }
×
440
                }
441
            },
×
442
        );
443

444
        self.db.get_inactive_recv_session_ids()?.into_iter().for_each(
445
            |(session_id, completed_at)| {
×
446
                let persister = ReceiverPersister::from_id(self.db.clone(), session_id.clone());
×
447
                match replay_receiver_event_log(&persister) {
×
448
                    Ok((receiver_state, _)) => {
×
449
                        let row = SessionHistoryRow {
×
450
                            session_id,
×
451
                            role: Role::Receiver,
×
452
                            status: receiver_state.clone(),
×
453
                            completed_at: Some(completed_at),
×
454
                            error_message: None,
×
455
                        };
×
456
                        recv_rows.push(row);
×
457
                    }
×
458
                    Err(e) => {
×
459
                        let row = SessionHistoryRow {
×
460
                            session_id,
×
461
                            role: Role::Receiver,
×
462
                            status: ReceiveSession::Closed(ReceiverSessionOutcome::Failure),
×
463
                            completed_at: Some(completed_at),
×
464
                            error_message: Some(e.to_string()),
×
465
                        };
×
466
                        recv_rows.push(row);
×
467
                    }
×
468
                }
469
            },
×
470
        );
471

472
        // Print receiver and sender rows separately
473
        for row in send_rows {
474
            println!("{row}");
475
        }
476
        for row in recv_rows {
477
            println!("{row}");
478
        }
479

480
        Ok(())
481
    }
×
482

483
    async fn fallback_sender(&self, session_id: SessionId) -> Result<()> {
1✔
484
        let persister = SenderPersister::from_id(self.db.clone(), session_id.clone());
485
        let (session, history) = replay_sender_event_log(&persister)?;
486

487
        if let SendSession::Closed(SenderSessionOutcome::Success(proposal)) = session {
488
            let txid = proposal.clone().extract_tx_unchecked_fee_rate().compute_txid();
489
            println!(
490
                "Session {session_id} already produced payjoin transaction {txid}. \
491
                 Broadcasting the original now would double-spend against it. \
492
                 If the payjoin tx needs re-broadcast, run \
493
                 `bitcoin-cli gettransaction {txid}` to fetch the hex, then \
494
                 `bitcoin-cli sendrawtransaction <hex>`."
495
            );
496
            return Ok(());
497
        }
498

499
        let fallback_tx = history.fallback_tx();
500
        self.wallet().broadcast_tx(&fallback_tx)?;
501
        println!("Broadcasted fallback transaction txid: {}", fallback_tx.compute_txid());
502

503
        if let Err(e) = SessionPersister::close(&persister) {
504
            tracing::warn!("Failed to close session {session_id} after fallback: {e}");
505
        }
506
        Ok(())
507
    }
1✔
508
}
509

510
impl App {
511
    fn close_failed_session<P>(persister: &P, session_id: &SessionId, role: &str)
×
512
    where
×
513
        P: SessionPersister,
×
514
    {
515
        if let Err(close_err) = SessionPersister::close(persister) {
×
516
            tracing::error!("Failed to close {} session {}: {:?}", role, session_id, close_err);
×
517
        } else {
518
            tracing::error!("Closed failed {} session: {}", role, session_id);
×
519
        }
520
    }
×
521

522
    async fn process_sender_session(
3✔
523
        &self,
3✔
524
        session: SendSession,
3✔
525
        persister: &SenderPersister,
3✔
526
    ) -> Result<()> {
3✔
527
        let mut relay_manager = RelayManager::new();
3✔
UNCOV
528
        match session {
×
529
            SendSession::WithReplyKey(context) =>
2✔
530
                self.post_original_proposal(context, persister, &mut relay_manager).await?,
2✔
531
            SendSession::PollingForProposal(context) =>
1✔
532
                self.get_proposed_payjoin_psbt(context, persister, &mut relay_manager).await?,
1✔
533
            SendSession::Closed(SenderSessionOutcome::Success(proposal)) => {
×
534
                self.process_pj_response(proposal)?;
×
535
                return Ok(());
×
536
            }
537
            SendSession::Closed(SenderSessionOutcome::Failure)
538
            | SendSession::Closed(SenderSessionOutcome::Cancel) => {
539
                let id = persister.session_id();
×
540
                println!(
×
541
                    "Session {id} ended without payjoin. Run `payjoin-cli fallback {id}` to broadcast the original transaction."
542
                );
543
                return Ok(());
×
544
            }
545
        }
546
        Ok(())
1✔
547
    }
1✔
548

549
    async fn post_original_proposal(
2✔
550
        &self,
2✔
551
        sender: Sender<WithReplyKey>,
2✔
552
        persister: &SenderPersister,
2✔
553
        relay_manager: &mut RelayManager,
2✔
554
    ) -> Result<()> {
2✔
555
        let (req, ctx) = sender.create_v2_post_request(
2✔
556
            self.unwrap_relay_or_else_fetch(Some(&sender.endpoint()), relay_manager)
2✔
557
                .await?
2✔
558
                .as_str(),
2✔
UNCOV
559
        )?;
×
560
        let response = self.post_request(req).await?;
2✔
561
        let sender = sender.process_response(&response.bytes().await?, ctx).save(persister)?;
2✔
562
        println!("Posted Original PSBT...");
2✔
563
        self.get_proposed_payjoin_psbt(sender, persister, relay_manager).await
2✔
564
    }
×
565

566
    async fn get_proposed_payjoin_psbt(
3✔
567
        &self,
3✔
568
        sender: Sender<PollingForProposal>,
3✔
569
        persister: &SenderPersister,
3✔
570
        relay_manager: &mut RelayManager,
3✔
571
    ) -> Result<()> {
3✔
572
        let ohttp_relay =
3✔
573
            self.unwrap_relay_or_else_fetch(Some(&sender.endpoint()), relay_manager).await?;
3✔
574
        let mut session = sender.clone();
3✔
575
        // Long poll until we get a response
576
        loop {
577
            let (req, ctx) = session.create_poll_request(ohttp_relay.as_str())?;
5✔
578
            let response = self.post_request(req).await?;
5✔
579
            let res = session.process_response(&response.bytes().await?, ctx).save(persister);
3✔
580
            match res {
3✔
581
                Ok(OptionalTransitionOutcome::Progress(psbt)) => {
1✔
582
                    println!("Proposal received. Processing...");
1✔
583
                    self.process_pj_response(psbt)?;
1✔
584
                    return Ok(());
1✔
585
                }
586
                Ok(OptionalTransitionOutcome::Stasis(current_state)) => {
2✔
587
                    println!("No response yet.");
2✔
588
                    session = current_state;
2✔
589
                    continue;
2✔
590
                }
591
                Err(re) => {
×
592
                    println!("{re}");
×
593
                    tracing::debug!("{re:?}");
×
594
                    return Err(anyhow!("Response error").context(re));
×
595
                }
596
            }
597
        }
598
    }
1✔
599

600
    async fn long_poll_fallback(
3✔
601
        &self,
3✔
602
        session: Receiver<Initialized>,
3✔
603
        persister: &ReceiverPersister,
3✔
604
        relay_manager: &mut RelayManager,
3✔
605
    ) -> Result<Receiver<UncheckedOriginalPayload>> {
3✔
606
        let ohttp_relay = self
3✔
607
            .unwrap_relay_or_else_fetch(Some(&session.pj_uri().extras.endpoint()), relay_manager)
3✔
608
            .await?;
3✔
609

610
        let mut session = session;
1✔
611
        loop {
612
            let (req, context) = session.create_poll_request(ohttp_relay.as_str())?;
1✔
613
            println!("Polling receive request...");
1✔
614
            let ohttp_response = self.post_request(req).await?;
1✔
615
            let state_transition = session
1✔
616
                .process_response(ohttp_response.bytes().await?.to_vec().as_slice(), context)
1✔
617
                .save(persister);
1✔
618
            match state_transition {
1✔
619
                Ok(OptionalTransitionOutcome::Progress(next_state)) => {
1✔
620
                    println!("Got a request from the sender. Responding with a Payjoin proposal.");
1✔
621
                    return Ok(next_state);
1✔
622
                }
623
                Ok(OptionalTransitionOutcome::Stasis(current_state)) => {
×
624
                    session = current_state;
×
625
                    continue;
×
626
                }
627
                Err(e) => return Err(e.into()),
×
628
            }
629
        }
630
    }
1✔
631

632
    async fn process_receiver_session(
4✔
633
        &self,
4✔
634
        session: ReceiveSession,
4✔
635
        persister: &ReceiverPersister,
4✔
636
    ) -> Result<()> {
4✔
637
        let mut relay_manager = RelayManager::new();
4✔
638
        let res = {
3✔
639
            match session {
4✔
640
                ReceiveSession::Initialized(proposal) =>
3✔
641
                    self.read_from_directory(proposal, persister, &mut relay_manager).await,
3✔
642
                ReceiveSession::UncheckedOriginalPayload(proposal) =>
×
NEW
643
                    self.check_proposal(proposal, persister, &mut relay_manager).await,
×
644
                ReceiveSession::MaybeInputsOwned(proposal) =>
×
NEW
645
                    self.check_inputs_not_owned(proposal, persister, &mut relay_manager).await,
×
646
                ReceiveSession::MaybeInputsSeen(proposal) =>
×
NEW
647
                    self.check_no_inputs_seen_before(proposal, persister, &mut relay_manager).await,
×
648
                ReceiveSession::OutputsUnknown(proposal) =>
×
NEW
649
                    self.identify_receiver_outputs(proposal, persister, &mut relay_manager).await,
×
650
                ReceiveSession::WantsOutputs(proposal) =>
×
NEW
651
                    self.commit_outputs(proposal, persister, &mut relay_manager).await,
×
652
                ReceiveSession::WantsInputs(proposal) =>
×
NEW
653
                    self.contribute_inputs(proposal, persister, &mut relay_manager).await,
×
654
                ReceiveSession::WantsFeeRange(proposal) =>
×
NEW
655
                    self.apply_fee_range(proposal, persister, &mut relay_manager).await,
×
656
                ReceiveSession::ProvisionalProposal(proposal) =>
×
NEW
657
                    self.finalize_proposal(proposal, persister, &mut relay_manager).await,
×
658
                ReceiveSession::PayjoinProposal(proposal) =>
×
NEW
659
                    self.send_payjoin_proposal(proposal, persister, &mut relay_manager).await,
×
660
                ReceiveSession::HasReplyableError(error) =>
×
NEW
661
                    self.handle_error(error, persister, &mut relay_manager).await,
×
662
                ReceiveSession::Monitor(proposal) =>
1✔
663
                    self.monitor_payjoin_proposal(proposal, persister).await,
1✔
664
                ReceiveSession::Closed(_) => return Err(anyhow!("Session closed")),
×
665
            }
666
        };
667
        res
3✔
668
    }
3✔
669

670
    #[allow(clippy::incompatible_msrv)]
671
    async fn read_from_directory(
3✔
672
        &self,
3✔
673
        session: Receiver<Initialized>,
3✔
674
        persister: &ReceiverPersister,
3✔
675
        relay_manager: &mut RelayManager,
3✔
676
    ) -> Result<()> {
3✔
677
        let mut interrupt = self.interrupt.clone();
3✔
678
        let receiver = tokio::select! {
3✔
679
            res = self.long_poll_fallback(session, persister, relay_manager) => res,
3✔
680
            _ = interrupt.changed() => {
3✔
681
                println!("Interrupted. Call the `resume` command to resume all sessions.");
2✔
682
                return Err(anyhow!("Interrupted"));
2✔
683
            }
684
        }?;
×
685
        self.check_proposal(receiver, persister, relay_manager).await
1✔
686
    }
2✔
687

688
    async fn check_proposal(
1✔
689
        &self,
1✔
690
        proposal: Receiver<UncheckedOriginalPayload>,
1✔
691
        persister: &ReceiverPersister,
1✔
692
        relay_manager: &mut RelayManager,
1✔
693
    ) -> Result<()> {
1✔
694
        let wallet = self.wallet();
1✔
695
        let proposal = proposal
1✔
696
            .check_broadcast_suitability(None, |tx| {
1✔
697
                wallet
1✔
698
                    .can_broadcast(tx)
1✔
699
                    .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
1✔
700
            })
1✔
701
            .save(persister)?;
1✔
702

703
        println!("Fallback transaction received. Consider broadcasting this to get paid if the Payjoin fails:");
1✔
704
        println!("{}", serialize_hex(&proposal.extract_tx_to_schedule_broadcast()));
1✔
705
        self.check_inputs_not_owned(proposal, persister, relay_manager).await
1✔
706
    }
×
707

708
    async fn check_inputs_not_owned(
1✔
709
        &self,
1✔
710
        proposal: Receiver<MaybeInputsOwned>,
1✔
711
        persister: &ReceiverPersister,
1✔
712
        relay_manager: &mut RelayManager,
1✔
713
    ) -> Result<()> {
1✔
714
        let wallet = self.wallet();
1✔
715
        let proposal = proposal
1✔
716
            .check_inputs_not_owned(&mut |input| {
1✔
717
                wallet
1✔
718
                    .is_mine(input)
1✔
719
                    .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
1✔
720
            })
1✔
721
            .save(persister)?;
1✔
722
        self.check_no_inputs_seen_before(proposal, persister, relay_manager).await
1✔
723
    }
×
724

725
    async fn check_no_inputs_seen_before(
1✔
726
        &self,
1✔
727
        proposal: Receiver<MaybeInputsSeen>,
1✔
728
        persister: &ReceiverPersister,
1✔
729
        relay_manager: &mut RelayManager,
1✔
730
    ) -> Result<()> {
1✔
731
        let proposal = proposal
1✔
732
            .check_no_inputs_seen_before(&mut |input| {
1✔
733
                Ok(self.db.insert_input_seen_before(*input)?)
1✔
734
            })
1✔
735
            .save(persister)?;
1✔
736
        self.identify_receiver_outputs(proposal, persister, relay_manager).await
1✔
737
    }
×
738

739
    async fn identify_receiver_outputs(
1✔
740
        &self,
1✔
741
        proposal: Receiver<OutputsUnknown>,
1✔
742
        persister: &ReceiverPersister,
1✔
743
        relay_manager: &mut RelayManager,
1✔
744
    ) -> Result<()> {
1✔
745
        let wallet = self.wallet();
1✔
746
        let proposal = proposal
1✔
747
            .identify_receiver_outputs(&mut |output_script| {
2✔
748
                wallet
2✔
749
                    .is_mine(output_script)
2✔
750
                    .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
2✔
751
            })
2✔
752
            .save(persister)?;
1✔
753
        self.commit_outputs(proposal, persister, relay_manager).await
1✔
754
    }
×
755

756
    async fn commit_outputs(
1✔
757
        &self,
1✔
758
        proposal: Receiver<WantsOutputs>,
1✔
759
        persister: &ReceiverPersister,
1✔
760
        relay_manager: &mut RelayManager,
1✔
761
    ) -> Result<()> {
1✔
762
        let proposal = proposal.commit_outputs().save(persister)?;
1✔
763
        self.contribute_inputs(proposal, persister, relay_manager).await
1✔
764
    }
×
765

766
    async fn contribute_inputs(
1✔
767
        &self,
1✔
768
        proposal: Receiver<WantsInputs>,
1✔
769
        persister: &ReceiverPersister,
1✔
770
        relay_manager: &mut RelayManager,
1✔
771
    ) -> Result<()> {
1✔
772
        let wallet = self.wallet();
1✔
773
        let candidate_inputs = wallet.list_unspent()?;
1✔
774

775
        if candidate_inputs.is_empty() {
1✔
776
            return Err(anyhow::anyhow!(
×
777
                "No spendable UTXOs available in wallet. Cannot contribute inputs to payjoin."
×
778
            ));
×
779
        }
1✔
780

781
        let selected_input = proposal.try_preserving_privacy(candidate_inputs)?;
1✔
782
        let proposal =
1✔
783
            proposal.contribute_inputs(vec![selected_input])?.commit_inputs().save(persister)?;
1✔
784
        self.apply_fee_range(proposal, persister, relay_manager).await
1✔
785
    }
×
786

787
    async fn apply_fee_range(
1✔
788
        &self,
1✔
789
        proposal: Receiver<WantsFeeRange>,
1✔
790
        persister: &ReceiverPersister,
1✔
791
        relay_manager: &mut RelayManager,
1✔
792
    ) -> Result<()> {
1✔
793
        let proposal = proposal.apply_fee_range(None, self.config.max_fee_rate).save(persister)?;
1✔
794
        self.finalize_proposal(proposal, persister, relay_manager).await
1✔
795
    }
×
796

797
    async fn finalize_proposal(
1✔
798
        &self,
1✔
799
        proposal: Receiver<ProvisionalProposal>,
1✔
800
        persister: &ReceiverPersister,
1✔
801
        relay_manager: &mut RelayManager,
1✔
802
    ) -> Result<()> {
1✔
803
        let wallet = self.wallet();
1✔
804
        let proposal = proposal
1✔
805
            .finalize_proposal(|psbt| {
1✔
806
                wallet
1✔
807
                    .process_psbt(psbt)
1✔
808
                    .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
1✔
809
            })
1✔
810
            .save(persister)?;
1✔
811
        self.send_payjoin_proposal(proposal, persister, relay_manager).await
1✔
812
    }
×
813

814
    async fn send_payjoin_proposal(
1✔
815
        &self,
1✔
816
        proposal: Receiver<PayjoinProposal>,
1✔
817
        persister: &ReceiverPersister,
1✔
818
        relay_manager: &mut RelayManager,
1✔
819
    ) -> Result<()> {
1✔
820
        let (req, ohttp_ctx) = proposal
1✔
821
            .create_post_request(
1✔
822
                self.unwrap_relay_or_else_fetch(None::<&str>, relay_manager).await?.as_str(),
1✔
823
            )
824
            .map_err(|e| anyhow!("v2 req extraction failed {}", e))?;
1✔
825
        let res = self.post_request(req).await?;
1✔
826
        let payjoin_psbt = proposal.psbt().clone();
1✔
827
        let session = proposal.process_response(&res.bytes().await?, ohttp_ctx).save(persister)?;
1✔
828
        println!(
1✔
829
            "Response successful. Watch mempool for successful Payjoin. TXID: {}",
830
            payjoin_psbt.extract_tx_unchecked_fee_rate().compute_txid()
1✔
831
        );
832

833
        return self.monitor_payjoin_proposal(session, persister).await;
1✔
834
    }
×
835

836
    async fn monitor_payjoin_proposal(
2✔
837
        &self,
2✔
838
        proposal: Receiver<Monitor>,
2✔
839
        persister: &ReceiverPersister,
2✔
840
    ) -> Result<()> {
2✔
841
        // On a session resumption, the receiver will resume again in this state.
842
        let poll_interval = tokio::time::Duration::from_millis(200);
2✔
843
        let timeout_duration = tokio::time::Duration::from_secs(5);
2✔
844

845
        let mut interval = tokio::time::interval(poll_interval);
2✔
846
        interval.tick().await;
2✔
847

848
        tracing::debug!("Polling for payment confirmation");
1✔
849

850
        let result = tokio::time::timeout(timeout_duration, async {
1✔
851
            loop {
852
                interval.tick().await;
1✔
853
                let check_result = proposal
1✔
854
                    .check_payment(|txid| {
1✔
855
                        self.wallet()
1✔
856
                            .get_raw_transaction(&txid)
1✔
857
                            .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
1✔
858
                    })
1✔
859
                    .save(persister);
1✔
860

861
                match check_result {
1✔
862
                    Ok(_) => {
863
                        println!("Payjoin transaction detected in the mempool!");
1✔
864
                        return Ok(());
1✔
865
                    }
866
                    Err(_) => {
867
                        // keep polling
868

869
                        continue;
×
870
                    }
871
                }
872
            }
873
        })
1✔
874
        .await;
1✔
875

876
        match result {
1✔
877
            Ok(ok) => ok,
1✔
878
            Err(_) => Err(anyhow!(
×
879
                "Timeout waiting for payment confirmation after {:?}",
×
880
                timeout_duration
×
881
            )),
×
882
        }
883
    }
1✔
884

885
    async fn unwrap_relay_or_else_fetch(
9✔
886
        &self,
9✔
887
        directory: Option<impl payjoin::IntoUrl>,
9✔
888
        relay_manager: &mut RelayManager,
9✔
889
    ) -> Result<payjoin::Url> {
9✔
890
        let directory = directory.map(|url| url.into_url()).transpose()?;
9✔
891
        let ohttp_relay = match relay_manager.get_selected_relay() {
9✔
892
            Some(relay) => relay,
3✔
893
            None =>
894
                unwrap_ohttp_keys_or_else_fetch(&self.config, directory, relay_manager)
6✔
895
                    .await?
6✔
896
                    .relay_url,
897
        };
898
        Ok(ohttp_relay)
7✔
899
    }
7✔
900

901
    /// Handle error by attempting to send an error response over the directory
902
    async fn handle_error(
×
903
        &self,
×
904
        session: Receiver<HasReplyableError>,
×
905
        persister: &ReceiverPersister,
×
NEW
906
        relay_manager: &mut RelayManager,
×
907
    ) -> Result<()> {
×
NEW
908
        let (err_req, err_ctx) = session.create_error_request(
×
NEW
909
            self.unwrap_relay_or_else_fetch(None::<&str>, relay_manager).await?.as_str(),
×
NEW
910
        )?;
×
911

912
        let err_response = match self.post_request(err_req).await {
×
913
            Ok(response) => response,
×
914
            Err(e) => return Err(anyhow!("Failed to post error request: {}", e)),
×
915
        };
916

917
        let err_bytes = match err_response.bytes().await {
×
918
            Ok(bytes) => bytes,
×
919
            Err(e) => return Err(anyhow!("Failed to get error response bytes: {}", e)),
×
920
        };
921

922
        if let Err(e) = session.process_error_response(&err_bytes, err_ctx).save(persister) {
×
923
            return Err(anyhow!("Failed to process error response: {}", e));
×
924
        }
×
925

926
        Ok(())
×
927
    }
×
928

929
    async fn post_request(&self, req: payjoin::Request) -> Result<reqwest::Response> {
9✔
930
        let http = http_agent(&self.config)?;
9✔
931
        http.post(req.url)
9✔
932
            .header("Content-Type", req.content_type)
9✔
933
            .body(req.body)
9✔
934
            .send()
9✔
935
            .await
9✔
936
            .and_then(|r| r.error_for_status())
7✔
937
            .context("HTTP request failed")
7✔
938
    }
7✔
939
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc