• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

payjoin / rust-payjoin / 17161369593

22 Aug 2025 05:05PM UTC coverage: 86.219% (+0.06%) from 86.156%
17161369593

Pull #981

github

web-flow
Merge beeb47186 into 87b0b695b
Pull Request #981: Use reply key for replyable errors to v2 senders

129 of 131 new or added lines in 6 files covered. (98.47%)

1 existing line in 1 file now uncovered.

7933 of 9201 relevant lines covered (86.22%)

508.16 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.41
/payjoin-cli/src/app/v2/mod.rs
1
use std::sync::{Arc, Mutex};
2

3
use anyhow::{anyhow, Context, Result};
4
use payjoin::bitcoin::consensus::encode::serialize_hex;
5
use payjoin::bitcoin::{Amount, FeeRate};
6
use payjoin::persist::OptionalTransitionOutcome;
7
use payjoin::receive::v2::{
8
    process_err_res, replay_event_log as replay_receiver_event_log, Initialized, MaybeInputsOwned,
9
    MaybeInputsSeen, OutputsUnknown, PayjoinProposal, ProvisionalProposal, ReceiveSession,
10
    Receiver, SessionHistory, UncheckedOriginalPsbt, WantsFeeRange, WantsInputs, WantsOutputs,
11
};
12
use payjoin::send::v2::{
13
    replay_event_log as replay_sender_event_log, SendSession, Sender, SenderBuilder, V2GetContext,
14
    WithReplyKey,
15
};
16
use payjoin::{ImplementationError, PjParam, Uri};
17
use tokio::sync::watch;
18

19
use super::config::Config;
20
use super::wallet::BitcoindWallet;
21
use super::App as AppTrait;
22
use crate::app::v2::ohttp::{unwrap_ohttp_keys_or_else_fetch, RelayManager};
23
use crate::app::{handle_interrupt, http_agent};
24
use crate::db::v2::{ReceiverPersister, SenderPersister};
25
use crate::db::Database;
26

27
mod ohttp;
28

29
#[derive(Clone)]
30
pub(crate) struct App {
31
    config: Config,
32
    db: Arc<Database>,
33
    wallet: BitcoindWallet,
34
    interrupt: watch::Receiver<()>,
35
    relay_manager: Arc<Mutex<RelayManager>>,
36
}
37

38
#[async_trait::async_trait]
39
impl AppTrait for App {
40
    async fn new(config: Config) -> Result<Self> {
7✔
41
        let db = Arc::new(Database::create(&config.db_path)?);
42
        let relay_manager = Arc::new(Mutex::new(RelayManager::new()));
43
        let (interrupt_tx, interrupt_rx) = watch::channel(());
44
        tokio::spawn(handle_interrupt(interrupt_tx));
45
        let wallet = BitcoindWallet::new(&config.bitcoind).await?;
46
        let app = Self { config, db, wallet, interrupt: interrupt_rx, relay_manager };
47
        app.wallet()
48
            .network()
49
            .context("Failed to connect to bitcoind. Check config RPC connection.")?;
50
        Ok(app)
51
    }
7✔
52

53
    fn wallet(&self) -> BitcoindWallet { self.wallet.clone() }
21✔
54

55
    #[allow(clippy::incompatible_msrv)]
56
    async fn send_payjoin(&self, bip21: &str, fee_rate: FeeRate) -> Result<()> {
3✔
57
        use payjoin::UriExt;
58
        let uri = Uri::try_from(bip21)
59
            .map_err(|e| anyhow!("Failed to create URI from BIP21: {}", e))?
×
60
            .assume_checked()
61
            .check_pj_supported()
62
            .map_err(|_| anyhow!("URI does not support Payjoin"))?;
×
63
        let address = uri.address;
64
        let amount = uri.amount.ok_or_else(|| anyhow!("please specify the amount in the Uri"))?;
×
65
        match uri.extras.pj_param() {
66
            #[cfg(feature = "v1")]
67
            PjParam::V1(pj_param) => {
68
                use std::str::FromStr;
69

70
                let psbt = self.create_original_psbt(&address, amount, fee_rate)?;
71
                let (req, ctx) = payjoin::send::v1::SenderBuilder::from_parts(
72
                    psbt,
73
                    pj_param,
74
                    &address,
75
                    Some(amount),
76
                )
77
                .build_recommended(fee_rate)
78
                .with_context(|| "Failed to build payjoin request")?
79
                .create_v1_post_request();
80
                let http = http_agent(&self.config)?;
81
                let body = String::from_utf8(req.body.clone()).unwrap();
82
                println!("Sending fallback request to {}", &req.url);
83
                let response = http
84
                    .post(req.url)
85
                    .header("Content-Type", req.content_type)
86
                    .body(body.clone())
87
                    .send()
88
                    .await
89
                    .with_context(|| "HTTP request failed")?;
90
                let fallback_tx = payjoin::bitcoin::Psbt::from_str(&body)
91
                    .map_err(|e| anyhow!("Failed to load PSBT from base64: {}", e))?
×
92
                    .extract_tx()?;
93
                println!("Sent fallback transaction txid: {}", fallback_tx.compute_txid());
94
                println!(
95
                    "Sent fallback transaction hex: {:#}",
96
                    payjoin::bitcoin::consensus::encode::serialize_hex(&fallback_tx)
97
                );
98
                let psbt = ctx.process_response(&response.bytes().await?).map_err(|e| {
×
99
                    log::debug!("Error processing response: {e:?}");
×
100
                    anyhow!("Failed to process response {e}")
×
101
                })?;
×
102

103
                self.process_pj_response(psbt)?;
104
                Ok(())
105
            }
106
            PjParam::V2(pj_param) => {
107
                // TODO: perhaps we should store pj uri in the session wrapper as to not replay the event log for each session
108
                let sender_state =
109
                    self.db.get_send_session_ids()?.into_iter().find_map(|session_id| {
1✔
110
                        let sender_persister =
1✔
111
                            SenderPersister::from_id(self.db.clone(), session_id).ok()?;
1✔
112
                        let (send_session, session_history) =
1✔
113
                            replay_sender_event_log(&sender_persister)
1✔
114
                                .map_err(|e| anyhow!("Failed to replay sender event log: {:?}", e))
1✔
115
                                .ok()?;
1✔
116

117
                        let pj_uri = session_history.pj_param().map(|pj_param| pj_param.endpoint());
1✔
118
                        let sender_state =
1✔
119
                            pj_uri.filter(|uri| uri == &pj_param.endpoint()).map(|_| send_session);
1✔
120
                        sender_state.map(|sender_state| (sender_state, sender_persister))
1✔
121
                    });
1✔
122

123
                let (sender_state, persister) = match sender_state {
124
                    Some((sender_state, persister)) => (sender_state, persister),
125
                    None => {
126
                        let persister = SenderPersister::new(self.db.clone())?;
127
                        let psbt = self.create_original_psbt(&address, amount, fee_rate)?;
128
                        let sender =
129
                            SenderBuilder::from_parts(psbt, pj_param, &address, Some(amount))
130
                                .build_recommended(fee_rate)?
131
                                .save(&persister)?;
132

133
                        (SendSession::WithReplyKey(sender), persister)
134
                    }
135
                };
136
                let mut interrupt = self.interrupt.clone();
137
                tokio::select! {
138
                    _ = self.process_sender_session(sender_state, &persister) => return Ok(()),
139
                    _ = interrupt.changed() => {
140
                        println!("Interrupted. Call `send` with the same arguments to resume this session or `resume` to resume all sessions.");
141
                        return Err(anyhow!("Interrupted"))
142
                    }
143
                }
144
            }
145
            _ => unimplemented!("Unrecognized payjoin version"),
146
        }
147
    }
3✔
148

149
    async fn receive_payjoin(&self, amount: Amount) -> Result<()> {
1✔
150
        let address = self.wallet().get_new_address()?;
151
        let ohttp_keys =
152
            unwrap_ohttp_keys_or_else_fetch(&self.config, None, self.relay_manager.clone())
153
                .await?
154
                .ohttp_keys;
155
        let persister = ReceiverPersister::new(self.db.clone())?;
156
        let session = Receiver::create_session(
157
            address,
158
            self.config.v2()?.pj_directory.clone(),
159
            ohttp_keys,
160
            None,
161
            Some(amount),
162
        )?
163
        .save(&persister)?;
164
        println!("Receive session established");
165
        let pj_uri = session.pj_uri();
166
        println!("Request Payjoin by sharing this Payjoin Uri:");
167
        println!("{}", pj_uri);
168

169
        self.process_receiver_session(ReceiveSession::Initialized(session.clone()), &persister)
170
            .await?;
171
        Ok(())
172
    }
1✔
173

174
    #[allow(clippy::incompatible_msrv)]
175
    async fn resume_payjoins(&self) -> Result<()> {
3✔
176
        let recv_session_ids = self.db.get_recv_session_ids()?;
177
        let send_session_ids = self.db.get_send_session_ids()?;
178

179
        if recv_session_ids.is_empty() && send_session_ids.is_empty() {
180
            println!("No sessions to resume.");
181
            return Ok(());
182
        }
183

184
        let mut tasks = Vec::new();
185

186
        for session_id in recv_session_ids {
187
            let self_clone = self.clone();
188
            let recv_persister = ReceiverPersister::from_id(self.db.clone(), session_id)?;
189
            let receiver_state = replay_receiver_event_log(&recv_persister)
190
                .map_err(|e| anyhow!("Failed to replay receiver event log: {:?}", e))?
×
191
                .0;
192
            tasks.push(tokio::spawn(async move {
1✔
193
                self_clone.process_receiver_session(receiver_state, &recv_persister).await
1✔
194
            }));
1✔
195
        }
196

197
        for session_id in send_session_ids {
198
            let sender_persiter = SenderPersister::from_id(self.db.clone(), session_id)?;
199
            let sender_state = replay_sender_event_log(&sender_persiter)
200
                .map_err(|e| anyhow!("Failed to replay sender event log: {:?}", e))?
×
201
                .0;
202
            let self_clone = self.clone();
203
            tasks.push(tokio::spawn(async move {
×
204
                self_clone.process_sender_session(sender_state, &sender_persiter).await
×
205
            }));
×
206
        }
207

208
        let mut interrupt = self.interrupt.clone();
209
        tokio::select! {
210
            _ = async {
1✔
211
                for task in tasks {
2✔
212
                    let _ = task.await;
1✔
213
                }
214
            } => {
1✔
215
                println!("All resumed sessions completed.");
216
            }
217
            _ = interrupt.changed() => {
218
                println!("Resumed sessions were interrupted.");
219
            }
220
        }
221
        Ok(())
222
    }
3✔
223
}
224

225
impl App {
226
    async fn process_sender_session(
2✔
227
        &self,
2✔
228
        session: SendSession,
2✔
229
        persister: &SenderPersister,
2✔
230
    ) -> Result<()> {
2✔
231
        match session {
2✔
232
            SendSession::WithReplyKey(context) =>
1✔
233
                self.post_original_proposal(context, persister).await?,
1✔
234
            SendSession::V2GetContext(context) =>
1✔
235
                self.get_proposed_payjoin_psbt(context, persister).await?,
1✔
236
            SendSession::ProposalReceived(proposal) => {
×
237
                self.process_pj_response(proposal)?;
×
238
                return Ok(());
×
239
            }
240
            _ => return Err(anyhow!("Unexpected sender state")),
×
241
        }
242
        Ok(())
1✔
243
    }
1✔
244

245
    async fn post_original_proposal(
1✔
246
        &self,
1✔
247
        sender: Sender<WithReplyKey>,
1✔
248
        persister: &SenderPersister,
1✔
249
    ) -> Result<()> {
1✔
250
        let (req, ctx) = sender.create_v2_post_request(
1✔
251
            self.unwrap_relay_or_else_fetch(Some(sender.endpoint().clone())).await?,
1✔
252
        )?;
×
253
        let response = self.post_request(req).await?;
1✔
254
        println!("Posted original proposal...");
1✔
255
        let sender = sender.process_response(&response.bytes().await?, ctx).save(persister)?;
1✔
256
        self.get_proposed_payjoin_psbt(sender, persister).await
1✔
257
    }
×
258

259
    async fn get_proposed_payjoin_psbt(
2✔
260
        &self,
2✔
261
        sender: Sender<V2GetContext>,
2✔
262
        persister: &SenderPersister,
2✔
263
    ) -> Result<()> {
2✔
264
        let mut session = sender.clone();
2✔
265
        // Long poll until we get a response
266
        loop {
267
            let (req, ctx) = session.create_poll_request(
3✔
268
                self.unwrap_relay_or_else_fetch(Some(session.endpoint().clone())).await?,
3✔
269
            )?;
×
270
            let response = self.post_request(req).await?;
3✔
271
            let res = session.process_response(&response.bytes().await?, ctx).save(persister);
2✔
272
            match res {
2✔
273
                Ok(OptionalTransitionOutcome::Progress(psbt)) => {
1✔
274
                    println!("Proposal received. Processing...");
1✔
275
                    self.process_pj_response(psbt)?;
1✔
276
                    return Ok(());
1✔
277
                }
278
                Ok(OptionalTransitionOutcome::Stasis(current_state)) => {
1✔
279
                    println!("No response yet.");
1✔
280
                    session = current_state;
1✔
281
                    continue;
1✔
282
                }
283
                Err(re) => {
×
284
                    println!("{re}");
×
285
                    log::debug!("{re:?}");
×
286
                    return Err(anyhow!("Response error").context(re));
×
287
                }
288
            }
289
        }
290
    }
1✔
291

292
    async fn long_poll_fallback(
2✔
293
        &self,
2✔
294
        session: Receiver<Initialized>,
2✔
295
        persister: &ReceiverPersister,
2✔
296
    ) -> Result<Receiver<UncheckedOriginalPsbt>> {
2✔
297
        let ohttp_relay = self
2✔
298
            .unwrap_relay_or_else_fetch(Some(session.pj_uri().extras.endpoint().clone()))
2✔
299
            .await?;
2✔
300

301
        let mut session = session;
2✔
302
        loop {
303
            let (req, context) = session.create_poll_request(&ohttp_relay)?;
2✔
304
            println!("Polling receive request...");
2✔
305
            let ohttp_response = self.post_request(req).await?;
2✔
306
            let state_transition = session
1✔
307
                .process_response(ohttp_response.bytes().await?.to_vec().as_slice(), context)
1✔
308
                .save(persister);
1✔
309
            match state_transition {
1✔
310
                Ok(OptionalTransitionOutcome::Progress(next_state)) => {
1✔
311
                    println!("Got a request from the sender. Responding with a Payjoin proposal.");
1✔
312
                    return Ok(next_state);
1✔
313
                }
314
                Ok(OptionalTransitionOutcome::Stasis(current_state)) => {
×
315
                    session = current_state;
×
316
                    continue;
×
317
                }
318
                Err(e) => return Err(e.into()),
×
319
            }
320
        }
321
    }
1✔
322

323
    async fn process_receiver_session(
2✔
324
        &self,
2✔
325
        session: ReceiveSession,
2✔
326
        persister: &ReceiverPersister,
2✔
327
    ) -> Result<()> {
2✔
328
        let res = {
2✔
329
            match session {
2✔
330
                ReceiveSession::Initialized(proposal) =>
2✔
331
                    self.read_from_directory(proposal, persister).await,
2✔
NEW
332
                ReceiveSession::UncheckedOriginalPsbt(proposal) =>
×
333
                    self.check_proposal(proposal, persister).await,
×
334
                ReceiveSession::MaybeInputsOwned(proposal) =>
×
335
                    self.check_inputs_not_owned(proposal, persister).await,
×
336
                ReceiveSession::MaybeInputsSeen(proposal) =>
×
337
                    self.check_no_inputs_seen_before(proposal, persister).await,
×
338
                ReceiveSession::OutputsUnknown(proposal) =>
×
339
                    self.identify_receiver_outputs(proposal, persister).await,
×
340
                ReceiveSession::WantsOutputs(proposal) =>
×
341
                    self.commit_outputs(proposal, persister).await,
×
342
                ReceiveSession::WantsInputs(proposal) =>
×
343
                    self.contribute_inputs(proposal, persister).await,
×
344
                ReceiveSession::WantsFeeRange(proposal) =>
×
345
                    self.apply_fee_range(proposal, persister).await,
×
346
                ReceiveSession::ProvisionalProposal(proposal) =>
×
347
                    self.finalize_proposal(proposal, persister).await,
×
348
                ReceiveSession::PayjoinProposal(proposal) =>
×
349
                    self.send_payjoin_proposal(proposal, persister).await,
×
350
                ReceiveSession::Uninitialized(_) =>
351
                    return Err(anyhow!("Uninitialized receiver session")),
×
352
                ReceiveSession::TerminalFailure =>
353
                    return Err(anyhow!("Terminal receiver session")),
×
354
            }
355
        };
356

357
        match res {
2✔
358
            Ok(_) => Ok(()),
1✔
359
            Err(e) => {
1✔
360
                let (_, session_history) = replay_receiver_event_log(persister)?;
1✔
361
                let pj_uri = match session_history.pj_uri() {
1✔
362
                    Some(uri) => Some(uri.extras.endpoint().clone()),
1✔
363
                    None => None,
×
364
                };
365
                let ohttp_relay = self.unwrap_relay_or_else_fetch(pj_uri).await?;
1✔
366
                self.handle_recoverable_error(&ohttp_relay, &session_history).await?;
1✔
367

368
                Err(e)
1✔
369
            }
370
        }
371
    }
2✔
372

373
    #[allow(clippy::incompatible_msrv)]
374
    async fn read_from_directory(
2✔
375
        &self,
2✔
376
        session: Receiver<Initialized>,
2✔
377
        persister: &ReceiverPersister,
2✔
378
    ) -> Result<()> {
2✔
379
        let mut interrupt = self.interrupt.clone();
2✔
380
        let receiver = tokio::select! {
2✔
381
            res = self.long_poll_fallback(session, persister) => res,
2✔
382
            _ = interrupt.changed() => {
2✔
383
                println!("Interrupted. Call the `resume` command to resume all sessions.");
1✔
384
                return Err(anyhow!("Interrupted"));
1✔
385
            }
386
        }?;
×
387
        self.check_proposal(receiver, persister).await
1✔
388
    }
2✔
389

390
    async fn check_proposal(
1✔
391
        &self,
1✔
392
        proposal: Receiver<UncheckedOriginalPsbt>,
1✔
393
        persister: &ReceiverPersister,
1✔
394
    ) -> Result<()> {
1✔
395
        let wallet = self.wallet();
1✔
396
        let proposal = proposal
1✔
397
            .check_broadcast_suitability(None, |tx| {
1✔
398
                wallet
1✔
399
                    .can_broadcast(tx)
1✔
400
                    .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
1✔
401
            })
1✔
402
            .save(persister)?;
1✔
403

404
        println!("Fallback transaction received. Consider broadcasting this to get paid if the Payjoin fails:");
1✔
405
        println!("{}", serialize_hex(&proposal.extract_tx_to_schedule_broadcast()));
1✔
406
        self.check_inputs_not_owned(proposal, persister).await
1✔
407
    }
1✔
408

409
    async fn check_inputs_not_owned(
1✔
410
        &self,
1✔
411
        proposal: Receiver<MaybeInputsOwned>,
1✔
412
        persister: &ReceiverPersister,
1✔
413
    ) -> Result<()> {
1✔
414
        let wallet = self.wallet();
1✔
415
        let proposal = proposal
1✔
416
            .check_inputs_not_owned(&mut |input| {
1✔
417
                wallet
1✔
418
                    .is_mine(input)
1✔
419
                    .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
1✔
420
            })
1✔
421
            .save(persister)?;
1✔
422
        self.check_no_inputs_seen_before(proposal, persister).await
1✔
423
    }
1✔
424

425
    async fn check_no_inputs_seen_before(
1✔
426
        &self,
1✔
427
        proposal: Receiver<MaybeInputsSeen>,
1✔
428
        persister: &ReceiverPersister,
1✔
429
    ) -> Result<()> {
1✔
430
        let proposal = proposal
1✔
431
            .check_no_inputs_seen_before(&mut |input| {
1✔
432
                Ok(self.db.insert_input_seen_before(*input)?)
1✔
433
            })
1✔
434
            .save(persister)?;
1✔
435
        self.identify_receiver_outputs(proposal, persister).await
1✔
436
    }
1✔
437

438
    async fn identify_receiver_outputs(
1✔
439
        &self,
1✔
440
        proposal: Receiver<OutputsUnknown>,
1✔
441
        persister: &ReceiverPersister,
1✔
442
    ) -> Result<()> {
1✔
443
        let wallet = self.wallet();
1✔
444
        let proposal = proposal
1✔
445
            .identify_receiver_outputs(&mut |output_script| {
2✔
446
                wallet
2✔
447
                    .is_mine(output_script)
2✔
448
                    .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
2✔
449
            })
2✔
450
            .save(persister)?;
1✔
451
        self.commit_outputs(proposal, persister).await
1✔
452
    }
1✔
453

454
    async fn commit_outputs(
1✔
455
        &self,
1✔
456
        proposal: Receiver<WantsOutputs>,
1✔
457
        persister: &ReceiverPersister,
1✔
458
    ) -> Result<()> {
1✔
459
        let proposal = proposal.commit_outputs().save(persister)?;
1✔
460
        self.contribute_inputs(proposal, persister).await
1✔
461
    }
1✔
462

463
    async fn contribute_inputs(
1✔
464
        &self,
1✔
465
        proposal: Receiver<WantsInputs>,
1✔
466
        persister: &ReceiverPersister,
1✔
467
    ) -> Result<()> {
1✔
468
        let wallet = self.wallet();
1✔
469
        let candidate_inputs = wallet.list_unspent()?;
1✔
470

471
        let selected_input = proposal.try_preserving_privacy(candidate_inputs)?;
1✔
472
        let proposal =
1✔
473
            proposal.contribute_inputs(vec![selected_input])?.commit_inputs().save(persister)?;
1✔
474
        self.apply_fee_range(proposal, persister).await
1✔
475
    }
1✔
476

477
    async fn apply_fee_range(
1✔
478
        &self,
1✔
479
        proposal: Receiver<WantsFeeRange>,
1✔
480
        persister: &ReceiverPersister,
1✔
481
    ) -> Result<()> {
1✔
482
        let proposal = proposal.apply_fee_range(None, self.config.max_fee_rate).save(persister)?;
1✔
483
        self.finalize_proposal(proposal, persister).await
1✔
484
    }
1✔
485

486
    async fn finalize_proposal(
1✔
487
        &self,
1✔
488
        proposal: Receiver<ProvisionalProposal>,
1✔
489
        persister: &ReceiverPersister,
1✔
490
    ) -> Result<()> {
1✔
491
        let wallet = self.wallet();
1✔
492
        let proposal = proposal
1✔
493
            .finalize_proposal(|psbt| {
1✔
494
                wallet
1✔
495
                    .process_psbt(psbt)
1✔
496
                    .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
1✔
497
            })
1✔
498
            .save(persister)?;
1✔
499
        self.send_payjoin_proposal(proposal, persister).await
1✔
500
    }
1✔
501

502
    async fn send_payjoin_proposal(
1✔
503
        &self,
1✔
504
        mut proposal: Receiver<PayjoinProposal>,
1✔
505
        persister: &ReceiverPersister,
1✔
506
    ) -> Result<()> {
1✔
507
        let (req, ohttp_ctx) = proposal
1✔
508
            .create_post_request(&self.unwrap_relay_or_else_fetch(None).await?)
1✔
509
            .map_err(|e| anyhow!("v2 req extraction failed {}", e))?;
1✔
510
        let res = self.post_request(req).await?;
1✔
511
        let payjoin_psbt = proposal.psbt().clone();
1✔
512
        proposal.process_response(&res.bytes().await?, ohttp_ctx).save(persister)?;
1✔
513
        println!(
1✔
514
            "Response successful. Watch mempool for successful Payjoin. TXID: {}",
1✔
515
            payjoin_psbt.extract_tx_unchecked_fee_rate().compute_txid()
1✔
516
        );
517
        Ok(())
1✔
518
    }
1✔
519

520
    async fn unwrap_relay_or_else_fetch(
8✔
521
        &self,
8✔
522
        directory: Option<payjoin::Url>,
8✔
523
    ) -> Result<payjoin::Url> {
8✔
524
        let selected_relay =
8✔
525
            self.relay_manager.lock().expect("Lock should not be poisoned").get_selected_relay();
8✔
526
        let ohttp_relay = match selected_relay {
8✔
527
            Some(relay) => relay,
3✔
528
            None =>
529
                unwrap_ohttp_keys_or_else_fetch(&self.config, directory, self.relay_manager.clone())
5✔
530
                    .await?
5✔
531
                    .relay_url,
532
        };
533
        Ok(ohttp_relay)
8✔
534
    }
8✔
535

536
    /// Handle request error by sending an error response over the directory
537
    async fn handle_recoverable_error(
1✔
538
        &self,
1✔
539
        ohttp_relay: &payjoin::Url,
1✔
540
        session_history: &SessionHistory,
1✔
541
    ) -> Result<()> {
1✔
542
        let e = match session_history.terminal_error() {
1✔
543
            Some((_, Some(e))) => e,
×
544
            _ => return Ok(()),
1✔
545
        };
546
        let (err_req, err_ctx) = session_history
×
547
            .extract_err_req(ohttp_relay)?
×
548
            .expect("If JsonReply is Some, then err_req and err_ctx should be Some");
×
549
        let to_return = anyhow!("Replied with error: {}", e.to_json().to_string());
×
550

551
        let err_response = match self.post_request(err_req).await {
×
552
            Ok(response) => response,
×
553
            Err(e) => return Err(anyhow!("Failed to post error request: {}", e)),
×
554
        };
555

556
        let err_bytes = match err_response.bytes().await {
×
557
            Ok(bytes) => bytes,
×
558
            Err(e) => return Err(anyhow!("Failed to get error response bytes: {}", e)),
×
559
        };
560

561
        if let Err(e) = process_err_res(&err_bytes, err_ctx) {
×
562
            return Err(anyhow!("Failed to process error response: {}", e));
×
563
        }
×
564

565
        Err(to_return)
×
566
    }
1✔
567

568
    async fn post_request(&self, req: payjoin::Request) -> Result<reqwest::Response> {
7✔
569
        let http = http_agent(&self.config)?;
7✔
570
        http.post(req.url)
7✔
571
            .header("Content-Type", req.content_type)
7✔
572
            .body(req.body)
7✔
573
            .send()
7✔
574
            .await
7✔
575
            .map_err(map_reqwest_err)
5✔
576
    }
5✔
577
}
578

579
fn map_reqwest_err(e: reqwest::Error) -> anyhow::Error {
×
580
    match e.status() {
×
581
        Some(status_code) => anyhow!("HTTP request failed: {} {}", status_code, e),
×
582
        None => anyhow!("No HTTP response: {}", e),
×
583
    }
584
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc