• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

payjoin / rust-payjoin / 17444435859

03 Sep 2025 07:49PM UTC coverage: 84.73% (-1.2%) from 85.946%
17444435859

Pull #1039

github

web-flow
Merge e78b4ff91 into 31258eb86
Pull Request #1039: WIP - display session history

1 of 138 new or added lines in 6 files covered. (0.72%)

29 existing lines in 1 file now uncovered.

8201 of 9679 relevant lines covered (84.73%)

482.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.16
/payjoin-cli/src/app/v2/mod.rs
1
use std::fmt;
2
use std::sync::{Arc, Mutex};
3

4
use anyhow::{anyhow, Context, Result};
5
use payjoin::bitcoin::consensus::encode::serialize_hex;
6
use payjoin::bitcoin::{Amount, FeeRate};
7
use payjoin::persist::OptionalTransitionOutcome;
8
use payjoin::receive::v2::{
9
    process_err_res, replay_event_log as replay_receiver_event_log, Initialized, MaybeInputsOwned,
10
    MaybeInputsSeen, OutputsUnknown, PayjoinProposal, ProvisionalProposal, ReceiveSession,
11
    Receiver, ReceiverBuilder, SessionHistory, UncheckedOriginalPayload, WantsFeeRange,
12
    WantsInputs, WantsOutputs,
13
};
14
use payjoin::send::v2::{
15
    replay_event_log as replay_sender_event_log, SendSession, Sender, SenderBuilder, V2GetContext,
16
    WithReplyKey,
17
};
18
use payjoin::{ImplementationError, PjParam, Uri};
19
use tokio::sync::watch;
20

21
use super::config::Config;
22
use super::wallet::BitcoindWallet;
23
use super::App as AppTrait;
24
use crate::app::v2::ohttp::{unwrap_ohttp_keys_or_else_fetch, RelayManager};
25
use crate::app::{handle_interrupt, http_agent};
26
use crate::db::v2::{ReceiverPersister, SenderPersister, SessionId};
27
use crate::db::Database;
28

29
mod ohttp;
30

31
const W_ID: usize = 12;
32
const W_ROLE: usize = 25;
33
const W_STATUS: usize = 25;
34
const W_DONE: usize = 15;
35
const W_ERR: usize = 15;
36

37
#[derive(Clone)]
38
pub(crate) struct App {
39
    config: Config,
40
    db: Arc<Database>,
41
    wallet: BitcoindWallet,
42
    interrupt: watch::Receiver<()>,
43
    relay_manager: Arc<Mutex<RelayManager>>,
44
}
45

46
trait StatusText {
47
    fn status_text(&self) -> &'static str;
48
}
49

50
impl StatusText for SendSession {
NEW
51
    fn status_text(&self) -> &'static str {
×
NEW
52
        match self {
×
NEW
53
            SendSession::Uninitialized => "Waiting to send proposal",
×
NEW
54
            SendSession::WithReplyKey(_) | SendSession::V2GetContext(_) => "Waiting for proposal",
×
NEW
55
            SendSession::ProposalReceived(_) => "Proposal received",
×
NEW
56
            SendSession::TerminalFailure => "Session failure",
×
57
        }
NEW
58
    }
×
59
}
60

61
impl StatusText for ReceiveSession {
NEW
62
    fn status_text(&self) -> &'static str {
×
NEW
63
        match self {
×
NEW
64
            ReceiveSession::Uninitialized => "Uninitialized",
×
NEW
65
            ReceiveSession::Initialized(_) => "Waiting for original proposal",
×
66
            ReceiveSession::UncheckedOriginalPayload(_)
67
            | ReceiveSession::MaybeInputsOwned(_)
68
            | ReceiveSession::MaybeInputsSeen(_)
69
            | ReceiveSession::OutputsUnknown(_)
70
            | ReceiveSession::WantsOutputs(_)
71
            | ReceiveSession::WantsInputs(_)
72
            | ReceiveSession::WantsFeeRange(_)
NEW
73
            | ReceiveSession::ProvisionalProposal(_) => "Processing original proposal",
×
NEW
74
            ReceiveSession::PayjoinProposal(_) => "Payjoin proposal sent",
×
NEW
75
            ReceiveSession::TerminalFailure => "Session failure",
×
76
        }
NEW
77
    }
×
78
}
79

NEW
80
fn print_header() {
×
NEW
81
    println!(
×
NEW
82
        "{:<W_ID$} {:<W_ROLE$} {:<W_STATUS$} {:<W_DONE$} {:<W_ERR$}",
×
83
        "Session ID", "Sender/Receiver", "Status", "Completed At", "Error Message"
84
    );
NEW
85
}
×
86

87
enum Role {
88
    Sender,
89
    Receiver,
90
}
91
impl Role {
NEW
92
    fn as_str(&self) -> &'static str {
×
NEW
93
        match self {
×
NEW
94
            Role::Sender => "Sender",
×
NEW
95
            Role::Receiver => "Receiver",
×
96
        }
NEW
97
    }
×
98
}
99

100
struct SessionHistoryRow<Status> {
101
    session_id: SessionId,
102
    role: Role,
103
    status: Status,
104
    completed_at: Option<u64>,
105
    error_message: Option<String>,
106
}
107

108
impl<Status: StatusText> fmt::Display for SessionHistoryRow<Status> {
NEW
109
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
×
NEW
110
        write!(
×
NEW
111
            f,
×
NEW
112
            "{:<W_ID$} {:<W_ROLE$} {:<W_STATUS$} {:<W_DONE$} {:<W_ERR$}",
×
NEW
113
            self.session_id.to_string(),
×
NEW
114
            self.role.as_str(),
×
NEW
115
            self.status.status_text(),
×
NEW
116
            match self.completed_at {
×
NEW
117
                None => "Not Completed".to_string(),
×
NEW
118
                Some(secs) => {
×
119
                    // TODO: human readable time
NEW
120
                    secs.to_string()
×
121
                }
122
            },
NEW
123
            self.error_message.as_deref().unwrap_or("N/A")
×
124
        )
NEW
125
    }
×
126
}
127

128
#[async_trait::async_trait]
129
impl AppTrait for App {
130
    async fn new(config: Config) -> Result<Self> {
7✔
131
        let db = Arc::new(Database::create(&config.db_path)?);
132
        let relay_manager = Arc::new(Mutex::new(RelayManager::new()));
133
        let (interrupt_tx, interrupt_rx) = watch::channel(());
134
        tokio::spawn(handle_interrupt(interrupt_tx));
135
        let wallet = BitcoindWallet::new(&config.bitcoind).await?;
136
        let app = Self { config, db, wallet, interrupt: interrupt_rx, relay_manager };
137
        app.wallet()
138
            .network()
139
            .context("Failed to connect to bitcoind. Check config RPC connection.")?;
140
        Ok(app)
141
    }
7✔
142

143
    fn wallet(&self) -> BitcoindWallet { self.wallet.clone() }
21✔
144

145
    #[allow(clippy::incompatible_msrv)]
146
    async fn send_payjoin(&self, bip21: &str, fee_rate: FeeRate) -> Result<()> {
3✔
147
        use payjoin::UriExt;
148
        let uri = Uri::try_from(bip21)
149
            .map_err(|e| anyhow!("Failed to create URI from BIP21: {}", e))?
×
150
            .assume_checked()
151
            .check_pj_supported()
152
            .map_err(|_| anyhow!("URI does not support Payjoin"))?;
×
153
        let address = uri.address;
154
        let amount = uri.amount.ok_or_else(|| anyhow!("please specify the amount in the Uri"))?;
×
155
        match uri.extras.pj_param() {
156
            #[cfg(feature = "v1")]
157
            PjParam::V1(pj_param) => {
158
                use std::str::FromStr;
159

160
                let psbt = self.create_original_psbt(&address, amount, fee_rate)?;
161
                let (req, ctx) = payjoin::send::v1::SenderBuilder::from_parts(
162
                    psbt,
163
                    pj_param,
164
                    &address,
165
                    Some(amount),
166
                )
167
                .build_recommended(fee_rate)
168
                .with_context(|| "Failed to build payjoin request")?
169
                .create_v1_post_request();
170
                let http = http_agent(&self.config)?;
171
                let body = String::from_utf8(req.body.clone()).unwrap();
172
                println!("Sending fallback request to {}", &req.url);
173
                let response = http
174
                    .post(req.url)
175
                    .header("Content-Type", req.content_type)
176
                    .body(body.clone())
177
                    .send()
178
                    .await
179
                    .with_context(|| "HTTP request failed")?;
180
                let fallback_tx = payjoin::bitcoin::Psbt::from_str(&body)
181
                    .map_err(|e| anyhow!("Failed to load PSBT from base64: {}", e))?
×
182
                    .extract_tx()?;
183
                println!("Sent fallback transaction txid: {}", fallback_tx.compute_txid());
184
                println!(
185
                    "Sent fallback transaction hex: {:#}",
186
                    payjoin::bitcoin::consensus::encode::serialize_hex(&fallback_tx)
187
                );
188
                let psbt = ctx.process_response(&response.bytes().await?).map_err(|e| {
×
189
                    tracing::debug!("Error processing response: {e:?}");
×
190
                    anyhow!("Failed to process response {e}")
×
191
                })?;
×
192

193
                self.process_pj_response(psbt)?;
194
                Ok(())
195
            }
196
            PjParam::V2(pj_param) => {
197
                let receiver_pubkey = pj_param.receiver_pubkey();
198
                let sender_state =
199
                    self.db.get_send_session_ids()?.into_iter().find_map(|session_id| {
1✔
200
                        let session_receiver_pubkey = self
1✔
201
                            .db
1✔
202
                            .get_send_session_receiver_pk(&session_id)
1✔
203
                            .expect("Receiver pubkey should exist if session id exists");
1✔
204
                        if session_receiver_pubkey == *receiver_pubkey {
1✔
205
                            let sender_persister =
1✔
206
                                SenderPersister::from_id(self.db.clone(), session_id).ok()?;
1✔
207
                            let (send_session, _) = replay_sender_event_log(&sender_persister)
1✔
208
                                .map_err(|e| anyhow!("Failed to replay sender event log: {:?}", e))
1✔
209
                                .ok()?;
1✔
210

211
                            Some((send_session, sender_persister))
1✔
212
                        } else {
213
                            None
×
214
                        }
215
                    });
1✔
216

217
                let (sender_state, persister) = match sender_state {
218
                    Some((sender_state, persister)) => (sender_state, persister),
219
                    None => {
220
                        let persister =
221
                            SenderPersister::new(self.db.clone(), receiver_pubkey.clone())?;
222
                        let psbt = self.create_original_psbt(&address, amount, fee_rate)?;
223
                        let sender =
224
                            SenderBuilder::from_parts(psbt, pj_param, &address, Some(amount))
225
                                .build_recommended(fee_rate)?
226
                                .save(&persister)?;
227

228
                        (SendSession::WithReplyKey(sender), persister)
229
                    }
230
                };
231
                let mut interrupt = self.interrupt.clone();
232
                tokio::select! {
233
                    _ = self.process_sender_session(sender_state, &persister) => return Ok(()),
234
                    _ = interrupt.changed() => {
235
                        println!("Interrupted. Call `send` with the same arguments to resume this session or `resume` to resume all sessions.");
236
                        return Err(anyhow!("Interrupted"))
237
                    }
238
                }
239
            }
240
            _ => unimplemented!("Unrecognized payjoin version"),
241
        }
242
    }
3✔
243

244
    async fn receive_payjoin(&self, amount: Amount) -> Result<()> {
1✔
245
        let address = self.wallet().get_new_address()?;
246
        let ohttp_keys =
247
            unwrap_ohttp_keys_or_else_fetch(&self.config, None, self.relay_manager.clone())
248
                .await?
249
                .ohttp_keys;
250
        let persister = ReceiverPersister::new(self.db.clone())?;
251
        let session =
252
            ReceiverBuilder::new(address, self.config.v2()?.pj_directory.clone(), ohttp_keys)?
253
                .with_amount(amount)
254
                .with_max_fee_rate(self.config.max_fee_rate.unwrap_or(FeeRate::BROADCAST_MIN))
255
                .build()
256
                .save(&persister)?;
257

258
        println!("Receive session established");
259
        let pj_uri = session.pj_uri();
260
        println!("Request Payjoin by sharing this Payjoin Uri:");
261
        println!("{}", pj_uri);
262

263
        self.process_receiver_session(ReceiveSession::Initialized(session.clone()), &persister)
264
            .await?;
265
        Ok(())
266
    }
1✔
267

268
    #[allow(clippy::incompatible_msrv)]
269
    async fn resume_payjoins(&self) -> Result<()> {
3✔
270
        let recv_session_ids = self.db.get_recv_session_ids()?;
271
        let send_session_ids = self.db.get_send_session_ids()?;
272

273
        if recv_session_ids.is_empty() && send_session_ids.is_empty() {
274
            println!("No sessions to resume.");
275
            return Ok(());
276
        }
277

278
        let mut tasks = Vec::new();
279

280
        for session_id in recv_session_ids {
281
            let self_clone = self.clone();
282
            let recv_persister = ReceiverPersister::from_id(self.db.clone(), session_id)?;
283
            let receiver_state = replay_receiver_event_log(&recv_persister)
284
                .map_err(|e| anyhow!("Failed to replay receiver event log: {:?}", e))?
×
285
                .0;
286
            tasks.push(tokio::spawn(async move {
1✔
287
                self_clone.process_receiver_session(receiver_state, &recv_persister).await
1✔
288
            }));
1✔
289
        }
290

291
        for session_id in send_session_ids {
292
            let sender_persiter = SenderPersister::from_id(self.db.clone(), session_id)?;
293
            let sender_state = replay_sender_event_log(&sender_persiter)
294
                .map_err(|e| anyhow!("Failed to replay sender event log: {:?}", e))?
×
295
                .0;
296
            let self_clone = self.clone();
297
            tasks.push(tokio::spawn(async move {
×
298
                self_clone.process_sender_session(sender_state, &sender_persiter).await
×
299
            }));
×
300
        }
301

302
        let mut interrupt = self.interrupt.clone();
303
        tokio::select! {
304
            _ = async {
1✔
305
                for task in tasks {
2✔
306
                    let _ = task.await;
1✔
307
                }
308
            } => {
1✔
309
                println!("All resumed sessions completed.");
310
            }
311
            _ = interrupt.changed() => {
312
                println!("Resumed sessions were interrupted.");
313
            }
314
        }
315
        Ok(())
316
    }
3✔
317

318
    #[cfg(feature = "v2")]
NEW
319
    async fn history(&self) -> Result<()> {
×
320
        print_header();
321

NEW
322
        self.db.get_send_session_ids()?.into_iter().try_for_each(|session_id| {
×
NEW
323
            let persister = SenderPersister::from_id(self.db.clone(), session_id.clone())?;
×
NEW
324
            let (sender_state, _) = replay_sender_event_log(&persister)
×
NEW
325
                .map_err(|e| anyhow!("Failed to replay sender event log: {:?}", e))?;
×
NEW
326
            let row = SessionHistoryRow {
×
NEW
327
                session_id,
×
NEW
328
                role: Role::Sender,
×
NEW
329
                status: sender_state,
×
NEW
330
                completed_at: None,
×
NEW
331
                error_message: None,
×
NEW
332
            };
×
NEW
333
            println!("{}", row);
×
NEW
334
            Ok::<_, anyhow::Error>(())
×
NEW
335
        })?;
×
336

NEW
337
        self.db.get_recv_session_ids()?.into_iter().try_for_each(|session_id| {
×
NEW
338
            let persister = ReceiverPersister::from_id(self.db.clone(), session_id.clone())?;
×
NEW
339
            let (receiver_state, _) = replay_receiver_event_log(&persister)
×
NEW
340
                .map_err(|e| anyhow!("Failed to replay receiver event log: {:?}", e))?;
×
NEW
341
            let row = SessionHistoryRow {
×
NEW
342
                session_id,
×
NEW
343
                role: Role::Receiver,
×
NEW
344
                status: receiver_state,
×
NEW
345
                completed_at: None,
×
NEW
346
                error_message: None,
×
NEW
347
            };
×
NEW
348
            println!("{}", row);
×
NEW
349
            Ok::<_, anyhow::Error>(())
×
NEW
350
        })?;
×
351

352
        self.db.get_inactive_send_session_ids()?.into_iter().try_for_each(
NEW
353
            |(session_id, completed_at)| {
×
NEW
354
                let persister = SenderPersister::from_id(self.db.clone(), session_id.clone())?;
×
NEW
355
                let (sender_state, session_history) = replay_sender_event_log(&persister)
×
NEW
356
                    .map_err(|e| anyhow!("Failed to replay sender event log: {:?}", e))?;
×
NEW
357
                let row = SessionHistoryRow {
×
NEW
358
                    session_id,
×
NEW
359
                    role: Role::Sender,
×
NEW
360
                    status: sender_state,
×
NEW
361
                    completed_at: Some(completed_at),
×
NEW
362
                    error_message: session_history.terminal_error(),
×
NEW
363
                };
×
NEW
364
                println!("{}", row);
×
NEW
365
                Ok::<_, anyhow::Error>(())
×
NEW
366
            },
×
367
        )?;
368

369
        self.db.get_inactive_recv_session_ids()?.into_iter().try_for_each(
NEW
370
            |(session_id, completed_at)| {
×
NEW
371
                let persister = ReceiverPersister::from_id(self.db.clone(), session_id.clone())?;
×
NEW
372
                let (receiver_state, session_history) = replay_receiver_event_log(&persister)
×
NEW
373
                    .map_err(|e| anyhow!("Failed to replay receiver event log: {:?}", e))?;
×
NEW
374
                let row = SessionHistoryRow {
×
NEW
375
                    session_id,
×
NEW
376
                    role: Role::Receiver,
×
NEW
377
                    status: receiver_state,
×
NEW
378
                    completed_at: Some(completed_at),
×
NEW
379
                    error_message: session_history.terminal_error().map(|e| e.0),
×
380
                };
NEW
381
                println!("{}", row);
×
NEW
382
                Ok::<_, anyhow::Error>(())
×
NEW
383
            },
×
384
        )?;
385
        Ok(())
NEW
386
    }
×
387
}
388

389
impl App {
390
    async fn process_sender_session(
2✔
391
        &self,
2✔
392
        session: SendSession,
2✔
393
        persister: &SenderPersister,
2✔
394
    ) -> Result<()> {
2✔
395
        match session {
2✔
396
            SendSession::WithReplyKey(context) =>
1✔
397
                self.post_original_proposal(context, persister).await?,
1✔
398
            SendSession::V2GetContext(context) =>
1✔
399
                self.get_proposed_payjoin_psbt(context, persister).await?,
1✔
UNCOV
400
            SendSession::ProposalReceived(proposal) => {
×
UNCOV
401
                self.process_pj_response(proposal)?;
×
UNCOV
402
                return Ok(());
×
403
            }
404
            _ => return Err(anyhow!("Unexpected sender state")),
×
405
        }
406
        Ok(())
1✔
407
    }
1✔
408

409
    async fn post_original_proposal(
1✔
410
        &self,
1✔
411
        sender: Sender<WithReplyKey>,
1✔
412
        persister: &SenderPersister,
1✔
413
    ) -> Result<()> {
1✔
414
        let (req, ctx) = sender.create_v2_post_request(
1✔
415
            self.unwrap_relay_or_else_fetch(Some(sender.endpoint().clone())).await?,
1✔
UNCOV
416
        )?;
×
417
        let response = self.post_request(req).await?;
1✔
418
        println!("Posted original proposal...");
1✔
419
        let sender = sender.process_response(&response.bytes().await?, ctx).save(persister)?;
1✔
420
        self.get_proposed_payjoin_psbt(sender, persister).await
1✔
UNCOV
421
    }
×
422

423
    async fn get_proposed_payjoin_psbt(
2✔
424
        &self,
2✔
425
        sender: Sender<V2GetContext>,
2✔
426
        persister: &SenderPersister,
2✔
427
    ) -> Result<()> {
2✔
428
        let mut session = sender.clone();
2✔
429
        // Long poll until we get a response
430
        loop {
431
            let (req, ctx) = session.create_poll_request(
3✔
432
                self.unwrap_relay_or_else_fetch(Some(session.endpoint().clone())).await?,
3✔
UNCOV
433
            )?;
×
434
            let response = self.post_request(req).await?;
3✔
435
            let res = session.process_response(&response.bytes().await?, ctx).save(persister);
2✔
436
            match res {
2✔
437
                Ok(OptionalTransitionOutcome::Progress(psbt)) => {
1✔
438
                    println!("Proposal received. Processing...");
1✔
439
                    self.process_pj_response(psbt)?;
1✔
440
                    return Ok(());
1✔
441
                }
442
                Ok(OptionalTransitionOutcome::Stasis(current_state)) => {
1✔
443
                    println!("No response yet.");
1✔
444
                    session = current_state;
1✔
445
                    continue;
1✔
446
                }
UNCOV
447
                Err(re) => {
×
UNCOV
448
                    println!("{re}");
×
UNCOV
449
                    tracing::debug!("{re:?}");
×
450
                    return Err(anyhow!("Response error").context(re));
×
451
                }
452
            }
453
        }
454
    }
1✔
455

456
    async fn long_poll_fallback(
2✔
457
        &self,
2✔
458
        session: Receiver<Initialized>,
2✔
459
        persister: &ReceiverPersister,
2✔
460
    ) -> Result<Receiver<UncheckedOriginalPayload>> {
2✔
461
        let ohttp_relay = self
1✔
462
            .unwrap_relay_or_else_fetch(Some(session.pj_uri().extras.endpoint().clone()))
1✔
463
            .await?;
1✔
464

465
        let mut session = session;
1✔
466
        loop {
467
            let (req, context) = session.create_poll_request(&ohttp_relay)?;
1✔
468
            println!("Polling receive request...");
1✔
469
            let ohttp_response = self.post_request(req).await?;
1✔
470
            let state_transition = session
1✔
471
                .process_response(ohttp_response.bytes().await?.to_vec().as_slice(), context)
1✔
472
                .save(persister);
1✔
473
            match state_transition {
1✔
474
                Ok(OptionalTransitionOutcome::Progress(next_state)) => {
1✔
475
                    println!("Got a request from the sender. Responding with a Payjoin proposal.");
1✔
476
                    return Ok(next_state);
1✔
477
                }
UNCOV
478
                Ok(OptionalTransitionOutcome::Stasis(current_state)) => {
×
UNCOV
479
                    session = current_state;
×
UNCOV
480
                    continue;
×
481
                }
482
                Err(e) => return Err(e.into()),
×
483
            }
484
        }
485
    }
1✔
486

487
    async fn process_receiver_session(
2✔
488
        &self,
2✔
489
        session: ReceiveSession,
2✔
490
        persister: &ReceiverPersister,
2✔
491
    ) -> Result<()> {
2✔
492
        let res = {
2✔
493
            match session {
2✔
494
                ReceiveSession::Initialized(proposal) =>
2✔
495
                    self.read_from_directory(proposal, persister).await,
2✔
UNCOV
496
                ReceiveSession::UncheckedOriginalPayload(proposal) =>
×
UNCOV
497
                    self.check_proposal(proposal, persister).await,
×
UNCOV
498
                ReceiveSession::MaybeInputsOwned(proposal) =>
×
499
                    self.check_inputs_not_owned(proposal, persister).await,
×
500
                ReceiveSession::MaybeInputsSeen(proposal) =>
×
501
                    self.check_no_inputs_seen_before(proposal, persister).await,
×
502
                ReceiveSession::OutputsUnknown(proposal) =>
×
503
                    self.identify_receiver_outputs(proposal, persister).await,
×
504
                ReceiveSession::WantsOutputs(proposal) =>
×
505
                    self.commit_outputs(proposal, persister).await,
×
506
                ReceiveSession::WantsInputs(proposal) =>
×
507
                    self.contribute_inputs(proposal, persister).await,
×
508
                ReceiveSession::WantsFeeRange(proposal) =>
×
509
                    self.apply_fee_range(proposal, persister).await,
×
510
                ReceiveSession::ProvisionalProposal(proposal) =>
×
511
                    self.finalize_proposal(proposal, persister).await,
×
512
                ReceiveSession::PayjoinProposal(proposal) =>
×
513
                    self.send_payjoin_proposal(proposal, persister).await,
×
514
                ReceiveSession::Uninitialized =>
515
                    return Err(anyhow!("Uninitialized receiver session")),
×
516
                ReceiveSession::TerminalFailure =>
UNCOV
517
                    return Err(anyhow!("Terminal receiver session")),
×
518
            }
519
        };
520

521
        match res {
2✔
522
            Ok(_) => Ok(()),
1✔
523
            Err(e) => {
1✔
524
                let (_, session_history) = replay_receiver_event_log(persister)?;
1✔
525
                let pj_uri = match session_history.pj_uri() {
1✔
526
                    Some(uri) => Some(uri.extras.endpoint().clone()),
1✔
UNCOV
527
                    None => None,
×
528
                };
529
                let ohttp_relay = self.unwrap_relay_or_else_fetch(pj_uri).await?;
1✔
530
                self.handle_recoverable_error(&ohttp_relay, &session_history).await?;
1✔
531

532
                Err(e)
1✔
533
            }
534
        }
535
    }
2✔
536

537
    #[allow(clippy::incompatible_msrv)]
538
    async fn read_from_directory(
2✔
539
        &self,
2✔
540
        session: Receiver<Initialized>,
2✔
541
        persister: &ReceiverPersister,
2✔
542
    ) -> Result<()> {
2✔
543
        let mut interrupt = self.interrupt.clone();
2✔
544
        let receiver = tokio::select! {
2✔
545
            res = self.long_poll_fallback(session, persister) => res,
2✔
546
            _ = interrupt.changed() => {
2✔
547
                println!("Interrupted. Call the `resume` command to resume all sessions.");
1✔
548
                return Err(anyhow!("Interrupted"));
1✔
549
            }
UNCOV
550
        }?;
×
551
        self.check_proposal(receiver, persister).await
1✔
552
    }
2✔
553

554
    async fn check_proposal(
1✔
555
        &self,
1✔
556
        proposal: Receiver<UncheckedOriginalPayload>,
1✔
557
        persister: &ReceiverPersister,
1✔
558
    ) -> Result<()> {
1✔
559
        let wallet = self.wallet();
1✔
560
        let proposal = proposal
1✔
561
            .check_broadcast_suitability(None, |tx| {
1✔
562
                wallet
1✔
563
                    .can_broadcast(tx)
1✔
564
                    .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
1✔
565
            })
1✔
566
            .save(persister)?;
1✔
567

568
        println!("Fallback transaction received. Consider broadcasting this to get paid if the Payjoin fails:");
1✔
569
        println!("{}", serialize_hex(&proposal.extract_tx_to_schedule_broadcast()));
1✔
570
        self.check_inputs_not_owned(proposal, persister).await
1✔
571
    }
1✔
572

573
    async fn check_inputs_not_owned(
1✔
574
        &self,
1✔
575
        proposal: Receiver<MaybeInputsOwned>,
1✔
576
        persister: &ReceiverPersister,
1✔
577
    ) -> Result<()> {
1✔
578
        let wallet = self.wallet();
1✔
579
        let proposal = proposal
1✔
580
            .check_inputs_not_owned(&mut |input| {
1✔
581
                wallet
1✔
582
                    .is_mine(input)
1✔
583
                    .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
1✔
584
            })
1✔
585
            .save(persister)?;
1✔
586
        self.check_no_inputs_seen_before(proposal, persister).await
1✔
587
    }
1✔
588

589
    async fn check_no_inputs_seen_before(
1✔
590
        &self,
1✔
591
        proposal: Receiver<MaybeInputsSeen>,
1✔
592
        persister: &ReceiverPersister,
1✔
593
    ) -> Result<()> {
1✔
594
        let proposal = proposal
1✔
595
            .check_no_inputs_seen_before(&mut |input| {
1✔
596
                Ok(self.db.insert_input_seen_before(*input)?)
1✔
597
            })
1✔
598
            .save(persister)?;
1✔
599
        self.identify_receiver_outputs(proposal, persister).await
1✔
600
    }
1✔
601

602
    async fn identify_receiver_outputs(
1✔
603
        &self,
1✔
604
        proposal: Receiver<OutputsUnknown>,
1✔
605
        persister: &ReceiverPersister,
1✔
606
    ) -> Result<()> {
1✔
607
        let wallet = self.wallet();
1✔
608
        let proposal = proposal
1✔
609
            .identify_receiver_outputs(&mut |output_script| {
2✔
610
                wallet
2✔
611
                    .is_mine(output_script)
2✔
612
                    .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
2✔
613
            })
2✔
614
            .save(persister)?;
1✔
615
        self.commit_outputs(proposal, persister).await
1✔
616
    }
1✔
617

618
    async fn commit_outputs(
1✔
619
        &self,
1✔
620
        proposal: Receiver<WantsOutputs>,
1✔
621
        persister: &ReceiverPersister,
1✔
622
    ) -> Result<()> {
1✔
623
        let proposal = proposal.commit_outputs().save(persister)?;
1✔
624
        self.contribute_inputs(proposal, persister).await
1✔
625
    }
1✔
626

627
    async fn contribute_inputs(
1✔
628
        &self,
1✔
629
        proposal: Receiver<WantsInputs>,
1✔
630
        persister: &ReceiverPersister,
1✔
631
    ) -> Result<()> {
1✔
632
        let wallet = self.wallet();
1✔
633
        let candidate_inputs = wallet.list_unspent()?;
1✔
634

635
        let selected_input = proposal.try_preserving_privacy(candidate_inputs)?;
1✔
636
        let proposal =
1✔
637
            proposal.contribute_inputs(vec![selected_input])?.commit_inputs().save(persister)?;
1✔
638
        self.apply_fee_range(proposal, persister).await
1✔
639
    }
1✔
640

641
    async fn apply_fee_range(
1✔
642
        &self,
1✔
643
        proposal: Receiver<WantsFeeRange>,
1✔
644
        persister: &ReceiverPersister,
1✔
645
    ) -> Result<()> {
1✔
646
        let proposal = proposal.apply_fee_range(None, self.config.max_fee_rate).save(persister)?;
1✔
647
        self.finalize_proposal(proposal, persister).await
1✔
648
    }
1✔
649

650
    async fn finalize_proposal(
1✔
651
        &self,
1✔
652
        proposal: Receiver<ProvisionalProposal>,
1✔
653
        persister: &ReceiverPersister,
1✔
654
    ) -> Result<()> {
1✔
655
        let wallet = self.wallet();
1✔
656
        let proposal = proposal
1✔
657
            .finalize_proposal(|psbt| {
1✔
658
                wallet
1✔
659
                    .process_psbt(psbt)
1✔
660
                    .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
1✔
661
            })
1✔
662
            .save(persister)?;
1✔
663
        self.send_payjoin_proposal(proposal, persister).await
1✔
664
    }
1✔
665

666
    async fn send_payjoin_proposal(
1✔
667
        &self,
1✔
668
        mut proposal: Receiver<PayjoinProposal>,
1✔
669
        persister: &ReceiverPersister,
1✔
670
    ) -> Result<()> {
1✔
671
        let (req, ohttp_ctx) = proposal
1✔
672
            .create_post_request(&self.unwrap_relay_or_else_fetch(None).await?)
1✔
673
            .map_err(|e| anyhow!("v2 req extraction failed {}", e))?;
1✔
674
        let res = self.post_request(req).await?;
1✔
675
        let payjoin_psbt = proposal.psbt().clone();
1✔
676
        proposal.process_response(&res.bytes().await?, ohttp_ctx).save(persister)?;
1✔
677
        println!(
1✔
678
            "Response successful. Watch mempool for successful Payjoin. TXID: {}",
1✔
679
            payjoin_psbt.extract_tx_unchecked_fee_rate().compute_txid()
1✔
680
        );
681
        Ok(())
1✔
682
    }
1✔
683

684
    async fn unwrap_relay_or_else_fetch(
7✔
685
        &self,
7✔
686
        directory: Option<payjoin::Url>,
7✔
687
    ) -> Result<payjoin::Url> {
7✔
688
        let selected_relay =
7✔
689
            self.relay_manager.lock().expect("Lock should not be poisoned").get_selected_relay();
7✔
690
        let ohttp_relay = match selected_relay {
7✔
691
            Some(relay) => relay,
3✔
692
            None =>
693
                unwrap_ohttp_keys_or_else_fetch(&self.config, directory, self.relay_manager.clone())
4✔
694
                    .await?
4✔
695
                    .relay_url,
696
        };
697
        Ok(ohttp_relay)
7✔
698
    }
7✔
699

700
    /// Handle request error by sending an error response over the directory
701
    async fn handle_recoverable_error(
1✔
702
        &self,
1✔
703
        ohttp_relay: &payjoin::Url,
1✔
704
        session_history: &SessionHistory,
1✔
705
    ) -> Result<()> {
1✔
706
        let e = match session_history.terminal_error() {
1✔
UNCOV
707
            Some((_, Some(e))) => e,
×
708
            _ => return Ok(()),
1✔
709
        };
710
        let (err_req, err_ctx) = session_history
×
UNCOV
711
            .extract_err_req(ohttp_relay)?
×
UNCOV
712
            .expect("If JsonReply is Some, then err_req and err_ctx should be Some");
×
713
        let to_return = anyhow!("Replied with error: {}", e.to_json().to_string());
×
714

715
        let err_response = match self.post_request(err_req).await {
×
716
            Ok(response) => response,
×
UNCOV
717
            Err(e) => return Err(anyhow!("Failed to post error request: {}", e)),
×
718
        };
719

720
        let err_bytes = match err_response.bytes().await {
×
UNCOV
721
            Ok(bytes) => bytes,
×
UNCOV
722
            Err(e) => return Err(anyhow!("Failed to get error response bytes: {}", e)),
×
723
        };
724

725
        if let Err(e) = process_err_res(&err_bytes, err_ctx) {
×
UNCOV
726
            return Err(anyhow!("Failed to process error response: {}", e));
×
UNCOV
727
        }
×
728

729
        Err(to_return)
×
730
    }
1✔
731

732
    async fn post_request(&self, req: payjoin::Request) -> Result<reqwest::Response> {
6✔
733
        let http = http_agent(&self.config)?;
6✔
734
        http.post(req.url)
6✔
735
            .header("Content-Type", req.content_type)
6✔
736
            .body(req.body)
6✔
737
            .send()
6✔
738
            .await
6✔
739
            .map_err(map_reqwest_err)
5✔
740
    }
5✔
741
}
742

UNCOV
743
fn map_reqwest_err(e: reqwest::Error) -> anyhow::Error {
×
UNCOV
744
    match e.status() {
×
UNCOV
745
        Some(status_code) => anyhow!("HTTP request failed: {} {}", status_code, e),
×
746
        None => anyhow!("No HTTP response: {}", e),
×
747
    }
748
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc