• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

payjoin / rust-payjoin / 18230715366

03 Oct 2025 06:34PM UTC coverage: 84.18% (-0.06%) from 84.237%
18230715366

Pull #1141

github

web-flow
Merge 1460d24bb into 5335134f3
Pull Request #1141: Return String instead of Url for endpoint accessors

97 of 99 new or added lines in 8 files covered. (97.98%)

4 existing lines in 3 files now uncovered.

8934 of 10613 relevant lines covered (84.18%)

466.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.11
/payjoin-cli/src/app/v2/mod.rs
1
use std::fmt;
2
use std::sync::{Arc, Mutex};
3

4
use anyhow::{anyhow, Context, Result};
5
use payjoin::bitcoin::consensus::encode::serialize_hex;
6
use payjoin::bitcoin::{Amount, FeeRate};
7
use payjoin::persist::OptionalTransitionOutcome;
8
use payjoin::receive::v2::{
9
    replay_event_log as replay_receiver_event_log, HasReplyableError, Initialized,
10
    MaybeInputsOwned, MaybeInputsSeen, Monitor, OutputsUnknown, PayjoinProposal,
11
    ProvisionalProposal, ReceiveSession, Receiver, ReceiverBuilder,
12
    SessionOutcome as ReceiverSessionOutcome, UncheckedOriginalPayload, WantsFeeRange, WantsInputs,
13
    WantsOutputs,
14
};
15
use payjoin::send::v2::{
16
    replay_event_log as replay_sender_event_log, PollingForProposal, SendSession, Sender,
17
    SenderBuilder, SessionOutcome as SenderSessionOutcome, WithReplyKey,
18
};
19
use payjoin::{ImplementationError, PjParam, Uri};
20
use tokio::sync::watch;
21

22
use super::config::Config;
23
use super::wallet::BitcoindWallet;
24
use super::App as AppTrait;
25
use crate::app::v2::ohttp::{unwrap_ohttp_keys_or_else_fetch, RelayManager};
26
use crate::app::{handle_interrupt, http_agent};
27
use crate::db::v2::{ReceiverPersister, SenderPersister, SessionId};
28
use crate::db::Database;
29

30
mod ohttp;
31

32
const W_ID: usize = 12;
33
const W_ROLE: usize = 25;
34
const W_DONE: usize = 15;
35
const W_STATUS: usize = 15;
36

37
#[derive(Clone)]
38
pub(crate) struct App {
39
    config: Config,
40
    db: Arc<Database>,
41
    wallet: BitcoindWallet,
42
    interrupt: watch::Receiver<()>,
43
    relay_manager: Arc<Mutex<RelayManager>>,
44
}
45

46
trait StatusText {
47
    fn status_text(&self) -> &'static str;
48
}
49

50
impl StatusText for SendSession {
51
    fn status_text(&self) -> &'static str {
×
52
        match self {
×
53
            SendSession::WithReplyKey(_) | SendSession::PollingForProposal(_) =>
54
                "Waiting for proposal",
×
55
            SendSession::ProposalReceived(_) => "Proposal received",
×
56
            SendSession::Closed(session_outcome) => match session_outcome {
×
57
                SenderSessionOutcome::Failure => "Session failure",
×
58
                SenderSessionOutcome::Success => "Session success",
×
59
                SenderSessionOutcome::Cancel => "Session cancelled",
×
60
            },
61
        }
62
    }
×
63
}
64

65
impl StatusText for ReceiveSession {
66
    fn status_text(&self) -> &'static str {
×
67
        match self {
×
68
            ReceiveSession::Initialized(_) => "Waiting for original proposal",
×
69
            ReceiveSession::UncheckedOriginalPayload(_)
70
            | ReceiveSession::MaybeInputsOwned(_)
71
            | ReceiveSession::MaybeInputsSeen(_)
72
            | ReceiveSession::OutputsUnknown(_)
73
            | ReceiveSession::WantsOutputs(_)
74
            | ReceiveSession::WantsInputs(_)
75
            | ReceiveSession::WantsFeeRange(_)
76
            | ReceiveSession::ProvisionalProposal(_) => "Processing original proposal",
×
77
            ReceiveSession::PayjoinProposal(_) => "Payjoin proposal sent",
×
78
            ReceiveSession::HasReplyableError(_) =>
79
                "Session failure, waiting to post error response",
×
80
            ReceiveSession::Monitor(_) => "Monitoring payjoin proposal",
×
81
            ReceiveSession::Closed(session_outcome) => match session_outcome {
×
82
                ReceiverSessionOutcome::Failure => "Session failure",
×
83
                ReceiverSessionOutcome::Success(_) => "Session success",
×
84
                ReceiverSessionOutcome::Cancel => "Session cancelled",
×
85
                ReceiverSessionOutcome::FallbackBroadcasted => "Fallback broadcasted",
×
86
            },
87
        }
88
    }
×
89
}
90

91
fn print_header() {
×
92
    println!(
×
93
        "{:<W_ID$} {:<W_ROLE$} {:<W_DONE$} {:<W_STATUS$}",
×
94
        "Session ID", "Sender/Receiver", "Completed At", "Status"
95
    );
96
}
×
97

98
enum Role {
99
    Sender,
100
    Receiver,
101
}
102
impl Role {
103
    fn as_str(&self) -> &'static str {
×
104
        match self {
×
105
            Role::Sender => "Sender",
×
106
            Role::Receiver => "Receiver",
×
107
        }
108
    }
×
109
}
110

111
struct SessionHistoryRow<Status> {
112
    session_id: SessionId,
113
    role: Role,
114
    status: Status,
115
    completed_at: Option<u64>,
116
    error_message: Option<String>,
117
}
118

119
impl<Status: StatusText> fmt::Display for SessionHistoryRow<Status> {
120
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
×
121
        write!(
×
122
            f,
×
123
            "{:<W_ID$} {:<W_ROLE$} {:<W_DONE$} {:<W_STATUS$}",
×
124
            self.session_id.to_string(),
×
125
            self.role.as_str(),
×
126
            match self.completed_at {
×
127
                None => "Not Completed".to_string(),
×
128
                Some(secs) => {
×
129
                    // TODO: human readable time
130
                    secs.to_string()
×
131
                }
132
            },
133
            self.error_message.as_deref().unwrap_or(self.status.status_text())
×
134
        )
135
    }
×
136
}
137

138
#[async_trait::async_trait]
139
impl AppTrait for App {
140
    async fn new(config: Config) -> Result<Self> {
8✔
141
        let db = Arc::new(Database::create(&config.db_path)?);
142
        let relay_manager = Arc::new(Mutex::new(RelayManager::new()));
143
        let (interrupt_tx, interrupt_rx) = watch::channel(());
144
        tokio::spawn(handle_interrupt(interrupt_tx));
145
        let wallet = BitcoindWallet::new(&config.bitcoind).await?;
146
        let app = Self { config, db, wallet, interrupt: interrupt_rx, relay_manager };
147
        app.wallet()
148
            .network()
149
            .context("Failed to connect to bitcoind. Check config RPC connection.")?;
150
        Ok(app)
151
    }
8✔
152

153
    fn wallet(&self) -> BitcoindWallet { self.wallet.clone() }
22✔
154

155
    #[allow(clippy::incompatible_msrv)]
156
    async fn send_payjoin(&self, bip21: &str, fee_rate: FeeRate) -> Result<()> {
3✔
157
        use payjoin::UriExt;
158
        let uri = Uri::try_from(bip21)
159
            .map_err(|e| anyhow!("Failed to create URI from BIP21: {}", e))?
×
160
            .assume_checked()
161
            .check_pj_supported()
162
            .map_err(|_| anyhow!("URI does not support Payjoin"))?;
×
163
        let address = uri.address;
164
        let amount = uri.amount.ok_or_else(|| anyhow!("please specify the amount in the Uri"))?;
×
165
        match uri.extras.pj_param() {
166
            #[cfg(feature = "v1")]
167
            PjParam::V1(pj_param) => {
168
                use std::str::FromStr;
169

170
                let psbt = self.create_original_psbt(&address, amount, fee_rate)?;
171
                let (req, ctx) = payjoin::send::v1::SenderBuilder::from_parts(
172
                    psbt,
173
                    pj_param,
174
                    &address,
175
                    Some(amount),
176
                )
177
                .build_recommended(fee_rate)
178
                .with_context(|| "Failed to build payjoin request")?
179
                .create_v1_post_request();
180
                let http = http_agent(&self.config)?;
181
                let body = String::from_utf8(req.body.clone()).unwrap();
182
                println!("Sending fallback request to {}", &req.url);
183
                let response = http
184
                    .post(req.url)
185
                    .header("Content-Type", req.content_type)
186
                    .body(body.clone())
187
                    .send()
188
                    .await
189
                    .with_context(|| "HTTP request failed")?;
190
                let fallback_tx = payjoin::bitcoin::Psbt::from_str(&body)
191
                    .map_err(|e| anyhow!("Failed to load PSBT from base64: {}", e))?
×
192
                    .extract_tx()?;
193
                println!("Sent fallback transaction txid: {}", fallback_tx.compute_txid());
194
                println!(
195
                    "Sent fallback transaction hex: {:#}",
196
                    payjoin::bitcoin::consensus::encode::serialize_hex(&fallback_tx)
197
                );
198
                let psbt = ctx.process_response(&response.bytes().await?).map_err(|e| {
×
199
                    tracing::debug!("Error processing response: {e:?}");
×
200
                    anyhow!("Failed to process response {e}")
×
201
                })?;
×
202

203
                self.process_pj_response(psbt)?;
204
                Ok(())
205
            }
206
            PjParam::V2(pj_param) => {
207
                let receiver_pubkey = pj_param.receiver_pubkey();
208
                let sender_state =
209
                    self.db.get_send_session_ids()?.into_iter().find_map(|session_id| {
1✔
210
                        let session_receiver_pubkey = self
1✔
211
                            .db
1✔
212
                            .get_send_session_receiver_pk(&session_id)
1✔
213
                            .expect("Receiver pubkey should exist if session id exists");
1✔
214
                        if session_receiver_pubkey == *receiver_pubkey {
1✔
215
                            let sender_persister =
1✔
216
                                SenderPersister::from_id(self.db.clone(), session_id);
1✔
217
                            let (send_session, _) = replay_sender_event_log(&sender_persister)
1✔
218
                                .map_err(|e| anyhow!("Failed to replay sender event log: {:?}", e))
1✔
219
                                .ok()?;
1✔
220

221
                            Some((send_session, sender_persister))
1✔
222
                        } else {
223
                            None
×
224
                        }
225
                    });
1✔
226

227
                let (sender_state, persister) = match sender_state {
228
                    Some((sender_state, persister)) => (sender_state, persister),
229
                    None => {
230
                        let persister =
231
                            SenderPersister::new(self.db.clone(), receiver_pubkey.clone())?;
232
                        let psbt = self.create_original_psbt(&address, amount, fee_rate)?;
233
                        let sender =
234
                            SenderBuilder::from_parts(psbt, pj_param, &address, Some(amount))
235
                                .build_recommended(fee_rate)?
236
                                .save(&persister)?;
237

238
                        (SendSession::WithReplyKey(sender), persister)
239
                    }
240
                };
241
                let mut interrupt = self.interrupt.clone();
242
                tokio::select! {
243
                    _ = self.process_sender_session(sender_state, &persister) => return Ok(()),
244
                    _ = interrupt.changed() => {
245
                        println!("Interrupted. Call `send` with the same arguments to resume this session or `resume` to resume all sessions.");
246
                        return Err(anyhow!("Interrupted"))
247
                    }
248
                }
249
            }
250
            _ => unimplemented!("Unrecognized payjoin version"),
251
        }
252
    }
3✔
253

254
    async fn receive_payjoin(&self, amount: Amount) -> Result<()> {
1✔
255
        let address = self.wallet().get_new_address()?;
256
        let ohttp_keys =
257
            unwrap_ohttp_keys_or_else_fetch(&self.config, None, self.relay_manager.clone())
258
                .await?
259
                .ohttp_keys;
260
        let persister = ReceiverPersister::new(self.db.clone())?;
261
        let session =
262
            ReceiverBuilder::new(address, self.config.v2()?.pj_directory.as_str(), ohttp_keys)?
263
                .with_amount(amount)
264
                .with_max_fee_rate(self.config.max_fee_rate.unwrap_or(FeeRate::BROADCAST_MIN))
265
                .build()
266
                .save(&persister)?;
267

268
        println!("Receive session established");
269
        let pj_uri = session.pj_uri();
270
        println!("Request Payjoin by sharing this Payjoin Uri:");
271
        println!("{pj_uri}");
272

273
        self.process_receiver_session(ReceiveSession::Initialized(session.clone()), &persister)
274
            .await?;
275
        Ok(())
276
    }
1✔
277

278
    #[allow(clippy::incompatible_msrv)]
279
    async fn resume_payjoins(&self) -> Result<()> {
4✔
280
        let recv_session_ids = self.db.get_recv_session_ids()?;
281
        let send_session_ids = self.db.get_send_session_ids()?;
282

283
        if recv_session_ids.is_empty() && send_session_ids.is_empty() {
284
            println!("No sessions to resume.");
285
            return Ok(());
286
        }
287

288
        let mut tasks = Vec::new();
289

290
        for session_id in recv_session_ids {
291
            let self_clone = self.clone();
292
            let recv_persister = ReceiverPersister::from_id(self.db.clone(), session_id);
293
            let receiver_state = replay_receiver_event_log(&recv_persister)
294
                .map_err(|e| anyhow!("Failed to replay receiver event log: {:?}", e))?
×
295
                .0;
296
            tasks.push(tokio::spawn(async move {
2✔
297
                self_clone.process_receiver_session(receiver_state, &recv_persister).await
2✔
298
            }));
2✔
299
        }
300

301
        for session_id in send_session_ids {
302
            let sender_persiter = SenderPersister::from_id(self.db.clone(), session_id);
303
            let sender_state = replay_sender_event_log(&sender_persiter)
304
                .map_err(|e| anyhow!("Failed to replay sender event log: {:?}", e))?
×
305
                .0;
306
            let self_clone = self.clone();
307
            tasks.push(tokio::spawn(async move {
×
308
                self_clone.process_sender_session(sender_state, &sender_persiter).await
×
309
            }));
×
310
        }
311

312
        let mut interrupt = self.interrupt.clone();
313
        tokio::select! {
314
            _ = async {
2✔
315
                for task in tasks {
3✔
316
                    let _ = task.await;
2✔
317
                }
318
            } => {
1✔
319
                println!("All resumed sessions completed.");
320
            }
321
            _ = interrupt.changed() => {
322
                println!("Resumed sessions were interrupted.");
323
            }
324
        }
325
        Ok(())
326
    }
4✔
327

328
    #[cfg(feature = "v2")]
329
    async fn history(&self) -> Result<()> {
×
330
        print_header();
331
        let mut send_rows = vec![];
332
        let mut recv_rows = vec![];
333
        self.db.get_send_session_ids()?.into_iter().for_each(|session_id| {
×
334
            let persister = SenderPersister::from_id(self.db.clone(), session_id.clone());
×
335
            match replay_sender_event_log(&persister) {
×
336
                Ok((sender_state, _)) => {
×
337
                    let row = SessionHistoryRow {
×
338
                        session_id,
×
339
                        role: Role::Sender,
×
340
                        status: sender_state.clone(),
×
341
                        completed_at: None,
×
342
                        error_message: None,
×
343
                    };
×
344
                    send_rows.push(row);
×
345
                }
×
346
                Err(e) => {
×
347
                    let row = SessionHistoryRow {
×
348
                        session_id,
×
349
                        role: Role::Sender,
×
350
                        status: SendSession::Closed(SenderSessionOutcome::Failure),
×
351
                        completed_at: None,
×
352
                        error_message: Some(e.to_string()),
×
353
                    };
×
354
                    send_rows.push(row);
×
355
                }
×
356
            }
357
        });
×
358

359
        self.db.get_recv_session_ids()?.into_iter().for_each(|session_id| {
×
360
            let persister = ReceiverPersister::from_id(self.db.clone(), session_id.clone());
×
361
            match replay_receiver_event_log(&persister) {
×
362
                Ok((receiver_state, _)) => {
×
363
                    let row = SessionHistoryRow {
×
364
                        session_id,
×
365
                        role: Role::Receiver,
×
366
                        status: receiver_state.clone(),
×
367
                        completed_at: None,
×
368
                        error_message: None,
×
369
                    };
×
370
                    recv_rows.push(row);
×
371
                }
×
372
                Err(e) => {
×
373
                    let row = SessionHistoryRow {
×
374
                        session_id,
×
375
                        role: Role::Receiver,
×
376
                        status: ReceiveSession::Closed(ReceiverSessionOutcome::Failure),
×
377
                        completed_at: None,
×
378
                        error_message: Some(e.to_string()),
×
379
                    };
×
380
                    recv_rows.push(row);
×
381
                }
×
382
            }
383
        });
×
384

385
        self.db.get_inactive_send_session_ids()?.into_iter().for_each(
386
            |(session_id, completed_at)| {
×
387
                let persister = SenderPersister::from_id(self.db.clone(), session_id.clone());
×
388
                match replay_sender_event_log(&persister) {
×
389
                    Ok((sender_state, _)) => {
×
390
                        let row = SessionHistoryRow {
×
391
                            session_id,
×
392
                            role: Role::Sender,
×
393
                            status: sender_state.clone(),
×
394
                            completed_at: Some(completed_at),
×
395
                            error_message: None,
×
396
                        };
×
397
                        send_rows.push(row);
×
398
                    }
×
399
                    Err(e) => {
×
400
                        let row = SessionHistoryRow {
×
401
                            session_id,
×
402
                            role: Role::Sender,
×
403
                            status: SendSession::Closed(SenderSessionOutcome::Failure),
×
404
                            completed_at: Some(completed_at),
×
405
                            error_message: Some(e.to_string()),
×
406
                        };
×
407
                        send_rows.push(row);
×
408
                    }
×
409
                }
410
            },
×
411
        );
412

413
        self.db.get_inactive_recv_session_ids()?.into_iter().for_each(
414
            |(session_id, completed_at)| {
×
415
                let persister = ReceiverPersister::from_id(self.db.clone(), session_id.clone());
×
416
                match replay_receiver_event_log(&persister) {
×
417
                    Ok((receiver_state, _)) => {
×
418
                        let row = SessionHistoryRow {
×
419
                            session_id,
×
420
                            role: Role::Receiver,
×
421
                            status: receiver_state.clone(),
×
422
                            completed_at: Some(completed_at),
×
423
                            error_message: None,
×
424
                        };
×
425
                        recv_rows.push(row);
×
426
                    }
×
427
                    Err(e) => {
×
428
                        let row = SessionHistoryRow {
×
429
                            session_id,
×
430
                            role: Role::Receiver,
×
431
                            status: ReceiveSession::Closed(ReceiverSessionOutcome::Failure),
×
432
                            completed_at: Some(completed_at),
×
433
                            error_message: Some(e.to_string()),
×
434
                        };
×
435
                        recv_rows.push(row);
×
436
                    }
×
437
                }
438
            },
×
439
        );
440

441
        // Print receiver and sender rows separately
442
        for row in send_rows {
443
            println!("{row}");
444
        }
445
        for row in recv_rows {
446
            println!("{row}");
447
        }
448

449
        Ok(())
450
    }
×
451
}
452

453
impl App {
454
    async fn process_sender_session(
2✔
455
        &self,
2✔
456
        session: SendSession,
2✔
457
        persister: &SenderPersister,
2✔
458
    ) -> Result<()> {
2✔
459
        match session {
2✔
460
            SendSession::WithReplyKey(context) =>
1✔
461
                self.post_original_proposal(context, persister).await?,
1✔
462
            SendSession::PollingForProposal(context) =>
1✔
463
                self.get_proposed_payjoin_psbt(context, persister).await?,
1✔
464
            SendSession::ProposalReceived(proposal) => {
×
465
                self.process_pj_response(proposal)?;
×
466
                return Ok(());
×
467
            }
468
            _ => return Err(anyhow!("Unexpected sender state")),
×
469
        }
470
        Ok(())
1✔
471
    }
1✔
472

473
    async fn post_original_proposal(
1✔
474
        &self,
1✔
475
        sender: Sender<WithReplyKey>,
1✔
476
        persister: &SenderPersister,
1✔
477
    ) -> Result<()> {
1✔
478
        let (req, ctx) = sender.create_v2_post_request(
1✔
479
            self.unwrap_relay_or_else_fetch(Some(
1✔
480
                url::Url::parse(&sender.endpoint()).expect("Could not parse url"),
1✔
481
            ))
1✔
482
            .await?
1✔
483
            .as_str(),
1✔
UNCOV
484
        )?;
×
485
        let response = self.post_request(req).await?;
1✔
486
        println!("Posted original proposal...");
1✔
487
        let sender = sender.process_response(&response.bytes().await?, ctx).save(persister)?;
1✔
488
        self.get_proposed_payjoin_psbt(sender, persister).await
1✔
489
    }
×
490

491
    async fn get_proposed_payjoin_psbt(
2✔
492
        &self,
2✔
493
        sender: Sender<PollingForProposal>,
2✔
494
        persister: &SenderPersister,
2✔
495
    ) -> Result<()> {
2✔
496
        let mut session = sender.clone();
2✔
497
        // Long poll until we get a response
498
        loop {
499
            let (req, ctx) = session.create_poll_request(
3✔
500
                self.unwrap_relay_or_else_fetch(Some(
3✔
501
                    url::Url::parse(&session.endpoint()).expect("Could not parse url"),
3✔
502
                ))
3✔
503
                .await?
3✔
504
                .as_str(),
3✔
UNCOV
505
            )?;
×
506
            let response = self.post_request(req).await?;
3✔
507
            let res = session.process_response(&response.bytes().await?, ctx).save(persister);
2✔
508
            match res {
2✔
509
                Ok(OptionalTransitionOutcome::Progress(psbt)) => {
1✔
510
                    println!("Proposal received. Processing...");
1✔
511
                    self.process_pj_response(psbt)?;
1✔
512
                    return Ok(());
1✔
513
                }
514
                Ok(OptionalTransitionOutcome::Stasis(current_state)) => {
1✔
515
                    println!("No response yet.");
1✔
516
                    session = current_state;
1✔
517
                    continue;
1✔
518
                }
519
                Err(re) => {
×
520
                    println!("{re}");
×
521
                    tracing::debug!("{re:?}");
×
522
                    return Err(anyhow!("Response error").context(re));
×
523
                }
524
            }
525
        }
526
    }
1✔
527

528
    async fn long_poll_fallback(
2✔
529
        &self,
2✔
530
        session: Receiver<Initialized>,
2✔
531
        persister: &ReceiverPersister,
2✔
532
    ) -> Result<Receiver<UncheckedOriginalPayload>> {
2✔
533
        let ohttp_relay = self
2✔
534
            .unwrap_relay_or_else_fetch(Some(
2✔
535
                url::Url::parse(&session.pj_uri().extras.endpoint()).expect("Could not parse url"),
2✔
536
            ))
2✔
537
            .await?;
2✔
538

539
        let mut session = session;
2✔
540
        loop {
541
            let (req, context) = session.create_poll_request(ohttp_relay.as_str())?;
2✔
542
            println!("Polling receive request...");
2✔
543
            let ohttp_response = self.post_request(req).await?;
2✔
544
            let state_transition = session
1✔
545
                .process_response(ohttp_response.bytes().await?.to_vec().as_slice(), context)
1✔
546
                .save(persister);
1✔
547
            match state_transition {
1✔
548
                Ok(OptionalTransitionOutcome::Progress(next_state)) => {
1✔
549
                    println!("Got a request from the sender. Responding with a Payjoin proposal.");
1✔
550
                    return Ok(next_state);
1✔
551
                }
552
                Ok(OptionalTransitionOutcome::Stasis(current_state)) => {
×
553
                    session = current_state;
×
554
                    continue;
×
555
                }
556
                Err(e) => return Err(e.into()),
×
557
            }
558
        }
559
    }
1✔
560

561
    async fn process_receiver_session(
3✔
562
        &self,
3✔
563
        session: ReceiveSession,
3✔
564
        persister: &ReceiverPersister,
3✔
565
    ) -> Result<()> {
3✔
566
        let res = {
2✔
567
            match session {
3✔
568
                ReceiveSession::Initialized(proposal) =>
2✔
569
                    self.read_from_directory(proposal, persister).await,
2✔
570
                ReceiveSession::UncheckedOriginalPayload(proposal) =>
×
571
                    self.check_proposal(proposal, persister).await,
×
572
                ReceiveSession::MaybeInputsOwned(proposal) =>
×
573
                    self.check_inputs_not_owned(proposal, persister).await,
×
574
                ReceiveSession::MaybeInputsSeen(proposal) =>
×
575
                    self.check_no_inputs_seen_before(proposal, persister).await,
×
576
                ReceiveSession::OutputsUnknown(proposal) =>
×
577
                    self.identify_receiver_outputs(proposal, persister).await,
×
578
                ReceiveSession::WantsOutputs(proposal) =>
×
579
                    self.commit_outputs(proposal, persister).await,
×
580
                ReceiveSession::WantsInputs(proposal) =>
×
581
                    self.contribute_inputs(proposal, persister).await,
×
582
                ReceiveSession::WantsFeeRange(proposal) =>
×
583
                    self.apply_fee_range(proposal, persister).await,
×
584
                ReceiveSession::ProvisionalProposal(proposal) =>
×
585
                    self.finalize_proposal(proposal, persister).await,
×
586
                ReceiveSession::PayjoinProposal(proposal) =>
×
587
                    self.send_payjoin_proposal(proposal, persister).await,
×
588
                ReceiveSession::HasReplyableError(error) =>
×
589
                    self.handle_error(error, persister).await,
×
590
                ReceiveSession::Monitor(proposal) =>
1✔
591
                    self.monitor_payjoin_proposal(proposal, persister).await,
1✔
592
                ReceiveSession::Closed(_) => return Err(anyhow!("Session closed")),
×
593
            }
594
        };
595
        res
2✔
596
    }
2✔
597

598
    #[allow(clippy::incompatible_msrv)]
599
    async fn read_from_directory(
2✔
600
        &self,
2✔
601
        session: Receiver<Initialized>,
2✔
602
        persister: &ReceiverPersister,
2✔
603
    ) -> Result<()> {
2✔
604
        let mut interrupt = self.interrupt.clone();
2✔
605
        let receiver = tokio::select! {
2✔
606
            res = self.long_poll_fallback(session, persister) => res,
2✔
607
            _ = interrupt.changed() => {
2✔
608
                println!("Interrupted. Call the `resume` command to resume all sessions.");
1✔
609
                return Err(anyhow!("Interrupted"));
1✔
610
            }
611
        }?;
×
612
        self.check_proposal(receiver, persister).await
1✔
613
    }
1✔
614

615
    async fn check_proposal(
1✔
616
        &self,
1✔
617
        proposal: Receiver<UncheckedOriginalPayload>,
1✔
618
        persister: &ReceiverPersister,
1✔
619
    ) -> Result<()> {
1✔
620
        let wallet = self.wallet();
1✔
621
        let proposal = proposal
1✔
622
            .check_broadcast_suitability(None, |tx| {
1✔
623
                wallet
1✔
624
                    .can_broadcast(tx)
1✔
625
                    .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
1✔
626
            })
1✔
627
            .save(persister)?;
1✔
628

629
        println!("Fallback transaction received. Consider broadcasting this to get paid if the Payjoin fails:");
1✔
630
        println!("{}", serialize_hex(&proposal.extract_tx_to_schedule_broadcast()));
1✔
631
        self.check_inputs_not_owned(proposal, persister).await
1✔
632
    }
×
633

634
    async fn check_inputs_not_owned(
1✔
635
        &self,
1✔
636
        proposal: Receiver<MaybeInputsOwned>,
1✔
637
        persister: &ReceiverPersister,
1✔
638
    ) -> Result<()> {
1✔
639
        let wallet = self.wallet();
1✔
640
        let proposal = proposal
1✔
641
            .check_inputs_not_owned(&mut |input| {
1✔
642
                wallet
1✔
643
                    .is_mine(input)
1✔
644
                    .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
1✔
645
            })
1✔
646
            .save(persister)?;
1✔
647
        self.check_no_inputs_seen_before(proposal, persister).await
1✔
648
    }
×
649

650
    async fn check_no_inputs_seen_before(
1✔
651
        &self,
1✔
652
        proposal: Receiver<MaybeInputsSeen>,
1✔
653
        persister: &ReceiverPersister,
1✔
654
    ) -> Result<()> {
1✔
655
        let proposal = proposal
1✔
656
            .check_no_inputs_seen_before(&mut |input| {
1✔
657
                Ok(self.db.insert_input_seen_before(*input)?)
1✔
658
            })
1✔
659
            .save(persister)?;
1✔
660
        self.identify_receiver_outputs(proposal, persister).await
1✔
661
    }
×
662

663
    async fn identify_receiver_outputs(
1✔
664
        &self,
1✔
665
        proposal: Receiver<OutputsUnknown>,
1✔
666
        persister: &ReceiverPersister,
1✔
667
    ) -> Result<()> {
1✔
668
        let wallet = self.wallet();
1✔
669
        let proposal = proposal
1✔
670
            .identify_receiver_outputs(&mut |output_script| {
2✔
671
                wallet
2✔
672
                    .is_mine(output_script)
2✔
673
                    .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
2✔
674
            })
2✔
675
            .save(persister)?;
1✔
676
        self.commit_outputs(proposal, persister).await
1✔
677
    }
×
678

679
    async fn commit_outputs(
1✔
680
        &self,
1✔
681
        proposal: Receiver<WantsOutputs>,
1✔
682
        persister: &ReceiverPersister,
1✔
683
    ) -> Result<()> {
1✔
684
        let proposal = proposal.commit_outputs().save(persister)?;
1✔
685
        self.contribute_inputs(proposal, persister).await
1✔
686
    }
×
687

688
    async fn contribute_inputs(
1✔
689
        &self,
1✔
690
        proposal: Receiver<WantsInputs>,
1✔
691
        persister: &ReceiverPersister,
1✔
692
    ) -> Result<()> {
1✔
693
        let wallet = self.wallet();
1✔
694
        let candidate_inputs = wallet.list_unspent()?;
1✔
695

696
        let selected_input = proposal.try_preserving_privacy(candidate_inputs)?;
1✔
697
        let proposal =
1✔
698
            proposal.contribute_inputs(vec![selected_input])?.commit_inputs().save(persister)?;
1✔
699
        self.apply_fee_range(proposal, persister).await
1✔
700
    }
×
701

702
    async fn apply_fee_range(
1✔
703
        &self,
1✔
704
        proposal: Receiver<WantsFeeRange>,
1✔
705
        persister: &ReceiverPersister,
1✔
706
    ) -> Result<()> {
1✔
707
        let proposal = proposal.apply_fee_range(None, self.config.max_fee_rate).save(persister)?;
1✔
708
        self.finalize_proposal(proposal, persister).await
1✔
709
    }
×
710

711
    async fn finalize_proposal(
1✔
712
        &self,
1✔
713
        proposal: Receiver<ProvisionalProposal>,
1✔
714
        persister: &ReceiverPersister,
1✔
715
    ) -> Result<()> {
1✔
716
        let wallet = self.wallet();
1✔
717
        let proposal = proposal
1✔
718
            .finalize_proposal(|psbt| {
1✔
719
                wallet
1✔
720
                    .process_psbt(psbt)
1✔
721
                    .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
1✔
722
            })
1✔
723
            .save(persister)?;
1✔
724
        self.send_payjoin_proposal(proposal, persister).await
1✔
725
    }
×
726

727
    async fn send_payjoin_proposal(
1✔
728
        &self,
1✔
729
        proposal: Receiver<PayjoinProposal>,
1✔
730
        persister: &ReceiverPersister,
1✔
731
    ) -> Result<()> {
1✔
732
        let (req, ohttp_ctx) = proposal
1✔
733
            .create_post_request(self.unwrap_relay_or_else_fetch(None).await?.as_str())
1✔
734
            .map_err(|e| anyhow!("v2 req extraction failed {}", e))?;
1✔
735
        let res = self.post_request(req).await?;
1✔
736
        let payjoin_psbt = proposal.psbt().clone();
1✔
737
        let session = proposal.process_response(&res.bytes().await?, ohttp_ctx).save(persister)?;
1✔
738
        println!(
1✔
739
            "Response successful. Watch mempool for successful Payjoin. TXID: {}",
1✔
740
            payjoin_psbt.extract_tx_unchecked_fee_rate().compute_txid()
1✔
741
        );
742

743
        return self.monitor_payjoin_proposal(session, persister).await;
1✔
744
    }
×
745

746
    async fn monitor_payjoin_proposal(
2✔
747
        &self,
2✔
748
        proposal: Receiver<Monitor>,
2✔
749
        persister: &ReceiverPersister,
2✔
750
    ) -> Result<()> {
2✔
751
        // On a session resumption, the receiver will resume again in this state.
752
        let _ = proposal
2✔
753
            .check_payment(
2✔
754
                |txid| {
2✔
755
                    self.wallet()
2✔
756
                        .get_raw_transaction(&txid)
2✔
757
                        .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
2✔
758
                },
2✔
759
                |outpoint| {
×
760
                    self.wallet()
×
761
                        .is_outpoint_spent(&outpoint)
×
762
                        .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
×
763
                },
×
764
            )
765
            .save(persister)?;
2✔
766
        Ok(())
2✔
767
    }
2✔
768

769
    async fn unwrap_relay_or_else_fetch(&self, directory: Option<url::Url>) -> Result<url::Url> {
7✔
770
        let selected_relay =
7✔
771
            self.relay_manager.lock().expect("Lock should not be poisoned").get_selected_relay();
7✔
772
        let ohttp_relay = match selected_relay {
7✔
773
            Some(relay) => relay,
3✔
774
            None =>
775
                unwrap_ohttp_keys_or_else_fetch(&self.config, directory, self.relay_manager.clone())
4✔
776
                    .await?
4✔
777
                    .relay_url,
778
        };
779
        Ok(ohttp_relay)
7✔
780
    }
7✔
781

782
    /// Handle error by attempting to send an error response over the directory
783
    async fn handle_error(
×
784
        &self,
×
785
        session: Receiver<HasReplyableError>,
×
786
        persister: &ReceiverPersister,
×
787
    ) -> Result<()> {
×
788
        let (err_req, err_ctx) =
×
789
            session.create_error_request(self.unwrap_relay_or_else_fetch(None).await?.as_str())?;
×
790

791
        let err_response = match self.post_request(err_req).await {
×
792
            Ok(response) => response,
×
793
            Err(e) => return Err(anyhow!("Failed to post error request: {}", e)),
×
794
        };
795

796
        let err_bytes = match err_response.bytes().await {
×
797
            Ok(bytes) => bytes,
×
798
            Err(e) => return Err(anyhow!("Failed to get error response bytes: {}", e)),
×
799
        };
800

801
        if let Err(e) = session.process_error_response(&err_bytes, err_ctx).save(persister) {
×
802
            return Err(anyhow!("Failed to process error response: {}", e));
×
803
        }
×
804

805
        Ok(())
×
806
    }
×
807

808
    async fn post_request(&self, req: payjoin::Request) -> Result<reqwest::Response> {
7✔
809
        let http = http_agent(&self.config)?;
7✔
810
        http.post(req.url)
7✔
811
            .header("Content-Type", req.content_type)
7✔
812
            .body(req.body)
7✔
813
            .send()
7✔
814
            .await
7✔
815
            .map_err(map_reqwest_err)
5✔
816
    }
5✔
817
}
818

819
fn map_reqwest_err(e: reqwest::Error) -> anyhow::Error {
×
820
    match e.status() {
×
821
        Some(status_code) => anyhow!("HTTP request failed: {} {}", status_code, e),
×
822
        None => anyhow!("No HTTP response: {}", e),
×
823
    }
824
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc