• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

payjoin / rust-payjoin / 16302587447

15 Jul 2025 07:32PM UTC coverage: 85.824% (+0.06%) from 85.766%
16302587447

Pull #881

github

web-flow
Merge 9861a58b2 into ba5ac460f
Pull Request #881: Separate out fee application into own typestate

113 of 119 new or added lines in 6 files covered. (94.96%)

32 existing lines in 3 files now uncovered.

7713 of 8987 relevant lines covered (85.82%)

521.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.83
/payjoin-cli/src/app/v2/mod.rs
1
use std::sync::{Arc, Mutex};
2

3
use anyhow::{anyhow, Context, Result};
4
use payjoin::bitcoin::consensus::encode::serialize_hex;
5
use payjoin::bitcoin::{Amount, FeeRate};
6
use payjoin::persist::OptionalTransitionOutcome;
7
use payjoin::receive::v2::{
8
    process_err_res, replay_event_log as replay_receiver_event_log, Initialized, MaybeInputsOwned,
9
    MaybeInputsSeen, OutputsUnknown, PayjoinProposal, ProvisionalProposal, ReceiveSession,
10
    Receiver, SessionHistory, UncheckedProposal, WantsFeeRange, WantsInputs, WantsOutputs,
11
};
12
use payjoin::send::v2::{
13
    replay_event_log as replay_sender_event_log, SendSession, Sender, SenderBuilder, V2GetContext,
14
    WithReplyKey,
15
};
16
use payjoin::Uri;
17
use tokio::sync::watch;
18

19
use super::config::Config;
20
use super::wallet::BitcoindWallet;
21
use super::App as AppTrait;
22
use crate::app::v2::ohttp::{unwrap_ohttp_keys_or_else_fetch, RelayManager};
23
use crate::app::{handle_interrupt, http_agent};
24
use crate::db::v2::{ReceiverPersister, SenderPersister};
25
use crate::db::Database;
26

27
mod ohttp;
28

29
#[derive(Clone)]
30
pub(crate) struct App {
31
    config: Config,
32
    db: Arc<Database>,
33
    wallet: BitcoindWallet,
34
    interrupt: watch::Receiver<()>,
35
    relay_manager: Arc<Mutex<RelayManager>>,
36
}
37

38
#[async_trait::async_trait]
39
impl AppTrait for App {
40
    fn new(config: Config) -> Result<Self> {
6✔
41
        let db = Arc::new(Database::create(&config.db_path)?);
6✔
42
        let relay_manager = Arc::new(Mutex::new(RelayManager::new()));
6✔
43
        let (interrupt_tx, interrupt_rx) = watch::channel(());
6✔
44
        tokio::spawn(handle_interrupt(interrupt_tx));
6✔
45
        let wallet = BitcoindWallet::new(&config.bitcoind)?;
6✔
46
        let app = Self { config, db, wallet, interrupt: interrupt_rx, relay_manager };
6✔
47
        app.wallet()
6✔
48
            .network()
6✔
49
            .context("Failed to connect to bitcoind. Check config RPC connection.")?;
6✔
50
        Ok(app)
6✔
51
    }
6✔
52

53
    fn wallet(&self) -> BitcoindWallet { self.wallet.clone() }
16✔
54

55
    #[allow(clippy::incompatible_msrv)]
56
    async fn send_payjoin(&self, bip21: &str, fee_rate: FeeRate) -> Result<()> {
4✔
57
        use payjoin::UriExt;
58
        let uri =
2✔
59
            Uri::try_from(bip21).map_err(|e| anyhow!("Failed to create URI from BIP21: {}", e))?;
2✔
60
        let uri = uri.assume_checked();
2✔
61
        let uri = uri.check_pj_supported().map_err(|_| anyhow!("URI does not support Payjoin"))?;
2✔
62
        let url = uri.extras.endpoint();
2✔
63
        // TODO: perhaps we should store pj uri in the session wrapper as to not replay the event log for each session
64
        let sender_state = self.db.get_send_session_ids()?.into_iter().find_map(|session_id| {
2✔
65
            let sender_persister = SenderPersister::from_id(self.db.clone(), session_id).ok()?;
1✔
66
            let replay_results = replay_sender_event_log(&sender_persister)
1✔
67
                .map_err(|e| anyhow!("Failed to replay sender event log: {:?}", e))
1✔
68
                .ok()?;
1✔
69

70
            let pj_uri = replay_results.1.endpoint();
1✔
71
            let sender_state = pj_uri.filter(|uri| uri == &url).map(|_| replay_results.0);
1✔
72
            sender_state.map(|sender_state| (sender_state, sender_persister))
1✔
73
        });
1✔
74

75
        let (sender_state, persister) = match sender_state {
2✔
76
            Some((sender_state, persister)) => (sender_state, persister),
1✔
77
            None => {
78
                let persister = SenderPersister::new(self.db.clone())?;
1✔
79
                let psbt = self.create_original_psbt(&uri, fee_rate)?;
1✔
80
                let sender = SenderBuilder::new(psbt, uri.clone())
1✔
81
                    .build_recommended(fee_rate)
1✔
82
                    .save(&persister)?;
1✔
83

84
                (SendSession::WithReplyKey(sender), persister)
1✔
85
            }
86
        };
87
        let mut interrupt = self.interrupt.clone();
2✔
88
        tokio::select! {
2✔
89
            _ = self.process_sender_session(sender_state, &persister) => return Ok(()),
2✔
90
            _ = interrupt.changed() => {
2✔
91
                println!("Interrupted. Call `send` with the same arguments to resume this session or `resume` to resume all sessions.");
1✔
92
                return Err(anyhow!("Interrupted"))
1✔
93
            }
94
        }
95
    }
4✔
96

97
    async fn receive_payjoin(&self, amount: Amount) -> Result<()> {
2✔
98
        let address = self.wallet().get_new_address()?;
1✔
99
        let ohttp_keys =
1✔
100
            unwrap_ohttp_keys_or_else_fetch(&self.config, None, self.relay_manager.clone())
1✔
101
                .await?
1✔
102
                .ohttp_keys;
103
        let persister = ReceiverPersister::new(self.db.clone())?;
1✔
104
        let session = Receiver::create_session(
1✔
105
            address,
1✔
106
            self.config.v2()?.pj_directory.clone(),
1✔
107
            ohttp_keys,
1✔
108
            None,
1✔
109
        )
110
        .save(&persister)?;
1✔
111
        println!("Receive session established");
1✔
112
        let mut pj_uri = session.pj_uri();
1✔
113
        pj_uri.amount = Some(amount);
1✔
114
        println!("Request Payjoin by sharing this Payjoin Uri:");
1✔
115
        println!("{}", pj_uri);
1✔
116

117
        self.process_receiver_session(ReceiveSession::Initialized(session.clone()), &persister)
1✔
118
            .await?;
1✔
119
        Ok(())
×
120
    }
2✔
121

122
    #[allow(clippy::incompatible_msrv)]
123
    async fn resume_payjoins(&self) -> Result<()> {
6✔
124
        let recv_session_ids = self.db.get_recv_session_ids()?;
3✔
125
        let send_session_ids = self.db.get_send_session_ids()?;
3✔
126

127
        if recv_session_ids.is_empty() && send_session_ids.is_empty() {
3✔
128
            println!("No sessions to resume.");
2✔
129
            return Ok(());
2✔
130
        }
1✔
131

132
        let mut tasks = Vec::new();
1✔
133

134
        for session_id in recv_session_ids {
2✔
135
            let self_clone = self.clone();
1✔
136
            let recv_persister = ReceiverPersister::from_id(self.db.clone(), session_id)?;
1✔
137
            let receiver_state = replay_receiver_event_log(&recv_persister)
1✔
138
                .map_err(|e| anyhow!("Failed to replay receiver event log: {:?}", e))?
1✔
139
                .0;
140
            tasks.push(tokio::spawn(async move {
1✔
141
                self_clone.process_receiver_session(receiver_state, &recv_persister).await
1✔
142
            }));
1✔
143
        }
144

145
        for session_id in send_session_ids {
1✔
146
            let sender_persiter = SenderPersister::from_id(self.db.clone(), session_id)?;
×
147
            let sender_state = replay_sender_event_log(&sender_persiter)
×
148
                .map_err(|e| anyhow!("Failed to replay sender event log: {:?}", e))?
×
149
                .0;
150
            let self_clone = self.clone();
×
151
            tasks.push(tokio::spawn(async move {
×
152
                self_clone.process_sender_session(sender_state, &sender_persiter).await
×
153
            }));
×
154
        }
155

156
        let mut interrupt = self.interrupt.clone();
1✔
157
        tokio::select! {
1✔
158
            _ = async {
1✔
159
                for task in tasks {
2✔
160
                    let _ = task.await;
1✔
161
                }
162
            } => {
1✔
163
                println!("All resumed sessions completed.");
1✔
164
            }
1✔
165
            _ = interrupt.changed() => {
1✔
166
                println!("Resumed sessions were interrupted.");
×
167
            }
×
168
        }
169
        Ok(())
1✔
170
    }
6✔
171
}
172

173
impl App {
174
    async fn process_sender_session(
2✔
175
        &self,
2✔
176
        session: SendSession,
2✔
177
        persister: &SenderPersister,
2✔
178
    ) -> Result<()> {
2✔
179
        match session {
2✔
180
            SendSession::WithReplyKey(context) => {
1✔
181
                // TODO: can we handle the fall back case in `post_original_proposal`. That way we don't have to clone here
182
                match self.post_original_proposal(context.clone(), persister).await {
1✔
183
                    Ok(()) => (),
×
184
                    Err(_) => {
185
                        let (req, v1_ctx) = context.create_v1_post_request();
×
186
                        let response = post_request(req).await?;
×
187
                        let psbt = Arc::new(
×
188
                            v1_ctx.process_response(response.bytes().await?.to_vec().as_slice())?,
×
189
                        );
190
                        self.process_pj_response((*psbt).clone())?;
×
191
                    }
192
                }
193
                return Ok(());
×
194
            }
195
            SendSession::V2GetContext(context) =>
1✔
196
                self.get_proposed_payjoin_psbt(context, persister).await?,
1✔
197
            SendSession::ProposalReceived(proposal) => {
×
198
                self.process_pj_response(proposal)?;
×
199
                return Ok(());
×
200
            }
201
            _ => return Err(anyhow!("Unexpected sender state")),
×
202
        }
203
        Ok(())
1✔
204
    }
1✔
205

206
    async fn post_original_proposal(
1✔
207
        &self,
1✔
208
        sender: Sender<WithReplyKey>,
1✔
209
        persister: &SenderPersister,
1✔
210
    ) -> Result<()> {
1✔
211
        let (req, ctx) = sender.create_v2_post_request(
1✔
212
            self.unwrap_relay_or_else_fetch(Some(sender.endpoint().clone())).await?,
1✔
213
        )?;
×
214
        let response = post_request(req).await?;
1✔
215
        println!("Posted original proposal...");
1✔
216
        let sender = sender.process_response(&response.bytes().await?, ctx).save(persister)?;
1✔
217
        self.get_proposed_payjoin_psbt(sender, persister).await
1✔
218
    }
×
219

220
    async fn get_proposed_payjoin_psbt(
2✔
221
        &self,
2✔
222
        sender: Sender<V2GetContext>,
2✔
223
        persister: &SenderPersister,
2✔
224
    ) -> Result<()> {
2✔
225
        let mut session = sender.clone();
2✔
226
        // Long poll until we get a response
227
        loop {
228
            let (req, ctx) = session.create_poll_request(
3✔
229
                self.unwrap_relay_or_else_fetch(Some(session.endpoint().clone())).await?,
3✔
230
            )?;
×
231
            let response = post_request(req).await?;
3✔
232
            let res = session.process_response(&response.bytes().await?, ctx).save(persister);
2✔
233
            match res {
2✔
234
                Ok(OptionalTransitionOutcome::Progress(psbt)) => {
1✔
235
                    println!("Proposal received. Processing...");
1✔
236
                    self.process_pj_response(psbt)?;
1✔
237
                    return Ok(());
1✔
238
                }
239
                Ok(OptionalTransitionOutcome::Stasis(current_state)) => {
1✔
240
                    println!("No response yet.");
1✔
241
                    session = current_state;
1✔
242
                    continue;
1✔
243
                }
244
                Err(re) => {
×
245
                    println!("{re}");
×
246
                    log::debug!("{re:?}");
×
247
                    return Err(anyhow!("Response error").context(re));
×
248
                }
249
            }
250
        }
251
    }
1✔
252

253
    async fn long_poll_fallback(
2✔
254
        &self,
2✔
255
        session: Receiver<Initialized>,
2✔
256
        persister: &ReceiverPersister,
2✔
257
    ) -> Result<Receiver<UncheckedProposal>> {
2✔
258
        let ohttp_relay = self
2✔
259
            .unwrap_relay_or_else_fetch(Some(session.pj_uri().extras.endpoint().clone()))
2✔
260
            .await?;
2✔
261

262
        let mut session = session;
2✔
263
        loop {
264
            let (req, context) = session.create_poll_request(&ohttp_relay)?;
2✔
265
            println!("Polling receive request...");
2✔
266
            let ohttp_response = post_request(req).await?;
2✔
267
            let state_transition = session
1✔
268
                .process_response(ohttp_response.bytes().await?.to_vec().as_slice(), context)
1✔
269
                .save(persister);
1✔
270
            match state_transition {
1✔
271
                Ok(OptionalTransitionOutcome::Progress(next_state)) => {
1✔
272
                    println!("Got a request from the sender. Responding with a Payjoin proposal.");
1✔
273
                    return Ok(next_state);
1✔
274
                }
275
                Ok(OptionalTransitionOutcome::Stasis(current_state)) => {
×
276
                    session = current_state;
×
277
                    continue;
×
278
                }
279
                Err(e) => return Err(e.into()),
×
280
            }
281
        }
282
    }
1✔
283

284
    async fn process_receiver_session(
2✔
285
        &self,
2✔
286
        session: ReceiveSession,
2✔
287
        persister: &ReceiverPersister,
2✔
288
    ) -> Result<()> {
2✔
289
        let res = {
2✔
290
            match session {
2✔
291
                ReceiveSession::Initialized(proposal) =>
2✔
292
                    self.read_from_directory(proposal, persister).await,
2✔
293
                ReceiveSession::UncheckedProposal(proposal) =>
×
294
                    self.check_proposal(proposal, persister).await,
×
295
                ReceiveSession::MaybeInputsOwned(proposal) =>
×
296
                    self.check_inputs_not_owned(proposal, persister).await,
×
297
                ReceiveSession::MaybeInputsSeen(proposal) =>
×
298
                    self.check_no_inputs_seen_before(proposal, persister).await,
×
299
                ReceiveSession::OutputsUnknown(proposal) =>
×
300
                    self.identify_receiver_outputs(proposal, persister).await,
×
301
                ReceiveSession::WantsOutputs(proposal) =>
×
302
                    self.commit_outputs(proposal, persister).await,
×
303
                ReceiveSession::WantsInputs(proposal) =>
×
304
                    self.contribute_inputs(proposal, persister).await,
×
NEW
305
                ReceiveSession::WantsFeeRange(proposal) =>
×
NEW
306
                    self.commit_fee_range(proposal, persister).await,
×
307
                ReceiveSession::ProvisionalProposal(proposal) =>
×
308
                    self.finalize_proposal(proposal, persister).await,
×
309
                ReceiveSession::PayjoinProposal(proposal) =>
×
310
                    self.send_payjoin_proposal(proposal, persister).await,
×
311
                ReceiveSession::Uninitialized(_) =>
312
                    return Err(anyhow!("Uninitialized receiver session")),
×
313
                ReceiveSession::TerminalFailure =>
314
                    return Err(anyhow!("Terminal receiver session")),
×
315
            }
316
        };
317

318
        match res {
2✔
319
            Ok(_) => Ok(()),
1✔
320
            Err(e) => {
1✔
321
                let (_, session_history) = replay_receiver_event_log(persister)?;
1✔
322
                let pj_uri = match session_history.pj_uri() {
1✔
323
                    Some(uri) => Some(uri.extras.endpoint().clone()),
1✔
324
                    None => None,
×
325
                };
326
                let ohttp_relay = self.unwrap_relay_or_else_fetch(pj_uri).await?;
1✔
327
                handle_recoverable_error(&ohttp_relay, &session_history).await?;
1✔
328

329
                Err(e)
1✔
330
            }
331
        }
332
    }
2✔
333

334
    #[allow(clippy::incompatible_msrv)]
335
    async fn read_from_directory(
2✔
336
        &self,
2✔
337
        session: Receiver<Initialized>,
2✔
338
        persister: &ReceiverPersister,
2✔
339
    ) -> Result<()> {
2✔
340
        let mut interrupt = self.interrupt.clone();
2✔
341
        let receiver = tokio::select! {
2✔
342
            res = self.long_poll_fallback(session, persister) => res,
2✔
343
            _ = interrupt.changed() => {
2✔
344
                println!("Interrupted. Call the `resume` command to resume all sessions.");
1✔
345
                return Err(anyhow!("Interrupted"));
1✔
346
            }
347
        }?;
×
348
        self.check_proposal(receiver, persister).await
1✔
349
    }
2✔
350

351
    async fn check_proposal(
1✔
352
        &self,
1✔
353
        proposal: Receiver<UncheckedProposal>,
1✔
354
        persister: &ReceiverPersister,
1✔
355
    ) -> Result<()> {
1✔
356
        let wallet = self.wallet();
1✔
357
        let proposal = proposal
1✔
358
            .check_broadcast_suitability(None, |tx| Ok(wallet.can_broadcast(tx)?))
1✔
359
            .save(persister)?;
1✔
360

361
        println!("Fallback transaction received. Consider broadcasting this to get paid if the Payjoin fails:");
1✔
362
        println!("{}", serialize_hex(&proposal.extract_tx_to_schedule_broadcast()));
1✔
363
        self.check_inputs_not_owned(proposal, persister).await
1✔
364
    }
1✔
365

366
    async fn check_inputs_not_owned(
1✔
367
        &self,
1✔
368
        proposal: Receiver<MaybeInputsOwned>,
1✔
369
        persister: &ReceiverPersister,
1✔
370
    ) -> Result<()> {
1✔
371
        let wallet = self.wallet();
1✔
372
        let proposal =
1✔
373
            proposal.check_inputs_not_owned(|input| Ok(wallet.is_mine(input)?)).save(persister)?;
1✔
374
        self.check_no_inputs_seen_before(proposal, persister).await
1✔
375
    }
1✔
376

377
    async fn check_no_inputs_seen_before(
1✔
378
        &self,
1✔
379
        proposal: Receiver<MaybeInputsSeen>,
1✔
380
        persister: &ReceiverPersister,
1✔
381
    ) -> Result<()> {
1✔
382
        let proposal = proposal
1✔
383
            .check_no_inputs_seen_before(|input| Ok(self.db.insert_input_seen_before(*input)?))
1✔
384
            .save(persister)?;
1✔
385
        self.identify_receiver_outputs(proposal, persister).await
1✔
386
    }
1✔
387

388
    async fn identify_receiver_outputs(
1✔
389
        &self,
1✔
390
        proposal: Receiver<OutputsUnknown>,
1✔
391
        persister: &ReceiverPersister,
1✔
392
    ) -> Result<()> {
1✔
393
        let wallet = self.wallet();
1✔
394
        let proposal = proposal
1✔
395
            .identify_receiver_outputs(|output_script| Ok(wallet.is_mine(output_script)?))
2✔
396
            .save(persister)?;
1✔
397
        self.commit_outputs(proposal, persister).await
1✔
398
    }
1✔
399

400
    async fn commit_outputs(
1✔
401
        &self,
1✔
402
        proposal: Receiver<WantsOutputs>,
1✔
403
        persister: &ReceiverPersister,
1✔
404
    ) -> Result<()> {
1✔
405
        let proposal = proposal.commit_outputs().save(persister)?;
1✔
406
        self.contribute_inputs(proposal, persister).await
1✔
407
    }
1✔
408

409
    async fn contribute_inputs(
1✔
410
        &self,
1✔
411
        proposal: Receiver<WantsInputs>,
1✔
412
        persister: &ReceiverPersister,
1✔
413
    ) -> Result<()> {
1✔
414
        let wallet = self.wallet();
1✔
415
        let candidate_inputs = wallet.list_unspent()?;
1✔
416

417
        let selected_input = proposal.try_preserving_privacy(candidate_inputs)?;
1✔
418
        let proposal =
1✔
419
            proposal.contribute_inputs(vec![selected_input])?.commit_inputs().save(persister)?;
1✔
420
        self.commit_fee_range(proposal, persister).await
1✔
421
    }
1✔
422

423
    async fn commit_fee_range(
1✔
424
        &self,
1✔
425
        proposal: Receiver<WantsFeeRange>,
1✔
426
        persister: &ReceiverPersister,
1✔
427
    ) -> Result<()> {
1✔
428
        let proposal = proposal.commit_fee_range(None, self.config.max_fee_rate).save(persister)?;
1✔
429
        self.finalize_proposal(proposal, persister).await
1✔
430
    }
1✔
431

432
    async fn finalize_proposal(
1✔
433
        &self,
1✔
434
        proposal: Receiver<ProvisionalProposal>,
1✔
435
        persister: &ReceiverPersister,
1✔
436
    ) -> Result<()> {
1✔
437
        let wallet = self.wallet();
1✔
438
        let proposal =
1✔
439
            proposal.finalize_proposal(|psbt| Ok(wallet.process_psbt(psbt)?)).save(persister)?;
1✔
440
        self.send_payjoin_proposal(proposal, persister).await
1✔
441
    }
1✔
442

443
    async fn send_payjoin_proposal(
1✔
444
        &self,
1✔
445
        mut proposal: Receiver<PayjoinProposal>,
1✔
446
        persister: &ReceiverPersister,
1✔
447
    ) -> Result<()> {
1✔
448
        let (req, ohttp_ctx) = proposal
1✔
449
            .create_post_request(&self.unwrap_relay_or_else_fetch(None).await?)
1✔
450
            .map_err(|e| anyhow!("v2 req extraction failed {}", e))?;
1✔
451
        let res = post_request(req).await?;
1✔
452
        let payjoin_psbt = proposal.psbt().clone();
1✔
453
        proposal.process_response(&res.bytes().await?, ohttp_ctx).save(persister)?;
1✔
454
        println!(
1✔
455
            "Response successful. Watch mempool for successful Payjoin. TXID: {}",
1✔
456
            payjoin_psbt.extract_tx_unchecked_fee_rate().compute_txid()
1✔
457
        );
458
        Ok(())
1✔
459
    }
1✔
460

461
    async fn unwrap_relay_or_else_fetch(
8✔
462
        &self,
8✔
463
        directory: Option<payjoin::Url>,
8✔
464
    ) -> Result<payjoin::Url> {
8✔
465
        let selected_relay =
8✔
466
            self.relay_manager.lock().expect("Lock should not be poisoned").get_selected_relay();
8✔
467
        let ohttp_relay = match selected_relay {
8✔
468
            Some(relay) => relay,
3✔
469
            None =>
470
                unwrap_ohttp_keys_or_else_fetch(&self.config, directory, self.relay_manager.clone())
5✔
471
                    .await?
5✔
472
                    .relay_url,
473
        };
474
        Ok(ohttp_relay)
8✔
475
    }
8✔
476
}
477

478
/// Handle request error by sending an error response over the directory
479
async fn handle_recoverable_error(
1✔
480
    ohttp_relay: &payjoin::Url,
1✔
481
    session_history: &SessionHistory,
1✔
482
) -> Result<()> {
1✔
483
    let e = match session_history.terminal_error() {
1✔
484
        Some((_, Some(e))) => e,
×
485
        _ => return Ok(()),
1✔
486
    };
487
    let (err_req, err_ctx) = session_history
×
488
        .extract_err_req(ohttp_relay)?
×
489
        .expect("If JsonReply is Some, then err_req and err_ctx should be Some");
×
490
    let to_return = anyhow!("Replied with error: {}", e.to_json().to_string());
×
491

492
    let err_response = match post_request(err_req).await {
×
493
        Ok(response) => response,
×
494
        Err(e) => return Err(anyhow!("Failed to post error request: {}", e)),
×
495
    };
496

497
    let err_bytes = match err_response.bytes().await {
×
498
        Ok(bytes) => bytes,
×
499
        Err(e) => return Err(anyhow!("Failed to get error response bytes: {}", e)),
×
500
    };
501

502
    if let Err(e) = process_err_res(&err_bytes, err_ctx) {
×
503
        return Err(anyhow!("Failed to process error response: {}", e));
×
504
    }
×
505

506
    Err(to_return)
×
507
}
1✔
508

509
async fn post_request(req: payjoin::Request) -> Result<reqwest::Response> {
7✔
510
    let http = http_agent()?;
7✔
511
    http.post(req.url)
7✔
512
        .header("Content-Type", req.content_type)
7✔
513
        .body(req.body)
7✔
514
        .send()
7✔
515
        .await
7✔
516
        .map_err(map_reqwest_err)
5✔
517
}
5✔
518

519
fn map_reqwest_err(e: reqwest::Error) -> anyhow::Error {
×
520
    match e.status() {
×
521
        Some(status_code) => anyhow!("HTTP request failed: {} {}", status_code, e),
×
522
        None => anyhow!("No HTTP response: {}", e),
×
523
    }
524
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc