• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

stacks-network / stacks-core / 23943169302

03 Apr 2026 10:28AM UTC coverage: 77.573% (-8.1%) from 85.712%
23943169302

Pull #7076

github

7f2377
web-flow
Merge bb87ecec2 into c529ad924
Pull Request #7076: feat: sortition side-table copy and validation

3743 of 4318 new or added lines in 19 files covered. (86.68%)

19304 existing lines in 182 files now uncovered.

172097 of 221852 relevant lines covered (77.57%)

7722182.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.38
/stackslib/src/net/httpcore.rs
1
// Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation
2
// Copyright (C) 2020-2023 Stacks Open Internet Foundation
3
//
4
// This program is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8
//
9
// This program is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13
//
14
// You should have received a copy of the GNU General Public License
15
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

17
/// This module binds the http library to Stacks as a `ProtocolFamily` implementation
18
use std::collections::BTreeMap;
19
use std::io::{Read, Write};
20
use std::net::{SocketAddr, TcpStream, ToSocketAddrs};
21
use std::time::{Duration, Instant};
22
use std::{fmt, io, mem};
23

24
use clarity::vm::costs::ExecutionCost;
25
use clarity::vm::types::QualifiedContractIdentifier;
26
use clarity::vm::{ClarityName, ContractName};
27
use percent_encoding::percent_decode_str;
28
use regex::{Captures, Regex};
29
use stacks_common::codec::{read_next, Error as CodecError, StacksMessageCodec, MAX_MESSAGE_LEN};
30
use stacks_common::types::chainstate::{
31
    BurnchainHeaderHash, ConsensusHash, StacksAddress, StacksBlockId, StacksPublicKey,
32
};
33
use stacks_common::types::net::PeerHost;
34
use stacks_common::types::Address;
35
use stacks_common::util::chunked_encoding::*;
36
use stacks_common::util::retry::{BoundReader, RetryReader};
37
use stacks_common::util::{get_epoch_time_ms, get_epoch_time_secs};
38
use url::Url;
39

40
use crate::burnchains::Txid;
41
use crate::chainstate::burn::db::sortdb::SortitionDB;
42
use crate::chainstate::burn::BlockSnapshot;
43
use crate::chainstate::nakamoto::NakamotoChainState;
44
use crate::chainstate::stacks::db::{StacksChainState, StacksHeaderInfo};
45
use crate::core::StacksEpoch;
46
use crate::net::connection::{ConnectionOptions, NetworkConnection};
47
use crate::net::http::common::{parse_raw_bytes, HTTP_PREAMBLE_MAX_ENCODED_SIZE};
48
use crate::net::http::{
49
    http_reason, parse_bytes, parse_json, Error as HttpError, HttpContentType, HttpErrorResponse,
50
    HttpMethodNotAllowed, HttpNotFound, HttpRequest, HttpRequestContents, HttpRequestPreamble,
51
    HttpResponse, HttpResponseContents, HttpResponsePayload, HttpResponsePreamble, HttpServerError,
52
};
53
use crate::net::p2p::PeerNetwork;
54
use crate::net::server::HttpPeer;
55
use crate::net::{Error as NetError, MessageSequence, ProtocolFamily, StacksNodeState, UrlString};
56

57
const CHUNK_BUF_LEN: usize = 32768;
58

59
/// canonical stacks tip height header
60
pub const STACKS_HEADER_HEIGHT: &str = "X-Canonical-Stacks-Tip-Height";
61

62
/// request ID header
63
pub const STACKS_REQUEST_ID: &str = "X-Request-Id";
64

65
/// Request ID to use or expect from non-Stacks HTTP clients.
66
/// In particular, if a HTTP response does not contain the x-request-id header, then it's assumed
67
/// to be this value.  This is needed to support fetching immutables like block and microblock data
68
/// from non-Stacks nodes (like Gaia hubs, CDNs, vanilla HTTP servers, and so on).
69
pub const HTTP_REQUEST_ID_RESERVED: u32 = 0;
70

71
/// The interval at which to send heartbeat logs
72
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(60);
73

74
/// Finds named path parameters like (?P<block_id>...) in route patterns
75
static CAPTURE_GROUP_REGEX: std::sync::LazyLock<Regex> =
76
    std::sync::LazyLock::new(|| Regex::new(r"\(\?P<[^>]+>[^)]+\)").unwrap());
181✔
77

78
/// Loosens a strict route regex so any valid URL characters match the parameters.
79
/// This lets us tell apart 400 (bad params), 404 (no such route), and 405 (wrong method).
80
/// Complex patterns with nested parens fall back to strict matching (404 instead of 400).
81
fn make_permissive_regex(strict_regex: &Regex) -> Regex {
88,434✔
82
    let pattern = strict_regex.as_str();
88,434✔
83
    let permissive = CAPTURE_GROUP_REGEX.replace_all(pattern, "[a-zA-Z0-9._~:@!$&'()*+,;=-]+");
88,434✔
84
    Regex::new(&permissive).unwrap_or_else(|_| strict_regex.clone())
88,434✔
85
}
88,434✔
86

87
/// All representations of the `tip=` query parameter value
88
#[derive(Debug, Clone, PartialEq)]
89
pub enum TipRequest {
90
    UseLatestAnchoredTip,
91
    UseLatestUnconfirmedTip,
92
    SpecificTip(StacksBlockId),
93
}
94

95
impl TipRequest {}
96

97
impl fmt::Display for TipRequest {
98
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
47✔
99
        match self {
47✔
100
            Self::UseLatestAnchoredTip => write!(f, ""),
×
101
            Self::UseLatestUnconfirmedTip => write!(f, "latest"),
18✔
102
            Self::SpecificTip(ref tip) => write!(f, "{tip}"),
29✔
103
        }
104
    }
47✔
105
}
106

107
impl From<&str> for TipRequest {
108
    fn from(s: &str) -> TipRequest {
48✔
109
        if s == "latest" {
48✔
110
            TipRequest::UseLatestUnconfirmedTip
17✔
111
        } else if let Ok(block_id) = StacksBlockId::from_hex(s) {
31✔
112
            TipRequest::SpecificTip(block_id)
29✔
113
        } else {
114
            TipRequest::UseLatestAnchoredTip
2✔
115
        }
116
    }
48✔
117
}
118

119
/// Extension to HttpRequestPreamble to give it awareness of Stacks-specific fields
120
pub trait HttpPreambleExtensions {
121
    /// Set the node's canonical Stacks chain tip
122
    fn set_canonical_stacks_tip_height(&mut self, height: Option<u64>);
123
    /// Set the node's request ID
124
    fn set_request_id(&mut self, req_id: u32);
125
    /// Get the canonical stacks chain tip
126
    fn get_canonical_stacks_tip_height(&self) -> Option<u64>;
127
    /// Get the request ID
128
    fn get_request_id(&self) -> Option<u32>;
129
}
130

131
impl HttpPreambleExtensions for HttpRequestPreamble {
132
    /// Set the canonical Stacks chain tip height
133
    fn set_canonical_stacks_tip_height(&mut self, height_opt: Option<u64>) {
7✔
134
        if let Some(height) = height_opt {
7✔
135
            self.add_header(
4✔
136
                "X-Canonical-Stacks-Tip-Height".into(),
4✔
137
                format!("{}", &height),
4✔
138
            );
4✔
139
        } else {
5✔
140
            self.remove_header("X-Canonical-Stacks-Tip-Height".to_string());
3✔
141
        }
3✔
142
    }
7✔
143

144
    /// Set the request ID
145
    fn set_request_id(&mut self, id: u32) {
18✔
146
        self.add_header("X-Request-Id".into(), format!("{}", id));
18✔
147
    }
18✔
148

149
    /// Get the canonical Stacks chain tip
150
    fn get_canonical_stacks_tip_height(&self) -> Option<u64> {
9✔
151
        self.get_header("X-Canonical-Stacks-Tip-Height".to_string())
9✔
152
            .and_then(|hdr| hdr.parse::<u64>().ok())
9✔
153
    }
9✔
154

155
    /// Get the request ID
156
    fn get_request_id(&self) -> Option<u32> {
18✔
157
        self.get_header("X-Request-Id".to_string())
18✔
158
            .and_then(|req| req.parse::<u32>().ok())
18✔
159
    }
18✔
160
}
161

162
impl HttpPreambleExtensions for HttpResponsePreamble {
163
    /// Set the canonical Stacks chain tip height
164
    fn set_canonical_stacks_tip_height(&mut self, height_opt: Option<u64>) {
1,338✔
165
        if let Some(height) = height_opt {
1,338✔
166
            self.add_header(
1,338✔
167
                "X-Canonical-Stacks-Tip-Height".into(),
1,338✔
168
                format!("{}", &height),
1,338✔
169
            );
1,338✔
170
        } else {
1,338✔
171
            self.remove_header("X-Canonical-Stacks-Tip-Height".to_string());
×
172
        }
×
173
    }
1,338✔
174

175
    /// Set the request ID
176
    fn set_request_id(&mut self, id: u32) {
×
177
        self.add_header("X-Request-Id".into(), format!("{}", id));
×
178
    }
×
179

180
    /// Get the canonical Stacks chain tip
181
    fn get_canonical_stacks_tip_height(&self) -> Option<u64> {
1,487✔
182
        self.get_header("X-Canonical-Stacks-Tip-Height".to_string())
1,487✔
183
            .and_then(|hdr| hdr.parse::<u64>().ok())
1,487✔
184
    }
1,487✔
185

186
    /// Get the request ID
187
    fn get_request_id(&self) -> Option<u32> {
1,218✔
188
        self.get_header("X-Request-Id".to_string())
1,218✔
189
            .and_then(|req| req.parse::<u32>().ok())
1,218✔
190
    }
1,218✔
191
}
192

193
/// This module contains request helpers for decoding common data found in the request path regex captures.
194
/// The error types convert to HTTP responses.
195
pub mod request {
196
    use super::*;
197

198
    /// Get and parse a contract address from a path's captures, given the address and contract
199
    /// regex field names.
200
    pub fn get_contract_address(
81✔
201
        captures: &Captures,
81✔
202
        address_key: &str,
81✔
203
        contract_key: &str,
81✔
204
    ) -> Result<QualifiedContractIdentifier, HttpError> {
81✔
205
        let address = if let Some(address_str) = captures.name(address_key) {
81✔
206
            if let Some(addr) = StacksAddress::from_string(address_str.as_str()) {
81✔
207
                addr
81✔
208
            } else {
209
                return Err(HttpError::Http(
×
210
                    400,
×
211
                    format!("Failed to decode `{}`", address_key),
×
212
                ));
×
213
            }
214
        } else {
215
            return Err(HttpError::Http(404, format!("Missing `{}`", address_key)));
×
216
        };
217

218
        let contract_name = if let Some(contract_str) = captures.name(contract_key) {
81✔
219
            if let Ok(contract_name) = ContractName::try_from(contract_str.as_str().to_string()) {
81✔
220
                contract_name
81✔
221
            } else {
222
                return Err(HttpError::Http(
×
223
                    400,
×
224
                    format!("Failed to decode `{}`", contract_key),
×
225
                ));
×
226
            }
227
        } else {
228
            return Err(HttpError::Http(404, format!("Missing `{}`", contract_key)));
×
229
        };
230

231
        let contract_identifier = QualifiedContractIdentifier::new(address.into(), contract_name);
81✔
232

233
        Ok(contract_identifier)
81✔
234
    }
81✔
235

236
    /// Get and parse a StacksBlockId from a path's captures, given the name of the regex field.
237
    pub fn get_block_hash(captures: &Captures, key: &str) -> Result<StacksBlockId, HttpError> {
9✔
238
        let block_id = if let Some(block_id) = captures.name(key) {
9✔
239
            match StacksBlockId::from_hex(block_id.as_str()) {
9✔
240
                Ok(bhh) => bhh,
9✔
241
                Err(_e) => {
×
242
                    return Err(HttpError::Http(400, format!("Failed to decode `{}`", key)));
×
243
                }
244
            }
245
        } else {
246
            return Err(HttpError::Http(404, format!("Missing `{}`", key)));
×
247
        };
248
        Ok(block_id)
9✔
249
    }
9✔
250

251
    /// Get and parse a Txid from a path's captures, given the name of the regex field.
252
    pub fn get_txid(captures: &Captures, key: &str) -> Result<Txid, HttpError> {
9✔
253
        let txid = if let Some(txid) = captures.name(key) {
9✔
254
            match Txid::from_hex(txid.as_str()) {
9✔
255
                Ok(bhh) => bhh,
9✔
256
                Err(_e) => {
×
257
                    return Err(HttpError::Http(400, format!("Failed to decode `{}`", key)));
×
258
                }
259
            }
260
        } else {
261
            return Err(HttpError::Http(404, format!("Missing `{}`", key)));
×
262
        };
263
        Ok(txid)
9✔
264
    }
9✔
265

266
    /// Get and parse a Clarity name from a path's captures, given the name of the regex field.
267
    pub fn get_clarity_name(captures: &Captures, key: &str) -> Result<ClarityName, HttpError> {
34✔
268
        let clarity_name = if let Some(name_str) = captures.name(key) {
34✔
269
            if let Ok(clarity_name) = ClarityName::try_from(name_str.as_str().to_string()) {
34✔
270
                clarity_name
34✔
271
            } else {
272
                return Err(HttpError::Http(400, format!("Failed to decode `{}`", key)));
×
273
            }
274
        } else {
275
            return Err(HttpError::Http(404, format!("Missing `{}`", key)));
×
276
        };
277

278
        Ok(clarity_name)
34✔
279
    }
34✔
280

281
    /// Get and parse a ConsensusHash from a path's captures, given the name of the regex field.
282
    pub fn get_consensus_hash(captures: &Captures, key: &str) -> Result<ConsensusHash, HttpError> {
15✔
283
        let ch = if let Some(ch_str) = captures.name(key) {
15✔
284
            match ConsensusHash::from_hex(ch_str.as_str()) {
15✔
285
                Ok(ch) => ch,
15✔
286
                Err(_e) => {
×
287
                    return Err(HttpError::Http(400, format!("Failed to decode `{}`", key)));
×
288
                }
289
            }
290
        } else {
291
            return Err(HttpError::Http(404, format!("Missing `{}`", key)));
×
292
        };
293
        Ok(ch)
15✔
294
    }
15✔
295

296
    /// Get and parse a BurnchainHeaderHash from a path's captures, given the name of the regex field.
297
    pub fn get_burnchain_header_hash(
6✔
298
        captures: &Captures,
6✔
299
        key: &str,
6✔
300
    ) -> Result<BurnchainHeaderHash, HttpError> {
6✔
301
        let ch = if let Some(ch_str) = captures.name(key) {
6✔
302
            match BurnchainHeaderHash::from_hex(ch_str.as_str()) {
6✔
303
                Ok(ch) => ch,
6✔
304
                Err(_e) => {
×
305
                    return Err(HttpError::Http(400, format!("Failed to decode `{}`", key)));
×
306
                }
307
            }
308
        } else {
309
            return Err(HttpError::Http(404, format!("Missing `{}`", key)));
×
310
        };
311
        Ok(ch)
6✔
312
    }
6✔
313

314
    /// Get and parse a u32 from a path's captures, given the name of the regex field.
315
    pub fn get_u32(captures: &Captures, key: &str) -> Result<u32, HttpError> {
18✔
316
        let u = if let Some(u32_str) = captures.name(key) {
18✔
317
            match u32_str.as_str().parse::<u32>() {
18✔
318
                Ok(x) => x,
18✔
319
                Err(_e) => {
×
320
                    return Err(HttpError::Http(400, format!("Failed to decode `{}`", key)));
×
321
                }
322
            }
323
        } else {
324
            return Err(HttpError::Http(404, format!("Missing `{}`", key)));
×
325
        };
326
        Ok(u)
18✔
327
    }
18✔
328

329
    /// Get and parse a u64 from a path's captures, given the name of the regex field.
330
    pub fn get_u64(captures: &Captures, key: &str) -> Result<u64, HttpError> {
6✔
331
        let u = if let Some(u64_str) = captures.name(key) {
6✔
332
            match u64_str.as_str().parse::<u64>() {
6✔
333
                Ok(x) => x,
6✔
334
                Err(_e) => {
×
335
                    return Err(HttpError::Http(400, format!("Failed to decode `{}`", key)));
×
336
                }
337
            }
338
        } else {
339
            return Err(HttpError::Http(404, format!("Missing `{}`", key)));
×
340
        };
341
        Ok(u)
6✔
342
    }
6✔
343
}
344

345
/// Extension to HttpRequestContents to give it awareness of Stacks-specific fields
346
pub trait HttpRequestContentsExtensions {
347
    /// Chain constructor: Request a specific tip
348
    fn for_specific_tip(self, tip: StacksBlockId) -> Self;
349
    /// Chain constructor: use a given TipRequest
350
    fn for_tip(self, tip_req: TipRequest) -> Self;
351
    /// Identify the tip request
352
    fn tip_request(&self) -> TipRequest;
353
    /// Determine if we should return a MARF proof
354
    fn get_with_proof(&self) -> bool;
355
}
356

357
impl HttpRequestContentsExtensions for HttpRequestContents {
358
    /// Request a specific tip
359
    fn for_specific_tip(self, tip: StacksBlockId) -> Self {
×
360
        self.query_arg("tip".to_string(), format!("{}", &tip))
×
361
    }
×
362

363
    /// Use a particular tip request
364
    fn for_tip(mut self, tip_req: TipRequest) -> Self {
96✔
365
        if tip_req != TipRequest::UseLatestAnchoredTip {
96✔
366
            self.query_arg("tip".to_string(), tip_req.to_string())
47✔
367
        } else {
368
            let _ = self.take_query_arg(&"tip".to_string());
49✔
369
            self
49✔
370
        }
371
    }
96✔
372

373
    /// Ref the tip request
374
    fn tip_request(&self) -> TipRequest {
93✔
375
        self.get_query_args()
93✔
376
            .get("tip")
93✔
377
            .map(|tip| tip.as_str().into())
93✔
378
            .unwrap_or(TipRequest::UseLatestAnchoredTip)
93✔
379
    }
93✔
380

381
    /// Get the proof= query parameter value
382
    fn get_with_proof(&self) -> bool {
33✔
383
        let proof_value = self
33✔
384
            .get_query_arg("proof")
33✔
385
            .map(|x| x.to_owned())
33✔
386
            // default to "with proof"
387
            .unwrap_or("1".into());
33✔
388
        &proof_value == "1"
33✔
389
    }
33✔
390
}
391

392
/// Work around Clone blanket implementations not being object-safe
393
pub trait RPCRequestHandlerClone {
394
    fn clone_rpc_handler_box(&self) -> Box<dyn RPCRequestHandler>;
395
}
396

397
impl<T> RPCRequestHandlerClone for T
398
where
399
    T: 'static + RPCRequestHandler + Clone,
400
{
401
    fn clone_rpc_handler_box(&self) -> Box<dyn RPCRequestHandler> {
×
402
        Box::new(self.clone())
×
403
    }
×
404
}
405

406
impl Clone for Box<dyn RPCRequestHandler> {
407
    fn clone(&self) -> Box<dyn RPCRequestHandler> {
×
408
        self.clone_rpc_handler_box()
×
409
    }
×
410
}
411

412
/// Trait that every HTTP round-trip request type must implement.
413
pub trait RPCRequestHandler: HttpRequest + HttpResponse + RPCRequestHandlerClone {
414
    /// Reset the RPC handler.  This clears any internal state this handler stored between calls to
415
    /// `try_handle_request()`
416
    fn restart(&mut self);
417
    /// Instantiate the HTTP response headers and body from a request
418
    fn try_handle_request(
419
        &mut self,
420
        request_preamble: HttpRequestPreamble,
421
        request_body: HttpRequestContents,
422
        state: &mut StacksNodeState,
423
    ) -> Result<(HttpResponsePreamble, HttpResponseContents), NetError>;
424

425
    /// Helper to get the canonical sortition tip
426
    fn get_canonical_burn_chain_tip(
43✔
427
        &self,
43✔
428
        preamble: &HttpRequestPreamble,
43✔
429
        sortdb: &SortitionDB,
43✔
430
    ) -> Result<BlockSnapshot, StacksHttpResponse> {
43✔
431
        SortitionDB::get_canonical_burn_chain_tip(sortdb.conn()).map_err(|e| {
43✔
432
            StacksHttpResponse::new_error(
×
433
                preamble,
×
434
                &HttpServerError::new(format!("Failed to load canonical burnchain tip: {:?}", &e)),
×
435
            )
436
        })
×
437
    }
43✔
438

439
    /// Helper to get the current Stacks epoch
440
    fn get_stacks_epoch(
44✔
441
        &self,
44✔
442
        preamble: &HttpRequestPreamble,
44✔
443
        sortdb: &SortitionDB,
44✔
444
        block_height: u64,
44✔
445
    ) -> Result<StacksEpoch, StacksHttpResponse> {
44✔
446
        SortitionDB::get_stacks_epoch(sortdb.conn(), block_height)
44✔
447
            .map_err(|e| {
44✔
448
                StacksHttpResponse::new_error(preamble, &HttpServerError::new(format!("Could not load Stacks epoch for canonical burn height: {:?}", &e)))
×
449
            })?
×
450
            .ok_or_else(|| {
44✔
451
                let msg = format!(
×
452
                    "Failed to get fee rate estimate because could not load Stacks epoch for canonical burn height = {}",
453
                    block_height
454
                );
455
                warn!("{}", &msg);
×
456
                StacksHttpResponse::new_error(preamble, &HttpServerError::new(msg))
×
457
            })
×
458
    }
44✔
459

460
    /// Helper to get the Stacks tip
461
    fn get_stacks_chain_tip(
39✔
462
        &self,
39✔
463
        preamble: &HttpRequestPreamble,
39✔
464
        sortdb: &SortitionDB,
39✔
465
        chainstate: &StacksChainState,
39✔
466
    ) -> Result<StacksHeaderInfo, StacksHttpResponse> {
39✔
467
        NakamotoChainState::get_canonical_block_header(chainstate.db(), sortdb)
39✔
468
            .map_err(|e| {
39✔
469
                let msg = format!("Failed to load stacks chain tip header: {:?}", &e);
×
470
                warn!("{}", &msg);
×
471
                StacksHttpResponse::new_error(preamble, &HttpServerError::new(msg))
×
472
            })?
×
473
            .ok_or_else(|| {
39✔
474
                let msg =
×
475
                    "No stacks tip exists yet. Perhaps no blocks have been processed by this node"
×
476
                        .to_string();
×
477
                warn!("{}", &msg);
×
478
                StacksHttpResponse::new_error(preamble, &HttpNotFound::new(msg))
×
479
            })
×
480
    }
39✔
481
}
482

483
/// A decoded HttpRequest for use in Stacks
484
#[derive(Debug, Clone, PartialEq)]
485
pub struct StacksHttpRequest {
486
    preamble: HttpRequestPreamble,
487
    contents: HttpRequestContents,
488
    start_time: u128,
489
    /// Cache result of `StacksHttp::find_response_handler` so we don't have to do the regex matching twice
490
    response_handler_index: Option<usize>,
491
}
492

493
impl StacksHttpRequest {
494
    pub fn new(preamble: HttpRequestPreamble, contents: HttpRequestContents) -> Self {
1,489✔
495
        Self {
1,489✔
496
            preamble,
1,489✔
497
            contents,
1,489✔
498
            start_time: get_epoch_time_ms(),
1,489✔
499
            response_handler_index: None,
1,489✔
500
        }
1,489✔
501
    }
1,489✔
502

503
    /// Instantiate a request to a remote Stacks peer
504
    /// `path` is just the request path.  Query arguments are added via `contents`.  Any query
505
    /// component to `path` will be silently dropped.
506
    ///
507
    /// In Stacks, all requests must have a known content length.  If it cannot be calculated, then
508
    /// this method will fail.
509
    pub fn new_for_peer(
1,914✔
510
        peerhost: PeerHost,
1,914✔
511
        verb: String,
1,914✔
512
        path: String,
1,914✔
513
        contents: HttpRequestContents,
1,914✔
514
    ) -> Result<Self, NetError> {
1,914✔
515
        let mut preamble = HttpRequestPreamble::new_for_peer(peerhost, verb, path);
1,914✔
516
        if let Some(ct) = contents.content_type() {
1,914✔
517
            preamble.set_content_type(ct);
1,378✔
518
        }
1,756✔
519
        let content_length = contents.content_length()?;
1,914✔
520
        if content_length > 0 || contents.content_type().is_some() {
1,914✔
521
            preamble.set_content_length(content_length);
1,378✔
522
        }
1,756✔
523
        let (decoded_path, _) = decode_request_path(&preamble.path_and_query_str)?;
1,914✔
524
        let full_query_string = contents.get_full_query_string();
1,914✔
525
        preamble.path_and_query_str = if full_query_string.is_empty() {
1,914✔
526
            decoded_path
967✔
527
        } else {
528
            format!("{decoded_path}?{full_query_string}")
947✔
529
        };
530

531
        Ok(Self {
1,914✔
532
            preamble,
1,914✔
533
            contents,
1,914✔
534
            start_time: get_epoch_time_ms(),
1,914✔
535
            response_handler_index: None,
1,914✔
536
        })
1,914✔
537
    }
1,914✔
538

539
    /// Get a reference to the request premable metadata
540
    pub fn preamble(&self) -> &HttpRequestPreamble {
9,141✔
541
        &self.preamble
9,141✔
542
    }
9,141✔
543

544
    /// Get a mutable reference to the request premable metadata
545
    pub fn preamble_mut(&mut self) -> &mut HttpRequestPreamble {
51✔
546
        &mut self.preamble
51✔
547
    }
51✔
548

549
    /// Get a reference to the request contents
550
    pub fn contents(&self) -> &HttpRequestContents {
24✔
551
        &self.contents
24✔
552
    }
24✔
553

554
    /// Get a reference to the fully-qualified request path
555
    pub fn request_path(&self) -> &str {
3,264✔
556
        &self.preamble.path_and_query_str
3,264✔
557
    }
3,264✔
558

559
    /// Get the HTTP verb for this request
560
    pub fn verb(&self) -> &str {
1,421✔
561
        &self.preamble.verb
1,421✔
562
    }
1,421✔
563

564
    /// Get the number of milliseconds elapsed since this request was created
565
    pub fn duration_ms(&self) -> u128 {
1,421✔
566
        let now = get_epoch_time_ms();
1,421✔
567
        now.saturating_sub(self.start_time)
1,421✔
568
    }
1,421✔
569

570
    /// Write out this message to a Write.
571
    /// NOTE: In practice, the Write will be a reply handle endpoint, so writing to it won't block.
572
    pub fn send<W: Write>(&self, fd: &mut W) -> Result<(), NetError> {
1,880✔
573
        self.preamble.send(fd)?;
1,880✔
574
        self.contents.get_payload().send(fd).map_err(NetError::from)
1,880✔
575
    }
1,880✔
576

577
    /// Add a request header
578
    pub fn add_header(&mut self, hdr: String, value: String) {
476✔
579
        self.preamble.add_header(hdr, value);
476✔
580
    }
476✔
581

582
    /// Constructor to add headers
583
    pub fn with_header(mut self, hdr: String, value: String) -> Self {
×
584
        self.add_header(hdr, value);
×
585
        self
×
586
    }
×
587

588
    /// Get a ref to all request headers
589
    pub fn get_headers(&self) -> &BTreeMap<String, String> {
×
590
        &self.preamble.headers
×
591
    }
×
592

593
    /// Clear all extra headers
594
    pub fn clear_headers(&mut self) {
52✔
595
        self.preamble.headers.clear();
52✔
596
    }
52✔
597

598
    /// Destruct into parts
599
    pub fn destruct(self) -> (HttpRequestPreamble, HttpRequestContents) {
54✔
600
        (self.preamble, self.contents)
54✔
601
    }
54✔
602

603
    #[cfg(test)]
604
    pub fn try_serialize(&self) -> Result<Vec<u8>, NetError> {
97✔
605
        let mut ret = vec![];
97✔
606
        self.send(&mut ret)?;
97✔
607
        Ok(ret)
97✔
608
    }
97✔
609

610
    #[cfg(test)]
611
    pub fn get_response_handler_index(&self) -> Option<usize> {
7✔
612
        self.response_handler_index
7✔
613
    }
7✔
614
}
615

616
/// A received HTTP response (fully decoded in RAM)
617
#[derive(Debug, Clone, PartialEq)]
618
pub struct StacksHttpResponse {
619
    /// Information about the response (e.g. headers and header-derived data)
620
    preamble: HttpResponsePreamble,
621
    /// The body contents
622
    body: HttpResponsePayload,
623
}
624

625
impl From<StacksHttpResponse> for Result<(HttpResponsePreamble, HttpResponseContents), NetError> {
626
    fn from(resp: StacksHttpResponse) -> Self {
3✔
627
        resp.try_into_contents()
3✔
628
    }
3✔
629
}
630

631
impl StacksHttpResponse {
632
    pub fn new(preamble: HttpResponsePreamble, body: HttpResponsePayload) -> StacksHttpResponse {
1,916✔
633
        StacksHttpResponse { preamble, body }
1,916✔
634
    }
1,916✔
635

636
    pub fn preamble(&self) -> &HttpResponsePreamble {
1,765✔
637
        &self.preamble
1,765✔
638
    }
1,765✔
639

640
    pub fn body(&self) -> &HttpResponsePayload {
27✔
641
        &self.body
27✔
642
    }
27✔
643

644
    pub fn destruct(self) -> (HttpResponsePreamble, HttpResponsePayload) {
1,644✔
645
        (self.preamble, self.body)
1,644✔
646
    }
1,644✔
647

648
    /// Convert into an HTTP response so an HttpRequest impl can return it
649
    pub fn try_into_contents(
90✔
650
        self,
90✔
651
    ) -> Result<(HttpResponsePreamble, HttpResponseContents), NetError> {
90✔
652
        Ok((self.preamble, self.body.try_into()?))
90✔
653
    }
90✔
654

655
    /// Send this HTTP response on a given Write.  Only used for testing; in practice, the RPC
656
    /// request handler takes care of sending or streaming data back.
657
    pub fn send<W: Write>(&self, fd: &mut W) -> Result<(), NetError> {
32✔
658
        self.preamble.consensus_serialize(fd)?;
32✔
659
        if self.preamble.content_length.is_some() {
32✔
660
            self.body.send(fd).map_err(NetError::from)
28✔
661
        } else {
662
            self.body
4✔
663
                .send_chunked(CHUNK_BUF_LEN, fd)
4✔
664
                .map_err(NetError::from)
4✔
665
        }
666
    }
32✔
667

668
    /// Make a new HTTP error response, in reaction to a request
669
    pub fn new_error(
82✔
670
        preamble: &HttpRequestPreamble,
82✔
671
        error: &dyn HttpErrorResponse,
82✔
672
    ) -> StacksHttpResponse {
82✔
673
        Self::new_error_with_headers(preamble, error, vec![])
82✔
674
    }
82✔
675

676
    /// Make a new HTTP error response with additional headers, in reaction to a request
677
    pub fn new_error_with_headers(
101✔
678
        preamble: &HttpRequestPreamble,
101✔
679
        error: &dyn HttpErrorResponse,
101✔
680
        extra_headers: Vec<(String, String)>,
101✔
681
    ) -> StacksHttpResponse {
101✔
682
        let payload = error.payload();
101✔
683
        let content_type = match &payload {
101✔
684
            HttpResponsePayload::Empty => HttpContentType::Bytes,
×
685
            HttpResponsePayload::Bytes(..) => HttpContentType::Bytes,
×
686
            HttpResponsePayload::Text(..) => HttpContentType::Text,
62✔
687
            HttpResponsePayload::JSON(..) => HttpContentType::JSON,
39✔
688
        };
689
        let content_length = payload.try_content_length();
101✔
690
        let mut resp_preamble = HttpResponsePreamble::from_http_request_preamble(
101✔
691
            preamble,
101✔
692
            error.code(),
101✔
693
            http_reason(error.code()),
101✔
694
            content_length,
101✔
695
            content_type,
101✔
696
        );
697
        for (key, value) in extra_headers {
101✔
698
            resp_preamble.add_header(key, value);
1✔
699
        }
1✔
700
        StacksHttpResponse::new(resp_preamble, payload)
101✔
701
    }
101✔
702

703
    /// Make a new HTTP error response for text, apropos of nothing
704
    pub fn new_empty_error(error: &dyn HttpErrorResponse) -> StacksHttpResponse {
19✔
705
        let code = error.code();
19✔
706
        let payload = error.payload();
19✔
707
        let reason = http_reason(code);
19✔
708
        let preamble = match &payload {
19✔
709
            HttpResponsePayload::Empty => HttpResponsePreamble::error_bytes(code, reason),
×
710
            HttpResponsePayload::Bytes(..) => HttpResponsePreamble::error_bytes(code, reason),
×
711
            HttpResponsePayload::JSON(..) => HttpResponsePreamble::error_json(code, reason),
×
712
            HttpResponsePayload::Text(ref txt) => {
19✔
713
                HttpResponsePreamble::error_text(code, reason, txt)
19✔
714
            }
715
        };
716

717
        StacksHttpResponse::new(preamble, payload)
19✔
718
    }
19✔
719

720
    /// Get the internal payload if the HTTP response was 200.
721
    /// If it was 404, return NotFoundError
722
    /// Otherwise, if it was not 200, return RecvError
723
    pub fn get_http_payload_ok(self) -> Result<HttpResponsePayload, NetError> {
1,316✔
724
        let (preamble, payload) = self.destruct();
1,316✔
725
        if preamble.status_code == 404 {
1,316✔
726
            return Err(NetError::NotFoundError);
×
727
        }
1,316✔
728

729
        if preamble.status_code != 200 {
1,316✔
730
            return Err(NetError::RecvError(format!(
×
731
                "HTTP status {}",
×
732
                &preamble.status_code
×
733
            )));
×
734
        }
1,316✔
735

736
        Ok(payload)
1,316✔
737
    }
1,316✔
738

739
    /// Clear all extra headers
740
    pub fn clear_headers(&mut self) {
26✔
741
        self.preamble.headers.clear();
26✔
742
    }
26✔
743

744
    #[cfg(test)]
745
    pub fn try_serialize(&self) -> Result<Vec<u8>, NetError> {
6✔
746
        let mut ret = vec![];
6✔
747
        self.send(&mut ret)?;
6✔
748
        Ok(ret)
6✔
749
    }
6✔
750
}
751

752
/// Message type for HTTP
753
#[derive(Debug, Clone, PartialEq)]
754
pub enum StacksHttpMessage {
755
    Request(StacksHttpRequest),
756
    Response(StacksHttpResponse),
757
    Error(String, StacksHttpResponse),
758
}
759

760
/// HTTP message preamble
761
#[derive(Debug, Clone, PartialEq)]
762
pub enum StacksHttpPreamble {
763
    Request(HttpRequestPreamble),
764
    Response(HttpResponsePreamble),
765
}
766

767
impl StacksHttpPreamble {
768
    #[cfg(test)]
769
    pub fn expect_request(self) -> HttpRequestPreamble {
62✔
770
        match self {
62✔
771
            Self::Request(x) => x,
62✔
772
            _ => panic!("Not a request preamble"),
×
773
        }
774
    }
62✔
775

776
    #[cfg(test)]
777
    pub fn expect_response(self) -> HttpResponsePreamble {
×
778
        match self {
×
779
            Self::Response(x) => x,
×
780
            _ => panic!("Not a response preamble"),
×
781
        }
782
    }
×
783
}
784

785
impl StacksMessageCodec for StacksHttpPreamble {
786
    fn consensus_serialize<W: Write>(&self, fd: &mut W) -> Result<(), CodecError> {
×
787
        match *self {
×
788
            StacksHttpPreamble::Request(ref req) => req.consensus_serialize(fd),
×
789
            StacksHttpPreamble::Response(ref res) => res.consensus_serialize(fd),
×
790
        }
791
    }
×
792

793
    fn consensus_deserialize<R: Read>(fd: &mut R) -> Result<StacksHttpPreamble, CodecError> {
7,137✔
794
        let mut retry_fd = RetryReader::new(fd);
7,137✔
795

796
        // the byte stream can decode to a http request or a http response, but not both.
797
        match HttpRequestPreamble::consensus_deserialize(&mut retry_fd) {
7,137✔
798
            Ok(request) => Ok(StacksHttpPreamble::Request(request)),
1,504✔
799
            Err(e_request) => {
5,633✔
800
                // maybe a http response?
801
                retry_fd.set_position(0);
5,633✔
802
                match HttpResponsePreamble::consensus_deserialize(&mut retry_fd) {
5,633✔
803
                    Ok(response) => Ok(StacksHttpPreamble::Response(response)),
1,813✔
804
                    Err(e) => {
3,820✔
805
                        // underflow?
806
                        match (e_request, e) {
3,820✔
807
                            (CodecError::ReadError(ref ioe1), CodecError::ReadError(ref ioe2)) => {
3,805✔
808
                                if ioe1.kind() == io::ErrorKind::UnexpectedEof
3,805✔
809
                                    && ioe2.kind() == io::ErrorKind::UnexpectedEof
3,805✔
810
                                {
811
                                    // out of bytes
812
                                    Err(CodecError::UnderflowError(
3,805✔
813
                                        "Not enough bytes to form a HTTP request or response"
3,805✔
814
                                            .to_string(),
3,805✔
815
                                    ))
3,805✔
816
                                } else {
817
                                    Err(CodecError::DeserializeError(format!(
×
818
                                        "Neither a HTTP request ({:?}) or HTTP response ({:?})",
×
819
                                        ioe1, ioe2
×
820
                                    )))
×
821
                                }
822
                            }
823
                            (e_req, e_res) => Err(CodecError::DeserializeError(format!(
15✔
824
                                "Failed to decode HTTP request or HTTP response (request error: {:?}; response error: {:?})",
15✔
825
                                &e_req, &e_res
15✔
826
                            ))),
15✔
827
                        }
828
                    }
829
                }
830
            }
831
        }
832
    }
7,137✔
833
}
834

835
impl MessageSequence for StacksHttpMessage {
836
    fn request_id(&self) -> u32 {
3,470✔
837
        // there is at most one in-flight HTTP request, as far as a Connection<P> is concerned
838
        HTTP_REQUEST_ID_RESERVED
3,470✔
839
    }
3,470✔
840

841
    fn get_message_name(&self) -> &'static str {
1,735✔
842
        "StacksHttpMessage"
1,735✔
843
    }
1,735✔
844
}
845

846
/// A partially-decoded, streamed HTTP message (response) being received.
847
/// Internally used by StacksHttp to keep track of chunk-decoding state.
848
#[derive(Debug, Clone, PartialEq)]
849
struct StacksHttpRecvStream {
850
    state: HttpChunkedTransferReaderState,
851
    data: Vec<u8>,
852
    total_consumed: usize, // number of *encoded* bytes consumed
853
}
854

855
impl StacksHttpRecvStream {
856
    pub fn new(max_size: u64) -> StacksHttpRecvStream {
1,218✔
857
        StacksHttpRecvStream {
1,218✔
858
            state: HttpChunkedTransferReaderState::new(max_size),
1,218✔
859
            data: vec![],
1,218✔
860
            total_consumed: 0,
1,218✔
861
        }
1,218✔
862
    }
1,218✔
863

864
    /// Feed data into our chunked transfer reader state.  If we finish reading a stream, return
865
    /// the decoded bytes (as Some(Vec<u8>) and the total number of encoded bytes consumed).
866
    /// Always returns the number of bytes consumed.
867
    pub fn consume_data<R: Read>(
2,377✔
868
        &mut self,
2,377✔
869
        fd: &mut R,
2,377✔
870
    ) -> Result<(Option<(Vec<u8>, usize)>, usize), NetError> {
2,377✔
871
        let mut consumed = 0;
2,377✔
872
        let mut blocked = false;
2,377✔
873
        while !blocked {
5,869✔
874
            let mut decoded_buf = vec![0u8; CHUNK_BUF_LEN];
3,493✔
875
            let (read_pass, consumed_pass) = match self.state.do_read(fd, &mut decoded_buf) {
3,493✔
876
                Ok((0, num_consumed)) => {
2,376✔
877
                    test_debug!(
2,376✔
878
                        "consume_data blocked on 0 decoded bytes ({} consumed)",
879
                        num_consumed
880
                    );
881
                    blocked = true;
2,376✔
882
                    (0, num_consumed)
2,376✔
883
                }
884
                Ok((num_read, num_consumed)) => {
1,116✔
885
                    test_debug!(
1,116✔
886
                        "consume_data read {} bytes ({} consumed)",
887
                        num_read,
888
                        num_consumed
889
                    );
890
                    (num_read, num_consumed)
1,116✔
891
                }
892
                Err(e) => {
1✔
893
                    if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut
1✔
894
                    {
895
                        test_debug!("consume_data blocked on read error");
×
896
                        blocked = true;
×
897
                        (0, 0)
×
898
                    } else {
899
                        return Err(NetError::ReadError(e));
1✔
900
                    }
901
                }
902
            };
903

904
            consumed += consumed_pass;
3,492✔
905
            if read_pass > 0 {
3,492✔
906
                self.data
1,116✔
907
                    .extend_from_slice(decoded_buf.get(0..read_pass).ok_or_else(|| {
1,116✔
908
                        NetError::DeserializeError("Expected more bytes in buffer".into())
×
909
                    })?);
×
910
            }
2,376✔
911
        }
912

913
        self.total_consumed += consumed;
2,376✔
914

915
        // did we get a message?
916
        if self.state.is_eof() {
2,376✔
917
            // reset
918
            let message_data = mem::replace(&mut self.data, vec![]);
1,208✔
919
            let total_consumed = self.total_consumed;
1,208✔
920

921
            self.state = HttpChunkedTransferReaderState::new(self.state.max_size);
1,208✔
922
            self.total_consumed = 0;
1,208✔
923

924
            Ok((Some((message_data, total_consumed)), consumed))
1,208✔
925
        } else {
926
            Ok((None, consumed))
1,168✔
927
        }
928
    }
2,377✔
929
}
930

931
/// Information about an in-flight request
932
#[derive(Debug, Clone, PartialEq)]
933
struct StacksHttpReplyData {
934
    request_id: u32,
935
    stream: StacksHttpRecvStream,
936
}
937

938
/// Default response handler, for when using StacksHttp to issue arbitrary requests
939
#[derive(Clone)]
940
struct RPCArbitraryResponseHandler {}
941
impl HttpResponse for RPCArbitraryResponseHandler {
942
    fn try_parse_response(
402✔
943
        &self,
402✔
944
        preamble: &HttpResponsePreamble,
402✔
945
        body: &[u8],
402✔
946
    ) -> Result<HttpResponsePayload, HttpError> {
402✔
947
        match preamble.content_type {
402✔
948
            HttpContentType::Bytes => {
949
                let bytes = parse_bytes(preamble, body, MAX_MESSAGE_LEN.into())?;
66✔
950
                Ok(HttpResponsePayload::Bytes(bytes))
66✔
951
            }
952
            HttpContentType::JSON => {
953
                if body.len() > MAX_MESSAGE_LEN as usize {
291✔
954
                    return Err(HttpError::DecodeError(
×
955
                        "Message is too long to decode".into(),
×
956
                    ));
×
957
                }
291✔
958

959
                let json = parse_json(preamble, body)?;
291✔
960
                Ok(HttpResponsePayload::JSON(json))
282✔
961
            }
962
            HttpContentType::Text => {
963
                let text_bytes = parse_raw_bytes(
45✔
964
                    preamble,
45✔
965
                    body,
45✔
966
                    MAX_MESSAGE_LEN.into(),
45✔
967
                    HttpContentType::Text,
45✔
968
                )?;
×
969
                let text = String::from_utf8_lossy(&text_bytes).to_string();
45✔
970
                Ok(HttpResponsePayload::Text(text))
45✔
971
            }
972
        }
973
    }
402✔
974
}
975

976
/// Stacks HTTP state machine implementation, for bufferring up data.
977
/// One of these exists per Connection<P: Protocol>.
978
/// There can be at most one HTTP request in-flight (i.e. we don't do pipelining).
979
///
980
/// This state machine gets used for both clients and servers.  A client issues an HTTP request,
981
/// and must receive a follow-up HTTP reply (or the state machine errors out).  A server receives
982
/// an HTTP request, and sends an HTTP reply.
983
#[derive(Clone)]
984
pub struct StacksHttp {
985
    /// Address of peer
986
    peer_addr: SocketAddr,
987
    /// offset body after '\r\n\r\n' if known
988
    body_start: Option<usize>,
989
    /// number of preamble bytes seen so far
990
    num_preamble_bytes: usize,
991
    /// last 4 bytes of the preamble we've seen, just in case the \r\n\r\n straddles two calls to
992
    /// read_preamble()
993
    last_four_preamble_bytes: [u8; 4],
994
    /// Incoming reply state
995
    reply: Option<StacksHttpReplyData>,
996
    /// Size of HTTP chunks to write
997
    chunk_size: usize,
998
    /// Which request handler is active.
999
    /// This is only used if this state-machine is used by a client to issue a request and then
1000
    /// parse a reply.  If instead this state-machine is used by the server to parse a request and
1001
    /// send a reply, it will be unused.
1002
    request_handler_index: Option<usize>,
1003
    /// HTTP request handlers (verb, regex, permissive_regex, request-handler)
1004
    /// The permissive_regex is used for 405 Method Not Allowed detection
1005
    request_handlers: Vec<(String, Regex, Regex, Box<dyn RPCRequestHandler>)>,
1006
    /// Maximum size of call arguments
1007
    pub maximum_call_argument_size: u32,
1008
    /// Maximum execution budget of a read-only call
1009
    pub read_only_call_limit: ExecutionCost,
1010
    /// The authorization token to enable access to privileged features, such as the block proposal RPC endpoint
1011
    pub auth_token: Option<String>,
1012
    /// Allow arbitrary responses to be handled in addition to request handlers
1013
    allow_arbitrary_response: bool,
1014
    /// Maximum execution time of a read-only call when in zero cost-tracking mode
1015
    pub read_only_max_execution_time: Duration,
1016
}
1017

1018
impl StacksHttp {
1019
    /// Create an HTTP protocol state machine that handles the built-in RPC API.
1020
    /// Used for building the RPC server
1021
    pub fn new(peer_addr: SocketAddr, conn_opts: &ConnectionOptions) -> StacksHttp {
1,734✔
1022
        let mut http = StacksHttp {
1,734✔
1023
            peer_addr,
1,734✔
1024
            body_start: None,
1,734✔
1025
            num_preamble_bytes: 0,
1,734✔
1026
            last_four_preamble_bytes: [0u8; 4],
1,734✔
1027
            reply: None,
1,734✔
1028
            chunk_size: 8192,
1,734✔
1029
            request_handler_index: None,
1,734✔
1030
            request_handlers: vec![],
1,734✔
1031
            maximum_call_argument_size: conn_opts.maximum_call_argument_size,
1,734✔
1032
            read_only_call_limit: conn_opts.read_only_call_limit.clone(),
1,734✔
1033
            auth_token: conn_opts.auth_token.clone(),
1,734✔
1034
            allow_arbitrary_response: false,
1,734✔
1035
            read_only_max_execution_time: Duration::from_secs(
1,734✔
1036
                conn_opts.read_only_max_execution_time_secs,
1,734✔
1037
            ),
1,734✔
1038
        };
1,734✔
1039
        http.register_rpc_methods();
1,734✔
1040
        http
1,734✔
1041
    }
1,734✔
1042

1043
    /// Create an HTTP protocol state machine that can handle arbitrary responses.
1044
    /// Used for building clients.
1045
    pub fn new_client(peer_addr: SocketAddr, conn_opts: &ConnectionOptions) -> StacksHttp {
407✔
1046
        StacksHttp {
407✔
1047
            peer_addr,
407✔
1048
            body_start: None,
407✔
1049
            num_preamble_bytes: 0,
407✔
1050
            last_four_preamble_bytes: [0u8; 4],
407✔
1051
            reply: None,
407✔
1052
            chunk_size: 8192,
407✔
1053
            request_handler_index: None,
407✔
1054
            request_handlers: vec![],
407✔
1055
            maximum_call_argument_size: conn_opts.maximum_call_argument_size,
407✔
1056
            read_only_call_limit: conn_opts.read_only_call_limit.clone(),
407✔
1057
            auth_token: conn_opts.auth_token.clone(),
407✔
1058
            allow_arbitrary_response: true,
407✔
1059
            read_only_max_execution_time: Duration::from_secs(
407✔
1060
                conn_opts.read_only_max_execution_time_secs,
407✔
1061
            ),
407✔
1062
        }
407✔
1063
    }
407✔
1064

1065
    /// Register an API RPC endpoint.
1066
    /// Auto-generates a permissive regex for 400/404/405 detection
1067
    /// unless the handler provides its own via path_regex_permissive().
1068
    pub fn register_rpc_endpoint<Handler: RPCRequestHandler + 'static>(
88,434✔
1069
        &mut self,
88,434✔
1070
        handler: Handler,
88,434✔
1071
    ) {
88,434✔
1072
        let strict_regex = handler.path_regex();
88,434✔
1073
        let permissive_regex = handler.path_regex_permissive();
88,434✔
1074

1075
        let permissive_regex = if permissive_regex.as_str() == strict_regex.as_str() {
88,434✔
1076
            make_permissive_regex(&strict_regex)
88,434✔
1077
        } else {
UNCOV
1078
            permissive_regex
×
1079
        };
1080

1081
        self.request_handlers.push((
88,434✔
1082
            handler.verb().to_string(),
88,434✔
1083
            strict_regex,
88,434✔
1084
            permissive_regex,
88,434✔
1085
            Box::new(handler),
88,434✔
1086
        ));
88,434✔
1087
    }
88,434✔
1088

1089
    /// Find the HTTP request handler to use to process the reply, given the request path.
1090
    /// Returns the index into the list of handlers
1091
    fn find_response_handler(&self, request_verb: &str, request_path: &str) -> Option<usize> {
2,840✔
1092
        for (i, (verb, regex, _, _)) in self.request_handlers.iter().enumerate() {
106,533✔
1093
            if request_verb != verb {
106,533✔
1094
                continue;
75,158✔
1095
            }
31,375✔
1096
            let Some(_captures) = regex.captures(request_path) else {
31,375✔
1097
                continue;
28,538✔
1098
            };
1099

1100
            return Some(i);
2,837✔
1101
        }
1102
        None
3✔
1103
    }
2,840✔
1104

1105
    /// Find all allowed HTTP methods for a given path.
1106
    /// Returns a list of HTTP verbs that are allowed for handlers whose path regex matches.
1107
    fn find_allowed_methods(&self, request_path: &str) -> Vec<String> {
×
1108
        let mut allowed_methods = Vec::new();
×
1109
        for (verb, regex, permissive_regex, _) in self.request_handlers.iter() {
×
1110
            // Check if either the strict or permissive regex matches
1111
            if regex.is_match(request_path) || permissive_regex.is_match(request_path) {
×
1112
                if !allowed_methods.contains(verb) {
×
1113
                    allowed_methods.push(verb.clone());
×
1114
                }
×
1115
            }
×
1116
        }
1117
        allowed_methods
×
1118
    }
×
1119

1120
    /// Force the state machine to expect a response
1121
    #[cfg(test)]
1122
    pub fn set_response_handler(&mut self, request_verb: &str, request_path: &str) {
37✔
1123
        let handler_index = self
37✔
1124
            .find_response_handler(request_verb, request_path)
37✔
1125
            .unwrap_or_else(|| {
37✔
1126
                panic!("FATAL: could not find handler for '{request_verb}' '{request_path}'")
×
1127
            });
1128
        self.request_handler_index = Some(handler_index);
37✔
1129
    }
37✔
1130

1131
    /// Try to parse an inbound HTTP request using a given handler, preamble, and body
1132
    #[cfg(test)]
1133
    pub fn handle_try_parse_request(
75✔
1134
        &self,
75✔
1135
        handler: &mut dyn RPCRequestHandler,
75✔
1136
        preamble: &HttpRequestPreamble,
75✔
1137
        body: &[u8],
75✔
1138
    ) -> Result<StacksHttpRequest, NetError> {
75✔
1139
        let (decoded_path, query) = decode_request_path(&preamble.path_and_query_str)?;
75✔
1140
        let captures = if let Some(caps) = handler.path_regex().captures(&decoded_path) {
75✔
1141
            caps
70✔
1142
        } else {
1143
            return Err(NetError::NotFoundError);
5✔
1144
        };
1145

1146
        let payload = match handler.try_parse_request(
70✔
1147
            preamble,
70✔
1148
            &captures,
70✔
1149
            if query.is_empty() { None } else { Some(&query) },
70✔
1150
            body,
70✔
1151
        ) {
1152
            Ok(p) => p,
58✔
1153
            Err(e) => {
12✔
1154
                handler.restart();
12✔
1155
                return Err(e.into());
12✔
1156
            }
1157
        };
1158

1159
        let request = StacksHttpRequest::new(preamble.clone(), payload);
58✔
1160
        Ok(request)
58✔
1161
    }
75✔
1162

1163
    /// Try to parse an inbound HTTP request, given its decoded HTTP preamble.
1164
    /// The body will be in the `fd`.
1165
    /// Returns the parsed HTTP request if successful.
1166
    pub fn try_parse_request(
1,443✔
1167
        &mut self,
1,443✔
1168
        preamble: &HttpRequestPreamble,
1,443✔
1169
        body: &[u8],
1,443✔
1170
    ) -> Result<StacksHttpRequest, NetError> {
1,443✔
1171
        let (decoded_path, query) = decode_request_path(&preamble.path_and_query_str)?;
1,443✔
1172
        test_debug!("decoded_path: '{}', query: '{}'", &decoded_path, &query);
1,443✔
1173

1174
        let mut allowed_methods: Vec<String> = Vec::new();
1,443✔
1175
        let mut verb_matched_but_params_invalid = false;
1,443✔
1176
        let mut any_strict_match = false;
1,443✔
1177

1178
        for (verb, regex, permissive_regex, request) in self.request_handlers.iter_mut() {
54,232✔
1179
            let permissive_match = permissive_regex.is_match(&decoded_path);
54,232✔
1180
            let Some(captures) = regex.captures(&decoded_path) else {
54,232✔
1181
                if permissive_match {
52,793✔
1182
                    if &preamble.verb == verb {
30✔
1183
                        verb_matched_but_params_invalid = true;
28✔
1184
                    }
29✔
1185
                    allowed_methods.push(verb.to_string());
30✔
1186
                }
52,763✔
1187
                continue;
52,793✔
1188
            };
1189

1190
            any_strict_match = true;
1,439✔
1191
            allowed_methods.push(verb.to_string());
1,439✔
1192
            if &preamble.verb != verb {
1,439✔
1193
                continue;
1✔
1194
            }
1,438✔
1195

1196
            let payload = match request.try_parse_request(
1,438✔
1197
                preamble,
1,438✔
1198
                &captures,
1,438✔
1199
                if query.is_empty() { None } else { Some(&query) },
1,438✔
1200
                body,
1,438✔
1201
            ) {
1202
                Ok(p) => p,
1,424✔
1203
                Err(e) => {
14✔
1204
                    request.restart();
14✔
1205
                    return Err(e.into());
14✔
1206
                }
1207
            };
1208

1209
            debug!("Handle StacksHttpRequest"; "verb" => %verb, "peer_addr" => %self.peer_addr, "path" => %decoded_path, "query" => %query);
1,424✔
1210
            return Ok(StacksHttpRequest::new(preamble.clone(), payload));
1,424✔
1211
        }
1212

1213
        test_debug!("Failed to parse '{}'", &preamble.path_and_query_str);
5✔
1214

1215
        // 400 if path pattern matched but params invalid (e.g. GET /v3/blocks/65-char-hex)
1216
        // 405 if path exists but wrong method (e.g. DELETE /v2/info)
1217
        // 404 if path doesn't exist
1218
        if !any_strict_match && verb_matched_but_params_invalid {
5✔
1219
            Err(NetError::Http(HttpError::Http(
2✔
1220
                400,
2✔
1221
                format!("Invalid path parameters for '{}'", &decoded_path),
2✔
1222
            )))
2✔
1223
        } else if !allowed_methods.is_empty() {
3✔
1224
            Err(NetError::Http(HttpError::HttpMethodNotAllowed(
1✔
1225
                allowed_methods,
1✔
1226
            )))
1✔
1227
        } else {
1228
            Err(NetError::Http(HttpError::Http(
2✔
1229
                404,
2✔
1230
                "No such file or directory".into(),
2✔
1231
            )))
2✔
1232
        }
1233
    }
1,443✔
1234

1235
    /// Parse out an HTTP response error message
1236
    pub fn try_parse_error_response(
72✔
1237
        preamble: &HttpResponsePreamble,
72✔
1238
        body: &[u8],
72✔
1239
    ) -> Result<StacksHttpResponse, NetError> {
72✔
1240
        if preamble.status_code < 400 || preamble.status_code > 599 {
72✔
1241
            return Err(NetError::DeserializeError(
×
1242
                "Inavlid response: not an error".to_string(),
×
1243
            ));
×
1244
        }
72✔
1245

1246
        let payload = if preamble.content_type == HttpContentType::Text {
72✔
1247
            let mut error_text = String::new();
67✔
1248
            let mut ioc = io::Cursor::new(body);
67✔
1249
            let mut bound_fd = BoundReader::from_reader(&mut ioc, MAX_MESSAGE_LEN as u64);
67✔
1250
            bound_fd
67✔
1251
                .read_to_string(&mut error_text)
67✔
1252
                .map_err(NetError::ReadError)?;
67✔
1253

1254
            HttpResponsePayload::Text(error_text)
67✔
1255
        } else if preamble.content_type == HttpContentType::JSON {
5✔
1256
            let mut ioc = io::Cursor::new(body);
4✔
1257
            let mut bound_fd = BoundReader::from_reader(&mut ioc, MAX_MESSAGE_LEN as u64);
4✔
1258
            let json_val = serde_json::from_reader(&mut bound_fd).map_err(|_| {
4✔
1259
                NetError::DeserializeError("Failed to decode JSON value".to_string())
×
1260
            })?;
×
1261

1262
            HttpResponsePayload::JSON(json_val)
4✔
1263
        } else {
1264
            return Err(NetError::DeserializeError(format!(
1✔
1265
                "Invalid error response: expected text/plain or application/json, got {:?}",
1✔
1266
                &preamble.content_type
1✔
1267
            )));
1✔
1268
        };
1269

1270
        Ok(StacksHttpResponse::new(preamble.clone(), payload))
71✔
1271
    }
72✔
1272

1273
    /// Try to parse an inbound HTTP response, given its decoded HTTP preamble, and the HTTP
1274
    /// version and request path that had originally sent.  The body will be read from `fd`.
1275
    pub fn try_parse_response(
1,398✔
1276
        &mut self,
1,398✔
1277
        request_handler_index: usize,
1,398✔
1278
        preamble: &HttpResponsePreamble,
1,398✔
1279
        body: &[u8],
1,398✔
1280
    ) -> Result<StacksHttpResponse, NetError> {
1,398✔
1281
        if preamble.status_code >= 400 {
1,398✔
1282
            return Self::try_parse_error_response(preamble, body);
72✔
1283
        }
1,326✔
1284

1285
        let (_, _, _, parser) = self
1,326✔
1286
            .request_handlers
1,326✔
1287
            .get(request_handler_index)
1,326✔
1288
            .expect("FATAL: tried to use nonexistent response handler");
1,326✔
1289
        let payload = parser.try_parse_response(preamble, body)?;
1,326✔
1290
        let response = StacksHttpResponse::new(preamble.clone(), payload);
1,321✔
1291
        return Ok(response);
1,321✔
1292
    }
1,398✔
1293

1294
    /// Handle an HTTP request by generating an HTTP response.
1295
    /// Returns Ok((preamble, contents)) on success.  Note that this could be an HTTP error
1296
    /// message.
1297
    /// Returns Err(..) on failure to decode or generate the response.
1298
    pub fn try_handle_request(
1,421✔
1299
        &mut self,
1,421✔
1300
        request: StacksHttpRequest,
1,421✔
1301
        node: &mut StacksNodeState,
1,421✔
1302
    ) -> Result<(HttpResponsePreamble, HttpResponseContents), NetError> {
1,421✔
1303
        let (decoded_path, _) = decode_request_path(&request.preamble().path_and_query_str)?;
1,421✔
1304
        let Some(response_handler_index) = request
1,421✔
1305
            .response_handler_index
1,421✔
1306
            .or_else(|| self.find_response_handler(&request.preamble().verb, &decoded_path))
1,421✔
1307
        else {
1308
            // Handler not found - check if it's a method not allowed (405) or not found (404)
1309
            let allowed_methods = self.find_allowed_methods(&decoded_path);
×
1310
            if !allowed_methods.is_empty() {
×
1311
                // Path exists but method is not allowed (405)
1312
                let error = HttpMethodNotAllowed::with_allowed_methods(allowed_methods);
×
1313
                let allowed_str = error.get_allowed_methods().join(", ");
×
1314
                return StacksHttpResponse::new_error_with_headers(
×
1315
                    &request.preamble,
×
1316
                    &error,
×
1317
                    vec![("Allow".to_string(), allowed_str)],
×
1318
                )
1319
                .try_into_contents();
×
1320
            }
×
1321
            // Path does not exist (404)
1322
            return StacksHttpResponse::new_error(
×
1323
                &request.preamble,
×
1324
                &HttpNotFound::new(format!(
×
1325
                    "No such API endpoint '{} {}'",
×
1326
                    &request.preamble().verb,
×
1327
                    &decoded_path
×
1328
                )),
×
1329
            )
1330
            .try_into_contents();
×
1331
        };
1332

1333
        let (_, _, _, request_handler) = self
1,421✔
1334
            .request_handlers
1,421✔
1335
            .get_mut(response_handler_index)
1,421✔
1336
            .expect("FATAL: request points to a nonexistent handler");
1,421✔
1337
        let request_preamble = request.preamble.clone();
1,421✔
1338
        let request_result =
1,421✔
1339
            request_handler.try_handle_request(request.preamble, request.contents, node);
1,421✔
1340
        request_handler.restart();
1,421✔
1341

1342
        let (response_preamble, response_contents) = match request_result {
1,421✔
1343
            Ok((rp, rc)) => (rp, rc),
1,421✔
1344
            Err(NetError::Http(e)) => {
×
1345
                debug!("RPC handler for {} failed: {:?}", decoded_path, &e);
×
1346
                return StacksHttpResponse::new_error(&request_preamble, &*e.into_http_error())
×
1347
                    .try_into_contents();
×
1348
            }
1349
            Err(e) => {
×
1350
                warn!("Irrecoverable error when handling request"; "path" => %request_preamble.path_and_query_str, "error" => %e);
×
1351
                return Err(e);
×
1352
            }
1353
        };
1354
        Ok((response_preamble, response_contents))
1,421✔
1355
    }
1,421✔
1356

1357
    #[cfg(test)]
1358
    pub fn num_pending(&self) -> usize {
31✔
1359
        self.reply.as_ref().map(|_| 1).unwrap_or(0)
31✔
1360
    }
31✔
1361

1362
    /// Set up the pending response
1363
    /// Called indirectly from ProtocolFamily::read_preamble() when handling an HTTP response
1364
    /// Used for dealing with streaming data
1365
    fn set_pending(&mut self, preamble: &HttpResponsePreamble) {
1,218✔
1366
        self.reply = Some(StacksHttpReplyData {
1,218✔
1367
            request_id: preamble
1,218✔
1368
                .get_request_id()
1,218✔
1369
                .unwrap_or(HTTP_REQUEST_ID_RESERVED),
1,218✔
1370
            stream: StacksHttpRecvStream::new(MAX_MESSAGE_LEN as u64),
1,218✔
1371
        });
1,218✔
1372
    }
1,218✔
1373

1374
    /// Set the preamble. This is only relevant for receiving an HTTP response to a request that we
1375
    /// already sent.  It gets called from ProtocolFamily::read_preamble().
1376
    ///
1377
    /// This method will set up this state machine to consume the message associated with this
1378
    /// premable, if the response is chunked.
1379
    fn set_preamble(&mut self, preamble: &StacksHttpPreamble) -> Result<(), NetError> {
3,317✔
1380
        if let StacksHttpPreamble::Response(ref http_response_preamble) = preamble {
3,317✔
1381
            // we can only receive a response if we're expecting it
1382
            if self.request_handler_index.is_none() && !self.allow_arbitrary_response {
1,813✔
1383
                return Err(NetError::DeserializeError(
×
1384
                    "Unexpected HTTP response: no active request handler".to_string(),
×
1385
                ));
×
1386
            }
1,813✔
1387
            if http_response_preamble.is_chunked() {
1,813✔
1388
                // we can only receive one response at a time
1389
                if self.reply.is_some() {
1,221✔
1390
                    test_debug!("Have pending reply already");
3✔
1391
                    return Err(NetError::InProgress);
3✔
1392
                }
1,218✔
1393

1394
                self.set_pending(http_response_preamble);
1,218✔
1395
            }
592✔
1396
        }
1,504✔
1397
        Ok(())
3,314✔
1398
    }
3,317✔
1399

1400
    /// Clear any pending response state -- i.e. due to a failed request.
1401
    fn reset(&mut self) {
1,430✔
1402
        self.request_handler_index = None;
1,430✔
1403
        self.reply = None;
1,430✔
1404
    }
1,430✔
1405

1406
    /// Used for processing chunk-encoded streams.
1407
    /// Given the preamble and a Read, stream the bytes into a chunk-decoder.  Return the decoded
1408
    /// bytes if we decode an entire stream.  Always return the number of bytes consumed.
1409
    /// Returns Ok((Some(decoded bytes we got, total number of encoded bytes), number of bytes gotten in this call)) if we're done decoding.
1410
    /// Returns Ok((None, number of bytes gotten in this call)) if there's more to decode.
1411
    pub fn consume_data<R: Read>(
2,377✔
1412
        &mut self,
2,377✔
1413
        preamble: &HttpResponsePreamble,
2,377✔
1414
        fd: &mut R,
2,377✔
1415
    ) -> Result<(Option<(Vec<u8>, usize)>, usize), NetError> {
2,377✔
1416
        if !preamble.is_chunked() {
2,377✔
1417
            return Err(NetError::InvalidState);
×
1418
        }
2,377✔
1419
        if let Some(reply) = self.reply.as_mut() {
2,377✔
1420
            match reply.stream.consume_data(fd).inspect_err(|_e| {
2,377✔
1421
                self.reset();
1✔
1422
            })? {
1✔
1423
                (Some((byte_vec, bytes_total)), sz) => {
1,208✔
1424
                    // done receiving
1425
                    self.reply = None;
1,208✔
1426
                    Ok((Some((byte_vec, bytes_total)), sz))
1,208✔
1427
                }
1428
                res => Ok(res),
1,168✔
1429
            }
1430
        } else {
1431
            return Err(NetError::InvalidState);
×
1432
        }
1433
    }
2,377✔
1434

1435
    /// Calculate the search window for \r\n\r\n in the preamble stream.
1436
    ///
1437
    /// As we are streaming the preamble, we're looking for the pattern `\r\n\r\n`.  The last four
1438
    /// bytes of the encoded preamble are always stored in `self.last_four_preamble_bytes`; this
1439
    /// gets updated as the preamble data is streamed in.  So, given these last four bytes, and the
1440
    /// next chunk of data streamed in from the request (in `buf`), determine the 4-byte sequence
1441
    /// to check for `\r\n\r\n`.
1442
    ///
1443
    /// `i` is the offset into the chunk `buf` being searched.  If `i < 4`, then we must check the
1444
    /// last `4 - i` bytes of `self.last_four_preamble_bytes` as well as the first `i` bytes of
1445
    /// `buf`.  Otherwise, we just check `buf[i-4..i]`.
1446
    #[allow(clippy::indexing_slicing)]
1447
    fn body_start_search_window(&self, i: usize, buf: &[u8]) -> [u8; 4] {
2,447,416✔
1448
        let window = match i {
2,447,416✔
1449
            0 => [
2,400✔
1450
                self.last_four_preamble_bytes[0],
2,400✔
1451
                self.last_four_preamble_bytes[1],
2,400✔
1452
                self.last_four_preamble_bytes[2],
2,400✔
1453
                self.last_four_preamble_bytes[3],
2,400✔
1454
            ],
2,400✔
1455
            1 => [
2,400✔
1456
                self.last_four_preamble_bytes[1],
2,400✔
1457
                self.last_four_preamble_bytes[2],
2,400✔
1458
                self.last_four_preamble_bytes[3],
2,400✔
1459
                buf[0],
2,400✔
1460
            ],
2,400✔
1461
            2 => [
2,400✔
1462
                self.last_four_preamble_bytes[2],
2,400✔
1463
                self.last_four_preamble_bytes[3],
2,400✔
1464
                buf[0],
2,400✔
1465
                buf[1],
2,400✔
1466
            ],
2,400✔
1467
            3 => [self.last_four_preamble_bytes[3], buf[0], buf[1], buf[2]],
2,498✔
1468
            _ => [buf[i - 4], buf[i - 3], buf[i - 2], buf[i - 1]],
2,437,718✔
1469
        };
1470
        window
2,447,416✔
1471
    }
2,447,416✔
1472

1473
    /// Get a unique `&str` identifier for each request type
1474
    /// This can only return a finite set of identifiers, which makes it safer to use for Prometheus metrics
1475
    /// For details see https://github.com/stacks-network/stacks-core/issues/4574
1476
    pub fn metrics_identifier(&self, req: &mut StacksHttpRequest) -> &str {
1,428✔
1477
        let Ok((decoded_path, _)) = decode_request_path(req.request_path()) else {
1,428✔
1478
            return "<err-url-decode>";
1✔
1479
        };
1480

1481
        let Some(response_handler_index) = req
1,427✔
1482
            .response_handler_index
1,427✔
1483
            .or_else(|| self.find_response_handler(&req.preamble().verb, &decoded_path))
1,427✔
1484
        else {
1485
            return "<err-handler-not-found>";
3✔
1486
        };
1487
        req.response_handler_index = Some(response_handler_index);
1,424✔
1488

1489
        let (_, _, _, request_handler) = self
1,424✔
1490
            .request_handlers
1,424✔
1491
            .get(response_handler_index)
1,424✔
1492
            .expect("FATAL: request points to a nonexistent handler");
1,424✔
1493

1494
        request_handler.metrics_identifier()
1,424✔
1495
    }
1,428✔
1496

1497
    /// Given a fully-formed single HTTP response, parse it (used by clients).
1498
    #[cfg(test)]
1499
    pub fn parse_response(
23✔
1500
        verb: &str,
23✔
1501
        request_path: &str,
23✔
1502
        response_buf: &[u8],
23✔
1503
    ) -> Result<StacksHttpMessage, NetError> {
23✔
1504
        let mut http = StacksHttp::new(
23✔
1505
            "127.0.0.1:20443".parse().unwrap(),
23✔
1506
            &ConnectionOptions::default(),
23✔
1507
        );
1508
        http.allow_arbitrary_response = true;
23✔
1509

1510
        let (preamble, message_offset) = http.read_preamble(response_buf)?;
23✔
1511
        let is_chunked = match preamble {
23✔
1512
            StacksHttpPreamble::Response(ref resp) => resp.is_chunked(),
23✔
1513
            _ => {
1514
                return Err(NetError::DeserializeError(
×
1515
                    "Invalid HTTP message: did not get a Response preamble".to_string(),
×
1516
                ));
×
1517
            }
1518
        };
1519

1520
        let mut message_bytes = &response_buf[message_offset..];
23✔
1521

1522
        if is_chunked {
23✔
1523
            match http.stream_payload(&preamble, &mut message_bytes)? {
11✔
1524
                (Some((message, _)), _) => Ok(message),
11✔
1525
                (None, _) => Err(NetError::UnderflowError(
×
1526
                    "Not enough bytes to form a streamed HTTP response".to_string(),
×
1527
                )),
×
1528
            }
1529
        } else {
1530
            let (message, _) = http.read_payload(&preamble, message_bytes)?;
12✔
1531
            Ok(message)
12✔
1532
        }
1533
    }
23✔
1534
}
1535

1536
impl ProtocolFamily for StacksHttp {
1537
    type Preamble = StacksHttpPreamble;
1538
    type Message = StacksHttpMessage;
1539

1540
    /// how big can a preamble get?
1541
    fn preamble_size_hint(&mut self) -> usize {
7,285✔
1542
        HTTP_PREAMBLE_MAX_ENCODED_SIZE as usize
7,285✔
1543
    }
7,285✔
1544

1545
    /// how big is this message?  Might not know if we're dealing with chunked encoding.
1546
    fn payload_len(&mut self, preamble: &StacksHttpPreamble) -> Option<usize> {
12,106✔
1547
        match *preamble {
12,106✔
1548
            StacksHttpPreamble::Request(ref http_request_preamble) => {
4,501✔
1549
                Some(http_request_preamble.get_content_length() as usize)
4,501✔
1550
            }
1551
            StacksHttpPreamble::Response(ref http_response_preamble) => http_response_preamble
7,605✔
1552
                .content_length
7,605✔
1553
                .map(|len| len as usize),
7,605✔
1554
        }
1555
    }
12,106✔
1556

1557
    /// Read the next HTTP preamble (be it a request or a response), and return the preamble and
1558
    /// the number of bytes consumed while reading it.
1559
    fn read_preamble(&mut self, buf: &[u8]) -> Result<(StacksHttpPreamble, usize), NetError> {
7,420✔
1560
        // does this contain end-of-headers marker, including the last four bytes of preamble we
1561
        // saw?
1562
        if self.body_start.is_none() {
7,420✔
1563
            for i in 0..=buf.len() {
2,447,116✔
1564
                let window = self.body_start_search_window(i, buf);
2,447,116✔
1565
                if window == [b'\r', b'\n', b'\r', b'\n'] {
2,447,116✔
1566
                    self.body_start = Some(self.num_preamble_bytes + i);
2,818✔
1567
                }
2,444,298✔
1568
            }
1569
        }
5,020✔
1570
        if self.body_start.is_none() {
7,420✔
1571
            // haven't found the body yet, so update `last_four_preamble_bytes`
1572
            // and report underflow
1573
            let len = buf.len();
300✔
1574
            let last_four_preamble_bytes = self.body_start_search_window(len, buf);
300✔
1575
            self.num_preamble_bytes += len;
300✔
1576
            self.last_four_preamble_bytes = last_four_preamble_bytes;
300✔
1577
            return Err(NetError::UnderflowError(
300✔
1578
                "Not enough bytes to form HTTP preamble".into(),
300✔
1579
            ));
300✔
1580
        }
7,120✔
1581

1582
        let mut cursor = io::Cursor::new(buf);
7,120✔
1583

1584
        let preamble = {
3,317✔
1585
            let mut rd =
7,120✔
1586
                BoundReader::from_reader(&mut cursor, HTTP_PREAMBLE_MAX_ENCODED_SIZE as u64);
7,120✔
1587
            let preamble: StacksHttpPreamble = read_next(&mut rd)?;
7,120✔
1588
            preamble
3,317✔
1589
        };
1590

1591
        let preamble_len = cursor.position() as usize;
3,317✔
1592
        self.set_preamble(&preamble)?;
3,317✔
1593

1594
        Ok((preamble, preamble_len))
3,314✔
1595
    }
7,420✔
1596

1597
    /// Stream a payload of unknown length.  Only gets called if payload_len() returns None.
1598
    ///
1599
    /// Returns Ok((Some((message, num-bytes-consumed)), num-bytes-read)) if we read enough data to
1600
    /// form a message.  `num-bytes-consumed` is the number of bytes required to parse the message,
1601
    /// and `num-bytes-read` is the number of bytes read in this call.
1602
    ///
1603
    /// Returns Ok((None, num-bytes-read)) if we consumed data (i.e. `num-bytes-read` bytes), but
1604
    /// did not yet have enough of the message to parse it.  The caller should try again.
1605
    ///
1606
    /// Returns Error on irrecoverable error.
1607
    fn stream_payload<R: Read>(
2,377✔
1608
        &mut self,
2,377✔
1609
        preamble: &StacksHttpPreamble,
2,377✔
1610
        fd: &mut R,
2,377✔
1611
    ) -> Result<(Option<(StacksHttpMessage, usize)>, usize), NetError> {
2,377✔
1612
        if self.payload_len(preamble).is_some() {
2,377✔
1613
            return Err(NetError::InvalidState);
×
1614
        }
2,377✔
1615
        match preamble {
2,377✔
1616
            StacksHttpPreamble::Request(_) => {
1617
                // HTTP requests can't be chunk-encoded, so this should never be reached
1618
                return Err(NetError::InvalidState);
×
1619
            }
1620
            StacksHttpPreamble::Response(ref http_response_preamble) => {
2,377✔
1621
                if !http_response_preamble.is_chunked() {
2,377✔
1622
                    return Err(NetError::InvalidState);
×
1623
                }
2,377✔
1624

1625
                // sanity check -- if we're receiving a response, then we must have earlier issued
1626
                // a request, or we must be in client mode. Thus, we must already know which
1627
                // response handler to use. Otherwise, someone sent us malforemd data.
1628
                if self.request_handler_index.is_none() && !self.allow_arbitrary_response {
2,377✔
1629
                    self.reset();
×
1630
                    return Err(NetError::DeserializeError(
×
1631
                        "Unsolicited HTTP response".to_string(),
×
1632
                    ));
×
1633
                }
2,377✔
1634

1635
                // message of unknown length.  Buffer up and maybe we can parse it.
1636
                let (message_bytes_opt, num_read) = self
2,377✔
1637
                    .consume_data(http_response_preamble, fd)
2,377✔
1638
                    .inspect_err(|_e| {
2,377✔
1639
                    self.reset();
1✔
1640
                })?;
1✔
1641

1642
                match message_bytes_opt {
2,376✔
1643
                    Some((message_bytes, total_bytes_consumed)) => {
1,208✔
1644
                        // can parse!
1645
                        test_debug!(
1,208✔
1646
                            "read http response payload of {} bytes (just buffered {})",
1647
                            message_bytes.len(),
×
1648
                            num_read,
1649
                        );
1650

1651
                        let parse_res = if self.request_handler_index.is_none()
1,208✔
1652
                            && self.allow_arbitrary_response
11✔
1653
                        {
1654
                            let arbitrary_parser = RPCArbitraryResponseHandler {};
11✔
1655
                            let response_payload = arbitrary_parser
11✔
1656
                                .try_parse_response(http_response_preamble, &message_bytes[..])?;
11✔
1657
                            Ok(StacksHttpResponse::new(
11✔
1658
                                http_response_preamble.clone(),
11✔
1659
                                response_payload,
11✔
1660
                            ))
11✔
1661
                        } else {
1662
                            // we now know the content-length, so pass it into the parser.
1663
                            let handler_index =
1,197✔
1664
                                self.request_handler_index
1,197✔
1665
                                    .ok_or(NetError::DeserializeError(
1,197✔
1666
                                        "Unknown HTTP response handler".to_string(),
1,197✔
1667
                                    ))?;
1,197✔
1668

1669
                            self.try_parse_response(
1,197✔
1670
                                handler_index,
1,197✔
1671
                                http_response_preamble,
1,197✔
1672
                                &message_bytes[..],
1,197✔
1673
                            )
1674
                        };
1675

1676
                        // done parsing
1677
                        self.reset();
1,208✔
1678
                        match parse_res {
1,208✔
1679
                            Ok(data_response) => Ok((
1,207✔
1680
                                Some((
1,207✔
1681
                                    StacksHttpMessage::Response(data_response),
1,207✔
1682
                                    total_bytes_consumed,
1,207✔
1683
                                )),
1,207✔
1684
                                num_read,
1,207✔
1685
                            )),
1,207✔
1686
                            Err(e) => {
1✔
1687
                                info!("Failed to parse HTTP response: {:?}", &e);
1✔
1688
                                Err(e)
1✔
1689
                            }
1690
                        }
1691
                    }
1692
                    None => {
1693
                        // need more data
1694
                        test_debug!(
1,168✔
1695
                            "did not read http response payload, but buffered {}",
1696
                            num_read
1697
                        );
1698
                        Ok((None, num_read))
1,168✔
1699
                    }
1700
                }
1701
            }
1702
        }
1703
    }
2,377✔
1704

1705
    /// Parse a payload of known length.
1706
    /// Only gets called if payload_len() returns Some(...).
1707
    ///
1708
    /// Return Ok(message, num-bytes-consumed) if we decoded a message.  The message will
1709
    /// have consumed `num-bytes-consumed` bytes.
1710
    ///
1711
    /// Return Err(..) if we failed to decode the message.
1712
    fn read_payload(
2,033✔
1713
        &mut self,
2,033✔
1714
        preamble: &StacksHttpPreamble,
2,033✔
1715
        buf: &[u8],
2,033✔
1716
    ) -> Result<(StacksHttpMessage, usize), NetError> {
2,033✔
1717
        match preamble {
2,033✔
1718
            StacksHttpPreamble::Request(ref http_request_preamble) => {
1,441✔
1719
                // all requests have a known length
1720
                let len = http_request_preamble.get_content_length() as usize;
1,441✔
1721
                let Some(buf_data) = buf.get(0..len) else {
1,441✔
1722
                    return Err(NetError::InvalidState);
×
1723
                };
1724

1725
                trace!("read http request payload of {} bytes", len);
1,441✔
1726

1727
                match self.try_parse_request(http_request_preamble, buf_data) {
1,441✔
1728
                    Ok(data_request) => Ok((StacksHttpMessage::Request(data_request), len)),
1,422✔
1729
                    Err(NetError::Http(http_error)) => {
19✔
1730
                        // convert into a response
1731
                        // for 405 responses, use the structured allowed methods
1732
                        let extra_headers =
19✔
1733
                            if let HttpError::HttpMethodNotAllowed(ref methods) = &http_error {
19✔
1734
                                vec![("Allow".to_string(), methods.join(", "))]
1✔
1735
                            } else {
1736
                                vec![]
18✔
1737
                            };
1738
                        let resp = StacksHttpResponse::new_error_with_headers(
19✔
1739
                            http_request_preamble,
19✔
1740
                            &*http_error.into_http_error(),
19✔
1741
                            extra_headers,
19✔
1742
                        );
1743
                        self.reset();
19✔
1744
                        return Ok((
19✔
1745
                            StacksHttpMessage::Error(
19✔
1746
                                http_request_preamble.path_and_query_str.clone(),
19✔
1747
                                resp,
19✔
1748
                            ),
19✔
1749
                            len,
19✔
1750
                        ));
19✔
1751
                    }
1752
                    Err(e) => {
×
1753
                        info!("Failed to parse HTTP request: {:?}", &e);
×
1754
                        self.reset();
×
1755
                        Err(e)
×
1756
                    }
1757
                }
1758
            }
1759
            StacksHttpPreamble::Response(ref http_response_preamble) => {
592✔
1760
                if http_response_preamble.is_chunked() {
592✔
1761
                    return Err(NetError::InvalidState);
×
1762
                }
592✔
1763

1764
                // message of known length
1765
                test_debug!("read http response payload of {} bytes", buf.len(),);
592✔
1766

1767
                if self.request_handler_index.is_none() && self.allow_arbitrary_response {
592✔
1768
                    let arbitrary_parser = RPCArbitraryResponseHandler {};
391✔
1769
                    let response_payload =
382✔
1770
                        arbitrary_parser.try_parse_response(http_response_preamble, buf)?;
391✔
1771
                    if http_response_preamble.status_code >= 400 {
382✔
1772
                        return Ok((
27✔
1773
                            StacksHttpMessage::Error(
27✔
1774
                                "(client-given)".into(),
27✔
1775
                                StacksHttpResponse::new(
27✔
1776
                                    http_response_preamble.clone(),
27✔
1777
                                    response_payload,
27✔
1778
                                ),
27✔
1779
                            ),
27✔
1780
                            buf.len(),
27✔
1781
                        ));
27✔
1782
                    } else {
1783
                        return Ok((
355✔
1784
                            StacksHttpMessage::Response(StacksHttpResponse::new(
355✔
1785
                                http_response_preamble.clone(),
355✔
1786
                                response_payload,
355✔
1787
                            )),
355✔
1788
                            buf.len(),
355✔
1789
                        ));
355✔
1790
                    }
1791
                }
201✔
1792

1793
                // sanity check -- if we're receiving a response, then we must have earlier issued
1794
                // a request. Thus, we must already know which response handler to use.
1795
                // Otherwise, someone sent us malformed data.
1796
                let handler_index = self.request_handler_index.ok_or_else(|| {
201✔
1797
                    self.reset();
×
1798
                    NetError::DeserializeError("Unsolicited HTTP response".to_string())
×
1799
                })?;
×
1800

1801
                let res = self.try_parse_response(handler_index, http_response_preamble, buf);
201✔
1802
                self.reset();
201✔
1803
                res.map(|data_response| (StacksHttpMessage::Response(data_response), buf.len()))
201✔
1804
            }
1805
        }
1806
    }
2,033✔
1807

1808
    fn verify_payload_bytes(
×
1809
        &mut self,
×
1810
        _key: &StacksPublicKey,
×
1811
        _preamble: &StacksHttpPreamble,
×
1812
        _bytes: &[u8],
×
1813
    ) -> Result<(), NetError> {
×
1814
        // not defined for HTTP messages, but maybe we could add a signature header at some point
1815
        // in the future if needed.
1816
        Ok(())
×
1817
    }
×
1818

1819
    /// Write out a message to `fd`.
1820
    ///
1821
    /// NOTE: If we're sending a StacksHttpMessage::Request(..), then the next preamble and payload
1822
    /// received _must be_ a StacksHttpMessage::Response(..) in response to the request.
1823
    /// If it is not, then that decode will fail.
1824
    fn write_message<W: Write>(
1,402✔
1825
        &mut self,
1,402✔
1826
        fd: &mut W,
1,402✔
1827
        message: &StacksHttpMessage,
1,402✔
1828
    ) -> Result<(), NetError> {
1,402✔
1829
        match *message {
1,402✔
1830
            StacksHttpMessage::Request(ref req) => {
1,376✔
1831
                // the node cannot send more than one request in parallel, unless the client is
1832
                // directing it
1833
                let handler_index = if !self.allow_arbitrary_response {
1,376✔
1834
                    if self.request_handler_index.is_some() {
1,376✔
1835
                        test_debug!("Have pending request already");
×
1836
                        return Err(NetError::InProgress);
×
1837
                    }
1,376✔
1838

1839
                    // find the response handler we'll use
1840
                    let (decoded_path, _) =
1,376✔
1841
                        decode_request_path(&req.preamble().path_and_query_str)?;
1,376✔
1842
                    let handler_index = self
1,376✔
1843
                        .find_response_handler(&req.preamble().verb, &decoded_path)
1,376✔
1844
                        .ok_or(NetError::SendError(format!(
1,376✔
1845
                            "No response handler found for `{} {}`",
1,376✔
1846
                            &req.preamble().verb,
1,376✔
1847
                            &decoded_path
1,376✔
1848
                        )))?;
1,376✔
1849
                    Some(handler_index)
1,376✔
1850
                } else {
1851
                    None
×
1852
                };
1853

1854
                req.send(fd)?;
1,376✔
1855

1856
                // remember this so we'll know how to decode the response.
1857
                // The next preamble and message we'll read _must be_ a response!
1858
                if handler_index.is_some() {
1,376✔
1859
                    self.request_handler_index = handler_index;
1,376✔
1860
                }
1,376✔
1861
                Ok(())
1,376✔
1862
            }
1863
            StacksHttpMessage::Response(ref resp) => resp.send(fd),
26✔
1864
            StacksHttpMessage::Error(_, ref resp) => resp.send(fd),
×
1865
        }
1866
    }
1,402✔
1867
}
1868

1869
impl PeerNetwork {
1870
    /// Send a (non-blocking) HTTP request to a remote peer.
1871
    /// Returns the event ID on success.
1872
    #[cfg_attr(test, mutants::skip)]
1873
    pub fn connect_or_send_http_request(
967✔
1874
        &mut self,
967✔
1875
        data_url: UrlString,
967✔
1876
        addr: SocketAddr,
967✔
1877
        request: StacksHttpRequest,
967✔
1878
    ) -> Result<usize, NetError> {
967✔
1879
        PeerNetwork::with_network_state(self, |ref mut network, ref mut network_state| {
967✔
1880
            PeerNetwork::with_http(network, |ref mut network, ref mut http| {
967✔
1881
                match http.connect_http(
967✔
1882
                    network_state,
967✔
1883
                    network,
967✔
1884
                    data_url.clone(),
967✔
1885
                    addr,
967✔
1886
                    Some(request.clone()),
967✔
1887
                ) {
1888
                    Ok(event_id) => Ok(event_id),
705✔
1889
                    Err(NetError::AlreadyConnected(event_id, _)) => {
262✔
1890
                        if let (Some(ref mut convo), Some(ref mut socket)) =
262✔
1891
                            http.get_conversation_and_socket(event_id)
262✔
1892
                        {
1893
                            convo.send_request(request)?;
262✔
1894
                            HttpPeer::saturate_http_socket(socket, convo)?;
262✔
1895
                            Ok(event_id)
262✔
1896
                        } else {
1897
                            debug!("HTTP failed to connect to {data_url}, {addr:?}");
×
1898
                            Err(NetError::PeerNotConnected(format!(
×
1899
                                "HTTP failed to connect to {data_url}, {addr:?}",
×
1900
                            )))
×
1901
                        }
1902
                    }
1903
                    Err(e) => Err(e),
×
1904
                }
1905
            })
967✔
1906
        })
967✔
1907
    }
967✔
1908
}
1909

1910
/// Given a raw path, decode it (i.e. if it's url-encoded)
1911
/// Return the (decoded-path, query-string) on success
1912
pub fn decode_request_path(path: &str) -> Result<(String, String), NetError> {
7,657✔
1913
    let local_url = format!("http://local{}", path);
7,657✔
1914
    let url = Url::parse(&local_url).map_err(|_e| {
7,657✔
1915
        NetError::DeserializeError("Http request path could not be parsed".to_string())
1✔
1916
    })?;
1✔
1917

1918
    let decoded_path = percent_decode_str(url.path()).decode_utf8().map_err(|_e| {
7,656✔
1919
        NetError::DeserializeError("Http request path could not be parsed as UTF-8".to_string())
×
1920
    })?;
×
1921

1922
    let query_str = url.query();
7,656✔
1923
    Ok((
7,656✔
1924
        decoded_path.to_string(),
7,656✔
1925
        query_str.unwrap_or("").to_string(),
7,656✔
1926
    ))
7,656✔
1927
}
7,657✔
1928

1929
/// Convert a NetError into an io::Error if appropriate.
1930
fn handle_net_error(e: NetError, msg: &str) -> io::Error {
9✔
1931
    match e {
9✔
1932
        NetError::ReadError(ioe) | NetError::WriteError(ioe) => ioe,
×
1933
        NetError::RecvTimeout => io::Error::new(io::ErrorKind::WouldBlock, "recv timeout"),
×
1934
        _ => io::Error::other(format!("{e}: {msg:?}").as_str()),
9✔
1935
    }
1936
}
9✔
1937

1938
/// Send an HTTP request to the given host:port.  Returns the decoded response.
1939
/// Internally, this creates a socket, connects it, sends the HTTP request, and decodes the HTTP
1940
/// response.  It is a blocking operation.
1941
///
1942
/// If the request encounters a network error, then return an error.  Don't retry.
1943
/// If the request times out after `timeout`, then return an error.
1944
pub fn send_http_request(
434✔
1945
    host: &str,
434✔
1946
    port: u16,
434✔
1947
    request: StacksHttpRequest,
434✔
1948
    timeout: Duration,
434✔
1949
) -> Result<StacksHttpResponse, io::Error> {
434✔
1950
    // Find the host:port that works.
1951
    // This is sometimes necessary because `localhost` can resolve to both its ipv4 and ipv6
1952
    // addresses, but usually, Stacks services like event observers are only bound to ipv4
1953
    // addresses.  So, be sure to use an address that will lead to a socket connection!
1954
    let mut stream_and_addr = None;
434✔
1955
    let mut last_err = None;
434✔
1956
    for addr in format!("{host}:{port}").to_socket_addrs()? {
434✔
1957
        debug!("send_request: connect to {}", &addr);
434✔
1958
        match TcpStream::connect_timeout(&addr, timeout) {
434✔
1959
            Ok(sock) => {
407✔
1960
                stream_and_addr = Some((sock, addr));
407✔
1961
                break;
407✔
1962
            }
1963
            Err(e) => {
27✔
1964
                last_err = Some(e);
27✔
1965
            }
27✔
1966
        }
1967
    }
1968

1969
    let Some((mut stream, addr)) = stream_and_addr else {
434✔
1970
        return Err(last_err.unwrap_or(io::Error::other("Unable to connect to {host}:{port}")));
27✔
1971
    };
1972

1973
    stream.set_read_timeout(Some(timeout))?;
407✔
1974
    stream.set_write_timeout(Some(timeout))?;
407✔
1975
    stream.set_nodelay(true)?;
407✔
1976

1977
    let start = Instant::now();
407✔
1978
    let request_path = request.request_path();
407✔
1979
    debug!("send_request: Sending request"; "request" => request_path);
407✔
1980

1981
    // Some explanation of what's going on here is in order.
1982
    //
1983
    // The networking stack in Stacks is designed to operate on non-blocking sockets, and
1984
    // furthermore, it operates in a way that the call site in which a network request is issued can
1985
    // be in a wholly separate stack (or thread) from the connection.  While this is absolutely necessary
1986
    // within the Stacks node, using it to issue a single blocking request imposes a lot of
1987
    // overhead.
1988
    //
1989
    // First, we will create the network connection and give it a ProtocolFamily implementation
1990
    // (StacksHttp), which gets used by the connection to encode and deocde messages.
1991
    //
1992
    // Second, we'll create a _handle_ to the network connection into which we will write requests
1993
    // and read responses.  The connection itself is an opaque black box that, internally,
1994
    // implements a state machine around the ProtocolFamily implementation to incrementally read
1995
    // ProtocolFamily messages from a Read, and write them to a Write.  The Read + Write is
1996
    // (usually) a non-blocking socket; the network connection deals with EWOULDBLOCK internally,
1997
    // as well as underfull socket buffers.
1998
    //
1999
    // Third, we need to _drive_ data to the socket.  We have to repeatedly (1) flush the network
2000
    // handle (which contains the buffered bytes from the message to be fed into the socket), and
2001
    // (2) drive bytes from the handle into the socket iself via the network connection.  This is a
2002
    // two-step process mainly because the handle is expected to live in a separate stack (or even
2003
    // a separate thread).
2004
    //
2005
    // Fourth, we need to _drive_ data from the socket.  We have to repeatedly (1) pull data from
2006
    // the socket into the network connection, and (2) drive parsed messages from the connection to
2007
    // the handle.  Then, the call site that owns the handle simply polls the handle for new
2008
    // messages.  Once we have received a message, we can proceed to handle it.
2009
    //
2010
    // Finally, we deal with the kind of HTTP message we got. If it's an error response, we convert
2011
    // it into an error.  If it's a request (i.e. not a response), we also return an error.  We
2012
    // only return the message if it was a well-formed non-error HTTP response.
2013

2014
    // Step 1-2: set up the connection and request handle
2015
    // NOTE: we don't need anything special for connection options, so just use the default
2016
    let conn_opts = ConnectionOptions::default();
407✔
2017
    let http = StacksHttp::new_client(addr, &conn_opts);
407✔
2018
    let mut connection = NetworkConnection::new(http, &conn_opts, None);
407✔
2019
    let mut request_handle = connection
407✔
2020
        .make_request_handle(0, get_epoch_time_secs() + timeout.as_secs(), 0)
407✔
2021
        .map_err(|e| {
407✔
2022
            io::Error::other(format!("Failed to create request handle: {e:?}").as_str())
×
2023
        })?;
×
2024

2025
    // Step 3: load up the request with the message we're gonna send, and iteratively dump its
2026
    // bytes from the handle into the socket (the connection does internal buffering and
2027
    // bookkeeping to deal with the cases where we fail to fill the socket buffer, or we can't send
2028
    // anymore because the socket buffer is currently full).
2029
    request
407✔
2030
        .send(&mut request_handle)
407✔
2031
        .map_err(|e| handle_net_error(e, "Failed to serialize request body"))?;
407✔
2032

2033
    debug!("send_request(sending data)");
407✔
2034
    let mut last_heartbeat_time = start; // Initialize heartbeat timer for sending loop
407✔
2035
    loop {
2036
        let flushed = request_handle
1,221✔
2037
            .try_flush()
1,221✔
2038
            .map_err(|e| handle_net_error(e, "Failed to flush request body"))?;
1,221✔
2039

2040
        // send it out
2041
        let num_sent = connection
1,221✔
2042
            .send_data(&mut stream)
1,221✔
2043
            .map_err(|e| handle_net_error(e, "Failed to send socket data"))?;
1,221✔
2044

2045
        debug!(
1,221✔
2046
            "send_request(sending data): flushed = {}, num_sent = {}",
2047
            flushed, num_sent
2048
        );
2049
        if flushed && num_sent == 0 {
1,221✔
2050
            break;
407✔
2051
        }
814✔
2052

2053
        if start.elapsed() >= timeout {
814✔
2054
            return Err(io::Error::new(
×
2055
                io::ErrorKind::WouldBlock,
×
2056
                "Timed out while sending request",
×
2057
            ));
×
2058
        }
814✔
2059
        if last_heartbeat_time.elapsed() >= HEARTBEAT_INTERVAL {
814✔
2060
            info!(
×
2061
                "send_request(sending data): heartbeat - still sending request to {} path='{}' (elapsed: {:?})",
2062
                addr,
2063
                request_path,
2064
                start.elapsed()
×
2065
            );
2066
            last_heartbeat_time = Instant::now();
×
2067
        }
814✔
2068
    }
2069

2070
    // Step 4: pull bytes from the socket back into the handle, and see if the connection decoded
2071
    // and dispatched any new messages to the request handle.  If so, then extract the message and
2072
    // check that it's a well-formed HTTP response.
2073
    debug!("send_request(receiving data)");
407✔
2074
    last_heartbeat_time = Instant::now();
407✔
2075
    let response = loop {
370✔
2076
        // get back the reply
2077
        debug!("send_request(receiving data): try to receive data");
407✔
2078
        match connection.recv_data(&mut stream) {
407✔
2079
            Ok(nr) => {
398✔
2080
                debug!("send_request(receiving data): received {} bytes", nr);
398✔
2081
            }
2082
            Err(e) => {
9✔
2083
                return Err(handle_net_error(e, "Failed to receive socket data"));
9✔
2084
            }
2085
        }
2086

2087
        // fullfill the request -- send it to its corresponding handle
2088
        debug!("send_request(receiving data): drain inbox");
398✔
2089
        connection.drain_inbox();
398✔
2090

2091
        // see if we got a message that was fulfilled in our handle
2092
        debug!("send_request(receiving data): try receive response");
398✔
2093
        let rh = match request_handle.try_recv() {
398✔
2094
            Ok(resp) => {
370✔
2095
                break resp;
370✔
2096
            }
2097
            Err(Ok(handle)) => handle,
28✔
2098
            Err(Err(e)) => {
×
2099
                return Err(handle_net_error(
×
2100
                    e,
×
2101
                    "Failed to receive message after socket has been drained",
×
2102
                ));
×
2103
            }
2104
        };
2105
        request_handle = rh;
28✔
2106

2107
        if start.elapsed() >= timeout {
28✔
2108
            return Err(io::Error::new(
28✔
2109
                io::ErrorKind::WouldBlock,
28✔
2110
                "Timed out while receiving response",
28✔
2111
            ));
28✔
2112
        }
×
2113
        if last_heartbeat_time.elapsed() >= HEARTBEAT_INTERVAL {
×
2114
            info!(
×
2115
                "send_request(receiving data): heartbeat - still receiving response from {} path='{}' (elapsed: {:?})",
2116
                addr,
2117
                request_path,
2118
                start.elapsed()
×
2119
            );
2120
            last_heartbeat_time = Instant::now();
×
2121
        }
×
2122
    };
2123

2124
    // Step 5: decode the HTTP message and return it if it's not an error.
2125
    let response_data = match response {
370✔
2126
        StacksHttpMessage::Response(response_data) => response_data,
343✔
2127
        StacksHttpMessage::Error(_path, response) => {
27✔
2128
            let verb = &request.preamble().verb;
27✔
2129
            let path = &request.preamble().path_and_query_str;
27✔
2130
            let resp_status_code = response.preamble().status_code;
27✔
2131
            let resp_body = response.body();
27✔
2132
            return Err(io::Error::other(format!(
27✔
2133
                "HTTP '{verb} {path}' did not succeed ({resp_status_code} != 200). Response body = {resp_body:?}"
27✔
2134
            )));
27✔
2135
        }
2136
        _ => {
2137
            return Err(io::Error::other("Did not receive an HTTP response"));
×
2138
        }
2139
    };
2140

2141
    Ok(response_data)
343✔
2142
}
434✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc