• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

qubit-ltd / rs-http / 5fb6d033-8551-4bbf-82b0-80b1bfd2cd99

28 Apr 2026 07:15AM UTC coverage: 99.694% (+5.2%) from 94.53%
5fb6d033-8551-4bbf-82b0-80b1bfd2cd99

push

circleci

Haixing-Hu
chore(ci): sync shared Rust CI scripts

3259 of 3269 relevant lines covered (99.69%)

64.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.69
/src/client/http_client.rs
1
/*******************************************************************************
2
 *
3
 *    Copyright (c) 2025 - 2026.
4
 *    Haixing Hu, Qubit Co. Ltd.
5
 *
6
 *    All rights reserved.
7
 *
8
 ******************************************************************************/
9
//! HTTP client: builds requests, applies defaults and interceptors, executes
10
//! them with optional retry, and exposes SSE helpers with reconnect.
11
//!
12
//! Single-shot execution is [`HttpClient::execute`] / [`HttpClient::execute_once`];
13
//! retry policy comes from [`crate::HttpClientOptions::retry`] unless overridden
14
//! per request.
15
//!
16
//! # Author
17
//!
18
//! Haixing Hu
19

20
use std::time::{Duration, Instant};
21

22
use qubit_retry::{
23
    AttemptFailure, AttemptFailureDecision, Retry, RetryContext, RetryError, RetryErrorReason,
24
};
25

26
use crate::sse::SseReconnectRunner;
27
use crate::{
28
    response::HttpResponseOptions,
29
    sse::{SseEventStream, SseReconnectOptions},
30
    AsyncHttpHeaderInjector, HttpClientOptions, HttpError, HttpHeaderInjector, HttpLogger,
31
    HttpRequest, HttpRequestBuilder, HttpRequestInterceptor, HttpRequestInterceptors, HttpResponse,
32
    HttpResponseInterceptor, HttpResponseInterceptors, HttpResponseMeta, HttpResult,
33
    HttpRetryOptions,
34
};
35

36
/// High-level HTTP client: default headers, injectors, interceptors, logging,
37
/// timeouts, and optional per-request retry.
38
///
39
/// [`Clone`] is shallow and cheap enough for typical use (including passing into
40
/// retry closures); cloning does not duplicate the underlying connection pool
41
/// beyond what [`reqwest::Client`] already shares.
42
#[derive(Clone)]
43
pub struct HttpClient {
44
    /// Pluggable low-level HTTP stack used to send requests (currently reqwest).
45
    pub(super) backend: reqwest::Client,
46
    /// Timeouts, proxy, logging, default headers, and related settings.
47
    pub(super) options: HttpClientOptions,
48
    /// Header injectors applied to every outgoing request after default
49
    /// headers.
50
    pub(super) injectors: Vec<HttpHeaderInjector>,
51
    /// Async header injectors applied after sync injectors and before request-level headers.
52
    pub(super) async_injectors: Vec<AsyncHttpHeaderInjector>,
53
    /// Request interceptors applied before request send for each attempt.
54
    request_interceptors: HttpRequestInterceptors,
55
    /// Response interceptors applied on successful responses before return.
56
    response_interceptors: HttpResponseInterceptors,
57
}
58

59
impl HttpClient {
60
    /// Wraps a built [`reqwest::Client`] with the given options and an empty
61
    /// injector list.
62
    ///
63
    /// # Parameters
64
    /// - `backend`: Configured low-level HTTP client used for I/O.
65
    /// - `options`: Client-wide timeouts, headers, proxy, logging, etc.
66
    ///
67
    /// # Returns
68
    /// A new [`HttpClient`] with no injectors until
69
    /// [`HttpClient::add_header_injector`] is called.
70
    pub(crate) fn new(backend: reqwest::Client, options: HttpClientOptions) -> Self {
198✔
71
        Self {
198✔
72
            backend,
198✔
73
            options,
198✔
74
            injectors: Vec::new(),
198✔
75
            async_injectors: Vec::new(),
198✔
76
            request_interceptors: HttpRequestInterceptors::new(),
198✔
77
            response_interceptors: HttpResponseInterceptors::new(),
198✔
78
        }
198✔
79
    }
198✔
80

81
    /// Returns a reference to the client-wide options (timeouts, proxy, logging,
82
    /// default headers, retry defaults, etc.).
83
    ///
84
    /// # Returns
85
    /// Immutable borrow of [`HttpClientOptions`]. Never `None`; always the
86
    /// options installed on this client.
87
    pub fn options(&self) -> &HttpClientOptions {
216✔
88
        &self.options
216✔
89
    }
216✔
90

91
    /// Appends a [`HttpHeaderInjector`] so its mutation function runs on every
92
    /// request. Mutates `self` in place.
93
    ///
94
    /// # Parameters
95
    /// - `injector`: Injector to append (order is preserved).
96
    pub fn add_header_injector(&mut self, injector: HttpHeaderInjector) {
9✔
97
        self.injectors.push(injector);
9✔
98
    }
9✔
99

100
    /// Appends an async header injector whose mutation runs after sync injectors.
101
    /// Mutates `self` in place.
102
    ///
103
    /// # Parameters
104
    /// - `injector`: Async injector to append (order is preserved).
105
    pub fn add_async_header_injector(&mut self, injector: AsyncHttpHeaderInjector) {
4✔
106
        self.async_injectors.push(injector);
4✔
107
    }
4✔
108

109
    /// Appends a request interceptor run before each send attempt (including
110
    /// each retry attempt). Mutates `self` in place.
111
    ///
112
    /// # Parameters
113
    /// - `interceptor`: Request interceptor to append (order is preserved).
114
    pub fn add_request_interceptor(&mut self, interceptor: HttpRequestInterceptor) {
17✔
115
        self.request_interceptors.push(interceptor);
17✔
116
    }
17✔
117

118
    /// Appends a response interceptor run only after a successful HTTP status
119
    /// (after the internal `execute_once` step) and before response body logging.
120
    /// Mutates `self` in place.
121
    ///
122
    /// # Parameters
123
    /// - `interceptor`: Response interceptor to append (order is preserved).
124
    pub fn add_response_interceptor(&mut self, interceptor: HttpResponseInterceptor) {
6✔
125
        self.response_interceptors.push(interceptor);
6✔
126
    }
6✔
127

128
    /// Validates and adds one client-level default header.
129
    ///
130
    /// The header is applied to every request before header injectors and
131
    /// request-level headers.
132
    ///
133
    /// # Parameters
134
    /// - `name`: Header name.
135
    /// - `value`: Header value.
136
    ///
137
    /// # Returns
138
    /// `Ok(self)` after the header is stored.
139
    ///
140
    /// # Errors
141
    /// Returns [`HttpError`] when the header name or value is invalid.
142
    pub fn add_header(&mut self, name: &str, value: &str) -> HttpResult<&mut Self> {
6✔
143
        self.options.add_header(name, value)?;
6✔
144
        Ok(self)
5✔
145
    }
6✔
146

147
    /// Validates and adds many client-level default headers atomically.
148
    ///
149
    /// If any input pair is invalid, no header from this batch is applied.
150
    ///
151
    /// # Parameters
152
    /// - `headers`: Iterator of `(name, value)` pairs.
153
    ///
154
    /// # Returns
155
    /// `Ok(self)` after all headers are stored.
156
    ///
157
    /// # Errors
158
    /// Returns [`HttpError`] when any name/value pair is invalid (nothing from
159
    /// this call is applied).
160
    pub fn add_headers(&mut self, headers: &[(&str, &str)]) -> HttpResult<&mut Self> {
2✔
161
        self.options.add_headers(headers)?;
2✔
162
        Ok(self)
1✔
163
    }
2✔
164

165
    /// Clears all synchronous header injectors. Mutates `self` in place.
166
    pub fn clear_header_injectors(&mut self) {
1✔
167
        self.injectors.clear();
1✔
168
    }
1✔
169

170
    /// Clears all async header injectors. Mutates `self` in place.
171
    pub fn clear_async_header_injectors(&mut self) {
1✔
172
        self.async_injectors.clear();
1✔
173
    }
1✔
174

175
    /// Clears all request interceptors. Mutates `self` in place.
176
    pub fn clear_request_interceptors(&mut self) {
1✔
177
        self.request_interceptors.clear();
1✔
178
    }
1✔
179

180
    /// Clears all response interceptors. Mutates `self` in place.
181
    pub fn clear_response_interceptors(&mut self) {
1✔
182
        self.response_interceptors.clear();
1✔
183
    }
1✔
184

185
    /// Starts building an [`HttpRequest`] with the given method and path
186
    /// (relative or absolute URL string).
187
    ///
188
    /// # Parameters
189
    /// - `method`: HTTP verb (GET, POST, …).
190
    /// - `path`: Path relative to [`HttpClientOptions::base_url`] or a full URL
191
    ///   string.
192
    ///
193
    /// # Returns
194
    /// A new [`HttpRequestBuilder`] borrowing this client for defaults; it is
195
    /// not sent until built and passed to [`HttpClient::execute`] (or related
196
    /// APIs).
197
    pub fn request(&self, method: http::Method, path: &str) -> HttpRequestBuilder {
193✔
198
        HttpRequestBuilder::new(method, path, self)
193✔
199
    }
193✔
200

201
    /// Returns a clone of the client-level default header map.
202
    ///
203
    /// Used when constructing a built [`HttpRequest`] so the snapshot reflects
204
    /// headers at build time.
205
    ///
206
    /// # Returns
207
    /// Owned [`http::HeaderMap`] copy of [`HttpClientOptions`] default headers.
208
    pub(crate) fn headers_snapshot(&self) -> http::HeaderMap {
193✔
209
        self.options.default_headers.clone()
193✔
210
    }
193✔
211

212
    /// Returns a clone of the registered synchronous header injectors list.
213
    ///
214
    /// # Returns
215
    /// New [`Vec`] with the same injectors and order as on this client.
216
    pub(crate) fn injectors_snapshot(&self) -> Vec<HttpHeaderInjector> {
193✔
217
        self.injectors.clone()
193✔
218
    }
193✔
219

220
    /// Returns a clone of the registered async header injectors list.
221
    ///
222
    /// # Returns
223
    /// New [`Vec`] with the same injectors and order as on this client.
224
    pub(crate) fn async_injectors_snapshot(&self) -> Vec<AsyncHttpHeaderInjector> {
193✔
225
        self.async_injectors.clone()
193✔
226
    }
193✔
227

228
    /// Sends the request and returns a unified [`HttpResponse`].
229
    ///
230
    /// Chooses retry vs single attempt from resolved [`HttpRetryOptions`] for
231
    /// this request. Performs network I/O and may await the internal
232
    /// `execute_once` path
233
    /// multiple times with backoff between attempts when retry is enabled.
234
    ///
235
    /// # Parameters
236
    /// - `request`: Built request (URL resolved against `base_url` if path is
237
    ///   not absolute).
238
    ///
239
    /// # Returns
240
    /// - `Ok(HttpResponse)` when the HTTP status is success
241
    ///   ([`http::StatusCode::is_success`]).
242
    /// - `Err(HttpError)` when any attempt fails for URL/header validation,
243
    ///   cancellation, interceptor failure, transport/timeout, non-success
244
    ///   status, or when the retry executor aborts or exceeds limits.
245
    pub async fn execute(&self, request: HttpRequest) -> HttpResult<HttpResponse> {
115✔
246
        let retry_options = self.options.retry.resolve(&request);
115✔
247
        if retry_options.should_retry(&request) {
115✔
248
            self.execute_with_retry(request, retry_options).await
21✔
249
        } else {
250
            self.execute_once(request).await
94✔
251
        }
252
    }
115✔
253

254
    /// Performs one non-retrying execution: pre-send cancellation check,
255
    /// request interceptors, resolve URL, merge headers, log the request, send
256
    /// with configured timeouts, map non-success status to an error, then
257
    /// response interceptors and response logging. The returned body is read
258
    /// lazily according to [`HttpResponse`].
259
    ///
260
    /// # Parameters
261
    /// - `request`: Built request to send (same fields as for
262
    ///   [`HttpClient::execute`]).
263
    ///
264
    /// # Returns
265
    /// - `Ok(HttpResponse)` on success status and after interceptors/logging
266
    ///   steps succeed.
267
    /// - `Err(HttpError)` from request/response interceptors, cancellation,
268
    ///   send/transport errors, status mapping, URL resolution for the response
269
    ///   wrapper, or response logging failures.
270
    ///
271
    /// # Side effects
272
    /// Network I/O, optional logging, and user-provided interceptor callbacks.
273
    pub(crate) async fn execute_once(&self, request: HttpRequest) -> HttpResult<HttpResponse> {
167✔
274
        let mut request = request;
167✔
275
        if let Some(error) = request.cancelled_error_if_needed("Request cancelled before sending") {
167✔
276
            return Err(error);
2✔
277
        }
165✔
278
        self.request_interceptors.apply(&mut request)?;
165✔
279
        let response = self
158✔
280
            .prepare_and_send_once(request, "Request cancelled before sending")
158✔
281
            .await?;
158✔
282
        let mut response = response
137✔
283
            .into_success_or_status_error("HTTP request failed")
137✔
284
            .await?;
137✔
285
        self.response_interceptors.apply(&mut response.meta)?;
99✔
286
        let logger = HttpLogger::new(&self.options);
98✔
287
        logger.log_response(&mut response).await?;
98✔
288
        Ok(response)
98✔
289
    }
167✔
290

291
    /// Single low-level send: cancellation check, request logging, one backend
292
    /// round-trip, then wraps the backend response as [`HttpResponse`].
293
    ///
294
    /// Does not run response interceptors or success-status enforcement; those
295
    /// happen in [`HttpClient::execute_once`] after this returns.
296
    ///
297
    /// # Parameters
298
    /// - `request`: Request to send (may be mutated for logging/send path).
299
    /// - `cancellation_message`: Message embedded if the request is already
300
    ///   cancelled when this runs.
301
    ///
302
    /// # Returns
303
    /// - `Ok(HttpResponse)` with lazy body and metadata.
304
    /// - `Err(HttpError)` if cancelled before send, send fails, or
305
    ///   [`HttpRequest::resolved_url`] fails when building the wrapper.
306
    ///
307
    /// # Side effects
308
    /// Async network I/O and request logging via [`HttpLogger`].
309
    async fn prepare_and_send_once(
159✔
310
        &self,
159✔
311
        request: HttpRequest,
159✔
312
        cancellation_message: &str,
159✔
313
    ) -> HttpResult<HttpResponse> {
159✔
314
        let mut request = request;
159✔
315
        if let Some(error) = request.cancelled_error_if_needed(cancellation_message) {
159✔
316
            return Err(error);
1✔
317
        }
158✔
318
        let logger = HttpLogger::new(&self.options);
158✔
319
        let backend_response = request.send_impl(&self.backend, &logger).await?;
158✔
320
        let meta = HttpResponseMeta::new(
137✔
321
            backend_response.status(),
137✔
322
            backend_response.headers().clone(),
137✔
323
            backend_response.url().clone(),
137✔
324
            request.method().clone(),
137✔
325
        );
326
        let response_options = HttpResponseOptions::new(
137✔
327
            self.options.error_response_preview_limit,
137✔
328
            self.options.sse_json_mode,
137✔
329
            self.options.sse_max_line_bytes,
137✔
330
            self.options.sse_max_frame_bytes,
137✔
331
            self.options.sse_done_marker_policy.clone(),
137✔
332
        );
333
        Ok(HttpResponse::from_backend(
137✔
334
            meta,
137✔
335
            backend_response,
137✔
336
            request.read_timeout(),
137✔
337
            request.cancellation_token().cloned(),
137✔
338
            request.resolved_url_with_query()?,
137✔
339
            response_options,
137✔
340
        ))
341
    }
159✔
342

343
    /// Runs [`HttpClient::execute_once`] under the given retry policy.
344
    ///
345
    /// Between attempts waits according to the resolved retry delay, optionally
346
    /// honoring `Retry-After` by extending the next sleep. Each attempt clones
347
    /// the request so request bodies can be rebuilt when supported.
348
    ///
349
    /// # Parameters
350
    /// - `request`: Built request passed to each [`HttpClient::execute_once`]
351
    ///   attempt (cloned per retry closure).
352
    /// - `options`: Effective retry options for this request (from resolution
353
    ///   in [`HttpClient::execute`]).
354
    ///
355
    /// # Returns
356
    /// - `Ok(HttpResponse)` when an attempt completes with success status.
357
    /// - `Err(HttpError)` from retry option validation, from any
358
    ///   [`HttpClient::execute_once`] failure that is non-retryable, or from
359
    ///   retry exhaustion/max-duration enforcement.
360
    ///
361
    /// # Side effects
362
    /// Multiple async HTTP attempts and optional sleeps.
363
    async fn execute_with_retry(
21✔
364
        &self,
21✔
365
        request: HttpRequest,
21✔
366
        options: HttpRetryOptions,
21✔
367
    ) -> HttpResult<HttpResponse> {
21✔
368
        let honor_retry_after = request.retry_override().should_honor_retry_after();
21✔
369
        let retry_options = options.to_executor_options()?;
21✔
370
        let started_at = Instant::now();
21✔
371

372
        let retry_policy_options = options.clone();
21✔
373
        let retry_delay_options = retry_options.clone();
21✔
374
        let retry_policy = match Retry::<HttpError>::builder()
21✔
375
            .options(retry_options)
21✔
376
            .retry_after_from_error(move |error| {
22✔
377
                honor_retry_after.then_some(error.retry_after).flatten()
22✔
378
            })
22✔
379
            .on_failure(
21✔
380
                move |failure: &AttemptFailure<HttpError>, context: &RetryContext| {
22✔
381
                    Self::retry_failure_decision(
22✔
382
                        failure,
22✔
383
                        context,
22✔
384
                        &retry_policy_options,
22✔
385
                        &retry_delay_options,
22✔
386
                    )
387
                },
22✔
388
            )
389
            .build()
21✔
390
        {
391
            Ok(retry_policy) => retry_policy,
21✔
392
            Err(error) => {
×
393
                return Err(HttpError::other(format!(
×
394
                    "Invalid HTTP retry executor: {error}"
×
395
                )))
×
396
            }
397
        };
398

399
        let cancellation_token = request.cancellation_token().cloned();
21✔
400
        let request_method = request.method().clone();
21✔
401
        let request_url = request.resolved_url_with_query().ok();
21✔
402
        let retry_request = request.clone();
21✔
403
        let retry_future = retry_policy.run_async(|| {
35✔
404
            let attempt_request = retry_request.clone();
35✔
405
            async move { self.execute_once(attempt_request).await }
35✔
406
        });
35✔
407

408
        let retry_result = if let Some(token) = cancellation_token.as_ref() {
21✔
409
            tokio::select! {
1✔
410
                _ = token.cancelled() => {
1✔
411
                    return Err(Self::retry_cancelled_error(
1✔
412
                        "HTTP retry cancelled while waiting before next attempt",
1✔
413
                        &request_method,
1✔
414
                        request_url.as_ref(),
1✔
415
                    ));
1✔
416
                }
417
                result = retry_future => result,
1✔
418
            }
419
        } else {
420
            retry_future.await
20✔
421
        };
422

423
        match retry_result {
20✔
424
            Ok(response) => Ok(response),
13✔
425
            Err(error) => Err(Self::map_retry_error(
7✔
426
                error,
7✔
427
                started_at,
7✔
428
                options.max_duration,
7✔
429
                options.max_attempts,
7✔
430
            )),
7✔
431
        }
432
    }
21✔
433

434
    /// Returns whether `error` is retryable under `options`.
435
    ///
436
    /// # Parameters
437
    /// - `error`: Error produced by a single HTTP attempt.
438
    /// - `options`: Effective retry options for the request.
439
    ///
440
    /// # Returns
441
    /// `true` if another attempt may be scheduled.
442
    fn is_retryable_error(error: &HttpError, options: &HttpRetryOptions) -> bool {
22✔
443
        if error.kind == crate::HttpErrorKind::Status {
22✔
444
            error
19✔
445
                .status
19✔
446
                .is_some_and(|status| options.is_retryable_status(status))
19✔
447
        } else {
448
            options.is_retryable_error_kind(error.kind)
3✔
449
        }
450
    }
22✔
451

452
    /// Computes the sleep before the next retry attempt.
453
    ///
454
    /// # Parameters
455
    /// - `base_delay`: Delay selected from retry policy and jitter.
456
    /// - `retry_after_hint`: Retry-After delay extracted by the retry policy.
457
    ///
458
    /// # Returns
459
    /// `base_delay`, or the larger `Retry-After` value when present.
460
    fn retry_sleep_delay(base_delay: Duration, retry_after_hint: Option<Duration>) -> Duration {
18✔
461
        retry_after_hint
18✔
462
            .map(|retry_after| retry_after.max(base_delay))
18✔
463
            .unwrap_or(base_delay)
18✔
464
    }
18✔
465

466
    /// Decides how HTTP retry should handle one failed attempt.
467
    ///
468
    /// # Parameters
469
    /// - `failure`: Failed attempt reported by `qubit-retry`.
470
    /// - `context`: Retry context for the failed attempt.
471
    /// - `policy_options`: HTTP retry allowlists and method policy.
472
    /// - `delay_options`: Retry executor options used to calculate base delay.
473
    ///
474
    /// # Returns
475
    /// Decision for `qubit-retry`: abort non-retryable HTTP errors, otherwise
476
    /// retry after the larger base delay / Retry-After hint. Non-HTTP runtime
477
    /// failures fall back to the retry executor default.
478
    fn retry_failure_decision(
23✔
479
        failure: &AttemptFailure<HttpError>,
23✔
480
        context: &RetryContext,
23✔
481
        policy_options: &HttpRetryOptions,
23✔
482
        delay_options: &qubit_retry::RetryOptions,
23✔
483
    ) -> AttemptFailureDecision {
23✔
484
        let Some(error) = failure.as_error() else {
23✔
485
            return AttemptFailureDecision::UseDefault;
1✔
486
        };
487
        if !Self::is_retryable_error(error, policy_options) {
22✔
488
            return AttemptFailureDecision::Abort;
4✔
489
        }
18✔
490

491
        let base_delay = delay_options.delay_for_attempt(context.attempt());
18✔
492
        let sleep_delay = Self::retry_sleep_delay(base_delay, context.retry_after_hint());
18✔
493
        AttemptFailureDecision::RetryAfter(sleep_delay)
18✔
494
    }
23✔
495

496
    /// Exposes retry failure decisions to coverage-only integration tests.
497
    ///
498
    /// # Parameters
499
    /// - `failure`: Failed attempt to inspect.
500
    /// - `context`: Retry context for the failed attempt.
501
    /// - `policy_options`: HTTP retry policy options.
502
    /// - `delay_options`: Retry executor options.
503
    ///
504
    /// # Returns
505
    /// The retry decision selected by [`Self::retry_failure_decision`].
506
    #[cfg(coverage)]
507
    #[doc(hidden)]
508
    pub(crate) fn coverage_retry_failure_decision(
1✔
509
        failure: &AttemptFailure<HttpError>,
1✔
510
        context: &RetryContext,
1✔
511
        policy_options: &HttpRetryOptions,
1✔
512
        delay_options: &qubit_retry::RetryOptions,
1✔
513
    ) -> AttemptFailureDecision {
1✔
514
        Self::retry_failure_decision(failure, context, policy_options, delay_options)
1✔
515
    }
1✔
516

517
    /// Adds retry-attempt exhaustion context to the last attempt error.
518
    ///
519
    /// # Parameters
520
    /// - `error`: Last retryable attempt error.
521
    /// - `attempts`: Number of attempts already made.
522
    /// - `max_attempts`: Configured maximum attempts.
523
    ///
524
    /// # Returns
525
    /// The same error with retry exhaustion details appended to its message.
526
    fn map_retry_attempts_exhausted(
1✔
527
        mut error: HttpError,
1✔
528
        attempts: u32,
1✔
529
        max_attempts: u32,
1✔
530
    ) -> HttpError {
1✔
531
        error.message = format!(
1✔
532
            "{} (retry attempts exhausted: {attempts}/{max_attempts})",
533
            error.message
534
        );
535
        error
1✔
536
    }
1✔
537

538
    /// Builds the error returned when retry policy stops early.
539
    ///
540
    /// # Parameters
541
    /// - `error`: Attempt error that the retry policy chose not to retry.
542
    /// - `attempts`: Number of attempts already made.
543
    /// - `started_at`: Start instant of the retry flow.
544
    ///
545
    /// # Returns
546
    /// [`HttpError::retry_aborted`] with the original [`HttpError`] chained as
547
    /// source for callers that need the underlying status or transport error.
548
    fn map_retry_aborted(error: HttpError, attempts: u32, started_at: Instant) -> HttpError {
4✔
549
        let elapsed = started_at.elapsed();
4✔
550
        let summary = error.message.clone();
4✔
551
        HttpError::retry_aborted(format!(
4✔
552
            "HTTP retry aborted after {attempts} attempt(s) in {elapsed:?}: {summary}"
553
        ))
554
        .with_source(error)
4✔
555
    }
4✔
556

557
    /// Builds the error when retry max-duration is exhausted.
558
    ///
559
    /// # Parameters
560
    /// - `started_at`: Start instant of the retry flow.
561
    /// - `max_duration`: Configured max-duration budget.
562
    /// - `last_error`: Last captured retryable attempt error, if any.
563
    ///
564
    /// # Returns
565
    /// Augments the last failure when present, otherwise a dedicated
566
    /// max-duration error with no underlying attempt error.
567
    fn map_retry_max_duration_exceeded(
4✔
568
        started_at: Instant,
4✔
569
        max_duration: Option<Duration>,
4✔
570
        last_error: Option<HttpError>,
4✔
571
    ) -> HttpError {
4✔
572
        let elapsed = started_at.elapsed();
4✔
573
        let max_duration_text = max_duration
4✔
574
            .map(|duration| format!("{duration:?}"))
4✔
575
            .unwrap_or_else(|| "unbounded".to_string());
4✔
576
        match last_error {
4✔
577
            Some(mut error) => {
1✔
578
                error.message = format!(
1✔
579
                    "{} (retry max duration exceeded: {elapsed:?}/{max_duration_text})",
580
                    error.message
581
                );
582
                error
1✔
583
            }
584
            None => HttpError::retry_max_elapsed_exceeded(format!(
3✔
585
                "HTTP retry max duration exceeded before a retryable error was captured: {elapsed:?}/{max_duration_text}"
586
            )),
587
        }
588
    }
4✔
589

590
    /// Maps a [`qubit_retry::RetryError`] into this crate's HTTP error model.
591
    ///
592
    /// # Parameters
593
    /// - `error`: Terminal retry error from `qubit-retry`.
594
    /// - `started_at`: Monotonic start instant of the HTTP retry flow.
595
    /// - `max_duration`: Optional HTTP total retry budget.
596
    /// - `max_attempts`: Configured maximum HTTP attempts.
597
    ///
598
    /// # Returns
599
    /// A rich [`HttpError`] preserving the last attempt error when available.
600
    fn map_retry_error(
12✔
601
        error: RetryError<HttpError>,
12✔
602
        started_at: Instant,
12✔
603
        max_duration: Option<Duration>,
12✔
604
        max_attempts: u32,
12✔
605
    ) -> HttpError {
12✔
606
        let attempts = error.attempts();
12✔
607
        let reason = error.reason();
12✔
608
        let (_, last_failure, _) = error.into_parts();
12✔
609
        let last_error = last_failure.and_then(AttemptFailure::into_error);
12✔
610

611
        match reason {
12✔
612
            RetryErrorReason::AttemptsExceeded => last_error
2✔
613
                .map(|error| Self::map_retry_attempts_exhausted(error, attempts, max_attempts))
2✔
614
                .unwrap_or_else(|| {
2✔
615
                    HttpError::retry_aborted(format!(
1✔
616
                        "HTTP retry attempts exhausted without a captured HTTP error: {attempts}/{max_attempts}"
617
                    ))
618
                }),
1✔
619
            RetryErrorReason::MaxOperationElapsedExceeded
620
            | RetryErrorReason::MaxTotalElapsedExceeded => {
621
                Self::map_retry_max_duration_exceeded(started_at, max_duration, last_error)
4✔
622
            }
623
            RetryErrorReason::Aborted => match last_error {
4✔
624
                Some(error) if error.kind == crate::HttpErrorKind::Cancelled => error,
4✔
625
                Some(error) => Self::map_retry_aborted(error, attempts, started_at),
4✔
626
                None => HttpError::retry_aborted(format!(
1✔
627
                    "HTTP retry aborted after {attempts} attempt(s) without a captured HTTP error"
628
                )),
629
            },
630
            RetryErrorReason::UnsupportedOperation | RetryErrorReason::WorkerStillRunning => {
631
                HttpError::other(format!(
1✔
632
                    "HTTP retry executor failed after {attempts} attempt(s): {reason:?}"
633
                ))
634
            }
635
        }
636
    }
12✔
637

638
    /// Exposes retry error mapping to coverage-only integration tests.
639
    ///
640
    /// # Parameters
641
    /// - `error`: Synthetic terminal retry error.
642
    /// - `started_at`: Monotonic retry-flow start instant.
643
    /// - `max_duration`: Optional HTTP max-duration budget.
644
    /// - `max_attempts`: Configured maximum attempts.
645
    ///
646
    /// # Returns
647
    /// HTTP error mapped by [`Self::map_retry_error`].
648
    #[cfg(coverage)]
649
    #[doc(hidden)]
650
    pub(crate) fn coverage_map_retry_error(
5✔
651
        error: RetryError<HttpError>,
5✔
652
        started_at: Instant,
5✔
653
        max_duration: Option<Duration>,
5✔
654
        max_attempts: u32,
5✔
655
    ) -> HttpError {
5✔
656
        Self::map_retry_error(error, started_at, max_duration, max_attempts)
5✔
657
    }
5✔
658

659
    /// Exercises the low-level pre-send cancellation check for coverage tests.
660
    ///
661
    /// # Returns
662
    /// Error kind returned before any network I/O starts.
663
    #[cfg(coverage)]
664
    #[doc(hidden)]
665
    pub(crate) async fn coverage_prepare_cancelled_error() -> crate::HttpErrorKind {
1✔
666
        let client = crate::HttpClientFactory::new()
1✔
667
            .create_default()
1✔
668
            .expect("coverage HTTP client should build");
1✔
669
        let token = tokio_util::sync::CancellationToken::new();
1✔
670
        token.cancel();
1✔
671
        let mut request = client
1✔
672
            .request(
1✔
673
                http::Method::GET,
1✔
674
                "https://example.com/cancelled-before-send",
1✔
675
            )
676
            .build();
1✔
677
        request.set_cancellation_token(token);
1✔
678

679
        client
1✔
680
            .prepare_and_send_once(request, "coverage request cancelled before send")
1✔
681
            .await
1✔
682
            .expect_err("pre-cancelled request should fail before send")
1✔
683
            .kind
684
    }
1✔
685

686
    /// Builds a cancellation error for retry wait cancellation.
687
    ///
688
    /// # Parameters
689
    /// - `message`: Human-readable cancellation reason.
690
    /// - `method`: Request method to attach.
691
    /// - `url`: Optional resolved request URL to attach.
692
    ///
693
    /// # Returns
694
    /// [`HttpErrorKind::Cancelled`](crate::HttpErrorKind::Cancelled) with request
695
    /// context.
696
    fn retry_cancelled_error(
1✔
697
        message: &str,
1✔
698
        method: &http::Method,
1✔
699
        url: Option<&url::Url>,
1✔
700
    ) -> HttpError {
1✔
701
        let mut error = HttpError::cancelled(message).with_method(method);
1✔
702
        if let Some(url) = url {
1✔
703
            error = error.with_url(url);
1✔
704
        }
1✔
705
        error
1✔
706
    }
1✔
707

708
    /// Opens an SSE stream and reconnects automatically on retryable stream
709
    /// failures.
710
    ///
711
    /// Reconnect behavior:
712
    /// - retryable transport/read failures trigger reconnects;
713
    /// - optional reconnect on clean EOF (`reconnect_on_eof`);
714
    /// - `Last-Event-ID` is set from the latest parsed SSE `id:` field;
715
    /// - optional use of SSE `retry:` as next reconnect delay.
716
    ///
717
    /// # Parameters
718
    /// - `request`: SSE request template reused on reconnect.
719
    /// - `options`: Reconnect limits and delay policy.
720
    ///
721
    /// # Returns
722
    /// SSE event stream yielding events from one or more reconnect sessions.
723
    ///
724
    /// # Errors
725
    /// Stream items are `Result`; `Err` covers per-item failures such as:
726
    /// - initial stream-open failures when not reconnectable or retries exhausted;
727
    /// - SSE protocol errors (non-reconnectable by default);
728
    /// - transport/read errors after reconnect budget is exhausted.
729
    ///
730
    /// # Side effects
731
    /// Performs repeated HTTP requests and reads on reconnect; may sleep between
732
    /// attempts according to reconnect options.
733
    pub fn execute_sse_with_reconnect(
19✔
734
        &self,
19✔
735
        request: HttpRequest,
19✔
736
        options: SseReconnectOptions,
19✔
737
    ) -> SseEventStream {
19✔
738
        SseReconnectRunner::new(self.clone(), request, options).run()
19✔
739
    }
19✔
740
}
741

742
impl std::fmt::Debug for HttpClient {
743
    /// Formats the client for debugging (exposes options and injectors; omits
744
    /// the backend client).
745
    ///
746
    /// # Parameters
747
    /// - `f`: Destination formatter.
748
    ///
749
    /// # Returns
750
    /// `fmt::Result` from writing the debug struct.
751
    ///
752
    /// # Errors
753
    /// Returns an error if formatting to `f` fails.
754
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1✔
755
        f.debug_struct("HttpClient")
1✔
756
            .field("options", &self.options)
1✔
757
            .field("injectors", &self.injectors)
1✔
758
            .field("async_injectors", &self.async_injectors)
1✔
759
            .field("request_interceptors", &self.request_interceptors)
1✔
760
            .field("response_interceptors", &self.response_interceptors)
1✔
761
            .finish_non_exhaustive()
1✔
762
    }
1✔
763
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc