• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

payjoin / rust-payjoin / 17649311628

11 Sep 2025 03:22PM UTC coverage: 85.971% (-0.05%) from 86.019%
17649311628

Pull #1057

github

web-flow
Merge b53dc45c4 into 52cfeef1a
Pull Request #1057: Remove use of payjoin::Url and url::Url in public Api

150 of 161 new or added lines in 14 files covered. (93.17%)

8 existing lines in 3 files now uncovered.

8230 of 9573 relevant lines covered (85.97%)

488.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.23
/payjoin-cli/src/app/v2/mod.rs
1
use std::str::FromStr;
2
use std::sync::{Arc, Mutex};
3

4
use anyhow::{anyhow, Context, Result};
5
use payjoin::bitcoin::consensus::encode::serialize_hex;
6
use payjoin::bitcoin::{Amount, FeeRate};
7
use payjoin::persist::OptionalTransitionOutcome;
8
use payjoin::receive::v2::{
9
    process_err_res, replay_event_log as replay_receiver_event_log, Initialized, MaybeInputsOwned,
10
    MaybeInputsSeen, OutputsUnknown, PayjoinProposal, ProvisionalProposal, ReceiveSession,
11
    Receiver, ReceiverBuilder, SessionHistory, UncheckedOriginalPayload, WantsFeeRange,
12
    WantsInputs, WantsOutputs,
13
};
14
use payjoin::send::v2::{
15
    replay_event_log as replay_sender_event_log, SendSession, Sender, SenderBuilder, V2GetContext,
16
    WithReplyKey,
17
};
18
use payjoin::{ImplementationError, PjParam, Uri};
19
use tokio::sync::watch;
20
use url::Url;
21

22
use super::config::Config;
23
use super::wallet::BitcoindWallet;
24
use super::App as AppTrait;
25
use crate::app::v2::ohttp::{unwrap_ohttp_keys_or_else_fetch, RelayManager};
26
use crate::app::{handle_interrupt, http_agent};
27
use crate::db::v2::{ReceiverPersister, SenderPersister};
28
use crate::db::Database;
29

30
mod ohttp;
31

32
#[derive(Clone)]
33
pub(crate) struct App {
34
    config: Config,
35
    db: Arc<Database>,
36
    wallet: BitcoindWallet,
37
    interrupt: watch::Receiver<()>,
38
    relay_manager: Arc<Mutex<RelayManager>>,
39
}
40

41
#[async_trait::async_trait]
42
impl AppTrait for App {
43
    async fn new(config: Config) -> Result<Self> {
7✔
44
        let db = Arc::new(Database::create(&config.db_path)?);
45
        let relay_manager = Arc::new(Mutex::new(RelayManager::new()));
46
        let (interrupt_tx, interrupt_rx) = watch::channel(());
47
        tokio::spawn(handle_interrupt(interrupt_tx));
48
        let wallet = BitcoindWallet::new(&config.bitcoind).await?;
49
        let app = Self { config, db, wallet, interrupt: interrupt_rx, relay_manager };
50
        app.wallet()
51
            .network()
52
            .context("Failed to connect to bitcoind. Check config RPC connection.")?;
53
        Ok(app)
54
    }
7✔
55

56
    fn wallet(&self) -> BitcoindWallet { self.wallet.clone() }
21✔
57

58
    #[allow(clippy::incompatible_msrv)]
59
    async fn send_payjoin(&self, bip21: &str, fee_rate: FeeRate) -> Result<()> {
3✔
60
        use payjoin::UriExt;
61
        let uri = Uri::try_from(bip21)
62
            .map_err(|e| anyhow!("Failed to create URI from BIP21: {}", e))?
×
63
            .assume_checked()
64
            .check_pj_supported()
65
            .map_err(|_| anyhow!("URI does not support Payjoin"))?;
×
66
        let address = uri.address;
67
        let amount = uri.amount.ok_or_else(|| anyhow!("please specify the amount in the Uri"))?;
×
68
        match uri.extras.pj_param() {
69
            #[cfg(feature = "v1")]
70
            PjParam::V1(pj_param) => {
71
                use std::str::FromStr;
72

73
                let psbt = self.create_original_psbt(&address, amount, fee_rate)?;
74
                let (req, ctx) = payjoin::send::v1::SenderBuilder::from_parts(
75
                    psbt,
76
                    pj_param,
77
                    &address,
78
                    Some(amount),
79
                )
80
                .build_recommended(fee_rate)
81
                .with_context(|| "Failed to build payjoin request")?
82
                .create_v1_post_request();
83
                let http = http_agent(&self.config)?;
84
                let body = String::from_utf8(req.body.clone()).unwrap();
85
                println!("Sending fallback request to {}", &req.url);
86
                let response = http
87
                    .post(req.url)
88
                    .header("Content-Type", req.content_type)
89
                    .body(body.clone())
90
                    .send()
91
                    .await
92
                    .with_context(|| "HTTP request failed")?;
93
                let fallback_tx = payjoin::bitcoin::Psbt::from_str(&body)
94
                    .map_err(|e| anyhow!("Failed to load PSBT from base64: {}", e))?
×
95
                    .extract_tx()?;
96
                println!("Sent fallback transaction txid: {}", fallback_tx.compute_txid());
97
                println!(
98
                    "Sent fallback transaction hex: {:#}",
99
                    payjoin::bitcoin::consensus::encode::serialize_hex(&fallback_tx)
100
                );
101
                let psbt = ctx.process_response(&response.bytes().await?).map_err(|e| {
×
102
                    tracing::debug!("Error processing response: {e:?}");
×
103
                    anyhow!("Failed to process response {e}")
×
104
                })?;
×
105

106
                self.process_pj_response(psbt)?;
107
                Ok(())
108
            }
109
            PjParam::V2(pj_param) => {
110
                let receiver_pubkey = pj_param.receiver_pubkey();
111
                let sender_state =
112
                    self.db.get_send_session_ids()?.into_iter().find_map(|session_id| {
1✔
113
                        let session_receiver_pubkey = self
1✔
114
                            .db
1✔
115
                            .get_send_session_receiver_pk(&session_id)
1✔
116
                            .expect("Receiver pubkey should exist if session id exists");
1✔
117
                        if session_receiver_pubkey == *receiver_pubkey {
1✔
118
                            let sender_persister =
1✔
119
                                SenderPersister::from_id(self.db.clone(), session_id);
1✔
120
                            let (send_session, _) = replay_sender_event_log(&sender_persister)
1✔
121
                                .map_err(|e| anyhow!("Failed to replay sender event log: {:?}", e))
1✔
122
                                .ok()?;
1✔
123

124
                            Some((send_session, sender_persister))
1✔
125
                        } else {
126
                            None
×
127
                        }
128
                    });
1✔
129

130
                let (sender_state, persister) = match sender_state {
131
                    Some((sender_state, persister)) => (sender_state, persister),
132
                    None => {
133
                        let persister =
134
                            SenderPersister::new(self.db.clone(), receiver_pubkey.clone())?;
135
                        let psbt = self.create_original_psbt(&address, amount, fee_rate)?;
136
                        let sender =
137
                            SenderBuilder::from_parts(psbt, pj_param, &address, Some(amount))
138
                                .build_recommended(fee_rate)?
139
                                .save(&persister)?;
140

141
                        (SendSession::WithReplyKey(sender), persister)
142
                    }
143
                };
144
                let mut interrupt = self.interrupt.clone();
145
                tokio::select! {
146
                    _ = self.process_sender_session(sender_state, &persister) => return Ok(()),
147
                    _ = interrupt.changed() => {
148
                        println!("Interrupted. Call `send` with the same arguments to resume this session or `resume` to resume all sessions.");
149
                        return Err(anyhow!("Interrupted"))
150
                    }
151
                }
152
            }
153
            _ => unimplemented!("Unrecognized payjoin version"),
154
        }
155
    }
3✔
156

157
    async fn receive_payjoin(&self, amount: Amount) -> Result<()> {
1✔
158
        let address = self.wallet().get_new_address()?;
159
        let ohttp_keys = unwrap_ohttp_keys_or_else_fetch(
160
            &self.config,
161
            None::<String>,
162
            self.relay_manager.clone(),
163
        )
164
        .await?
165
        .ohttp_keys;
166
        let persister = ReceiverPersister::new(self.db.clone())?;
167
        let session =
168
            ReceiverBuilder::new(address, self.config.v2()?.pj_directory.clone(), ohttp_keys)?
169
                .with_amount(amount)
170
                .with_max_fee_rate(self.config.max_fee_rate.unwrap_or(FeeRate::BROADCAST_MIN))
171
                .build()
172
                .save(&persister)?;
173

174
        println!("Receive session established");
175
        let pj_uri = session.pj_uri();
176
        println!("Request Payjoin by sharing this Payjoin Uri:");
177
        println!("{}", pj_uri);
178

179
        self.process_receiver_session(ReceiveSession::Initialized(session.clone()), &persister)
180
            .await?;
181
        Ok(())
182
    }
1✔
183

184
    #[allow(clippy::incompatible_msrv)]
185
    async fn resume_payjoins(&self) -> Result<()> {
3✔
186
        let recv_session_ids = self.db.get_recv_session_ids()?;
187
        let send_session_ids = self.db.get_send_session_ids()?;
188

189
        if recv_session_ids.is_empty() && send_session_ids.is_empty() {
190
            println!("No sessions to resume.");
191
            return Ok(());
192
        }
193

194
        let mut tasks = Vec::new();
195

196
        for session_id in recv_session_ids {
197
            let self_clone = self.clone();
198
            let recv_persister = ReceiverPersister::from_id(self.db.clone(), session_id);
199
            let receiver_state = replay_receiver_event_log(&recv_persister)
200
                .map_err(|e| anyhow!("Failed to replay receiver event log: {:?}", e))?
×
201
                .0;
202
            tasks.push(tokio::spawn(async move {
1✔
203
                self_clone.process_receiver_session(receiver_state, &recv_persister).await
1✔
204
            }));
1✔
205
        }
206

207
        for session_id in send_session_ids {
208
            let sender_persiter = SenderPersister::from_id(self.db.clone(), session_id);
209
            let sender_state = replay_sender_event_log(&sender_persiter)
210
                .map_err(|e| anyhow!("Failed to replay sender event log: {:?}", e))?
×
211
                .0;
212
            let self_clone = self.clone();
213
            tasks.push(tokio::spawn(async move {
×
214
                self_clone.process_sender_session(sender_state, &sender_persiter).await
×
215
            }));
×
216
        }
217

218
        let mut interrupt = self.interrupt.clone();
219
        tokio::select! {
220
            _ = async {
1✔
221
                for task in tasks {
2✔
222
                    let _ = task.await;
1✔
223
                }
224
            } => {
1✔
225
                println!("All resumed sessions completed.");
226
            }
227
            _ = interrupt.changed() => {
228
                println!("Resumed sessions were interrupted.");
229
            }
230
        }
231
        Ok(())
232
    }
3✔
233
}
234

235
impl App {
236
    async fn process_sender_session(
2✔
237
        &self,
2✔
238
        session: SendSession,
2✔
239
        persister: &SenderPersister,
2✔
240
    ) -> Result<()> {
2✔
241
        match session {
2✔
242
            SendSession::WithReplyKey(context) =>
1✔
243
                self.post_original_proposal(context, persister).await?,
1✔
244
            SendSession::V2GetContext(context) =>
1✔
245
                self.get_proposed_payjoin_psbt(context, persister).await?,
1✔
246
            SendSession::ProposalReceived(proposal) => {
×
247
                self.process_pj_response(proposal)?;
×
248
                return Ok(());
×
249
            }
250
            _ => return Err(anyhow!("Unexpected sender state")),
×
251
        }
252
        Ok(())
1✔
253
    }
1✔
254

255
    async fn post_original_proposal(
1✔
256
        &self,
1✔
257
        sender: Sender<WithReplyKey>,
1✔
258
        persister: &SenderPersister,
1✔
259
    ) -> Result<()> {
1✔
260
        let (req, ctx) = sender.create_v2_post_request(
1✔
261
            &self.unwrap_relay_or_else_fetch(Some(sender.endpoint())).await?,
1✔
262
        )?;
×
263
        let response = self.post_request(req).await?;
1✔
264
        let sender = sender.process_response(&response.bytes().await?, ctx).save(persister)?;
1✔
265
        self.get_proposed_payjoin_psbt(sender, persister).await
1✔
266
    }
×
267

268
    async fn get_proposed_payjoin_psbt(
2✔
269
        &self,
2✔
270
        sender: Sender<V2GetContext>,
2✔
271
        persister: &SenderPersister,
2✔
272
    ) -> Result<()> {
2✔
273
        let mut session = sender.clone();
2✔
274
        // Long poll until we get a response
275
        loop {
276
            let (req, ctx) = session.create_poll_request(
3✔
277
                &self.unwrap_relay_or_else_fetch(Some(session.endpoint())).await?,
3✔
278
            )?;
×
279
            let response = self.post_request(req).await?;
3✔
280
            let res = session.process_response(&response.bytes().await?, ctx).save(persister);
2✔
281
            match res {
2✔
282
                Ok(OptionalTransitionOutcome::Progress(psbt)) => {
1✔
283
                    println!("Proposal received. Processing...");
1✔
284
                    self.process_pj_response(psbt)?;
1✔
285
                    return Ok(());
1✔
286
                }
287
                Ok(OptionalTransitionOutcome::Stasis(current_state)) => {
1✔
288
                    println!("No response yet.");
1✔
289
                    session = current_state;
1✔
290
                    continue;
1✔
291
                }
292
                Err(re) => {
×
293
                    println!("{re}");
×
294
                    tracing::debug!("{re:?}");
×
295
                    return Err(anyhow!("Response error").context(re));
×
296
                }
297
            }
298
        }
299
    }
1✔
300

301
    async fn long_poll_fallback(
2✔
302
        &self,
2✔
303
        session: Receiver<Initialized>,
2✔
304
        persister: &ReceiverPersister,
2✔
305
    ) -> Result<Receiver<UncheckedOriginalPayload>> {
2✔
306
        let ohttp_relay = self
2✔
307
            .unwrap_relay_or_else_fetch(Some(session.pj_uri().extras.endpoint().to_string()))
2✔
308
            .await?;
2✔
309

310
        let mut session = session;
2✔
311
        loop {
312
            let (req, context) = session.create_poll_request(&ohttp_relay)?;
2✔
313
            println!("Polling receive request...");
2✔
314
            let ohttp_response = self.post_request(req).await?;
2✔
315
            let state_transition = session
1✔
316
                .process_response(ohttp_response.bytes().await?.to_vec().as_slice(), context)
1✔
317
                .save(persister);
1✔
318
            match state_transition {
1✔
319
                Ok(OptionalTransitionOutcome::Progress(next_state)) => {
1✔
320
                    println!("Got a request from the sender. Responding with a Payjoin proposal.");
1✔
321
                    return Ok(next_state);
1✔
322
                }
323
                Ok(OptionalTransitionOutcome::Stasis(current_state)) => {
×
324
                    session = current_state;
×
325
                    continue;
×
326
                }
327
                Err(e) => return Err(e.into()),
×
328
            }
329
        }
330
    }
1✔
331

332
    async fn process_receiver_session(
2✔
333
        &self,
2✔
334
        session: ReceiveSession,
2✔
335
        persister: &ReceiverPersister,
2✔
336
    ) -> Result<()> {
2✔
337
        let res = {
2✔
338
            match session {
2✔
339
                ReceiveSession::Initialized(proposal) =>
2✔
340
                    self.read_from_directory(proposal, persister).await,
2✔
341
                ReceiveSession::UncheckedOriginalPayload(proposal) =>
×
342
                    self.check_proposal(proposal, persister).await,
×
343
                ReceiveSession::MaybeInputsOwned(proposal) =>
×
344
                    self.check_inputs_not_owned(proposal, persister).await,
×
345
                ReceiveSession::MaybeInputsSeen(proposal) =>
×
346
                    self.check_no_inputs_seen_before(proposal, persister).await,
×
347
                ReceiveSession::OutputsUnknown(proposal) =>
×
348
                    self.identify_receiver_outputs(proposal, persister).await,
×
349
                ReceiveSession::WantsOutputs(proposal) =>
×
350
                    self.commit_outputs(proposal, persister).await,
×
351
                ReceiveSession::WantsInputs(proposal) =>
×
352
                    self.contribute_inputs(proposal, persister).await,
×
353
                ReceiveSession::WantsFeeRange(proposal) =>
×
354
                    self.apply_fee_range(proposal, persister).await,
×
355
                ReceiveSession::ProvisionalProposal(proposal) =>
×
356
                    self.finalize_proposal(proposal, persister).await,
×
357
                ReceiveSession::PayjoinProposal(proposal) =>
×
358
                    self.send_payjoin_proposal(proposal, persister).await,
×
359
                ReceiveSession::TerminalFailure =>
360
                    return Err(anyhow!("Terminal receiver session")),
×
361
            }
362
        };
363

364
        match res {
2✔
365
            Ok(_) => Ok(()),
1✔
366
            Err(e) => {
1✔
367
                let (_, session_history) = replay_receiver_event_log(persister)?;
1✔
368
                let pj_uri = match session_history.pj_uri() {
1✔
369
                    Some(uri) => Some(uri.extras.endpoint()),
1✔
370
                    None => None,
×
371
                };
372
                let ohttp_relay = self.unwrap_relay_or_else_fetch(pj_uri).await?;
1✔
373
                self.handle_recoverable_error(&ohttp_relay, &session_history).await?;
1✔
374

375
                Err(e)
1✔
376
            }
377
        }
378
    }
2✔
379

380
    #[allow(clippy::incompatible_msrv)]
381
    async fn read_from_directory(
2✔
382
        &self,
2✔
383
        session: Receiver<Initialized>,
2✔
384
        persister: &ReceiverPersister,
2✔
385
    ) -> Result<()> {
2✔
386
        let mut interrupt = self.interrupt.clone();
2✔
387
        let receiver = tokio::select! {
2✔
388
            res = self.long_poll_fallback(session, persister) => res,
2✔
389
            _ = interrupt.changed() => {
2✔
390
                println!("Interrupted. Call the `resume` command to resume all sessions.");
1✔
391
                return Err(anyhow!("Interrupted"));
1✔
392
            }
393
        }?;
×
394
        self.check_proposal(receiver, persister).await
1✔
395
    }
2✔
396

397
    async fn check_proposal(
1✔
398
        &self,
1✔
399
        proposal: Receiver<UncheckedOriginalPayload>,
1✔
400
        persister: &ReceiverPersister,
1✔
401
    ) -> Result<()> {
1✔
402
        let wallet = self.wallet();
1✔
403
        let proposal = proposal
1✔
404
            .check_broadcast_suitability(None, |tx| {
1✔
405
                wallet
1✔
406
                    .can_broadcast(tx)
1✔
407
                    .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
1✔
408
            })
1✔
409
            .save(persister)?;
1✔
410

411
        println!("Fallback transaction received. Consider broadcasting this to get paid if the Payjoin fails:");
1✔
412
        println!("{}", serialize_hex(&proposal.extract_tx_to_schedule_broadcast()));
1✔
413
        self.check_inputs_not_owned(proposal, persister).await
1✔
414
    }
1✔
415

416
    async fn check_inputs_not_owned(
1✔
417
        &self,
1✔
418
        proposal: Receiver<MaybeInputsOwned>,
1✔
419
        persister: &ReceiverPersister,
1✔
420
    ) -> Result<()> {
1✔
421
        let wallet = self.wallet();
1✔
422
        let proposal = proposal
1✔
423
            .check_inputs_not_owned(&mut |input| {
1✔
424
                wallet
1✔
425
                    .is_mine(input)
1✔
426
                    .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
1✔
427
            })
1✔
428
            .save(persister)?;
1✔
429
        self.check_no_inputs_seen_before(proposal, persister).await
1✔
430
    }
1✔
431

432
    async fn check_no_inputs_seen_before(
1✔
433
        &self,
1✔
434
        proposal: Receiver<MaybeInputsSeen>,
1✔
435
        persister: &ReceiverPersister,
1✔
436
    ) -> Result<()> {
1✔
437
        let proposal = proposal
1✔
438
            .check_no_inputs_seen_before(&mut |input| {
1✔
439
                Ok(self.db.insert_input_seen_before(*input)?)
1✔
440
            })
1✔
441
            .save(persister)?;
1✔
442
        self.identify_receiver_outputs(proposal, persister).await
1✔
443
    }
1✔
444

445
    async fn identify_receiver_outputs(
1✔
446
        &self,
1✔
447
        proposal: Receiver<OutputsUnknown>,
1✔
448
        persister: &ReceiverPersister,
1✔
449
    ) -> Result<()> {
1✔
450
        let wallet = self.wallet();
1✔
451
        let proposal = proposal
1✔
452
            .identify_receiver_outputs(&mut |output_script| {
2✔
453
                wallet
2✔
454
                    .is_mine(output_script)
2✔
455
                    .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
2✔
456
            })
2✔
457
            .save(persister)?;
1✔
458
        self.commit_outputs(proposal, persister).await
1✔
459
    }
1✔
460

461
    async fn commit_outputs(
1✔
462
        &self,
1✔
463
        proposal: Receiver<WantsOutputs>,
1✔
464
        persister: &ReceiverPersister,
1✔
465
    ) -> Result<()> {
1✔
466
        let proposal = proposal.commit_outputs().save(persister)?;
1✔
467
        self.contribute_inputs(proposal, persister).await
1✔
468
    }
1✔
469

470
    async fn contribute_inputs(
1✔
471
        &self,
1✔
472
        proposal: Receiver<WantsInputs>,
1✔
473
        persister: &ReceiverPersister,
1✔
474
    ) -> Result<()> {
1✔
475
        let wallet = self.wallet();
1✔
476
        let candidate_inputs = wallet.list_unspent()?;
1✔
477

478
        let selected_input = proposal.try_preserving_privacy(candidate_inputs)?;
1✔
479
        let proposal =
1✔
480
            proposal.contribute_inputs(vec![selected_input])?.commit_inputs().save(persister)?;
1✔
481
        self.apply_fee_range(proposal, persister).await
1✔
482
    }
1✔
483

484
    async fn apply_fee_range(
1✔
485
        &self,
1✔
486
        proposal: Receiver<WantsFeeRange>,
1✔
487
        persister: &ReceiverPersister,
1✔
488
    ) -> Result<()> {
1✔
489
        let proposal = proposal.apply_fee_range(None, self.config.max_fee_rate).save(persister)?;
1✔
490
        self.finalize_proposal(proposal, persister).await
1✔
491
    }
1✔
492

493
    async fn finalize_proposal(
1✔
494
        &self,
1✔
495
        proposal: Receiver<ProvisionalProposal>,
1✔
496
        persister: &ReceiverPersister,
1✔
497
    ) -> Result<()> {
1✔
498
        let wallet = self.wallet();
1✔
499
        let proposal = proposal
1✔
500
            .finalize_proposal(|psbt| {
1✔
501
                wallet
1✔
502
                    .process_psbt(psbt)
1✔
503
                    .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
1✔
504
            })
1✔
505
            .save(persister)?;
1✔
506
        self.send_payjoin_proposal(proposal, persister).await
1✔
507
    }
1✔
508

509
    async fn send_payjoin_proposal(
1✔
510
        &self,
1✔
511
        mut proposal: Receiver<PayjoinProposal>,
1✔
512
        persister: &ReceiverPersister,
1✔
513
    ) -> Result<()> {
1✔
514
        let (req, ohttp_ctx) = proposal
1✔
515
            .create_post_request(&self.unwrap_relay_or_else_fetch(None::<String>).await?)
1✔
516
            .map_err(|e| anyhow!("v2 req extraction failed {}", e))?;
1✔
517
        let res = self.post_request(req).await?;
1✔
518
        let payjoin_psbt = proposal.psbt().clone();
1✔
519
        proposal.process_response(&res.bytes().await?, ohttp_ctx).save(persister)?;
1✔
520
        println!(
1✔
521
            "Response successful. Watch mempool for successful Payjoin. TXID: {}",
1✔
522
            payjoin_psbt.extract_tx_unchecked_fee_rate().compute_txid()
1✔
523
        );
524
        Ok(())
1✔
525
    }
1✔
526

527
    async fn unwrap_relay_or_else_fetch(&self, directory: Option<String>) -> Result<String> {
8✔
528
        let selected_relay =
8✔
529
            self.relay_manager.lock().expect("Lock should not be poisoned").get_selected_relay();
8✔
530
        let ohttp_relay = match selected_relay {
8✔
531
            Some(relay) => relay,
3✔
532
            None =>
533
                unwrap_ohttp_keys_or_else_fetch(&self.config, directory, self.relay_manager.clone())
5✔
534
                    .await?
5✔
535
                    .relay_url,
536
        };
537
        Ok(ohttp_relay)
8✔
538
    }
8✔
539

540
    /// Handle request error by sending an error response over the directory
541
    async fn handle_recoverable_error(
1✔
542
        &self,
1✔
543
        ohttp_relay: impl payjoin::IntoUrl,
1✔
544
        session_history: &SessionHistory,
1✔
545
    ) -> Result<()> {
1✔
546
        let e = match session_history.terminal_error() {
1✔
547
            Some((_, Some(e))) => e,
×
548
            _ => return Ok(()),
1✔
549
        };
550
        let (err_req, err_ctx) = session_history
×
NEW
551
            .extract_err_req(ohttp_relay.as_str())?
×
552
            .expect("If JsonReply is Some, then err_req and err_ctx should be Some");
×
553
        let to_return = anyhow!("Replied with error: {}", e.to_json().to_string());
×
554

555
        let err_response = match self.post_request(err_req).await {
×
556
            Ok(response) => response,
×
557
            Err(e) => return Err(anyhow!("Failed to post error request: {}", e)),
×
558
        };
559

560
        let err_bytes = match err_response.bytes().await {
×
561
            Ok(bytes) => bytes,
×
562
            Err(e) => return Err(anyhow!("Failed to get error response bytes: {}", e)),
×
563
        };
564

565
        if let Err(e) = process_err_res(&err_bytes, err_ctx) {
×
566
            return Err(anyhow!("Failed to process error response: {}", e));
×
567
        }
×
568

569
        Err(to_return)
×
570
    }
1✔
571

572
    async fn post_request(&self, req: payjoin::Request) -> Result<reqwest::Response> {
7✔
573
        let http = http_agent(&self.config)?;
7✔
574
        http.post(Url::from_str(&req.url).expect("invalid URL"))
7✔
575
            .header("Content-Type", req.content_type)
7✔
576
            .body(req.body)
7✔
577
            .send()
7✔
578
            .await
7✔
579
            .map_err(map_reqwest_err)
5✔
580
    }
5✔
581
}
582

583
fn map_reqwest_err(e: reqwest::Error) -> anyhow::Error {
×
584
    match e.status() {
×
585
        Some(status_code) => anyhow!("HTTP request failed: {} {}", status_code, e),
×
586
        None => anyhow!("No HTTP response: {}", e),
×
587
    }
588
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc