• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

qubit-ltd / rs-http / a568d438-eff9-4360-ac8e-77b909cf0b39

14 Apr 2026 06:45AM UTC coverage: 98.121% (+3.0%) from 95.088%
a568d438-eff9-4360-ac8e-77b909cf0b39

push

circleci

Haixing-Hu
feat(sse): 可配置 SSE JSON 模式与行帧上限并贯通流式响应

- HttpClientOptions 增加 sse_json_mode、sse_max_line_bytes、sse_max_frame_bytes
- HttpStreamResponse 携带默认 SSE 解码参数;SseEvent 增加 decode_json_with_mode
- 更新中英文 README 与相关单测/集成测

106 of 109 new or added lines in 4 files covered. (97.25%)

21 existing lines in 4 files now uncovered.

1775 of 1809 relevant lines covered (98.12%)

38.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.35
/src/http_client.rs
1
/*******************************************************************************
2
 *
3
 *    Copyright (c) 2025 - 2026.
4
 *    Haixing Hu, Qubit Co. Ltd.
5
 *
6
 *    All rights reserved.
7
 *
8
 ******************************************************************************/
9
//! # HTTP Client
10
//!
11
//! Implements request execution and stream execution with unified behavior.
12
//!
13
//! # Author
14
//!
15
//! Haixing Hu
16

17
use std::time::Duration;
18

19
use async_stream::stream;
20
use bytes::Bytes;
21
use futures_util::StreamExt;
22
use http::header::RETRY_AFTER;
23
use http::{HeaderMap, StatusCode};
24
use qubit_function::MutatingFunction;
25
use qubit_retry::{
26
    AttemptFailure, Jitter, RetryDecision, RetryError, RetryExecutor, RetryOptions, RetryResult,
27
};
28
use reqwest::Response;
29
use tokio_util::sync::CancellationToken;
30
use url::Host;
31
use url::Url;
32

33
use crate::{
34
    AsyncHeaderInjector, HeaderInjector, HttpClientOptions, HttpError, HttpErrorKind, HttpLogger,
35
    HttpRequest, HttpRequestBody, HttpRequestBuilder, HttpResponse, HttpResult, HttpRetryOptions,
36
    HttpStreamResponse, RetryHint,
37
};
38

39
/// High-level HTTP client that applies options, header injection, logging, and
40
/// timeouts.
41
#[derive(Clone)]
42
pub struct HttpClient {
43
    /// Low-level HTTP client used to send requests.
44
    client: reqwest::Client,
45
    /// Timeouts, proxy, logging, default headers, and related settings.
46
    options: HttpClientOptions,
47
    /// Header injectors applied to every outgoing request after default
48
    /// headers.
49
    injectors: Vec<HeaderInjector>,
50
    /// Async header injectors applied after sync injectors and before request-level headers.
51
    async_injectors: Vec<AsyncHeaderInjector>,
52
}
53

54
impl std::fmt::Debug for HttpClient {
55
    /// Formats the client for debugging (exposes options and injectors; omits
56
    /// the reqwest client).
57
    ///
58
    /// # Parameters
59
    /// - `f`: Destination formatter.
60
    ///
61
    /// # Returns
62
    /// `fmt::Result` from writing the debug struct.
63
    ///
64
    /// # Errors
65
    /// Returns an error if formatting to `f` fails.
66
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1✔
67
        f.debug_struct("HttpClient")
1✔
68
            .field("options", &self.options)
1✔
69
            .field("injectors", &self.injectors)
1✔
70
            .field("async_injectors", &self.async_injectors)
1✔
71
            .finish_non_exhaustive()
1✔
72
    }
1✔
73
}
74

75
impl HttpClient {
76
    /// Wraps a built [`reqwest::Client`] with the given options and an empty
77
    /// injector list.
78
    ///
79
    /// # Parameters
80
    /// - `client`: Configured reqwest client used for I/O.
81
    /// - `options`: Client-wide timeouts, headers, proxy, logging, etc.
82
    ///
83
    /// # Returns
84
    /// A new [`HttpClient`] with no injectors until
85
    /// [`HttpClient::add_header_injector`] is called.
86
    pub(crate) fn new(client: reqwest::Client, options: HttpClientOptions) -> Self {
79✔
87
        Self {
79✔
88
            client,
79✔
89
            options,
79✔
90
            injectors: Vec::new(),
79✔
91
            async_injectors: Vec::new(),
79✔
92
        }
79✔
93
    }
79✔
94

95
    /// Returns a reference to the client options (timeouts, proxy, logging,
96
    /// etc.).
97
    ///
98
    /// # Returns
99
    /// Immutable borrow of [`HttpClientOptions`].
100
    pub fn options(&self) -> &HttpClientOptions {
13✔
101
        &self.options
13✔
102
    }
13✔
103

104
    /// Appends a [`HeaderInjector`] so its mutation function runs on every
105
    /// request.
106
    ///
107
    /// # Parameters
108
    /// - `injector`: Injector to append (order is preserved).
109
    ///
110
    /// # Returns
111
    /// Nothing.
112
    pub fn add_header_injector(&mut self, injector: HeaderInjector) {
9✔
113
        self.injectors.push(injector);
9✔
114
    }
9✔
115

116
    /// Appends an async header injector whose mutation runs after sync injectors.
117
    ///
118
    /// # Parameters
119
    /// - `injector`: Async injector to append (order is preserved).
120
    ///
121
    /// # Returns
122
    /// Nothing.
123
    pub fn add_async_header_injector(&mut self, injector: AsyncHeaderInjector) {
3✔
124
        self.async_injectors.push(injector);
3✔
125
    }
3✔
126

127
    /// Validates and adds one client-level default header.
128
    ///
129
    /// The header is applied to every request before header injectors and
130
    /// request-level headers.
131
    ///
132
    /// # Parameters
133
    /// - `name`: Header name.
134
    /// - `value`: Header value.
135
    ///
136
    /// # Returns
137
    /// `Ok(self)` after the header is stored.
138
    ///
139
    /// # Errors
140
    /// Returns [`HttpError`] when the header name or value is invalid.
141
    pub fn add_header(&mut self, name: &str, value: &str) -> HttpResult<&mut Self> {
6✔
142
        self.options.add_header(name, value)?;
6✔
143
        Ok(self)
5✔
144
    }
6✔
145

146
    /// Validates and adds many client-level default headers atomically.
147
    ///
148
    /// If any input pair is invalid, no header from this batch is applied.
149
    ///
150
    /// # Parameters
151
    /// - `headers`: Iterator of `(name, value)` pairs.
152
    ///
153
    /// # Returns
154
    /// `Ok(self)` after all headers are stored.
155
    ///
156
    /// # Errors
157
    /// Returns [`HttpError`] when any name/value pair is invalid (nothing from
158
    /// this call is applied).
159
    pub fn add_headers<'a, I>(&mut self, headers: I) -> HttpResult<&mut Self>
2✔
160
    where
2✔
161
        I: IntoIterator<Item = (&'a str, &'a str)>,
2✔
162
    {
163
        self.options.add_headers(headers)?;
2✔
164
        Ok(self)
1✔
165
    }
2✔
166

167
    /// Removes all registered header injectors.
168
    ///
169
    /// # Returns
170
    /// Nothing.
171
    pub fn clear_header_injectors(&mut self) {
1✔
172
        self.injectors.clear();
1✔
173
    }
1✔
174

175
    /// Removes all registered async header injectors.
176
    ///
177
    /// # Returns
178
    /// Nothing.
179
    pub fn clear_async_header_injectors(&mut self) {
1✔
180
        self.async_injectors.clear();
1✔
181
    }
1✔
182

183
    /// Starts building an [`HttpRequest`] with the given method and path
184
    /// (relative or absolute URL string).
185
    ///
186
    /// # Parameters
187
    /// - `method`: HTTP verb (GET, POST, …).
188
    /// - `path`: Path relative to [`HttpClientOptions::base_url`] or a full URL
189
    ///   string.
190
    ///
191
    /// # Returns
192
    /// A fresh [`HttpRequestBuilder`] not yet tied to this client until
193
    /// [`HttpRequestBuilder::build`] and [`HttpClient::execute`].
194
    pub fn request(&self, method: http::Method, path: &str) -> HttpRequestBuilder {
69✔
195
        HttpRequestBuilder::new(method, path)
69✔
196
    }
69✔
197

198
    /// Sends the request, reads the full response body, logs per options, and
199
    /// returns a buffered [`HttpResponse`].
200
    ///
201
    /// # Parameters
202
    /// - `request`: Built request (URL resolved against `base_url` if path is
203
    ///   not absolute).
204
    ///
205
    /// # Returns
206
    /// - `Ok(HttpResponse)` when the HTTP status is success
207
    ///   ([`http::StatusCode::is_success`]).
208
    /// - `Err(HttpError)` on URL/header errors, transport failure, timeout, or
209
    ///   non-success status.
210
    pub async fn execute(&self, request: HttpRequest) -> HttpResult<HttpResponse> {
57✔
211
        let retry_options = self.resolve_retry_options(&request);
57✔
212
        let honor_retry_after = request.retry_override.should_honor_retry_after();
57✔
213
        if !self.should_retry_request(&request, &retry_options) {
57✔
214
            return self.execute_once(request).await;
47✔
215
        }
10✔
216
        self.execute_with_retry(request, retry_options, honor_retry_after)
10✔
217
            .await
10✔
218
    }
57✔
219

220
    /// Performs one non-retrying execution: resolve URL, merge headers, log the
221
    /// request, send with write timeout, reject non-success status, read the
222
    /// full body with read timeout, then log the response.
223
    ///
224
    /// # Parameters
225
    /// - `request`: Built request to send (same fields as for
226
    ///   [`HttpClient::execute`]).
227
    ///
228
    /// # Returns
229
    /// Buffered [`HttpResponse`] or [`HttpError`].
230
    async fn execute_once(&self, request: HttpRequest) -> HttpResult<HttpResponse> {
64✔
231
        let url = self.resolve_url(&request)?;
64✔
232
        let method = request.method.clone();
60✔
233
        let cancellation_token = request.cancellation_token.clone();
60✔
234
        if let Some(error) = cancelled_request_error_if_needed(
60✔
235
            cancellation_token.as_ref(),
60✔
236
            &method,
60✔
237
            &url,
60✔
238
            "Request cancelled before sending",
60✔
239
        ) {
60✔
240
            return Err(error);
1✔
241
        }
59✔
242
        let headers = self.build_headers(&request).await?;
59✔
243

244
        let body_for_log = clone_request_body_for_log(&request.body);
57✔
245

246
        let logger = HttpLogger::new(&self.options.logging, &self.options.sensitive_headers);
57✔
247
        logger.log_request(&method, &url, &headers, body_for_log.as_ref());
57✔
248

249
        let mut builder = self.client.request(method.clone(), url.clone());
57✔
250
        builder = builder.headers(headers);
57✔
251

252
        if !request.query.is_empty() {
57✔
253
            builder = builder.query(&request.query);
4✔
254
        }
53✔
255

256
        if let Some(timeout) = request.request_timeout {
57✔
257
            builder = builder.timeout(timeout);
5✔
258
        }
52✔
259

260
        builder = apply_request_body(builder, request.body);
57✔
261

262
        let response = self
57✔
263
            .send_with_write_timeout(
57✔
264
                builder,
57✔
265
                method.clone(),
57✔
266
                url.clone(),
57✔
267
                cancellation_token.as_ref(),
57✔
268
            )
57✔
269
            .await?;
57✔
270

271
        if !response.status().is_success() {
49✔
272
            let status = response.status();
18✔
273
            let retry_after = parse_retry_after(status, response.headers());
18✔
274
            let error = response.error_for_status_ref().expect_err(
18✔
275
                "non-success HTTP status must produce reqwest status error via error_for_status_ref",
18✔
276
            );
277
            let body_preview = self.read_error_response_preview(response).await;
18✔
278
            let message = format!(
18✔
279
                "HTTP request failed with status {} for {} {}; response body preview: {}",
280
                status, method, url, body_preview
281
            );
282
            let mut mapped = map_reqwest_error(
18✔
283
                error,
18✔
284
                HttpErrorKind::Status,
18✔
285
                Some(method.clone()),
18✔
286
                Some(url.clone()),
18✔
287
            )
288
            .with_status(status)
18✔
289
            .with_response_body_preview(body_preview);
18✔
290
            if let Some(retry_after) = retry_after {
18✔
291
                mapped = mapped.with_retry_after(retry_after);
2✔
292
            }
16✔
293
            mapped.message = message;
18✔
294
            return Err(mapped);
18✔
295
        }
31✔
296

297
        let status = response.status();
31✔
298
        let response_url = response.url().clone();
31✔
299
        let response_headers = response.headers().clone();
31✔
300

301
        let body = self
31✔
302
            .read_body_with_timeout(
31✔
303
                response,
31✔
304
                method.clone(),
31✔
305
                response_url.clone(),
31✔
306
                cancellation_token.as_ref(),
31✔
307
            )
31✔
308
            .await?;
31✔
309

310
        logger.log_response(status, &response_url, &response_headers, &body);
28✔
311

312
        Ok(HttpResponse::new(
28✔
313
            status,
28✔
314
            response_headers,
28✔
315
            body,
28✔
316
            response_url,
28✔
317
        ))
28✔
318
    }
64✔
319

320
    /// Sends the request and returns headers plus a byte stream without
321
    /// buffering the full body.
322
    ///
323
    /// # Parameters
324
    /// - `request`: Same as [`HttpClient::execute`].
325
    ///
326
    /// # Returns
327
    /// - `Ok(HttpStreamResponse)` with a stream that applies read timeout per
328
    ///   options.
329
    /// - `Err(HttpError)` before the stream starts (same cases as
330
    ///   [`HttpClient::execute`] for the initial response).
331
    pub async fn execute_stream(&self, request: HttpRequest) -> HttpResult<HttpStreamResponse> {
13✔
332
        let retry_options = self.resolve_retry_options(&request);
13✔
333
        let honor_retry_after = request.retry_override.should_honor_retry_after();
13✔
334
        if !self.should_retry_request(&request, &retry_options) {
13✔
335
            return self.execute_stream_once(request).await;
11✔
336
        }
2✔
337
        self.execute_stream_with_retry(request, retry_options, honor_retry_after)
2✔
338
            .await
2✔
339
    }
13✔
340

341
    /// Performs one non-retrying streaming execution: same setup as
342
    /// [`HttpClient::execute_once`], but on success wraps the body in a stream
343
    /// with per-chunk read timeouts instead of buffering the full body.
344
    ///
345
    /// # Parameters
346
    /// - `request`: Built request to send (same fields as for
347
    ///   [`HttpClient::execute_stream`]).
348
    ///
349
    /// # Returns
350
    /// [`HttpStreamResponse`] or [`HttpError`].
351
    async fn execute_stream_once(&self, request: HttpRequest) -> HttpResult<HttpStreamResponse> {
14✔
352
        let url = self.resolve_url(&request)?;
14✔
353
        let method = request.method.clone();
14✔
354
        let cancellation_token = request.cancellation_token.clone();
14✔
355
        if let Some(error) = cancelled_request_error_if_needed(
14✔
356
            cancellation_token.as_ref(),
14✔
357
            &method,
14✔
358
            &url,
14✔
359
            "Streaming request cancelled before sending",
14✔
360
        ) {
14✔
UNCOV
361
            return Err(error);
×
362
        }
14✔
363
        let headers = self.build_headers(&request).await?;
14✔
364

365
        let body_for_log = clone_request_body_for_log(&request.body);
14✔
366

367
        let logger = HttpLogger::new(&self.options.logging, &self.options.sensitive_headers);
14✔
368
        logger.log_request(&method, &url, &headers, body_for_log.as_ref());
14✔
369

370
        let mut builder = self.client.request(method.clone(), url.clone());
14✔
371
        builder = builder.headers(headers);
14✔
372

373
        if !request.query.is_empty() {
14✔
374
            builder = builder.query(&request.query);
1✔
375
        }
13✔
376

377
        if let Some(timeout) = request.request_timeout {
14✔
378
            builder = builder.timeout(timeout);
1✔
379
        }
13✔
380

381
        builder = apply_request_body(builder, request.body);
14✔
382

383
        let response = self
14✔
384
            .send_with_write_timeout(
14✔
385
                builder,
14✔
386
                method.clone(),
14✔
387
                url.clone(),
14✔
388
                cancellation_token.as_ref(),
14✔
389
            )
14✔
390
            .await?;
14✔
391

392
        if !response.status().is_success() {
14✔
393
            let status = response.status();
2✔
394
            let retry_after = parse_retry_after(status, response.headers());
2✔
395
            let error = response.error_for_status_ref().expect_err(
2✔
396
                "non-success HTTP status must produce reqwest status error via error_for_status_ref",
2✔
397
            );
398
            let body_preview = self.read_error_response_preview(response).await;
2✔
399
            let message = format!(
2✔
400
                "HTTP streaming request failed with status {} for {} {}; response body preview: {}",
401
                status, method, url, body_preview
402
            );
403
            let mut mapped = map_reqwest_error(
2✔
404
                error,
2✔
405
                HttpErrorKind::Status,
2✔
406
                Some(method.clone()),
2✔
407
                Some(url.clone()),
2✔
408
            )
409
            .with_status(status)
2✔
410
            .with_response_body_preview(body_preview);
2✔
411
            if let Some(retry_after) = retry_after {
2✔
UNCOV
412
                mapped = mapped.with_retry_after(retry_after);
×
413
            }
2✔
414
            mapped.message = message;
2✔
415
            return Err(mapped);
2✔
416
        }
12✔
417

418
        let status = response.status();
12✔
419
        let response_url = response.url().clone();
12✔
420
        let response_headers = response.headers().clone();
12✔
421

422
        logger.log_stream_response_headers(status, &response_url, &response_headers);
12✔
423

424
        let read_timeout = self.options.timeouts.read_timeout;
12✔
425
        let method_for_err = method.clone();
12✔
426
        let url_for_err = response_url.clone();
12✔
427
        let cancellation_token_for_stream = cancellation_token.clone();
12✔
428

429
        let mut stream = response.bytes_stream();
12✔
430
        let wrapped = stream! {
12✔
431
            loop {
432
                let next = if let Some(token) = &cancellation_token_for_stream {
433
                    tokio::select! {
434
                        _ = token.cancelled() => {
435
                            yield Err(HttpError::cancelled("Streaming response cancelled while reading body")
436
                                .with_method(method_for_err.clone())
437
                                .with_url(url_for_err.clone()));
438
                            break;
439
                        }
440
                        item = tokio::time::timeout(read_timeout, stream.next()) => item,
441
                    }
442
                } else {
443
                    tokio::time::timeout(read_timeout, stream.next()).await
444
                };
445
                match next {
446
                    Ok(Some(Ok(bytes))) => yield Ok(bytes),
447
                    Ok(Some(Err(error))) => {
448
                        let mapped = map_reqwest_error(
449
                            error,
450
                            HttpErrorKind::Transport,
451
                            Some(method_for_err.clone()),
452
                            Some(url_for_err.clone()),
453
                        );
454
                        yield Err(mapped);
455
                        break;
456
                    }
457
                    Ok(None) => break,
458
                    Err(_) => {
459
                        let error = HttpError::read_timeout(format!(
460
                            "Read timeout after {:?} while streaming response",
461
                            read_timeout
462
                        ))
463
                        .with_method(method_for_err.clone())
464
                        .with_url(url_for_err.clone());
465
                        yield Err(error);
466
                        break;
467
                    }
468
                }
469
            }
470
        };
471

472
        Ok(HttpStreamResponse::new_with_sse_options(
12✔
473
            status,
12✔
474
            response_headers,
12✔
475
            response_url,
12✔
476
            Box::pin(wrapped),
12✔
477
            self.options.sse_json_mode,
12✔
478
            self.options.sse_max_line_bytes,
12✔
479
            self.options.sse_max_frame_bytes,
12✔
480
        ))
12✔
481
    }
14✔
482

483
    /// Returns whether the client should run the retry policy for this request.
484
    ///
485
    /// Retries are enabled when `max_attempts` is greater than one and the
486
    /// request method is allowed by [`HttpClientOptions`] retry settings.
487
    ///
488
    /// # Parameters
489
    /// - `request`: Request whose HTTP method is checked against the configured
490
    ///   retry policy.
491
    /// - `retry_options`: Effective retry options after applying request-level overrides.
492
    fn should_retry_request(
70✔
493
        &self,
70✔
494
        request: &HttpRequest,
70✔
495
        retry_options: &HttpRetryOptions,
70✔
496
    ) -> bool {
70✔
497
        retry_options.max_attempts > 1 && retry_options.allows_method(&request.method)
70✔
498
    }
70✔
499

500
    /// Resolves request-level retry override against client-level retry options.
501
    ///
502
    /// # Parameters
503
    /// - `request`: Request whose override is applied.
504
    ///
505
    /// # Returns
506
    /// Effective retry options for this request.
507
    fn resolve_retry_options(&self, request: &HttpRequest) -> HttpRetryOptions {
70✔
508
        let mut options = self.options.retry.clone();
70✔
509
        options.enabled = request.retry_override.resolve_enabled(options.enabled);
70✔
510
        options.method_policy = request
70✔
511
            .retry_override
70✔
512
            .resolve_method_policy(options.method_policy);
70✔
513
        options
70✔
514
    }
70✔
515

516
    /// Builds a [`RetryExecutor`] from effective retry options and classifies
517
    /// [`HttpError`] values using [`RetryHint`].
518
    ///
519
    /// # Parameters
520
    /// - `retry_options`: Effective retry options for this request.
521
    /// - `honor_retry_after`: Whether to honor `Retry-After` on `429`.
522
    ///
523
    /// # Returns
524
    /// Configured executor or [`HttpError`] if retry options or executor
525
    /// configuration is invalid.
526
    fn build_retry_executor(
12✔
527
        &self,
12✔
528
        retry_options: &HttpRetryOptions,
12✔
529
        honor_retry_after: bool,
12✔
530
    ) -> HttpResult<RetryExecutor<HttpError>> {
12✔
531
        let options = RetryOptions::new(
12✔
532
            retry_options.max_attempts,
12✔
533
            retry_options.max_duration,
12✔
534
            retry_options.delay_strategy.clone(),
12✔
535
            Jitter::factor(retry_options.jitter_factor),
12✔
536
        )
537
        .map_err(|error| HttpError::other(format!("Invalid HTTP retry options: {error}")))?;
12✔
538

539
        let mut builder = RetryExecutor::<HttpError>::builder()
12✔
540
            .options(options)
12✔
541
            .classify_error(|error: &HttpError, _| {
12✔
542
                if matches!(error.retry_hint(), RetryHint::Retryable) {
12✔
543
                    RetryDecision::Retry
11✔
544
                } else {
545
                    RetryDecision::Abort
1✔
546
                }
547
            });
12✔
548
        if honor_retry_after {
12✔
549
            builder = builder.on_retry(|context, failure| {
1✔
550
                let AttemptFailure::Error(error) = failure else {
1✔
UNCOV
551
                    return;
×
552
                };
553
                let Some(retry_after) = error.retry_after else {
1✔
UNCOV
554
                    return;
×
555
                };
556
                if retry_after > context.next_delay {
1✔
557
                    std::thread::sleep(retry_after - context.next_delay);
1✔
558
                }
1✔
559
            });
1✔
560
        }
11✔
561
        builder
12✔
562
            .build()
12✔
563
            .map_err(|error| HttpError::other(format!("Invalid HTTP retry executor: {error}")))
12✔
564
    }
12✔
565

566
    /// Runs [`HttpClient::execute_once`] under the configured retry policy.
567
    ///
568
    /// # Parameters
569
    /// - `request`: Built request passed to each [`HttpClient::execute_once`]
570
    ///   attempt.
571
    /// - `retry_options`: Effective retry options for this request.
572
    /// - `honor_retry_after`: Whether to honor `Retry-After` on `429`.
573
    ///
574
    /// # Returns
575
    /// Same as a successful single attempt, or a mapped [`HttpError`] when
576
    /// retries abort or limits are exceeded.
577
    async fn execute_with_retry(
10✔
578
        &self,
10✔
579
        request: HttpRequest,
10✔
580
        retry_options: HttpRetryOptions,
10✔
581
        honor_retry_after: bool,
10✔
582
    ) -> HttpResult<HttpResponse> {
10✔
583
        let policy = self.build_retry_executor(&retry_options, honor_retry_after)?;
10✔
584
        let client = self.clone();
10✔
585
        let result = policy
10✔
586
            .run_async(move || {
17✔
587
                let client = client.clone();
17✔
588
                let request = request.clone();
17✔
589
                async move { client.execute_once(request).await }
17✔
590
            })
17✔
591
            .await;
10✔
592
        map_retry_result(result)
10✔
593
    }
10✔
594

595
    /// Runs [`HttpClient::execute_stream_once`] under the configured retry
596
    /// policy.
597
    ///
598
    /// # Parameters
599
    /// - `request`: Built request passed to each
600
    ///   [`HttpClient::execute_stream_once`] attempt.
601
    /// - `retry_options`: Effective retry options for this request.
602
    /// - `honor_retry_after`: Whether to honor `Retry-After` on `429`.
603
    ///
604
    /// # Returns
605
    /// Same as a successful single streaming attempt, or a mapped [`HttpError`]
606
    /// when retries abort or limits are exceeded.
607
    async fn execute_stream_with_retry(
2✔
608
        &self,
2✔
609
        request: HttpRequest,
2✔
610
        retry_options: HttpRetryOptions,
2✔
611
        honor_retry_after: bool,
2✔
612
    ) -> HttpResult<HttpStreamResponse> {
2✔
613
        let policy = self.build_retry_executor(&retry_options, honor_retry_after)?;
2✔
614
        let client = self.clone();
2✔
615
        let result = policy
2✔
616
            .run_async(move || {
3✔
617
                let client = client.clone();
3✔
618
                let request = request.clone();
3✔
619
                async move { client.execute_stream_once(request).await }
3✔
620
            })
3✔
621
            .await;
2✔
622
        map_retry_result(result)
2✔
623
    }
2✔
624

625
    /// Parses `request.path` as a URL or joins it to `base_url` when relative.
626
    ///
627
    /// # Parameters
628
    /// - `request`: Request whose `path` and implied base are used.
629
    ///
630
    /// # Returns
631
    /// Resolved [`Url`] or [`HttpError::invalid_url`] if resolution fails.
632
    fn resolve_url(&self, request: &HttpRequest) -> HttpResult<Url> {
78✔
633
        if let Ok(url) = Url::parse(&request.path) {
78✔
634
            self.validate_resolved_url_host(&url)?;
7✔
635
            return Ok(url);
5✔
636
        }
71✔
637

638
        let base = self.options.base_url.as_ref().ok_or_else(|| {
71✔
639
            HttpError::invalid_url(format!(
1✔
640
                "Cannot resolve relative path '{}' without base_url",
641
                request.path
642
            ))
643
        })?;
1✔
644

645
        let url = base.join(&request.path).map_err(|error| {
70✔
646
            HttpError::invalid_url(format!(
1✔
647
                "Failed to resolve path '{}' against base URL '{}': {}",
648
                request.path, base, error
649
            ))
650
        })?;
1✔
651
        self.validate_resolved_url_host(&url)?;
69✔
652
        Ok(url)
69✔
653
    }
78✔
654

655
    /// Validates host constraints for a resolved URL under current client options.
656
    ///
657
    /// # Parameters
658
    /// - `url`: Fully resolved request URL.
659
    ///
660
    /// # Returns
661
    /// `Ok(())` when host is allowed by options.
662
    ///
663
    /// # Errors
664
    /// Returns [`HttpError::invalid_url`] when `ipv4_only=true` and `url` uses an IPv6 literal host.
665
    fn validate_resolved_url_host(&self, url: &Url) -> HttpResult<()> {
76✔
666
        if self.options.ipv4_only && matches!(url.host(), Some(Host::Ipv6(_))) {
76✔
667
            return Err(HttpError::invalid_url(format!(
2✔
668
                "IPv6 literal host is not allowed when ipv4_only=true: {}",
2✔
669
                url
2✔
670
            )));
2✔
671
        }
74✔
672
        Ok(())
74✔
673
    }
76✔
674

675
    /// Merges default headers, injector output, and per-request headers (later
676
    /// wins on duplicates).
677
    ///
678
    /// # Parameters
679
    /// - `request`: Request supplying extra headers.
680
    ///
681
    /// # Returns
682
    /// Final [`HeaderMap`] or error if an injector fails.
683
    async fn build_headers(&self, request: &HttpRequest) -> HttpResult<HeaderMap> {
73✔
684
        let mut headers = self.options.default_headers.clone();
73✔
685

686
        for injector in &self.injectors {
73✔
687
            injector.apply(&mut headers)?;
8✔
688
        }
689
        for injector in &self.async_injectors {
72✔
690
            injector.apply(&mut headers).await?;
2✔
691
        }
692

693
        headers.extend(request.headers.clone());
71✔
694
        Ok(headers)
71✔
695
    }
73✔
696

697
    /// Sends the built request with a write-phase timeout (time to finish
698
    /// sending the request).
699
    ///
700
    /// # Parameters
701
    /// - `builder`: Reqwest request builder (method, URL, headers, body already
702
    ///   set).
703
    /// - `method`: Method for error context.
704
    /// - `url`: URL for error context.
705
    ///
706
    /// # Returns
707
    /// Raw [`reqwest::Response`] or [`HttpError`] (transport, write timeout,
708
    /// etc.).
709
    async fn send_with_write_timeout(
71✔
710
        &self,
71✔
711
        builder: reqwest::RequestBuilder,
71✔
712
        method: http::Method,
71✔
713
        url: Url,
71✔
714
        cancellation_token: Option<&CancellationToken>,
71✔
715
    ) -> HttpResult<Response> {
71✔
716
        let timeout = self.options.timeouts.write_timeout;
71✔
717
        let send_future = tokio::time::timeout(timeout, builder.send());
71✔
718
        let next = if let Some(token) = cancellation_token {
71✔
719
            tokio::select! {
2✔
720
                _ = token.cancelled() => {
2✔
UNCOV
721
                    return Err(HttpError::cancelled("Request cancelled while sending")
×
UNCOV
722
                        .with_method(method)
×
UNCOV
723
                        .with_url(url));
×
724
                }
725
                send_result = send_future => send_result,
2✔
726
            }
727
        } else {
728
            send_future.await
69✔
729
        };
730
        match next {
69✔
731
            Ok(Ok(response)) => Ok(response),
63✔
732
            Ok(Err(error)) => Err(map_reqwest_error(
6✔
733
                error,
6✔
734
                HttpErrorKind::Transport,
6✔
735
                Some(method),
6✔
736
                Some(url),
6✔
737
            )),
6✔
738
            Err(_) => Err(HttpError::write_timeout(format!(
2✔
739
                "Write timeout after {:?} while sending request",
2✔
740
                timeout
2✔
741
            ))
2✔
742
            .with_method(method)
2✔
743
            .with_url(url)),
2✔
744
        }
745
    }
71✔
746

747
    /// Reads the entire response body with a read timeout.
748
    ///
749
    /// # Parameters
750
    /// - `response`: Successful response whose body will be consumed.
751
    /// - `method`: Method for error context.
752
    /// - `url`: URL for error context.
753
    ///
754
    /// # Returns
755
    /// Body as [`Bytes`] or [`HttpError`] (decode/read timeout).
756
    async fn read_body_with_timeout(
31✔
757
        &self,
31✔
758
        response: Response,
31✔
759
        method: http::Method,
31✔
760
        url: Url,
31✔
761
        cancellation_token: Option<&CancellationToken>,
31✔
762
    ) -> HttpResult<Bytes> {
31✔
763
        let timeout = self.options.timeouts.read_timeout;
31✔
764
        let read_future = tokio::time::timeout(timeout, response.bytes());
31✔
765
        let next = if let Some(token) = cancellation_token {
31✔
766
            tokio::select! {
1✔
767
                _ = token.cancelled() => {
1✔
768
                    return Err(HttpError::cancelled("Request cancelled while reading response body")
1✔
769
                        .with_method(method)
1✔
770
                        .with_url(url));
1✔
771
                }
772
                read_result = read_future => read_result,
1✔
773
            }
774
        } else {
775
            read_future.await
30✔
776
        };
777
        match next {
29✔
778
            Ok(Ok(body)) => Ok(body),
28✔
779
            Ok(Err(error)) => Err(map_reqwest_error(
1✔
780
                error,
1✔
781
                HttpErrorKind::Decode,
1✔
782
                Some(method),
1✔
783
                Some(url),
1✔
784
            )),
1✔
785
            Err(_) => Err(HttpError::read_timeout(format!(
1✔
786
                "Read timeout after {:?} while reading response body",
1✔
787
                timeout
1✔
788
            ))
1✔
789
            .with_method(method)
1✔
790
            .with_url(url)),
1✔
791
        }
792
    }
31✔
793

794
    /// Reads and renders a bounded preview for a non-success response body.
795
    ///
796
    /// # Parameters
797
    /// - `response`: Non-success response whose body will be consumed.
798
    ///
799
    /// # Returns
800
    /// Rendered preview text. On preview read failure, returns a descriptive placeholder.
801
    async fn read_error_response_preview(&self, mut response: Response) -> String {
20✔
802
        let read_timeout = self.options.timeouts.read_timeout;
20✔
803
        let max_bytes = self.options.logging.body_size_limit.max(1);
20✔
804
        let mut preview = Vec::new();
20✔
805
        let mut truncated = false;
20✔
806

807
        loop {
808
            let next = tokio::time::timeout(read_timeout, response.chunk()).await;
38✔
809
            match next {
37✔
810
                Ok(Ok(Some(chunk))) => {
19✔
811
                    if preview.len() >= max_bytes {
19✔
UNCOV
812
                        truncated = true;
×
UNCOV
813
                        break;
×
814
                    }
19✔
815
                    let remaining = max_bytes - preview.len();
19✔
816
                    if chunk.len() > remaining {
19✔
817
                        preview.extend_from_slice(&chunk[..remaining]);
1✔
818
                        truncated = true;
1✔
819
                        break;
1✔
820
                    }
18✔
821
                    preview.extend_from_slice(&chunk);
18✔
822
                }
823
                Ok(Ok(None)) => break,
18✔
UNCOV
824
                Ok(Err(error)) => {
×
UNCOV
825
                    return format!(
×
826
                        "<error body unavailable: failed to read response body: {}>",
827
                        error
828
                    );
829
                }
830
                Err(_) => {
831
                    return format!(
1✔
832
                        "<error body unavailable: read timeout after {:?}>",
833
                        read_timeout
834
                    );
835
                }
836
            }
837
        }
838

839
        render_error_body_preview(&preview, truncated)
19✔
840
    }
20✔
841
}
842

843
/// Builds a cancelled error when `token` is already cancelled.
844
///
845
/// # Parameters
846
/// - `token`: Optional cancellation token for this request.
847
/// - `method`: Request method for error context.
848
/// - `url`: Request URL for error context.
849
/// - `message`: Cancellation message.
850
///
851
/// # Returns
852
/// `Some(HttpError)` when cancelled, otherwise `None`.
853
fn cancelled_request_error_if_needed(
74✔
854
    token: Option<&CancellationToken>,
74✔
855
    method: &http::Method,
74✔
856
    url: &Url,
74✔
857
    message: &str,
74✔
858
) -> Option<HttpError> {
74✔
859
    if token.is_some_and(CancellationToken::is_cancelled) {
74✔
860
        Some(
1✔
861
            HttpError::cancelled(message.to_string())
1✔
862
                .with_method(method.clone())
1✔
863
                .with_url(url.clone()),
1✔
864
        )
1✔
865
    } else {
866
        None
73✔
867
    }
868
}
74✔
869

870
/// Clones request body content for request logging.
871
///
872
/// # Parameters
873
/// - `body`: Request body variant.
874
///
875
/// # Returns
876
/// Optional byte payload for logger previewing.
877
fn clone_request_body_for_log(body: &HttpRequestBody) -> Option<Bytes> {
71✔
878
    match body {
71✔
879
        HttpRequestBody::Bytes(bytes)
2✔
880
        | HttpRequestBody::Json(bytes)
2✔
881
        | HttpRequestBody::Form(bytes)
1✔
882
        | HttpRequestBody::Multipart(bytes)
1✔
883
        | HttpRequestBody::Ndjson(bytes) => Some(bytes.clone()),
7✔
884
        HttpRequestBody::Text(text) => Some(Bytes::from(text.clone())),
2✔
885
        HttpRequestBody::Empty => None,
62✔
886
    }
887
}
71✔
888

889
/// Applies request body variant to a reqwest request builder.
890
///
891
/// # Parameters
892
/// - `builder`: Request builder with method/url/headers/query already set.
893
/// - `body`: Request body variant to apply.
894
///
895
/// # Returns
896
/// Updated builder containing the request body payload.
897
fn apply_request_body(
71✔
898
    builder: reqwest::RequestBuilder,
71✔
899
    body: HttpRequestBody,
71✔
900
) -> reqwest::RequestBuilder {
71✔
901
    match body {
71✔
902
        HttpRequestBody::Empty => builder,
62✔
903
        HttpRequestBody::Bytes(bytes)
2✔
904
        | HttpRequestBody::Json(bytes)
2✔
905
        | HttpRequestBody::Form(bytes)
1✔
906
        | HttpRequestBody::Multipart(bytes)
1✔
907
        | HttpRequestBody::Ndjson(bytes) => builder.body(bytes),
7✔
908
        HttpRequestBody::Text(text) => builder.body(text),
2✔
909
    }
910
}
71✔
911

912
/// Converts a [`RetryResult`] from the HTTP retry executor into [`HttpResult`].
913
///
914
/// Successful attempts pass through. Retry exhaustion and deadline failures are
915
/// turned into [`HttpError`] values with additional context on the message when
916
/// applicable.
917
///
918
/// # Parameters
919
/// - `result`: Outcome of the retry executor after one or more async attempts.
920
///
921
/// # Returns
922
/// The successful value, or an [`HttpError`] describing abort, exhaustion, or
923
/// deadline overrun.
924
fn map_retry_result<T>(result: RetryResult<T, HttpError>) -> HttpResult<T> {
12✔
925
    match result {
2✔
926
        Ok(value) => Ok(value),
8✔
927
        Err(RetryError::Aborted { failure, .. }) => map_retry_failure(failure),
1✔
928
        Err(RetryError::AttemptsExceeded {
929
            attempts,
1✔
930
            max_attempts,
1✔
931
            last_failure,
1✔
932
            ..
933
        }) => {
934
            let mut error = map_retry_failure_to_error(last_failure);
1✔
935
            error.message = format!(
1✔
936
                "{} (retry attempts exhausted: {attempts}/{max_attempts})",
937
                error.message
938
            );
939
            Err(error)
1✔
940
        }
941
        Err(RetryError::MaxElapsedExceeded {
942
            elapsed,
1✔
943
            max_elapsed,
1✔
944
            last_failure: Some(last_failure),
1✔
945
            ..
946
        }) => {
947
            let mut error = map_retry_failure_to_error(last_failure);
1✔
948
            error.message = format!(
1✔
949
                "{} (retry max duration exceeded: {elapsed:?}/{max_elapsed:?})",
950
                error.message
951
            );
952
            Err(error)
1✔
953
        }
954
        Err(RetryError::MaxElapsedExceeded {
955
            elapsed,
1✔
956
            max_elapsed,
1✔
957
            last_failure: None,
958
            ..
959
        }) => Err(HttpError::other(format!(
1✔
960
            "HTTP retry max duration exceeded before a retryable error was captured: {elapsed:?}/{max_elapsed:?}"
1✔
961
        ))),
1✔
962
    }
963
}
12✔
964

965
/// Maps a single retry [`AttemptFailure`] into [`HttpResult`].
966
///
967
/// # Parameters
968
/// - `failure`: Single attempt outcome from the retry layer.
969
///
970
/// # Returns
971
/// Always `Err`: either the wrapped [`HttpError`] or a synthesized timeout
972
/// message.
973
fn map_retry_failure<T>(failure: AttemptFailure<HttpError>) -> HttpResult<T> {
1✔
974
    Err(map_retry_failure_to_error(failure))
1✔
975
}
1✔
976

977
/// Converts a retry-layer attempt failure into [`HttpError`].
978
///
979
/// # Parameters
980
/// - `failure`: Attempt failure from the retry executor.
981
///
982
/// # Returns
983
/// Mapped [`HttpError`] with timeout context when applicable.
984
fn map_retry_failure_to_error(failure: AttemptFailure<HttpError>) -> HttpError {
3✔
985
    match failure {
3✔
986
        AttemptFailure::Error(error) => error,
3✔
UNCOV
987
        AttemptFailure::AttemptTimeout { elapsed, timeout } => HttpError::other(format!(
×
988
            "HTTP retry attempt timeout after {elapsed:?} (timeout: {timeout:?})"
989
        )),
990
    }
991
}
3✔
992

993
/// Maps a [`reqwest::Error`] into [`HttpError`] with best-effort
994
/// [`HttpErrorKind`] and optional context.
995
///
996
/// # Parameters
997
/// - `error`: Underlying reqwest error.
998
/// - `default_kind`: Kind used when reqwest does not classify the error more
999
///   specifically.
1000
/// - `method`: Optional request method to attach.
1001
/// - `url`: Optional request URL to attach.
1002
///
1003
/// # Returns
1004
/// Configured [`HttpError`] including chained source.
1005
fn map_reqwest_error(
27✔
1006
    error: reqwest::Error,
27✔
1007
    default_kind: HttpErrorKind,
27✔
1008
    method: Option<http::Method>,
27✔
1009
    url: Option<Url>,
27✔
1010
) -> HttpError {
27✔
1011
    let kind = if error.is_timeout() {
27✔
1012
        classify_reqwest_timeout_kind(&error)
3✔
1013
    } else if error.is_decode() {
24✔
1014
        HttpErrorKind::Decode
1✔
1015
    } else if error.is_status() {
23✔
1016
        HttpErrorKind::Status
20✔
1017
    } else if error.is_request() && error.url().is_none() {
3✔
UNCOV
1018
        HttpErrorKind::InvalidUrl
×
1019
    } else {
1020
        default_kind
3✔
1021
    };
1022

1023
    let mut result = HttpError::new(kind, format!("HTTP transport error: {}", error));
27✔
1024
    if let Some(method) = method {
27✔
1025
        result = result.with_method(method);
27✔
1026
    }
27✔
1027
    if let Some(url) = url {
27✔
1028
        result = result.with_url(url);
27✔
1029
    }
27✔
1030
    result.with_source(error)
27✔
1031
}
27✔
1032

1033
/// Classifies reqwest timeout errors into connect-timeout vs request-timeout.
1034
///
1035
/// # Parameters
1036
/// - `error`: Reqwest timeout error to classify.
1037
///
1038
/// # Returns
1039
/// [`HttpErrorKind::ConnectTimeout`] when the timeout message indicates a connect-phase timeout;
1040
/// otherwise [`HttpErrorKind::RequestTimeout`].
1041
fn classify_reqwest_timeout_kind(error: &reqwest::Error) -> HttpErrorKind {
3✔
1042
    let message = error.to_string().to_ascii_lowercase();
3✔
1043
    if message.contains("connect") {
3✔
UNCOV
1044
        HttpErrorKind::ConnectTimeout
×
1045
    } else {
1046
        HttpErrorKind::RequestTimeout
3✔
1047
    }
1048
}
3✔
1049

1050
/// Parses `Retry-After` from response headers for `429 Too Many Requests`.
1051
///
1052
/// # Parameters
1053
/// - `status`: HTTP status code.
1054
/// - `headers`: Response headers.
1055
///
1056
/// # Returns
1057
/// Parsed retry delay when `status` is `429` and `Retry-After` is an integer
1058
/// number of seconds.
1059
fn parse_retry_after(status: StatusCode, headers: &HeaderMap) -> Option<Duration> {
20✔
1060
    if status != StatusCode::TOO_MANY_REQUESTS {
20✔
1061
        return None;
18✔
1062
    }
2✔
1063
    headers
2✔
1064
        .get(RETRY_AFTER)
2✔
1065
        .and_then(|value| value.to_str().ok())
2✔
1066
        .and_then(parse_retry_after_value)
2✔
1067
}
20✔
1068

1069
/// Parses a `Retry-After` header value as integer seconds.
1070
///
1071
/// # Parameters
1072
/// - `value`: Raw `Retry-After` header value.
1073
///
1074
/// # Returns
1075
/// Parsed duration, or `None` when value is not a non-negative integer.
1076
fn parse_retry_after_value(value: &str) -> Option<Duration> {
2✔
1077
    let seconds = value.trim().parse::<u64>().ok()?;
2✔
1078
    Some(Duration::from_secs(seconds))
2✔
1079
}
2✔
1080

1081
/// Renders a human-readable error-body preview from raw bytes.
1082
///
1083
/// # Parameters
1084
/// - `bytes`: Captured body bytes (already size-limited).
1085
/// - `truncated`: Whether additional bytes were omitted.
1086
///
1087
/// # Returns
1088
/// UTF-8 text preview or binary placeholder with truncation suffix when needed.
1089
fn render_error_body_preview(bytes: &[u8], truncated: bool) -> String {
19✔
1090
    if bytes.is_empty() {
19✔
1091
        return "<empty>".to_string();
1✔
1092
    }
18✔
1093

1094
    let suffix = if truncated { "...<truncated>" } else { "" };
18✔
1095
    match std::str::from_utf8(bytes) {
18✔
1096
        Ok(text) => format!("{text}{suffix}"),
17✔
1097
        Err(_) => format!("<binary {} bytes>{suffix}", bytes.len()),
1✔
1098
    }
1099
}
19✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc