• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

stacks-network / stacks-core / 25903914664-1

15 May 2026 06:28AM UTC coverage: 47.122% (-38.8%) from 85.959%
25903914664-1

Pull #7199

github

94e391
web-flow
Merge 109f2828c into 1c7b8e6ac
Pull Request #7199: Feat: L1 and L2 early unlocks, updating signer

103343 of 219309 relevant lines covered (47.12%)

12880462.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.08
/stackslib/src/net/api/postmempoolquery.rs
1
// Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation
2
// Copyright (C) 2020-2023 Stacks Open Internet Foundation
3
//
4
// This program is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8
//
9
// This program is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13
//
14
// You should have received a copy of the GNU General Public License
15
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

17
use rand::{thread_rng, Rng};
18
use regex::{Captures, Regex};
19
use stacks_common::codec::{StacksMessageCodec, MAX_MESSAGE_LEN};
20
use stacks_common::types::net::PeerHost;
21
use url::form_urlencoded;
22

23
use crate::burnchains::Txid;
24
use crate::chainstate::stacks::StacksTransaction;
25
use crate::core::mempool::{decode_tx_stream, MemPoolDB, MemPoolSyncData};
26
use crate::net::http::{
27
    parse_bytes, Error, HttpChunkGenerator, HttpContentType, HttpRequest, HttpRequestContents,
28
    HttpRequestPreamble, HttpResponse, HttpResponseContents, HttpResponsePayload,
29
    HttpResponsePreamble, HttpServerError,
30
};
31
use crate::net::httpcore::{RPCRequestHandler, StacksHttpRequest, StacksHttpResponse};
32
use crate::net::{Error as NetError, StacksNodeState};
33
use crate::util_lib::db::DBConn;
34

35
#[derive(Clone)]
36
pub struct RPCMempoolQueryRequestHandler {
37
    pub page_id: Option<Txid>,
38
    pub mempool_query: Option<MemPoolSyncData>,
39
}
40

41
impl RPCMempoolQueryRequestHandler {
42
    pub fn new() -> Self {
1,059,005✔
43
        Self {
1,059,005✔
44
            page_id: None,
1,059,005✔
45
            mempool_query: None,
1,059,005✔
46
        }
1,059,005✔
47
    }
1,059,005✔
48

49
    /// Obtain the mempool page_id query string, if it is present
50
    fn get_page_id_query(&self, query: Option<&str>) -> Option<Txid> {
6,608✔
51
        match query {
6,608✔
52
            Some(query_string) => {
6,608✔
53
                for (key, value) in form_urlencoded::parse(query_string.as_bytes()) {
6,608✔
54
                    if key != "page_id" {
6,608✔
55
                        continue;
×
56
                    }
6,608✔
57
                    if let Ok(page_id) = Txid::from_hex(&value) {
6,608✔
58
                        return Some(page_id);
6,608✔
59
                    }
×
60
                }
61
                return None;
×
62
            }
63
            None => {
64
                return None;
×
65
            }
66
        }
67
    }
6,608✔
68
}
69

70
#[derive(Debug)]
71
pub struct StacksMemPoolStream {
72
    /// Mempool sync data requested
73
    pub tx_query: MemPoolSyncData,
74
    /// last txid loaded
75
    pub last_randomized_txid: Txid,
76
    /// number of transactions visited in the DB so far
77
    pub num_txs: u64,
78
    /// maximum we can visit in the query
79
    pub max_txs: u64,
80
    /// coinbase height of the chain at time of query
81
    pub coinbase_height: u64,
82
    /// Are we done sending transactions, and are now in the process of sending the trailing page
83
    /// ID?
84
    pub corked: bool,
85
    /// Did we run out of transactions to send?
86
    pub finished: bool,
87
    num_bytes: u64,
88
    /// link to the mempool DB
89
    mempool_db: DBConn,
90
}
91

92
impl StacksMemPoolStream {
93
    pub fn new(
6,608✔
94
        mempool_db: DBConn,
6,608✔
95
        tx_query: MemPoolSyncData,
6,608✔
96
        max_txs: u64,
6,608✔
97
        coinbase_height: u64,
6,608✔
98
        page_id_opt: Option<Txid>,
6,608✔
99
    ) -> Self {
6,608✔
100
        let last_randomized_txid = page_id_opt.unwrap_or_else(|| {
6,608✔
101
            let random_bytes = thread_rng().gen::<[u8; 32]>();
×
102
            Txid(random_bytes)
×
103
        });
×
104

105
        Self {
6,608✔
106
            tx_query,
6,608✔
107
            last_randomized_txid,
6,608✔
108
            num_txs: 0,
6,608✔
109
            num_bytes: 0,
6,608✔
110
            max_txs,
6,608✔
111
            coinbase_height,
6,608✔
112
            corked: false,
6,608✔
113
            finished: false,
6,608✔
114
            mempool_db,
6,608✔
115
        }
6,608✔
116
    }
6,608✔
117
}
118

119
impl HttpChunkGenerator for StacksMemPoolStream {
120
    #[cfg_attr(test, mutants::skip)]
121
    fn hint_chunk_size(&self) -> usize {
6,608✔
122
        4096
6,608✔
123
    }
6,608✔
124

125
    #[cfg_attr(test, mutants::skip)]
126
    fn generate_next_chunk(&mut self) -> Result<Vec<u8>, String> {
17,680✔
127
        if self.corked {
17,680✔
128
            test_debug!(
7,355✔
129
                "Finished streaming txs; last page was {:?}",
130
                &self.last_randomized_txid
×
131
            );
132
            return Ok(vec![]);
7,355✔
133
        }
10,325✔
134

135
        if self.num_txs >= self.max_txs || self.finished {
10,325✔
136
            test_debug!(
×
137
                "Finished sending transactions after {:?}. Corking tx stream.",
138
                &self.last_randomized_txid
×
139
            );
140

141
            // cork the stream -- send the next page_id the requester should use to continue
142
            // streaming.
143
            self.corked = true;
×
144
            return Ok(self.last_randomized_txid.serialize_to_vec());
×
145
        }
10,325✔
146

147
        let remaining = self.max_txs.saturating_sub(self.num_txs);
10,325✔
148
        let (next_txs, next_last_randomized_txid_opt, num_rows_visited) =
10,325✔
149
            MemPoolDB::static_find_next_missing_transactions(
10,325✔
150
                &self.mempool_db,
10,325✔
151
                &self.tx_query,
10,325✔
152
                self.coinbase_height,
10,325✔
153
                &self.last_randomized_txid,
10,325✔
154
                1,
155
                remaining,
10,325✔
156
            )
157
            .map_err(|e| format!("Failed to find next missing transactions: {:?}", &e))?;
10,325✔
158

159
        debug!(
10,325✔
160
            "Streaming mempool propagation stepped";
161
            "rows_visited" => num_rows_visited,
×
162
            "last_rand_txid" => %self.last_randomized_txid,
163
            "num_txs" => self.num_txs,
×
164
            "max_txs" => self.max_txs
×
165
        );
166

167
        if let Some(next_tx) = next_txs.first() {
10,325✔
168
            // have another tx to send
169
            let chunk = next_tx.serialize_to_vec();
3,717✔
170
            if u64::try_from(chunk.len())
3,717✔
171
                .unwrap()
3,717✔
172
                .saturating_add(self.num_bytes)
3,717✔
173
                >= u64::from(MAX_MESSAGE_LEN) / 2
3,717✔
174
            {
175
                self.corked = true;
×
176
                return Ok(self.last_randomized_txid.serialize_to_vec());
×
177
            }
3,717✔
178
            if let Some(next_last_randomized_txid) = next_last_randomized_txid_opt {
3,717✔
179
                // we have more after this
3,717✔
180
                self.last_randomized_txid = next_last_randomized_txid;
3,717✔
181
            } else {
3,717✔
182
                // that was the last transaction.
×
183
                // next call will cork the stream
×
184
                self.finished = true;
×
185
            }
×
186
            self.num_txs += next_txs.len() as u64;
3,717✔
187
            self.num_bytes = self
3,717✔
188
                .num_bytes
3,717✔
189
                .saturating_add(u64::try_from(chunk.len()).unwrap());
3,717✔
190
            return Ok(chunk);
3,717✔
191
        } else if let Some(next_txid) = next_last_randomized_txid_opt {
6,608✔
192
            // no more txs to send
193
            test_debug!(
747✔
194
                "No rows returned for {}; cork tx stream with next page {}",
195
                &self.last_randomized_txid,
×
196
                &next_txid
×
197
            );
198

199
            // send the page ID as the final chunk
200
            let chunk = next_txid.serialize_to_vec();
747✔
201
            self.finished = true;
747✔
202
            self.corked = true;
747✔
203
            return Ok(chunk);
747✔
204
        } else {
205
            test_debug!(
5,861✔
206
                "No more txs to send after {:?}; corking stream",
207
                &self.last_randomized_txid
×
208
            );
209

210
            // no more transactions, and none after this
211
            self.finished = true;
5,861✔
212
            self.corked = true;
5,861✔
213
            return Ok(vec![]);
5,861✔
214
        }
215
    }
17,680✔
216
}
217

218
/// Decode the HTTP request
219
impl HttpRequest for RPCMempoolQueryRequestHandler {
220
    fn verb(&self) -> &'static str {
1,059,005✔
221
        "POST"
1,059,005✔
222
    }
1,059,005✔
223

224
    fn path_regex(&self) -> Regex {
2,118,010✔
225
        Regex::new(r#"^/v2/mempool/query$"#).unwrap()
2,118,010✔
226
    }
2,118,010✔
227

228
    fn metrics_identifier(&self) -> &str {
6,608✔
229
        "/v2/mempool/query"
6,608✔
230
    }
6,608✔
231

232
    /// Try to decode this request.
233
    /// There's nothing to load here, so just make sure the request is well-formed.
234
    fn try_parse_request(
6,608✔
235
        &mut self,
6,608✔
236
        preamble: &HttpRequestPreamble,
6,608✔
237
        _captures: &Captures,
6,608✔
238
        query: Option<&str>,
6,608✔
239
        body: &[u8],
6,608✔
240
    ) -> Result<HttpRequestContents, Error> {
6,608✔
241
        if preamble.get_content_length() == 0 {
6,608✔
242
            return Err(Error::DecodeError(
×
243
                "Invalid Http request: expected nonzero body length".to_string(),
×
244
            ));
×
245
        }
6,608✔
246

247
        let mut body_ptr = body;
6,608✔
248
        let mempool_body = MemPoolSyncData::consensus_deserialize(&mut body_ptr)?;
6,608✔
249

250
        self.mempool_query = Some(mempool_body);
6,608✔
251
        if let Some(page_id) = self.get_page_id_query(query) {
6,608✔
252
            self.page_id = Some(page_id);
6,608✔
253
        }
6,608✔
254
        Ok(HttpRequestContents::new().query_string(query))
6,608✔
255
    }
6,608✔
256
}
257

258
impl RPCRequestHandler for RPCMempoolQueryRequestHandler {
259
    /// Reset internal state
260
    fn restart(&mut self) {
6,608✔
261
        self.mempool_query = None;
6,608✔
262
        self.page_id = None;
6,608✔
263
    }
6,608✔
264

265
    /// Make the response
266
    fn try_handle_request(
6,608✔
267
        &mut self,
6,608✔
268
        preamble: HttpRequestPreamble,
6,608✔
269
        _contents: HttpRequestContents,
6,608✔
270
        node: &mut StacksNodeState,
6,608✔
271
    ) -> Result<(HttpResponsePreamble, HttpResponseContents), NetError> {
6,608✔
272
        let mempool_query = self
6,608✔
273
            .mempool_query
6,608✔
274
            .take()
6,608✔
275
            .ok_or(NetError::SendError("`mempool_query` not set".into()))?;
6,608✔
276
        let page_id = self.page_id.take();
6,608✔
277

278
        let stream_res = node.with_node_state(|network, _sortdb, _chainstate, mempool, _rpc_args| {
6,608✔
279
            let coinbase_height = network.stacks_tip.coinbase_height;
6,608✔
280
            let max_txs = network.connection_opts.mempool_max_tx_query;
6,608✔
281
            debug!(
6,608✔
282
                "Begin mempool query";
283
                "page_id" => %page_id.as_ref().map(|txid| format!("{txid}")).unwrap_or("(none".to_string()),
×
284
                "coinbase_height" => coinbase_height,
×
285
                "max_txs" => max_txs
×
286
            );
287

288
            let mempool_db = match mempool.reopen(false) {
6,608✔
289
                Ok(db) => db,
6,608✔
290
                Err(e) => {
×
291
                    return Err(StacksHttpResponse::new_error(&preamble, &HttpServerError::new(format!("Failed to open mempool DB: {:?}", &e))));
×
292
                }
293
            };
294

295
            Ok(StacksMemPoolStream::new(mempool_db, mempool_query, max_txs, coinbase_height, page_id))
6,608✔
296
        });
6,608✔
297

298
        let stream = match stream_res {
6,608✔
299
            Ok(stream) => stream,
6,608✔
300
            Err(response) => {
×
301
                return response.try_into_contents().map_err(NetError::from);
×
302
            }
303
        };
304

305
        let resp_preamble = HttpResponsePreamble::from_http_request_preamble(
6,608✔
306
            &preamble,
6,608✔
307
            200,
308
            "OK",
6,608✔
309
            None,
6,608✔
310
            HttpContentType::Bytes,
6,608✔
311
        );
312
        Ok((
6,608✔
313
            resp_preamble,
6,608✔
314
            HttpResponseContents::from_stream(Box::new(stream)),
6,608✔
315
        ))
6,608✔
316
    }
6,608✔
317
}
318

319
/// Decode the HTTP response
320
impl HttpResponse for RPCMempoolQueryRequestHandler {
321
    /// Decode this response from a byte stream.  This is called by the client to decode this
322
    /// message
323
    fn try_parse_response(
6,598✔
324
        &self,
6,598✔
325
        preamble: &HttpResponsePreamble,
6,598✔
326
        body: &[u8],
6,598✔
327
    ) -> Result<HttpResponsePayload, Error> {
6,598✔
328
        let bytes = parse_bytes(preamble, body, MAX_MESSAGE_LEN.into())?;
6,598✔
329
        Ok(HttpResponsePayload::Bytes(bytes))
6,598✔
330
    }
6,598✔
331
}
332

333
impl StacksHttpRequest {
334
    pub fn new_mempool_query(
×
335
        host: PeerHost,
×
336
        query: MemPoolSyncData,
×
337
        page_id_opt: Option<Txid>,
×
338
    ) -> StacksHttpRequest {
×
339
        StacksHttpRequest::new_for_peer(
×
340
            host,
×
341
            "POST".into(),
×
342
            "/v2/mempool/query".into(),
×
343
            if let Some(page_id) = page_id_opt {
×
344
                HttpRequestContents::new()
×
345
                    .query_arg("page_id".into(), format!("{}", &page_id))
×
346
                    .payload_stacks(&query)
×
347
            } else {
348
                HttpRequestContents::new().payload_stacks(&query)
×
349
            },
350
        )
351
        .expect("FATAL: failed to construct request from infallible data")
×
352
    }
×
353
}
354

355
impl StacksHttpResponse {
356
    /// Decode an HTTP response body into the transactions and next-page ID returned from
357
    /// /v2/mempool/query.
358
    pub fn decode_mempool_txs_page(
6,598✔
359
        self,
6,598✔
360
    ) -> Result<(Vec<StacksTransaction>, Option<Txid>), NetError> {
6,598✔
361
        let contents = self.get_http_payload_ok()?;
6,598✔
362
        let raw_bytes: Vec<u8> = contents.try_into()?;
6,598✔
363
        let (txs, page_id_opt) = decode_tx_stream(&mut &raw_bytes[..])?;
6,598✔
364
        Ok((txs, page_id_opt))
6,598✔
365
    }
6,598✔
366
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc