• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

stacks-network / stacks-core / 26250451051-1

21 May 2026 08:11PM UTC coverage: 85.585% (-0.1%) from 85.712%
26250451051-1

Pull #7215

github

ec9d4c
web-flow
Merge 9487bf852 into af1280aac
Pull Request #7215: Chore: fix flake in non_blocking_minority_configured_to_favour_...

188844 of 220651 relevant lines covered (85.58%)

18975267.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.26
/stackslib/src/core/mempool.rs
1
// Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation
2
// Copyright (C) 2020 Stacks Open Internet Foundation
3
//
4
// This program is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8
//
9
// This program is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13
//
14
// You should have received a copy of the GNU General Public License
15
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

17
use std::cmp::{self, Ordering};
18
use std::collections::{HashMap, HashSet, VecDeque};
19
use std::hash::Hasher;
20
use std::io::{Read, Write};
21
use std::ops::{Deref, DerefMut};
22
use std::path::PathBuf;
23
use std::str::FromStr;
24
use std::time::{Duration, Instant};
25
use std::{fs, io, thread};
26

27
use rand::distributions::Uniform;
28
use rand::prelude::Distribution;
29
use rand::Rng;
30
use rusqlite::{params, OpenFlags, OptionalExtension, Row};
31
use siphasher::sip::SipHasher; // this is SipHash-2-4
32
use stacks_common::codec::{
33
    read_next, write_next, Error as codec_error, StacksMessageCodec, MAX_MESSAGE_LEN,
34
};
35
use stacks_common::types::chainstate::{BlockHeaderHash, StacksAddress, StacksBlockId};
36
use stacks_common::types::sqlite::NO_PARAMS;
37
use stacks_common::types::MempoolCollectionBehavior;
38
use stacks_common::util::get_epoch_time_secs;
39
use stacks_common::util::hash::{to_hex, Sha512Trunc256Sum};
40
use stacks_common::util::retry::{BoundReader, RetryReader};
41

42
use crate::burnchains::Txid;
43
use crate::chainstate::burn::db::sortdb::SortitionDB;
44
use crate::chainstate::burn::ConsensusHash;
45
use crate::chainstate::nakamoto::{NakamotoBlock, NakamotoChainState};
46
use crate::chainstate::stacks::db::blocks::MemPoolRejection;
47
use crate::chainstate::stacks::db::StacksChainState;
48
use crate::chainstate::stacks::miner::TransactionEvent;
49
use crate::chainstate::stacks::{
50
    Error as ChainstateError, StacksBlock, StacksMicroblock, StacksTransaction, TransactionPayload,
51
};
52
use crate::clarity_vm::clarity::ClarityConnection;
53
use crate::core::nonce_cache::NonceCache;
54
use crate::core::{ExecutionCost, StacksEpochId, FIRST_BURNCHAIN_CONSENSUS_HASH};
55
use crate::cost_estimates::metrics::CostMetric;
56
#[cfg(test)]
57
use crate::cost_estimates::metrics::UnitMetric;
58
#[cfg(test)]
59
use crate::cost_estimates::UnitEstimator;
60
use crate::cost_estimates::{CostEstimator, EstimatorError};
61
use crate::monitoring::increment_stx_mempool_gc;
62
use crate::net::api::postblock_proposal::{BlockValidateOk, BlockValidateReject};
63
use crate::net::Error as net_error;
64
use crate::util_lib::bloom::{bloom_hash_count, BloomCounter, BloomFilter, BloomNodeHasher};
65
use crate::util_lib::db::{
66
    query_int, query_row, query_row_columns, query_rows, sqlite_open, table_exists,
67
    tx_begin_immediate, u64_to_sql, DBConn, DBTx, Error as db_error, Error, FromColumn, FromRow,
68
};
69
use crate::{cost_estimates, monitoring};
70

71
// maximum number of confirmations a transaction can have before it's garbage-collected
72
pub static MEMPOOL_MAX_TRANSACTION_AGE: u64 = 256;
73
pub static MAXIMUM_MEMPOOL_TX_CHAINING: u64 = 25;
74
pub static MEMPOOL_NAKAMOTO_MAX_TRANSACTION_AGE: Duration =
75
    Duration::from_secs(MEMPOOL_MAX_TRANSACTION_AGE * 10 * 60);
76

77
// name of table for storing the counting bloom filter
78
pub const BLOOM_COUNTER_TABLE: &str = "txid_bloom_counter";
79

80
// bloom filter error rate
81
pub const BLOOM_COUNTER_ERROR_RATE: f64 = 0.001;
82

83
// expected number of txs in the bloom filter
84
pub const MAX_BLOOM_COUNTER_TXS: u32 = 8192;
85

86
// how far back in time (in Stacks blocks) does the bloom counter maintain tx records?
87
pub const BLOOM_COUNTER_DEPTH: usize = 2;
88

89
// how long will a transaction be blacklisted?
90
// about as long as it takes for it to be garbage-collected
91
pub const DEFAULT_BLACKLIST_TIMEOUT: u64 = 24 * 60 * 60 * 2;
92
pub const DEFAULT_BLACKLIST_MAX_SIZE: u64 = 134217728; // 2**27 -- the blacklist table can reach at most 4GB at 128 bytes per record
93

94
// maximum many tx tags we'll send before sending a bloom filter instead.
95
// The parameter choice here is due to performance -- calculating a tag set can be slower than just
96
// loading the bloom filter, even though the bloom filter is larger.
97
const DEFAULT_MAX_TX_TAGS: u32 = 2048;
98

99
// maximum number of transactions that can fit in a single block
100
const MAX_BLOCK_TXS: usize = 11_650;
101

102
/// A node-specific transaction tag -- the first 8 bytes of siphash(local-seed,txid)
103
#[derive(Debug, Clone, PartialEq, Hash, Eq)]
104
pub struct TxTag(pub [u8; 8]);
105

106
impl TxTag {
107
    pub fn from(seed: &[u8], txid: &Txid) -> TxTag {
167,667✔
108
        let mut hasher = SipHasher::new();
167,667✔
109
        hasher.write(seed);
167,667✔
110
        hasher.write(&txid.0);
167,667✔
111

112
        let result_64 = hasher.finish();
167,667✔
113
        TxTag(result_64.to_be_bytes())
167,667✔
114
    }
167,667✔
115
}
116

117
impl std::fmt::Display for TxTag {
118
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
×
119
        write!(f, "{}", &to_hex(&self.0))
×
120
    }
×
121
}
122

123
impl StacksMessageCodec for TxTag {
124
    fn consensus_serialize<W: Write>(&self, fd: &mut W) -> Result<(), codec_error> {
111,052✔
125
        fd.write_all(&self.0).map_err(codec_error::WriteError)?;
111,052✔
126
        Ok(())
111,052✔
127
    }
111,052✔
128

129
    fn consensus_deserialize<R: Read>(fd: &mut R) -> Result<TxTag, codec_error> {
110,971✔
130
        let mut bytes = [0u8; 8];
110,971✔
131
        fd.read_exact(&mut bytes).map_err(codec_error::ReadError)?;
110,971✔
132
        Ok(TxTag(bytes))
110,971✔
133
    }
110,971✔
134
}
135

136
define_u8_enum!(MemPoolSyncDataID {
137
    BloomFilter = 0x01,
138
    TxTags = 0x02
139
});
140

141
#[derive(Debug, Clone, PartialEq)]
142
pub enum MemPoolSyncData {
143
    BloomFilter(BloomFilter<BloomNodeHasher>),
144
    TxTags([u8; 32], Vec<TxTag>),
145
}
146

147
#[derive(Debug, PartialEq)]
148
pub enum MempoolIterationStopReason {
149
    /// No more candidates in the mempool to consider
150
    NoMoreCandidates,
151
    /// The mining deadline has been reached
152
    DeadlineReached,
153
    /// If the iteration function supplied to mempool iteration exited
154
    ///  (i.e., the transaction evaluator returned an early exit command)
155
    IteratorExited,
156
}
157

158
impl MempoolIterationStopReason {
159
    /// Report the stop reason to Prometheus monitoring.
160
    ///
161
    /// `IteratorExited` is not reported here because the miner's
162
    /// `iterate_candidates` callback already reports the specific reason
163
    /// it returned early (Preempted, LimitReached, or DeadlineReached).
164
    pub fn report_to_monitoring(&self) {
364,223✔
165
        let reason = match self {
364,223✔
166
            MempoolIterationStopReason::NoMoreCandidates => {
167
                monitoring::MinerStopReason::NoTransactions
363,458✔
168
            }
169
            MempoolIterationStopReason::DeadlineReached => {
170
                monitoring::MinerStopReason::DeadlineReached
117✔
171
            }
172
            MempoolIterationStopReason::IteratorExited => return,
648✔
173
        };
174
        monitoring::increment_miner_stop_reason(reason);
363,575✔
175
    }
364,223✔
176
}
177

178
impl StacksMessageCodec for MemPoolSyncData {
179
    fn consensus_serialize<W: Write>(&self, fd: &mut W) -> Result<(), codec_error> {
6,961✔
180
        match *self {
6,961✔
181
            MemPoolSyncData::BloomFilter(ref bloom_filter) => {
×
182
                write_next(fd, &MemPoolSyncDataID::BloomFilter.to_u8())?;
×
183
                write_next(fd, bloom_filter)?;
×
184
            }
185
            MemPoolSyncData::TxTags(ref seed, ref tags) => {
6,961✔
186
                write_next(fd, &MemPoolSyncDataID::TxTags.to_u8())?;
6,961✔
187
                write_next(fd, seed)?;
6,961✔
188
                write_next(fd, tags)?;
6,961✔
189
            }
190
        }
191
        Ok(())
6,961✔
192
    }
6,961✔
193

194
    fn consensus_deserialize<R: Read>(fd: &mut R) -> Result<MemPoolSyncData, codec_error> {
6,903✔
195
        let data_id: u8 = read_next(fd)?;
6,903✔
196
        match MemPoolSyncDataID::from_u8(data_id).ok_or(codec_error::DeserializeError(format!(
6,903✔
197
            "Unrecognized MemPoolSyncDataID {}",
6,903✔
198
            &data_id
6,903✔
199
        )))? {
6,903✔
200
            MemPoolSyncDataID::BloomFilter => {
201
                let bloom_filter: BloomFilter<BloomNodeHasher> = read_next(fd)?;
×
202

203
                // hash parameters must be valid for the mempool
204
                let (_, num_hashes) =
×
205
                    bloom_hash_count(BLOOM_COUNTER_ERROR_RATE, MAX_BLOOM_COUNTER_TXS);
×
206
                if bloom_filter.num_hashes > num_hashes {
×
207
                    return Err(codec_error::DeserializeError(format!(
×
208
                        "Too many bloom hashers (max {})",
×
209
                        num_hashes
×
210
                    )));
×
211
                }
×
212
                Ok(MemPoolSyncData::BloomFilter(bloom_filter))
×
213
            }
214
            MemPoolSyncDataID::TxTags => {
215
                let seed: [u8; 32] = read_next(fd)?;
6,903✔
216
                let txtags: Vec<TxTag> = read_next(fd)?;
6,903✔
217
                Ok(MemPoolSyncData::TxTags(seed, txtags))
6,903✔
218
            }
219
        }
220
    }
6,903✔
221
}
222

223
/// Read the trailing page ID from a transaction stream
224
fn parse_mempool_query_page_id<R: Read>(
6,937✔
225
    pos: usize,
6,937✔
226
    retry_reader: &mut RetryReader<'_, R>,
6,937✔
227
) -> Result<Option<Txid>, net_error> {
6,937✔
228
    // possibly end-of-transactions, in which case, the last 32 bytes should be
229
    // a page ID.  Expect end-of-stream after this.
230
    retry_reader.set_position(pos);
6,937✔
231
    let next_page: Txid = match read_next(retry_reader) {
6,937✔
232
        Ok(txid) => txid,
950✔
233
        Err(e) => match e {
5,987✔
234
            codec_error::ReadError(ref ioe) => match ioe.kind() {
5,987✔
235
                io::ErrorKind::UnexpectedEof => {
236
                    if pos == retry_reader.position() {
5,987✔
237
                        // this is fine -- the node didn't get another page
238
                        return Ok(None);
5,987✔
239
                    } else {
240
                        // partial data -- corrupt stream
241
                        test_debug!("Unexpected EOF: {} != {}", pos, retry_reader.position());
×
242
                        return Err(e.into());
×
243
                    }
244
                }
245
                _ => {
246
                    return Err(e.into());
×
247
                }
248
            },
249
            e => {
×
250
                return Err(e.into());
×
251
            }
252
        },
253
    };
254

255
    test_debug!("Read page_id {:?}", &next_page);
950✔
256
    Ok(Some(next_page))
950✔
257
}
6,937✔
258

259
/// Decode a transaction stream, returned from /v2/mempool/query.
260
/// The wire format is a list of transactions (no SIP-003 length prefix), followed by an
261
/// optional 32-byte page ID.  Obtain both the transactions and page ID, if it exists.
262
pub fn decode_tx_stream<R: Read>(
6,937✔
263
    fd: &mut R,
6,937✔
264
) -> Result<(Vec<StacksTransaction>, Option<Txid>), net_error> {
6,937✔
265
    // The wire format is `tx, tx, tx, tx, .., tx, txid`.
266
    // The last 32 bytes are the page ID for the next mempool query.
267
    // NOTE: there will be no length prefix on this.
268
    let mut txs: Vec<StacksTransaction> = vec![];
6,937✔
269
    let mut bound_reader = BoundReader::from_reader(fd, MAX_MESSAGE_LEN as u64);
6,937✔
270
    let mut retry_reader = RetryReader::new(&mut bound_reader);
6,937✔
271
    let mut page_id = None;
6,937✔
272
    let mut expect_eof = false;
6,937✔
273

274
    loop {
275
        let pos = retry_reader.position();
14,783✔
276
        let next_msg: Result<StacksTransaction, _> = read_next(&mut retry_reader);
14,783✔
277
        match next_msg {
14,783✔
278
            Ok(tx) => {
6,908✔
279
                if expect_eof {
6,908✔
280
                    // this should have failed
281
                    test_debug!("Expected EOF; got transaction {}", tx.txid());
1✔
282
                    return Err(net_error::ExpectedEndOfStream);
1✔
283
                }
6,907✔
284

285
                test_debug!("Read transaction {}", tx.txid());
6,907✔
286
                txs.push(tx);
6,907✔
287
                Ok(())
6,907✔
288
            }
289
            Err(e) => match e {
7,875✔
290
                codec_error::ReadError(ref ioe) => match ioe.kind() {
6,935✔
291
                    io::ErrorKind::UnexpectedEof => {
292
                        if expect_eof {
6,935✔
293
                            if pos != retry_reader.position() {
937✔
294
                                // read partial data. The stream is corrupt.
295
                                test_debug!(
1✔
296
                                    "Expected EOF; stream advanced from {} to {}",
297
                                    pos,
298
                                    retry_reader.position()
×
299
                                );
300
                                return Err(net_error::ExpectedEndOfStream);
1✔
301
                            }
936✔
302
                        } else {
303
                            // couldn't read a full transaction.  This is possibly a page ID, whose
304
                            // 32 bytes decode to the prefix of a well-formed transaction.
305
                            test_debug!("Try to read page ID trailer after ReadError");
5,998✔
306
                            page_id = parse_mempool_query_page_id(pos, &mut retry_reader)?;
5,998✔
307
                        }
308
                        break;
6,934✔
309
                    }
310
                    _ => Err(e),
×
311
                },
312
                codec_error::DeserializeError(_msg) => {
940✔
313
                    if expect_eof {
940✔
314
                        // this should have failed due to EOF
315
                        test_debug!("Expected EOF; got DeserializeError '{}'", &_msg);
1✔
316
                        return Err(net_error::ExpectedEndOfStream);
1✔
317
                    }
939✔
318

319
                    // failed to parse a transaction.  This is possibly a page ID.
320
                    test_debug!("Try to read page ID trailer after ReadError");
939✔
321
                    page_id = parse_mempool_query_page_id(pos, &mut retry_reader)?;
939✔
322

323
                    // do one more pass to make sure we're actually end-of-stream.
324
                    // otherwise, the stream itself was corrupt, since any 32 bytes is a valid
325
                    // txid and the presence of more bytes means that we simply got a bad tx
326
                    // that we couldn't decode.
327
                    expect_eof = true;
939✔
328
                    Ok(())
939✔
329
                }
330
                _ => Err(e),
×
331
            },
332
        }?;
×
333
    }
334

335
    Ok((txs, page_id))
6,934✔
336
}
6,937✔
337

338
pub struct MemPoolAdmitter {
339
    cur_block: BlockHeaderHash,
340
    cur_consensus_hash: ConsensusHash,
341
}
342

343
enum MemPoolWalkResult {
344
    Chainstate(ConsensusHash, BlockHeaderHash, u64, u64),
345
    NoneAtCoinbaseHeight(ConsensusHash, BlockHeaderHash, u64),
346
    Done,
347
}
348

349
impl MemPoolAdmitter {
350
    pub fn new(cur_block: BlockHeaderHash, cur_consensus_hash: ConsensusHash) -> MemPoolAdmitter {
2,296,395✔
351
        MemPoolAdmitter {
2,296,395✔
352
            cur_block,
2,296,395✔
353
            cur_consensus_hash,
2,296,395✔
354
        }
2,296,395✔
355
    }
2,296,395✔
356

357
    pub fn set_block(&mut self, cur_block: &BlockHeaderHash, cur_consensus_hash: ConsensusHash) {
38,878✔
358
        self.cur_consensus_hash = cur_consensus_hash.clone();
38,878✔
359
        self.cur_block = cur_block.clone();
38,878✔
360
    }
38,878✔
361
    pub fn will_admit_tx(
38,878✔
362
        &mut self,
38,878✔
363
        chainstate: &mut StacksChainState,
38,878✔
364
        sortdb: &SortitionDB,
38,878✔
365
        tx: &StacksTransaction,
38,878✔
366
        tx_size: u64,
38,878✔
367
    ) -> Result<(), MemPoolRejection> {
38,878✔
368
        let sortition_id = match SortitionDB::get_sortition_id_by_consensus(
38,878✔
369
            sortdb.conn(),
38,878✔
370
            &self.cur_consensus_hash,
38,878✔
371
        ) {
372
            Ok(Some(x)) => x,
38,878✔
373
            _ => return Err(MemPoolRejection::DBError(db_error::NotFoundError)),
×
374
        };
375
        chainstate.will_admit_mempool_tx(
38,878✔
376
            &sortdb.index_handle(&sortition_id),
38,878✔
377
            &self.cur_consensus_hash,
38,878✔
378
            &self.cur_block,
38,878✔
379
            tx,
38,878✔
380
            tx_size,
38,878✔
381
        )
382
    }
38,878✔
383
}
384

385
pub enum MemPoolDropReason {
386
    REPLACE_ACROSS_FORK,
387
    REPLACE_BY_FEE,
388
    STALE_COLLECT,
389
    TOO_EXPENSIVE,
390
    PROBLEMATIC,
391
}
392

393
pub struct ConsiderTransaction {
394
    /// Transaction to consider in block assembly
395
    pub tx: MemPoolTxInfo,
396
    /// If `update_estimator` is set, the iteration should update the estimator
397
    /// after considering the tx.
398
    pub update_estimate: bool,
399
}
400

401
enum ConsiderTransactionResult {
402
    NoTransactions,
403
    UpdateNonces(Vec<StacksAddress>),
404
    /// This transaction should be considered for inclusion in the block.
405
    Consider(ConsiderTransaction),
406
}
407

408
impl std::fmt::Display for MemPoolDropReason {
409
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63✔
410
        match self {
63✔
411
            MemPoolDropReason::STALE_COLLECT => write!(f, "StaleGarbageCollect"),
×
412
            MemPoolDropReason::TOO_EXPENSIVE => write!(f, "TooExpensive"),
9✔
413
            MemPoolDropReason::REPLACE_ACROSS_FORK => write!(f, "ReplaceAcrossFork"),
×
414
            MemPoolDropReason::REPLACE_BY_FEE => write!(f, "ReplaceByFee"),
9✔
415
            MemPoolDropReason::PROBLEMATIC => write!(f, "Problematic"),
45✔
416
        }
417
    }
63✔
418
}
419

420
pub trait ProposalCallbackReceiver: Send {
421
    fn notify_proposal_result(&self, result: Result<BlockValidateOk, BlockValidateReject>);
422
}
423

424
pub trait MemPoolEventDispatcher {
425
    fn get_proposal_callback_receiver(&self) -> Option<Box<dyn ProposalCallbackReceiver>>;
426
    fn mempool_txs_dropped(
427
        &self,
428
        txids: Vec<Txid>,
429
        new_txid: Option<Txid>,
430
        reason: MemPoolDropReason,
431
    );
432
    fn mined_block_event(
433
        &self,
434
        target_burn_height: u64,
435
        block: &StacksBlock,
436
        block_size_bytes: u64,
437
        consumed: &ExecutionCost,
438
        confirmed_microblock_cost: &ExecutionCost,
439
        tx_results: Vec<TransactionEvent>,
440
    );
441
    fn mined_microblock_event(
442
        &self,
443
        microblock: &StacksMicroblock,
444
        tx_results: Vec<TransactionEvent>,
445
        anchor_block_consensus_hash: ConsensusHash,
446
        anchor_block: BlockHeaderHash,
447
    );
448
    fn mined_nakamoto_block_event(
449
        &self,
450
        target_burn_height: u64,
451
        block: &NakamotoBlock,
452
        block_size_bytes: u64,
453
        consumed: &ExecutionCost,
454
        tx_results: Vec<TransactionEvent>,
455
    );
456
}
457

458
#[derive(Debug, PartialEq, Clone)]
459
pub struct MemPoolTxInfo {
460
    pub tx: StacksTransaction,
461
    pub metadata: MemPoolTxMetadata,
462
}
463

464
/// This class is a minimal version of `MemPoolTxInfo`. It contains
465
/// just enough information to 1) filter by nonce readiness, 2) sort by fee rate.
466
#[derive(Debug, Clone)]
467
pub struct MemPoolTxInfoPartial {
468
    pub txid: Txid,
469
    pub fee_rate: Option<f64>,
470
    pub origin_address: StacksAddress,
471
    pub origin_nonce: u64,
472
    pub sponsor_address: StacksAddress,
473
    pub sponsor_nonce: u64,
474
}
475

476
#[derive(Debug, PartialEq, Clone)]
477
pub struct MemPoolTxMetadata {
478
    pub txid: Txid,
479
    pub len: u64,
480
    pub tx_fee: u64,
481
    /// The tenure ID in which this transaction was accepted.
482
    /// In epoch 2.x, this is the consensus hash of the sortition that chose the Stacks block
483
    /// In Nakamoto, this is the consensus hash of the ongoing tenure.
484
    pub tenure_consensus_hash: ConsensusHash,
485
    /// The tenure block in which this transaction was accepted.
486
    /// In epoch 2.x, this is the hash of the Stacks block produced in the sortition.
487
    /// In Nakamoto, this is the hash of the tenure-start block.
488
    pub tenure_block_header_hash: BlockHeaderHash,
489
    /// The number of coinbases that have transpired at the time of this transaction's acceptance.
490
    /// In epoch 2.x, this is the same as the Stacks block height
491
    /// In Nakamoto, this is the simply the number of coinbases produced in the history tipped at
492
    /// `tenure_consensus_hash` and `tenure_block_header_hash`
493
    pub coinbase_height: u64,
494
    pub origin_address: StacksAddress,
495
    pub origin_nonce: u64,
496
    pub sponsor_address: StacksAddress,
497
    pub sponsor_nonce: u64,
498
    pub last_known_origin_nonce: Option<u64>,
499
    pub last_known_sponsor_nonce: Option<u64>,
500
    pub accept_time: u64,
501
    pub time_estimate_ms: Option<u64>,
502
}
503

504
impl MemPoolTxMetadata {
505
    pub fn get_unknown_nonces(&self) -> Vec<StacksAddress> {
×
506
        let mut needs_nonces = vec![];
×
507
        if self.last_known_origin_nonce.is_none() {
×
508
            needs_nonces.push(self.origin_address.clone());
×
509
        }
×
510
        if self.last_known_sponsor_nonce.is_none() {
×
511
            needs_nonces.push(self.sponsor_address.clone());
×
512
        }
×
513
        needs_nonces
×
514
    }
×
515
}
516

517
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
518
pub enum MemPoolWalkTxTypes {
519
    TokenTransfer,
520
    SmartContract,
521
    ContractCall,
522
}
523

524
impl FromStr for MemPoolWalkTxTypes {
525
    type Err = &'static str;
526
    fn from_str(s: &str) -> Result<Self, Self::Err> {
×
527
        match s {
×
528
            "TokenTransfer" => Ok(Self::TokenTransfer),
×
529
            "SmartContract" => Ok(Self::SmartContract),
×
530
            "ContractCall" => Ok(Self::ContractCall),
×
531
            _ => Err("Unknown mempool tx walk type"),
×
532
        }
533
    }
×
534
}
535

536
impl MemPoolWalkTxTypes {
537
    pub fn all() -> HashSet<MemPoolWalkTxTypes> {
5,233✔
538
        HashSet::from([
5,233✔
539
            MemPoolWalkTxTypes::TokenTransfer,
5,233✔
540
            MemPoolWalkTxTypes::SmartContract,
5,233✔
541
            MemPoolWalkTxTypes::ContractCall,
5,233✔
542
        ])
5,233✔
543
    }
5,233✔
544

545
    pub fn only(selected: &[MemPoolWalkTxTypes]) -> HashSet<MemPoolWalkTxTypes> {
×
546
        selected.iter().copied().collect()
×
547
    }
×
548
}
549

550
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
551
pub enum MemPoolWalkStrategy {
552
    /// Select transactions with the highest global fee rate.
553
    GlobalFeeRate,
554
    /// Select transactions with the next expected nonce for origin and sponsor addresses,
555
    NextNonceWithHighestFeeRate,
556
}
557

558
impl FromStr for MemPoolWalkStrategy {
559
    type Err = &'static str;
560
    fn from_str(s: &str) -> Result<Self, Self::Err> {
×
561
        match s {
×
562
            "GlobalFeeRate" => {
×
563
                return Ok(Self::GlobalFeeRate);
×
564
            }
565
            "NextNonceWithHighestFeeRate" => {
×
566
                return Ok(Self::NextNonceWithHighestFeeRate);
×
567
            }
568
            _ => {
569
                return Err("Unknown mempool walk strategy");
×
570
            }
571
        }
572
    }
×
573
}
574

575
#[derive(Debug, Clone)]
576
pub struct MemPoolWalkSettings {
577
    /// Strategy to use when selecting the next transactions to consider in the `mempool` table.
578
    pub strategy: MemPoolWalkStrategy,
579
    /// Maximum amount of time a miner will spend walking through mempool transactions, in
580
    /// milliseconds.  This is a soft deadline.
581
    pub max_walk_time_ms: u64,
582
    /// Probability percentage to consider a transaction which has not received a cost estimate.
583
    /// Only used when walk strategy is `GlobalFeeRate`.
584
    pub consider_no_estimate_tx_prob: u8,
585
    /// Size of the nonce cache. This avoids MARF look-ups.
586
    pub nonce_cache_size: usize,
587
    /// Size of the candidate cache. These are the candidates that will be retried after each
588
    /// transaction is mined.
589
    pub candidate_retry_cache_size: usize,
590
    /// Types of transactions we'll consider
591
    pub txs_to_consider: HashSet<MemPoolWalkTxTypes>,
592
    /// Origins for transactions that we'll consider
593
    pub filter_origins: HashSet<StacksAddress>,
594
    /// What percentage of the remaining cost limit should we consume before stopping the walk
595
    /// None means we consume the entire cost limit ASAP
596
    pub tenure_cost_limit_per_block_percentage: Option<u8>,
597
    /// The percentage of the block’s execution cost at which, if the next non-boot
598
    /// contract call would cause a `BlockTooBigError`, the miner will stop including
599
    /// further non-boot contract calls and instead consider only boot contract calls
600
    /// and STX transfers.
601
    pub contract_cost_limit_percentage: Option<u8>,
602
    /// Enable logging of skipped transactions (disabled by default, generally used for tests)
603
    pub log_skipped_transactions: bool,
604
}
605

606
impl Default for MemPoolWalkSettings {
607
    fn default() -> Self {
461✔
608
        MemPoolWalkSettings {
461✔
609
            strategy: MemPoolWalkStrategy::NextNonceWithHighestFeeRate,
461✔
610
            max_walk_time_ms: u64::MAX,
461✔
611
            consider_no_estimate_tx_prob: 5,
461✔
612
            nonce_cache_size: 1024 * 1024,
461✔
613
            candidate_retry_cache_size: 64 * 1024,
461✔
614
            txs_to_consider: MemPoolWalkTxTypes::all(),
461✔
615
            filter_origins: HashSet::new(),
461✔
616
            tenure_cost_limit_per_block_percentage: None,
461✔
617
            contract_cost_limit_percentage: None,
461✔
618
            log_skipped_transactions: false,
461✔
619
        }
461✔
620
    }
461✔
621
}
622
impl MemPoolWalkSettings {
623
    pub fn zero() -> MemPoolWalkSettings {
319✔
624
        MemPoolWalkSettings {
319✔
625
            strategy: MemPoolWalkStrategy::NextNonceWithHighestFeeRate,
319✔
626
            max_walk_time_ms: u64::MAX,
319✔
627
            consider_no_estimate_tx_prob: 5,
319✔
628
            nonce_cache_size: 1024 * 1024,
319✔
629
            candidate_retry_cache_size: 64 * 1024,
319✔
630
            txs_to_consider: MemPoolWalkTxTypes::all(),
319✔
631
            filter_origins: HashSet::new(),
319✔
632
            tenure_cost_limit_per_block_percentage: None,
319✔
633
            contract_cost_limit_percentage: None,
319✔
634
            log_skipped_transactions: false,
319✔
635
        }
319✔
636
    }
319✔
637
}
638

639
impl FromRow<Txid> for Txid {
640
    fn from_row(row: &Row) -> Result<Txid, db_error> {
154,530✔
641
        row.get(0).map_err(db_error::SqliteError)
154,530✔
642
    }
154,530✔
643
}
644

645
impl FromRow<MemPoolTxMetadata> for MemPoolTxMetadata {
646
    fn from_row(row: &Row) -> Result<MemPoolTxMetadata, db_error> {
3,094,597✔
647
        let txid = Txid::from_column(row, "txid")?;
3,094,597✔
648
        let tenure_consensus_hash = ConsensusHash::from_column(row, "consensus_hash")?;
3,094,597✔
649
        let tenure_block_header_hash = BlockHeaderHash::from_column(row, "block_header_hash")?;
3,094,597✔
650
        let tx_fee = u64::from_column(row, "tx_fee")?;
3,094,597✔
651
        let coinbase_height = u64::from_column(row, "height")?;
3,094,597✔
652
        let len = u64::from_column(row, "length")?;
3,094,597✔
653
        let accept_time = u64::from_column(row, "accept_time")?;
3,094,597✔
654
        let origin_address = StacksAddress::from_column(row, "origin_address")?;
3,094,597✔
655
        let origin_nonce = u64::from_column(row, "origin_nonce")?;
3,094,597✔
656
        let sponsor_address = StacksAddress::from_column(row, "sponsor_address")?;
3,094,597✔
657
        let sponsor_nonce = u64::from_column(row, "sponsor_nonce")?;
3,094,597✔
658
        let last_known_sponsor_nonce = u64::from_column(row, "last_known_sponsor_nonce")?;
3,094,597✔
659
        let last_known_origin_nonce = u64::from_column(row, "last_known_origin_nonce")?;
3,094,597✔
660
        let time_estimate_ms: Option<u64> = row.get("time_estimate_ms")?;
3,094,597✔
661

662
        Ok(MemPoolTxMetadata {
3,094,597✔
663
            txid,
3,094,597✔
664
            len,
3,094,597✔
665
            tx_fee,
3,094,597✔
666
            tenure_consensus_hash,
3,094,597✔
667
            tenure_block_header_hash,
3,094,597✔
668
            coinbase_height,
3,094,597✔
669
            origin_address,
3,094,597✔
670
            origin_nonce,
3,094,597✔
671
            sponsor_address,
3,094,597✔
672
            sponsor_nonce,
3,094,597✔
673
            last_known_origin_nonce,
3,094,597✔
674
            last_known_sponsor_nonce,
3,094,597✔
675
            accept_time,
3,094,597✔
676
            time_estimate_ms,
3,094,597✔
677
        })
3,094,597✔
678
    }
3,094,597✔
679
}
680

681
impl FromRow<MemPoolTxInfo> for MemPoolTxInfo {
682
    fn from_row(row: &Row) -> Result<MemPoolTxInfo, db_error> {
3,073,396✔
683
        let md = MemPoolTxMetadata::from_row(row)?;
3,073,396✔
684
        let tx_bytes: Vec<u8> = row.get_unwrap("tx");
3,073,396✔
685
        let tx = StacksTransaction::consensus_deserialize(&mut &tx_bytes[..])
3,073,396✔
686
            .map_err(|_e| db_error::ParseError)?;
3,073,396✔
687

688
        if tx.txid() != md.txid {
3,073,396✔
689
            return Err(db_error::ParseError);
×
690
        }
3,073,396✔
691

692
        Ok(MemPoolTxInfo { tx, metadata: md })
3,073,396✔
693
    }
3,073,396✔
694
}
695

696
impl FromRow<MemPoolTxInfoPartial> for MemPoolTxInfoPartial {
697
    fn from_row(row: &Row) -> Result<MemPoolTxInfoPartial, db_error> {
19,435,823✔
698
        let txid = Txid::from_column(row, "txid")?;
19,435,823✔
699
        let fee_rate: Option<f64> = match row.get("fee_rate") {
19,435,823✔
700
            Ok(rate) => Some(rate),
19,421,904✔
701
            Err(_) => None,
13,919✔
702
        };
703
        let origin_address = StacksAddress::from_column(row, "origin_address")?;
19,435,823✔
704
        let origin_nonce = u64::from_column(row, "origin_nonce")?;
19,435,823✔
705
        let sponsor_address = StacksAddress::from_column(row, "sponsor_address")?;
19,435,823✔
706
        let sponsor_nonce = u64::from_column(row, "sponsor_nonce")?;
19,435,823✔
707

708
        Ok(MemPoolTxInfoPartial {
19,435,823✔
709
            txid,
19,435,823✔
710
            fee_rate,
19,435,823✔
711
            origin_address,
19,435,823✔
712
            origin_nonce,
19,435,823✔
713
            sponsor_address,
19,435,823✔
714
            sponsor_nonce,
19,435,823✔
715
        })
19,435,823✔
716
    }
19,435,823✔
717
}
718

719
impl FromRow<(u64, u64)> for (u64, u64) {
720
    fn from_row(row: &Row) -> Result<(u64, u64), db_error> {
51,214,854✔
721
        let t1: i64 = row.get_unwrap(0);
51,214,854✔
722
        let t2: i64 = row.get_unwrap(1);
51,214,854✔
723
        if t1 < 0 || t2 < 0 {
51,214,854✔
724
            return Err(db_error::ParseError);
×
725
        }
51,214,854✔
726
        Ok((t1 as u64, t2 as u64))
51,214,854✔
727
    }
51,214,854✔
728
}
729

730
const MEMPOOL_INITIAL_SCHEMA: &[&str] = &[r#"
731
    CREATE TABLE mempool(
732
        txid TEXT NOT NULL,
733
        origin_address TEXT NOT NULL,
734
        origin_nonce INTEGER NOT NULL,
735
        sponsor_address TEXT NOT NULL,
736
        sponsor_nonce INTEGER NOT NULL,
737
        tx_fee INTEGER NOT NULL,
738
        length INTEGER NOT NULL,
739
        consensus_hash TEXT NOT NULL,
740
        -- In epoch2x, this is the Stacks tip block hash at the time of this tx's arrival.
741
        -- In Nakamoto, this is the tenure-start block hash of the ongoing tenure at the time of this tx's arrival.
742
        block_header_hash TEXT NOT NULL,
743
        -- This is the *coinbase height* of the chain tip above.
744
        -- In epoch2x (when this schema was written), this also happened to be the block height; hence the name.
745
        -- In Nakamoto, this is not a block height any longer.
746
        height INTEGER NOT NULL,
747
        accept_time INTEGER NOT NULL,
748
        tx BLOB NOT NULL,
749
        PRIMARY KEY (txid),
750
        UNIQUE (origin_address, origin_nonce),
751
        UNIQUE (sponsor_address,sponsor_nonce)
752
    );
753
    "#];
754

755
const MEMPOOL_SCHEMA_2_COST_ESTIMATOR: &[&str] = &[
756
    r#"
757
    CREATE TABLE fee_estimates(
758
        txid TEXT NOT NULL,
759
        fee_rate NUMBER,
760
        PRIMARY KEY (txid),
761
        FOREIGN KEY (txid) REFERENCES mempool (txid) ON DELETE CASCADE ON UPDATE CASCADE
762
    );
763
    "#,
764
    // The `last_known_*_nonce` columns are no longer used, beginning in schema 6,
765
    // in favor of a separate `nonces` table and an in-memory cache.
766
    r#"
767
    ALTER TABLE mempool ADD COLUMN last_known_origin_nonce INTEGER;
768
    "#,
769
    r#"
770
    ALTER TABLE mempool ADD COLUMN last_known_sponsor_nonce INTEGER;
771
    "#,
772
    r#"
773
    CREATE TABLE schema_version (version NUMBER, PRIMARY KEY (version));
774
    "#,
775
    r#"
776
    INSERT INTO schema_version (version) VALUES (2)
777
    "#,
778
];
779

780
const MEMPOOL_SCHEMA_3_BLOOM_STATE: &[&str] = &[
781
    r#"
782
    CREATE TABLE IF NOT EXISTS removed_txids(
783
        txid TEXT PRIMARY KEY NOT NULL,
784
        FOREIGN KEY(txid) REFERENCES mempool(txid) ON DELETE CASCADE
785
    );
786
    "#,
787
    r#"
788
    -- mapping between hash(local-seed,txid) and txid, used for randomized but efficient
789
    -- paging when streaming transactions out of the mempool.
790
    CREATE TABLE IF NOT EXISTS randomized_txids(
791
        txid TEXT PRIMARY KEY NOT NULL,
792
        hashed_txid TEXT NOT NULL,
793
        FOREIGN KEY(txid) REFERENCES mempool(txid) ON DELETE CASCADE
794
    );
795
    "#,
796
    r#"
797
    INSERT INTO schema_version (version) VALUES (3)
798
    "#,
799
];
800

801
const MEMPOOL_SCHEMA_4_BLACKLIST: &[&str] = &[
802
    r#"
803
    -- List of transactions that will never be stored to the mempool again, for as long as the rows exist.
804
    -- `arrival_time` indicates when the entry was created. This is used to garbage-collect the list.
805
    -- A transaction that is blacklisted may still be served from the mempool, but it will never be (re)submitted.
806
    CREATE TABLE IF NOT EXISTS tx_blacklist(
807
        txid TEXT PRIMARY KEY NOT NULL,
808
        arrival_time INTEGER NOT NULL
809
    );
810
    "#,
811
    r#"
812
    -- Count the number of entries in the blacklist
813
    CREATE TABLE IF NOT EXISTS tx_blacklist_size(
814
        size INTEGER NOT NULL
815
    );
816
    "#,
817
    r#"
818
    -- Maintain a count of the size of the blacklist
819
    CREATE TRIGGER IF NOT EXISTS tx_blacklist_size_inc
820
    AFTER INSERT ON tx_blacklist
821
    BEGIN
822
        UPDATE tx_blacklist_size SET size = size + 1;
823
    END
824
    "#,
825
    r#"
826
    CREATE TRIGGER IF NOT EXISTS tx_blacklist_size_dec
827
    AFTER DELETE ON tx_blacklist
828
    BEGIN
829
        UPDATE tx_blacklist_size SET size = size - 1;
830
    END
831
    "#,
832
    r#"
833
    INSERT INTO tx_blacklist_size (size) VALUES (0)
834
    "#,
835
    r#"
836
    INSERT INTO schema_version (version) VALUES (4)
837
    "#,
838
];
839

840
const MEMPOOL_SCHEMA_5: &[&str] = &[
841
    r#"
842
    ALTER TABLE mempool ADD COLUMN fee_rate NUMBER;
843
    "#,
844
    r#"
845
    CREATE INDEX IF NOT EXISTS by_fee_rate ON mempool(fee_rate);
846
    "#,
847
    r#"
848
    UPDATE mempool
849
    SET fee_rate = (SELECT f.fee_rate FROM fee_estimates as f WHERE f.txid = mempool.txid);
850
    "#,
851
    r#"
852
    INSERT INTO schema_version (version) VALUES (5)
853
    "#,
854
];
855

856
const MEMPOOL_SCHEMA_6_NONCES: &[&str] = &[
857
    r#"
858
    CREATE TABLE nonces(
859
        address TEXT PRIMARY KEY NOT NULL,
860
        nonce INTEGER NOT NULL
861
    );
862
    "#,
863
    r#"
864
    INSERT INTO schema_version (version) VALUES (6)
865
    "#,
866
];
867

868
const MEMPOOL_SCHEMA_7_TIME_ESTIMATES: &[&str] = &[
869
    r#"
870
    -- ALLOW NULL
871
    ALTER TABLE mempool ADD COLUMN time_estimate_ms INTEGER;
872
    "#,
873
    r#"
874
    INSERT INTO schema_version (version) VALUES (7)
875
    "#,
876
];
877

878
const MEMPOOL_SCHEMA_8_NONCE_SORTING: &'static [&'static str] = &[
879
    r#"
880
    -- Add table to track considered transactions
881
    CREATE TABLE IF NOT EXISTS considered_txs(
882
        txid TEXT PRIMARY KEY NOT NULL,
883
        FOREIGN KEY(txid) REFERENCES mempool(txid) ON DELETE CASCADE
884
    );
885
    "#,
886
    r#"
887
    -- Drop redundant mempool indexes, covered by unique constraints
888
    DROP INDEX IF EXISTS "by_txid";
889
    DROP INDEX IF EXISTS "by_sponsor";
890
    DROP INDEX IF EXISTS "by_origin";
891
    "#,
892
    r#"
893
    -- Add indexes for nonce sorting
894
    CREATE INDEX IF NOT EXISTS by_address_nonce ON nonces(address, nonce);
895
    "#,
896
    r#"
897
    INSERT INTO schema_version (version) VALUES (8)
898
    "#,
899
];
900

901
const MEMPOOL_INDEXES: &[&str] = &[
902
    "CREATE INDEX IF NOT EXISTS by_txid ON mempool(txid);",
903
    "CREATE INDEX IF NOT EXISTS by_height ON mempool(height);",
904
    "CREATE INDEX IF NOT EXISTS by_txid_and_height ON mempool(txid,height);",
905
    "CREATE INDEX IF NOT EXISTS by_sponsor ON mempool(sponsor_address, sponsor_nonce);",
906
    "CREATE INDEX IF NOT EXISTS by_origin ON mempool(origin_address, origin_nonce);",
907
    "CREATE INDEX IF NOT EXISTS by_timestamp ON mempool(accept_time);",
908
    "CREATE INDEX IF NOT EXISTS by_chaintip ON mempool(consensus_hash,block_header_hash);",
909
    "CREATE INDEX IF NOT EXISTS fee_by_txid ON fee_estimates(txid);",
910
    "CREATE INDEX IF NOT EXISTS by_ordered_hashed_txid ON randomized_txids(hashed_txid ASC);",
911
    "CREATE INDEX IF NOT EXISTS by_hashed_txid ON randomized_txids(txid,hashed_txid);",
912
    "CREATE INDEX IF NOT EXISTS by_arrival_time_desc ON tx_blacklist(arrival_time DESC);",
913
];
914

915
pub struct MemPoolDB {
916
    pub db: DBConn,
917
    path: String,
918
    admitter: MemPoolAdmitter,
919
    bloom_counter: BloomCounter<BloomNodeHasher>,
920
    max_tx_tags: u32,
921
    cost_estimator: Box<dyn CostEstimator>,
922
    metric: Box<dyn CostMetric>,
923
    pub blacklist_timeout: u64,
924
    pub blacklist_max_size: u64,
925
}
926

927
pub struct MemPoolTx<'a> {
928
    tx: DBTx<'a>,
929
    admitter: &'a mut MemPoolAdmitter,
930
    bloom_counter: Option<&'a mut BloomCounter<BloomNodeHasher>>,
931
}
932

933
impl<'a> Deref for MemPoolTx<'a> {
934
    type Target = DBTx<'a>;
935
    fn deref(&self) -> &DBTx<'a> {
5,118,444✔
936
        &self.tx
5,118,444✔
937
    }
5,118,444✔
938
}
939

940
impl<'a> DerefMut for MemPoolTx<'a> {
941
    fn deref_mut(&mut self) -> &mut DBTx<'a> {
×
942
        &mut self.tx
×
943
    }
×
944
}
945

946
impl<'a> MemPoolTx<'a> {
947
    pub fn new(
7,916,232✔
948
        tx: DBTx<'a>,
7,916,232✔
949
        admitter: &'a mut MemPoolAdmitter,
7,916,232✔
950
        bloom_counter: &'a mut BloomCounter<BloomNodeHasher>,
7,916,232✔
951
    ) -> MemPoolTx<'a> {
7,916,232✔
952
        MemPoolTx {
7,916,232✔
953
            tx,
7,916,232✔
954
            admitter,
7,916,232✔
955
            bloom_counter: Some(bloom_counter),
7,916,232✔
956
        }
7,916,232✔
957
    }
7,916,232✔
958

959
    pub fn with_bloom_state<F, R>(tx: &mut MemPoolTx<'a>, f: F) -> R
88,832✔
960
    where
88,832✔
961
        F: FnOnce(&mut DBTx<'a>, &mut BloomCounter<BloomNodeHasher>) -> R,
88,832✔
962
    {
963
        let bc = tx
88,832✔
964
            .bloom_counter
88,832✔
965
            .take()
88,832✔
966
            .expect("BUG: did not replace bloom filter");
88,832✔
967
        let res = f(&mut tx.tx, bc);
88,832✔
968
        tx.bloom_counter.replace(bc);
88,832✔
969
        res
88,832✔
970
    }
88,832✔
971

972
    pub fn commit(self) -> Result<(), db_error> {
2,591,827✔
973
        self.tx.commit().map_err(db_error::SqliteError)
2,591,827✔
974
    }
2,591,827✔
975

976
    /// Remove all txids at the given coinbase height from the bloom counter.
977
    /// Used to clear out txids that are now outside the bloom counter's depth.
978
    fn prune_bloom_counter(&mut self, target_coinbase_height: u64) -> Result<(), MemPoolRejection> {
13,501✔
979
        let sql = "SELECT a.txid FROM mempool AS a LEFT OUTER JOIN removed_txids AS b ON a.txid = b.txid WHERE b.txid IS NULL AND a.height = ?1";
13,501✔
980
        let args = params![u64_to_sql(target_coinbase_height)?];
13,501✔
981
        let txids: Vec<Txid> = query_rows(&self.tx, sql, args)?;
13,501✔
982
        let _num_txs = txids.len();
13,501✔
983

984
        test_debug!(
13,501✔
985
            "Prune bloom counter from coinbase height {}",
986
            target_coinbase_height
987
        );
988

989
        // keep borrow-checker happy
990
        MemPoolTx::with_bloom_state(self, |ref mut dbtx, ref mut bloom_counter| {
13,501✔
991
            for txid in txids.into_iter() {
26,919✔
992
                bloom_counter.remove_raw(dbtx, &txid.0)?;
21,962✔
993

994
                let sql = "INSERT OR REPLACE INTO removed_txids (txid) VALUES (?1)";
21,962✔
995
                let args = params![txid];
21,962✔
996
                dbtx.execute(sql, args).map_err(db_error::SqliteError)?;
21,962✔
997
            }
998
            // help the type inference out
999
            let res: Result<(), db_error> = Ok(());
13,501✔
1000
            res
13,501✔
1001
        })?;
13,501✔
1002

1003
        test_debug!(
13,501✔
1004
            "Pruned bloom filter at coinbase height {}: removed {} txs",
1005
            target_coinbase_height,
1006
            _num_txs
1007
        );
1008
        Ok(())
13,501✔
1009
    }
13,501✔
1010

1011
    /// Add the txid to the bloom counter in the mempool DB, optionally replacing a prior
1012
    /// transaction (identified by prior_txid) if the bloom counter is full.
1013
    /// If this is the first txid at this coinbase height, then also garbage-collect the bloom counter to remove no-longer-recent transactions.
1014
    /// If the bloom counter is saturated -- i.e. it represents more than MAX_BLOOM_COUNTER_TXS
1015
    /// transactions -- then pick another transaction to evict from the bloom filter and return its txid.
1016
    /// (Note that no transactions are ever removed from the mempool; we just don't prioritize them
1017
    /// in the bloom filter).
1018
    fn update_bloom_counter(
75,331✔
1019
        &mut self,
75,331✔
1020
        coinbase_height: u64,
75,331✔
1021
        txid: &Txid,
75,331✔
1022
        prior_txid: Option<&Txid>,
75,331✔
1023
    ) -> Result<Option<Txid>, MemPoolRejection> {
75,331✔
1024
        // is this the first-ever txid at this coinbase height?
1025
        let sql = "SELECT 1 FROM mempool WHERE height = ?1";
75,331✔
1026
        let args = params![u64_to_sql(coinbase_height)?];
75,331✔
1027
        let present: Option<i64> = query_row(&self.tx, sql, args)?;
75,331✔
1028
        if present.is_none() && coinbase_height > (BLOOM_COUNTER_DEPTH as u64) {
75,331✔
1029
            // this is the first-ever tx at this coinbase height.
1030
            // which means, the bloom filter window has advanced.
1031
            // which means, we need to remove all the txs that are now out of the window.
1032
            self.prune_bloom_counter(coinbase_height - (BLOOM_COUNTER_DEPTH as u64))?;
13,501✔
1033
        }
61,830✔
1034

1035
        MemPoolTx::with_bloom_state(self, |ref mut dbtx, ref mut bloom_counter| {
75,331✔
1036
            // remove replaced transaction
1037
            if let Some(prior_txid) = prior_txid {
75,331✔
1038
                bloom_counter.remove_raw(dbtx, &prior_txid.0)?;
5,312✔
1039
            }
70,019✔
1040

1041
            // keep the bloom counter un-saturated -- remove at most one transaction from it to keep
1042
            // the error rate at or below the target error rate
1043
            let evict_txid = {
75,331✔
1044
                let num_recents = MemPoolDB::get_num_recent_txs(dbtx)?;
75,331✔
1045
                if num_recents >= u64::from(MAX_BLOOM_COUNTER_TXS) {
75,331✔
1046
                    // remove lowest-fee tx (they're paying the least, so replication is
1047
                    // deprioritized)
1048
                    let sql = "SELECT a.txid FROM mempool AS a LEFT OUTER JOIN removed_txids AS b ON a.txid = b.txid WHERE b.txid IS NULL AND a.height > ?1 ORDER BY a.tx_fee ASC LIMIT 1";
8,192✔
1049
                    let args = params![u64_to_sql(
8,192✔
1050
                        coinbase_height.saturating_sub(BLOOM_COUNTER_DEPTH as u64),
8,192✔
1051
                    )?];
×
1052
                    let evict_txid: Option<Txid> = query_row(dbtx, sql, args)?;
8,192✔
1053
                    if let Some(evict_txid) = evict_txid {
8,192✔
1054
                        bloom_counter.remove_raw(dbtx, &evict_txid.0)?;
8,192✔
1055

1056
                        let sql = "INSERT OR REPLACE INTO removed_txids (txid) VALUES (?1)";
8,192✔
1057
                        let args = params![evict_txid];
8,192✔
1058
                        dbtx.execute(sql, args).map_err(db_error::SqliteError)?;
8,192✔
1059

1060
                        Some(evict_txid)
8,192✔
1061
                    } else {
1062
                        None
×
1063
                    }
1064
                } else {
1065
                    None
67,139✔
1066
                }
1067
            };
1068

1069
            // finally add the new transaction
1070
            bloom_counter.insert_raw(dbtx, &txid.0)?;
75,331✔
1071
            Ok(evict_txid)
75,331✔
1072
        })
75,331✔
1073
    }
75,331✔
1074

1075
    /// Add the txid to our randomized page order
1076
    fn update_mempool_pager(&mut self, txid: &Txid) -> Result<(), MemPoolRejection> {
75,331✔
1077
        let mut randomized_buff = self
75,331✔
1078
            .bloom_counter
75,331✔
1079
            .as_ref()
75,331✔
1080
            .expect("BUG: did not instantiate bloom counter in mempool tx")
75,331✔
1081
            .get_seed()
75,331✔
1082
            .to_vec();
75,331✔
1083
        randomized_buff.extend_from_slice(&txid.0);
75,331✔
1084
        let hashed_txid = Txid(Sha512Trunc256Sum::from_data(&randomized_buff).0);
75,331✔
1085

1086
        let sql = "INSERT OR REPLACE INTO randomized_txids (txid,hashed_txid) VALUES (?1,?2)";
75,331✔
1087
        let args = params![txid, hashed_txid];
75,331✔
1088

1089
        self.execute(sql, args).map_err(db_error::SqliteError)?;
75,331✔
1090

1091
        Ok(())
75,331✔
1092
    }
75,331✔
1093
}
1094

1095
#[cfg(any(test, feature = "testing"))]
1096
pub fn db_get_all_nonces(conn: &DBConn) -> Result<Vec<(StacksAddress, u64)>, db_error> {
2✔
1097
    let sql = "SELECT * FROM nonces";
2✔
1098
    let mut stmt = conn.prepare(sql).map_err(db_error::SqliteError)?;
2✔
1099
    let mut iter = stmt.query(NO_PARAMS).map_err(db_error::SqliteError)?;
2✔
1100
    let mut ret = vec![];
2✔
1101
    while let Ok(Some(row)) = iter.next() {
4✔
1102
        let addr = StacksAddress::from_column(row, "address")?;
2✔
1103
        let nonce = u64::from_column(row, "nonce")?;
2✔
1104
        ret.push((addr, nonce));
2✔
1105
    }
1106
    Ok(ret)
2✔
1107
}
2✔
1108

1109
/// Cache potential candidate transactions for subsequent iterations.
1110
/// While walking the mempool, transactions that have nonces that are too high
1111
/// to process yet (but could be processed in the future) are added to `next`.
1112
/// In the next pass, `next` is moved to `cache` and these transactions are
1113
/// checked before reading more from the mempool DB.
1114
struct CandidateCache {
1115
    cache: VecDeque<MemPoolTxInfoPartial>,
1116
    next: VecDeque<MemPoolTxInfoPartial>,
1117
    /// The maximum size that this cache can be.
1118
    max_cache_size: usize,
1119
}
1120

1121
impl CandidateCache {
1122
    fn new(candidate_retry_cache_size: usize) -> Self {
364,223✔
1123
        let max_size: usize = candidate_retry_cache_size
364,223✔
1124
            .try_into()
364,223✔
1125
            .expect("Could not cast `candidate_retry_cache_size` as usize.");
364,223✔
1126
        Self {
364,223✔
1127
            cache: VecDeque::new(),
364,223✔
1128
            next: VecDeque::new(),
364,223✔
1129
            max_cache_size: max_size,
364,223✔
1130
        }
364,223✔
1131
    }
364,223✔
1132

1133
    /// Retrieve the next candidate transaction from the cache.
1134
    fn next(&mut self) -> Option<MemPoolTxInfoPartial> {
370,091,612✔
1135
        self.cache.pop_front()
370,091,612✔
1136
    }
370,091,612✔
1137

1138
    /// Push a candidate to the cache for the next iteration.
1139
    fn push(&mut self, tx: MemPoolTxInfoPartial) {
366,102,588✔
1140
        if self.next.len() < self.max_cache_size {
366,102,588✔
1141
            self.next.push_back(tx);
366,102,588✔
1142
        }
366,102,588✔
1143

1144
        #[cfg(test)]
1145
        assert!(self.cache.len() + self.next.len() <= self.max_cache_size);
16,500✔
1146
    }
366,102,588✔
1147

1148
    /// Prepare for the next iteration, transferring transactions from `next` to `cache`.
1149
    fn reset(&mut self) {
373,183✔
1150
        // We do not need a size check here, because the cache can only grow in size
1151
        // after `cache` is empty. New transactions are not walked until the entire
1152
        // cache has been walked, so whenever we are adding brand new transactions to
1153
        // the cache, `cache` must, by definition, be empty. The size of `next`
1154
        // can grow beyond the previous iteration's cache, and that is limited inside
1155
        // the `push` method.
1156
        self.next.append(&mut self.cache);
373,183✔
1157
        self.cache = std::mem::take(&mut self.next);
373,183✔
1158

1159
        #[cfg(test)]
1160
        {
1161
            assert!(self.cache.len() <= self.max_cache_size + 1);
250✔
1162
            assert!(self.next.len() <= self.max_cache_size + 1);
250✔
1163
        }
1164
    }
373,183✔
1165

1166
    /// Total length of the cache.
1167
    #[cfg_attr(test, mutants::skip)]
1168
    fn len(&self) -> usize {
×
1169
        self.cache.len() + self.next.len()
×
1170
    }
×
1171

1172
    /// Is the cache empty?
1173
    #[cfg_attr(test, mutants::skip)]
1174
    fn is_empty(&self) -> bool {
×
1175
        self.cache.is_empty() && self.next.is_empty()
×
1176
    }
×
1177
}
1178

1179
/// Evaluates the pair of nonces, to determine an order
1180
///
1181
/// Returns:
1182
///   `Equal` if both origin and sponsor nonces match expected
1183
///   `Less` if the origin nonce is less than expected, or the origin matches expected and the
1184
///          sponsor nonce is less than expected
1185
///   `Greater` if the origin nonce is greater than expected, or the origin matches expected
1186
///          and the sponsor nonce is greater than expected
1187
fn order_nonces(
370,985,303✔
1188
    origin_actual: u64,
370,985,303✔
1189
    origin_expected: u64,
370,985,303✔
1190
    sponsor_actual: u64,
370,985,303✔
1191
    sponsor_expected: u64,
370,985,303✔
1192
) -> Ordering {
370,985,303✔
1193
    if origin_actual < origin_expected {
370,985,303✔
1194
        return Ordering::Less;
4,195,393✔
1195
    } else if origin_actual > origin_expected {
366,789,910✔
1196
        return Ordering::Greater;
366,103,493✔
1197
    }
686,417✔
1198

1199
    if sponsor_actual < sponsor_expected {
686,417✔
1200
        return Ordering::Less;
×
1201
    } else if sponsor_actual > sponsor_expected {
686,417✔
1202
        return Ordering::Greater;
1✔
1203
    }
686,416✔
1204

1205
    Ordering::Equal
686,416✔
1206
}
370,985,303✔
1207

1208
impl MemPoolDB {
1209
    fn instantiate_mempool_db(conn: &mut DBConn) -> Result<(), db_error> {
3,316✔
1210
        let mut tx = tx_begin_immediate(conn)?;
3,316✔
1211

1212
        // create initial mempool tables
1213
        for cmd in MEMPOOL_INITIAL_SCHEMA {
3,316✔
1214
            tx.execute_batch(cmd).map_err(db_error::SqliteError)?;
3,316✔
1215
        }
1216

1217
        // apply all migrations
1218
        MemPoolDB::apply_schema_migrations(&mut tx)?;
3,316✔
1219

1220
        // add all indexes
1221
        MemPoolDB::add_indexes(&mut tx)?;
3,316✔
1222

1223
        tx.commit().map_err(db_error::SqliteError)?;
3,316✔
1224
        Ok(())
3,316✔
1225
    }
3,316✔
1226

1227
    /// Load the schema version from the database, if it's new enough to have such a version.
1228
    /// Returns Some(version) if a version can be loaded; None if not.
1229
    fn get_schema_version(conn: &DBConn) -> Result<Option<i64>, db_error> {
2,319,598✔
1230
        let is_versioned = table_exists(conn, "schema_version")?;
2,319,598✔
1231
        if !is_versioned {
2,319,598✔
1232
            return Ok(None);
3,316✔
1233
        }
2,316,282✔
1234

1235
        let version = conn
2,316,282✔
1236
            .query_row(
2,316,282✔
1237
                "SELECT MAX(version) FROM schema_version",
2,316,282✔
1238
                NO_PARAMS,
1239
                |row| row.get(0),
2,316,282✔
1240
            )
1241
            .optional()?;
2,316,282✔
1242

1243
        Ok(version)
2,316,282✔
1244
    }
2,319,598✔
1245

1246
    /// Apply all schema migrations up to the latest schema.
1247
    fn apply_schema_migrations(tx: &mut DBTx) -> Result<(), db_error> {
2,296,386✔
1248
        loop {
1249
            let version = MemPoolDB::get_schema_version(tx)?.unwrap_or(1);
2,319,598✔
1250
            match version {
2,319,598✔
1251
                1 => {
1252
                    MemPoolDB::instantiate_cost_estimator(tx)?;
3,316✔
1253
                }
1254
                2 => {
1255
                    MemPoolDB::instantiate_bloom_state(tx)?;
3,316✔
1256
                }
1257
                3 => {
1258
                    MemPoolDB::instantiate_tx_blacklist(tx)?;
3,316✔
1259
                }
1260
                4 => {
1261
                    MemPoolDB::denormalize_fee_rate(tx)?;
3,316✔
1262
                }
1263
                5 => {
1264
                    MemPoolDB::instantiate_nonces(tx)?;
3,316✔
1265
                }
1266
                6 => {
1267
                    MemPoolDB::instantiate_schema_7(tx)?;
3,316✔
1268
                }
1269
                7 => {
1270
                    MemPoolDB::instantiate_schema_8(tx)?;
3,316✔
1271
                }
1272
                8 => {
1273
                    break;
2,296,386✔
1274
                }
1275
                _ => {
1276
                    panic!("Unknown schema version {}", version);
×
1277
                }
1278
            }
1279
        }
1280
        Ok(())
2,296,386✔
1281
    }
2,296,386✔
1282

1283
    /// Add indexes
1284
    #[cfg_attr(test, mutants::skip)]
1285
    fn add_indexes(tx: &mut DBTx) -> Result<(), db_error> {
2,296,386✔
1286
        for cmd in MEMPOOL_INDEXES {
25,260,246✔
1287
            tx.execute_batch(cmd).map_err(db_error::SqliteError)?;
25,260,246✔
1288
        }
1289
        Ok(())
2,296,386✔
1290
    }
2,296,386✔
1291

1292
    /// Instantiate the on-disk counting bloom filter
1293
    #[cfg_attr(test, mutants::skip)]
1294
    fn instantiate_bloom_state(tx: &mut DBTx) -> Result<(), db_error> {
3,316✔
1295
        let node_hasher = BloomNodeHasher::new_random();
3,316✔
1296
        let _ = BloomCounter::new(
3,316✔
1297
            tx,
3,316✔
1298
            BLOOM_COUNTER_TABLE,
3,316✔
1299
            BLOOM_COUNTER_ERROR_RATE,
1300
            MAX_BLOOM_COUNTER_TXS,
1301
            node_hasher,
3,316✔
1302
        )?;
×
1303

1304
        for cmd in MEMPOOL_SCHEMA_3_BLOOM_STATE {
9,948✔
1305
            tx.execute_batch(cmd).map_err(db_error::SqliteError)?;
9,948✔
1306
        }
1307
        Ok(())
3,316✔
1308
    }
3,316✔
1309

1310
    /// Instantiate the cost estimator schema
1311
    #[cfg_attr(test, mutants::skip)]
1312
    fn instantiate_cost_estimator(tx: &DBTx) -> Result<(), db_error> {
3,316✔
1313
        for sql_exec in MEMPOOL_SCHEMA_2_COST_ESTIMATOR {
16,580✔
1314
            tx.execute_batch(sql_exec)?;
16,580✔
1315
        }
1316

1317
        Ok(())
3,316✔
1318
    }
3,316✔
1319

1320
    /// Denormalize fee rate schema 5
1321
    fn denormalize_fee_rate(tx: &DBTx) -> Result<(), db_error> {
3,316✔
1322
        for sql_exec in MEMPOOL_SCHEMA_5 {
13,264✔
1323
            tx.execute_batch(sql_exec)?;
13,264✔
1324
        }
1325

1326
        Ok(())
3,316✔
1327
    }
3,316✔
1328

1329
    /// Instantiate the tx blacklist schema
1330
    #[cfg_attr(test, mutants::skip)]
1331
    fn instantiate_tx_blacklist(tx: &DBTx) -> Result<(), db_error> {
3,316✔
1332
        for sql_exec in MEMPOOL_SCHEMA_4_BLACKLIST {
19,896✔
1333
            tx.execute_batch(sql_exec)?;
19,896✔
1334
        }
1335

1336
        Ok(())
3,316✔
1337
    }
3,316✔
1338

1339
    /// Add the nonce table
1340
    #[cfg_attr(test, mutants::skip)]
1341
    fn instantiate_nonces(tx: &DBTx) -> Result<(), db_error> {
3,316✔
1342
        for sql_exec in MEMPOOL_SCHEMA_6_NONCES {
6,632✔
1343
            tx.execute_batch(sql_exec)?;
6,632✔
1344
        }
1345

1346
        Ok(())
3,316✔
1347
    }
3,316✔
1348

1349
    /// Add the nonce table
1350
    #[cfg_attr(test, mutants::skip)]
1351
    fn instantiate_schema_7(tx: &DBTx) -> Result<(), db_error> {
3,316✔
1352
        for sql_exec in MEMPOOL_SCHEMA_7_TIME_ESTIMATES {
6,632✔
1353
            tx.execute_batch(sql_exec)?;
6,632✔
1354
        }
1355

1356
        Ok(())
3,316✔
1357
    }
3,316✔
1358

1359
    /// Optimize indexes for mempool visits
1360
    #[cfg_attr(test, mutants::skip)]
1361
    fn instantiate_schema_8(tx: &DBTx) -> Result<(), db_error> {
3,316✔
1362
        for sql_exec in MEMPOOL_SCHEMA_8_NONCE_SORTING {
13,264✔
1363
            tx.execute_batch(sql_exec)?;
13,264✔
1364
        }
1365

1366
        Ok(())
3,316✔
1367
    }
3,316✔
1368

1369
    #[cfg_attr(test, mutants::skip)]
1370
    pub fn db_path(chainstate_root_path: &str) -> Result<String, db_error> {
3,196,274✔
1371
        let mut path = PathBuf::from(chainstate_root_path);
3,196,274✔
1372

1373
        path.push("mempool.sqlite");
3,196,274✔
1374
        path.to_str()
3,196,274✔
1375
            .ok_or_else(|| db_error::ParseError)
3,196,274✔
1376
            .map(String::from)
3,196,274✔
1377
    }
3,196,274✔
1378

1379
    #[cfg(test)]
1380
    pub fn open_test(
918✔
1381
        mainnet: bool,
918✔
1382
        chain_id: u32,
918✔
1383
        chainstate_path: &str,
918✔
1384
    ) -> Result<MemPoolDB, db_error> {
918✔
1385
        let estimator = Box::new(UnitEstimator);
918✔
1386
        let metric = Box::new(UnitMetric);
918✔
1387
        MemPoolDB::open(mainnet, chain_id, chainstate_path, estimator, metric)
918✔
1388
    }
918✔
1389

1390
    pub fn open_db(
2,296,395✔
1391
        db_path: &str,
2,296,395✔
1392
        cost_estimator: Box<dyn CostEstimator>,
2,296,395✔
1393
        metric: Box<dyn CostMetric>,
2,296,395✔
1394
    ) -> Result<MemPoolDB, db_error> {
2,296,395✔
1395
        let admitter = MemPoolAdmitter::new(BlockHeaderHash([0u8; 32]), ConsensusHash([0u8; 20]));
2,296,395✔
1396

1397
        let mut create_flag = false;
2,296,395✔
1398
        let open_flags = if fs::metadata(&db_path).is_err() {
2,296,395✔
1399
            // need to create
1400
            create_flag = true;
3,316✔
1401
            OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE
3,316✔
1402
        } else {
1403
            // can just open
1404
            OpenFlags::SQLITE_OPEN_READ_WRITE
2,293,079✔
1405
        };
1406

1407
        let mut conn = sqlite_open(&db_path, open_flags, true)?;
2,296,395✔
1408
        if create_flag {
2,296,395✔
1409
            // instantiate!
1410
            MemPoolDB::instantiate_mempool_db(&mut conn)?;
3,316✔
1411
        } else {
1412
            let mut tx = tx_begin_immediate(&mut conn)?;
2,293,079✔
1413
            MemPoolDB::apply_schema_migrations(&mut tx)?;
2,293,079✔
1414
            MemPoolDB::add_indexes(&mut tx)?;
2,293,079✔
1415
            tx.commit().map_err(db_error::SqliteError)?;
2,293,079✔
1416
        }
1417

1418
        let bloom_counter = BloomCounter::<BloomNodeHasher>::try_load(&conn, BLOOM_COUNTER_TABLE)?
2,296,395✔
1419
            .ok_or(db_error::Other("Failed to load bloom counter".to_string()))?;
2,296,395✔
1420

1421
        Ok(MemPoolDB {
2,296,395✔
1422
            db: conn,
2,296,395✔
1423
            path: db_path.to_owned(),
2,296,395✔
1424
            admitter,
2,296,395✔
1425
            bloom_counter,
2,296,395✔
1426
            max_tx_tags: DEFAULT_MAX_TX_TAGS,
2,296,395✔
1427
            cost_estimator,
2,296,395✔
1428
            metric,
2,296,395✔
1429
            blacklist_timeout: DEFAULT_BLACKLIST_TIMEOUT,
2,296,395✔
1430
            blacklist_max_size: DEFAULT_BLACKLIST_MAX_SIZE,
2,296,395✔
1431
        })
2,296,395✔
1432
    }
2,296,395✔
1433

1434
    pub fn reopen(&self, readwrite: bool) -> Result<DBConn, db_error> {
371,148✔
1435
        if let Err(e) = fs::metadata(&self.path) {
371,148✔
1436
            return Err(db_error::IOError(e));
×
1437
        }
371,148✔
1438

1439
        let open_flags = if readwrite {
371,148✔
1440
            OpenFlags::SQLITE_OPEN_READ_WRITE
364,223✔
1441
        } else {
1442
            OpenFlags::SQLITE_OPEN_READ_ONLY
6,925✔
1443
        };
1444

1445
        let conn = sqlite_open(&self.path, open_flags, true)?;
371,148✔
1446
        Ok(conn)
371,148✔
1447
    }
371,148✔
1448

1449
    /// Open the mempool db within the chainstate directory.
1450
    /// The chainstate must be instantiated already.
1451
    pub fn open(
2,296,395✔
1452
        mainnet: bool,
2,296,395✔
1453
        chain_id: u32,
2,296,395✔
1454
        chainstate_path: &str,
2,296,395✔
1455
        cost_estimator: Box<dyn CostEstimator>,
2,296,395✔
1456
        metric: Box<dyn CostMetric>,
2,296,395✔
1457
    ) -> Result<MemPoolDB, db_error> {
2,296,395✔
1458
        match fs::metadata(chainstate_path) {
2,296,395✔
1459
            Ok(md) => {
2,296,395✔
1460
                if !md.is_dir() {
2,296,395✔
1461
                    return Err(db_error::NotFoundError);
×
1462
                }
2,296,395✔
1463
            }
1464
            Err(_e) => {
×
1465
                return Err(db_error::NotFoundError);
×
1466
            }
1467
        }
1468

1469
        let (chainstate, _) = StacksChainState::open(mainnet, chain_id, chainstate_path, None)
2,296,395✔
1470
            .map_err(|e| db_error::Other(format!("Failed to open chainstate: {:?}", &e)))?;
2,296,395✔
1471

1472
        let db_path = MemPoolDB::db_path(&chainstate.root_path)?;
2,296,395✔
1473

1474
        MemPoolDB::open_db(&db_path, cost_estimator, metric)
2,296,395✔
1475
    }
2,296,395✔
1476

1477
    #[cfg_attr(test, mutants::skip)]
1478
    pub fn reset_mempool_caches(&mut self) -> Result<(), db_error> {
378,397✔
1479
        self.reset_nonce_cache()?;
378,397✔
1480
        self.reset_considered_txs_cache()?;
378,397✔
1481
        Ok(())
378,397✔
1482
    }
378,397✔
1483

1484
    #[cfg_attr(test, mutants::skip)]
1485
    pub fn reset_considered_txs_cache(&mut self) -> Result<(), db_error> {
413,623✔
1486
        debug!("reset considered txs cache");
413,623✔
1487
        self.db.execute("DELETE FROM considered_txs", NO_PARAMS)?;
413,623✔
1488
        Ok(())
413,623✔
1489
    }
413,623✔
1490

1491
    #[cfg_attr(test, mutants::skip)]
1492
    pub fn reset_nonce_cache(&mut self) -> Result<(), db_error> {
378,397✔
1493
        debug!("reset nonce cache");
378,397✔
1494
        self.db.execute("DELETE FROM nonces", NO_PARAMS)?;
378,397✔
1495
        Ok(())
378,397✔
1496
    }
378,397✔
1497

1498
    /// Find the origin addresses who have sent the highest-fee transactions
1499
    fn find_origin_addresses_by_descending_fees(
×
1500
        &self,
×
1501
        start_coinbase_height: i64,
×
1502
        end_coinbase_height: i64,
×
1503
        min_fees: u64,
×
1504
        offset: u32,
×
1505
        count: u32,
×
1506
    ) -> Result<Vec<StacksAddress>, db_error> {
×
1507
        let sql = "SELECT DISTINCT origin_address FROM mempool WHERE height > ?1 AND height <= ?2 AND tx_fee >= ?3
×
1508
                   ORDER BY tx_fee DESC LIMIT ?4 OFFSET ?5";
×
1509
        let args = params![
×
1510
            start_coinbase_height,
1511
            end_coinbase_height,
1512
            u64_to_sql(min_fees)?,
×
1513
            count,
1514
            offset,
1515
        ];
1516
        query_row_columns(self.conn(), sql, args, "origin_address")
×
1517
    }
×
1518

1519
    /// Add estimated fee rates to the mempool rate table using
1520
    /// the mempool's configured `CostMetric` and `CostEstimator`. Will update
1521
    /// at most `max_updates` entries in the database before returning.
1522
    ///
1523
    /// Returns `Ok(number_updated)` on success
1524
    pub fn estimate_tx_rates(
386,373✔
1525
        &mut self,
386,373✔
1526
        max_updates: u32,
386,373✔
1527
        block_limit: &ExecutionCost,
386,373✔
1528
        stacks_epoch_id: &StacksEpochId,
386,373✔
1529
    ) -> Result<u32, db_error> {
386,373✔
1530
        let sql_tx = tx_begin_immediate(&mut self.db)?;
386,373✔
1531
        let txs: Vec<MemPoolTxInfo> = query_rows(
386,373✔
1532
            &sql_tx,
386,373✔
1533
            "SELECT * FROM mempool as m WHERE m.fee_rate IS NULL LIMIT ?",
386,373✔
1534
            params![max_updates],
386,373✔
1535
        )?;
×
1536
        let mut updated = 0;
386,373✔
1537
        for tx_to_estimate in txs {
391,197✔
1538
            let txid = tx_to_estimate.tx.txid();
26,694✔
1539
            let estimator_result = cost_estimates::estimate_fee_rate(
26,694✔
1540
                &tx_to_estimate.tx,
26,694✔
1541
                self.cost_estimator.as_ref(),
26,694✔
1542
                self.metric.as_ref(),
26,694✔
1543
                block_limit,
26,694✔
1544
                stacks_epoch_id,
26,694✔
1545
            );
1546
            let fee_rate_f64 = match estimator_result {
14,751✔
1547
                Ok(x) => Some(x),
12,078✔
1548
                Err(EstimatorError::NoEstimateAvailable) => continue,
14,616✔
1549
                Err(e) => {
×
1550
                    warn!("Error while estimating mempool tx rate";
×
1551
                          "txid" => %txid,
1552
                          "error" => ?e);
1553
                    continue;
×
1554
                }
1555
            };
1556

1557
            sql_tx.execute(
12,078✔
1558
                "UPDATE mempool SET fee_rate = ? WHERE txid = ?",
12,078✔
1559
                params![fee_rate_f64, txid],
12,078✔
1560
            )?;
12,078✔
1561
            updated += 1;
12,078✔
1562
        }
1563

1564
        sql_tx.commit()?;
386,373✔
1565

1566
        Ok(updated)
386,373✔
1567
    }
386,373✔
1568

1569
    /// Helper method to record nonces to a retry-buffer.
1570
    /// This is needed for when we try to write-through a new (address, nonce) pair to the on-disk
1571
    /// `nonces` cache, but the write fails due to lock contention from another thread.  The
1572
    /// retry-buffer will be used to later store this data in a single transaction.
1573
    fn save_nonce_for_retry(
×
1574
        retry_store: &mut HashMap<StacksAddress, u64>,
×
1575
        max_size: u64,
×
1576
        addr: StacksAddress,
×
1577
        new_nonce: u64,
×
1578
    ) {
×
1579
        if (retry_store.len() as u64) < max_size {
×
1580
            if let Some(nonce) = retry_store.get_mut(&addr) {
×
1581
                *nonce = cmp::max(new_nonce, *nonce);
×
1582
            } else {
×
1583
                retry_store.insert(addr, new_nonce);
×
1584
            }
×
1585
        }
×
1586
    }
×
1587

1588
    /// Iterate over candidates in the mempool
1589
    /// `todo` will be called once for each transaction that is a valid
1590
    /// candidate for inclusion in the next block, meaning its origin and
1591
    /// sponsor nonces are equal to the nonces of the corresponding accounts.
1592
    /// Best effort will be made to process the transactions in fee-rate order.
1593
    /// That is, transactions will be processed in fee-rate order until the
1594
    /// candidate cache is full, at which point, transactions with a lower
1595
    /// fee-rate may be considered before those with a higher fee-rate.
1596
    /// When the candidate cache fills, a subsequent call to
1597
    /// `iterate_candidates` will be needed to reconsider transactions which
1598
    /// were skipped on the first pass, but become valid after some lower
1599
    /// fee-rate transactions are considered.
1600
    ///
1601
    /// The size of the candidate cache and the nonce cache are configurable
1602
    /// in the settings struct. This method is interruptable -- in the
1603
    /// `settings` struct, the caller may choose how long to spend iterating
1604
    /// before this method stops.
1605
    ///
1606
    /// `todo` returns an option to a `TransactionEvent` representing the
1607
    /// outcome, or None to indicate that iteration through the mempool should
1608
    /// be halted.
1609
    ///
1610
    /// `output_events` is modified in place, adding all substantive
1611
    /// transaction events (success and error events, but not skipped) output
1612
    /// by `todo`.
1613
    pub fn iterate_candidates<F, E, C>(
364,223✔
1614
        &mut self,
364,223✔
1615
        clarity_tx: &mut C,
364,223✔
1616
        output_events: &mut Vec<TransactionEvent>,
364,223✔
1617
        settings: MemPoolWalkSettings,
364,223✔
1618
        mut todo: F,
364,223✔
1619
    ) -> Result<(u64, MempoolIterationStopReason), E>
364,223✔
1620
    where
364,223✔
1621
        C: ClarityConnection,
364,223✔
1622
        F: FnMut(
364,223✔
1623
            &mut C,
364,223✔
1624
            &ConsiderTransaction,
364,223✔
1625
            &mut dyn CostEstimator,
364,223✔
1626
        ) -> Result<Option<TransactionEvent>, E>,
364,223✔
1627
        E: From<db_error> + From<ChainstateError>,
364,223✔
1628
    {
1629
        let start_time = Instant::now();
364,223✔
1630
        let mut total_considered = 0;
364,223✔
1631
        let mut considered_txs = Vec::with_capacity(MAX_BLOCK_TXS);
364,223✔
1632

1633
        debug!("Mempool walk for {}ms", settings.max_walk_time_ms,);
364,223✔
1634

1635
        let mut nonce_cache = NonceCache::new(settings.nonce_cache_size);
364,223✔
1636
        let mut nonce_conn = self.reopen(true)?;
364,223✔
1637

1638
        // == Queries for `GlobalFeeRate` mempool walk strategy
1639
        //
1640
        // Selects mempool transactions only based on their fee rate. Transactions with NULL fee rates get randomly selected for
1641
        // consideration.
1642
        let tx_consideration_sampler = Uniform::new(0, 100);
364,223✔
1643
        let mut rng = rand::thread_rng();
364,223✔
1644
        let mut candidate_cache = CandidateCache::new(settings.candidate_retry_cache_size);
364,223✔
1645
        let sql = "
364,223✔
1646
            SELECT txid, origin_nonce, origin_address, sponsor_nonce, sponsor_address, fee_rate
364,223✔
1647
            FROM mempool
364,223✔
1648
            WHERE fee_rate IS NULL
364,223✔
1649
            ";
364,223✔
1650
        let mut query_stmt_null = self.db.prepare(sql).map_err(Error::SqliteError)?;
364,223✔
1651
        let mut null_iterator = query_stmt_null
364,223✔
1652
            .query(NO_PARAMS)
364,223✔
1653
            .map_err(Error::SqliteError)?;
364,223✔
1654
        let sql = "
364,223✔
1655
            SELECT txid, origin_nonce, origin_address, sponsor_nonce, sponsor_address, fee_rate
364,223✔
1656
            FROM mempool
364,223✔
1657
            WHERE fee_rate IS NOT NULL
364,223✔
1658
            ORDER BY fee_rate DESC
364,223✔
1659
            ";
364,223✔
1660
        let mut query_stmt_fee = self.db.prepare(sql).map_err(Error::SqliteError)?;
364,223✔
1661
        let mut fee_iterator = query_stmt_fee
364,223✔
1662
            .query(NO_PARAMS)
364,223✔
1663
            .map_err(Error::SqliteError)?;
364,223✔
1664

1665
        // Here we have a nested loop to walk the mempool.
1666
        //
1667
        // The `GlobalFeeRate` strategy includes all transactions, so we just
1668
        // query once and walk the full mempool in the inner loop.
1669
        //
1670
        // The `NextNonceWithHighestFeeRate` strategy only selects transactions
1671
        // that have the next expected nonce, so we need to re-query the
1672
        // mempool after one batch has been processed and the nonce table has
1673
        // been updated. This is handled in the outer loop.
1674
        let stop_reason = loop {
364,223✔
1675
            let mut state_changed = false;
498,531✔
1676

1677
            // == Query for `NextNonceWithHighestFeeRate` mempool walk strategy
1678
            //
1679
            // Selects the next mempool transaction to consider using a heuristic that maximizes miner fee profitability and minimizes
1680
            // CPU time wasted on already-mined or not-yet-mineable transactions. This heuristic takes the following steps:
1681
            //
1682
            // 1. Filters out transactions to consider only those that have the next expected nonce for both the origin and sponsor,
1683
            //    when possible
1684
            // 2. Adds a "simulated" fee rate to transactions that don't have it by multiplying the mempool's maximum current fee rate
1685
            //    by a random number. This helps us mix these transactions with others to guarantee they get processed in a reasonable
1686
            //    order
1687
            // 3. Ranks transactions by prioritizing those with next nonces and higher fees (per origin and sponsor address)
1688
            // 4. Takes the top ranked transaction and returns it for evaluation
1689
            //
1690
            // This logic prevents miners from repeatedly visiting (and then skipping) high fee transactions that would get evaluated
1691
            // first based on their `fee_rate` but are otherwise non-mineable because they have very high or invalid nonces. A large
1692
            // volume of these transactions would cause considerable slowness when selecting valid transactions to mine. This query
1693
            // also makes sure transactions that have NULL `fee_rate`s are visited, because they will also get ranked according to
1694
            // their origin address nonce.
1695
            let sql = "
498,531✔
1696
            WITH nonce_filtered AS (
498,531✔
1697
                SELECT txid, origin_nonce, origin_address, sponsor_nonce, sponsor_address, fee_rate,
498,531✔
1698
                    CASE
498,531✔
1699
                        WHEN fee_rate IS NULL THEN (ABS(RANDOM()) % 10000 / 10000.0) * (SELECT MAX(fee_rate) FROM mempool)
498,531✔
1700
                        ELSE fee_rate
498,531✔
1701
                    END AS sort_fee_rate
498,531✔
1702
                FROM mempool AS m
498,531✔
1703
                LEFT JOIN nonces AS no ON m.origin_address = no.address
498,531✔
1704
                LEFT JOIN nonces AS ns ON m.sponsor_address = ns.address
498,531✔
1705
                WHERE (no.address IS NULL OR m.origin_nonce = no.nonce)
498,531✔
1706
                    AND (ns.address IS NULL OR m.sponsor_nonce = ns.nonce)
498,531✔
1707
                    AND m.txid NOT IN (SELECT txid FROM considered_txs)
498,531✔
1708
                ORDER BY accept_time ASC
498,531✔
1709
                LIMIT 11650 -- max transactions that can fit in one block
498,531✔
1710
            ),
498,531✔
1711
            address_nonce_ranked AS (
498,531✔
1712
                SELECT *,
498,531✔
1713
                    ROW_NUMBER() OVER (
498,531✔
1714
                        PARTITION BY origin_address
498,531✔
1715
                        ORDER BY origin_nonce ASC, sort_fee_rate DESC
498,531✔
1716
                    ) AS origin_rank,
498,531✔
1717
                    ROW_NUMBER() OVER (
498,531✔
1718
                        PARTITION BY sponsor_address
498,531✔
1719
                        ORDER BY sponsor_nonce ASC, sort_fee_rate DESC
498,531✔
1720
                    ) AS sponsor_rank
498,531✔
1721
                FROM nonce_filtered
498,531✔
1722
            )
498,531✔
1723
            SELECT txid, origin_nonce, origin_address, sponsor_nonce, sponsor_address, fee_rate
498,531✔
1724
            FROM address_nonce_ranked
498,531✔
1725
            ORDER BY origin_rank ASC, sponsor_rank ASC, sort_fee_rate DESC
498,531✔
1726
            ";
498,531✔
1727
            let mut query_stmt_nonce_rank = self.db.prepare(sql).map_err(Error::SqliteError)?;
498,531✔
1728
            let mut nonce_rank_iterator = query_stmt_nonce_rank
498,531✔
1729
                .query(NO_PARAMS)
498,531✔
1730
                .map_err(Error::SqliteError)?;
498,531✔
1731

1732
            let stop_reason = loop {
498,531✔
1733
                if start_time.elapsed().as_millis() > settings.max_walk_time_ms as u128 {
371,483,186✔
1734
                    debug!("Mempool: iteration deadline exceeded";
117✔
1735
                       "deadline_ms" => settings.max_walk_time_ms);
×
1736
                    break MempoolIterationStopReason::DeadlineReached;
117✔
1737
                }
371,483,069✔
1738

1739
                // Get the next candidate transaction.
1740
                let (candidate, update_estimate) = match settings.strategy {
371,483,069✔
1741
                    MemPoolWalkStrategy::GlobalFeeRate => {
1742
                        // First, try to read from the retry list
1743
                        match candidate_cache.next() {
370,091,612✔
1744
                            Some(tx) => {
351,549,480✔
1745
                                let update_estimate = tx.fee_rate.is_none();
351,549,480✔
1746
                                (tx, update_estimate)
351,549,480✔
1747
                            }
1748
                            None => {
1749
                                // When the retry list is empty, read from the mempool db,
1750
                                // randomly selecting from either the null fee-rate transactions
1751
                                // or those with fee-rate estimates.
1752
                                let start_with_no_estimate = tx_consideration_sampler
18,542,132✔
1753
                                    .sample(&mut rng)
18,542,132✔
1754
                                    < settings.consider_no_estimate_tx_prob;
18,542,132✔
1755
                                let opt_tx = if start_with_no_estimate {
18,542,132✔
1756
                                    null_iterator.next().map_err(Error::SqliteError)?
4,631,508✔
1757
                                } else {
1758
                                    fee_iterator.next().map_err(Error::SqliteError)?
13,910,624✔
1759
                                };
1760
                                match opt_tx {
18,542,132✔
1761
                                    Some(row) => (
13,907,212✔
1762
                                        MemPoolTxInfoPartial::from_row(row)?,
13,907,212✔
1763
                                        start_with_no_estimate,
13,907,212✔
1764
                                    ),
1765
                                    None => {
1766
                                        // If the selected iterator is empty, check the other
1767
                                        match if start_with_no_estimate {
4,634,920✔
1768
                                            fee_iterator.next().map_err(Error::SqliteError)?
4,631,400✔
1769
                                        } else {
1770
                                            null_iterator.next().map_err(Error::SqliteError)?
3,520✔
1771
                                        } {
1772
                                            Some(row) => (
4,630,716✔
1773
                                                MemPoolTxInfoPartial::from_row(row)?,
4,630,716✔
1774
                                                !start_with_no_estimate,
4,630,716✔
1775
                                            ),
1776
                                            None => {
1777
                                                break MempoolIterationStopReason::NoMoreCandidates;
4,204✔
1778
                                            }
1779
                                        }
1780
                                    }
1781
                                }
1782
                            }
1783
                        }
1784
                    }
1785
                    MemPoolWalkStrategy::NextNonceWithHighestFeeRate => {
1786
                        match nonce_rank_iterator.next().map_err(Error::SqliteError)? {
1,391,457✔
1787
                            Some(row) => {
897,895✔
1788
                                let tx = MemPoolTxInfoPartial::from_row(row)?;
897,895✔
1789
                                let update_estimate = tx.fee_rate.is_none();
897,895✔
1790
                                (tx, update_estimate)
897,895✔
1791
                            }
1792
                            None => {
1793
                                break MempoolIterationStopReason::NoMoreCandidates;
493,562✔
1794
                            }
1795
                        }
1796
                    }
1797
                };
1798

1799
                state_changed = true;
370,985,303✔
1800

1801
                // Check the nonces.
1802
                let expected_origin_nonce =
370,985,303✔
1803
                    nonce_cache.get(&candidate.origin_address, clarity_tx, &mut nonce_conn);
370,985,303✔
1804
                let expected_sponsor_nonce =
370,985,303✔
1805
                    nonce_cache.get(&candidate.sponsor_address, clarity_tx, &mut nonce_conn);
370,985,303✔
1806

1807
                match order_nonces(
370,985,303✔
1808
                    candidate.origin_nonce,
370,985,303✔
1809
                    expected_origin_nonce,
370,985,303✔
1810
                    candidate.sponsor_nonce,
370,985,303✔
1811
                    expected_sponsor_nonce,
370,985,303✔
1812
                ) {
370,985,303✔
1813
                    Ordering::Less => {
1814
                        debug!(
4,195,393✔
1815
                            "Mempool: unexecutable: drop tx";
1816
                            "txid" => %candidate.txid,
1817
                            "tx_origin_addr" => %candidate.origin_address,
1818
                            "tx_origin_nonce" => candidate.origin_nonce,
×
1819
                            "fee_rate" => candidate.fee_rate.unwrap_or_default(),
×
1820
                            "expected_origin_nonce" => expected_origin_nonce,
×
1821
                            "expected_sponsor_nonce" => expected_sponsor_nonce,
×
1822
                        );
1823
                        // This transaction cannot execute in this pass, just drop it
1824
                        continue;
4,195,393✔
1825
                    }
1826
                    Ordering::Greater => {
1827
                        debug!(
366,103,494✔
1828
                            "Mempool: nonces too high";
1829
                            "txid" => %candidate.txid,
1830
                            "tx_origin_addr" => %candidate.origin_address,
1831
                            "tx_origin_nonce" => candidate.origin_nonce,
×
1832
                            "fee_rate" => candidate.fee_rate.unwrap_or_default(),
×
1833
                            "expected_origin_nonce" => expected_origin_nonce,
×
1834
                            "expected_sponsor_nonce" => expected_sponsor_nonce,
×
1835
                        );
1836
                        if settings.strategy == MemPoolWalkStrategy::GlobalFeeRate {
366,103,494✔
1837
                            // This transaction could become runnable in this pass, save it for later
366,102,588✔
1838
                            candidate_cache.push(candidate);
366,102,588✔
1839
                        }
366,103,493✔
1840
                        continue;
366,103,494✔
1841
                    }
1842
                    Ordering::Equal => {
686,416✔
1843
                        // Candidate transaction: fall through
686,416✔
1844
                    }
686,416✔
1845
                };
1846
                considered_txs.push(candidate.txid.clone());
686,416✔
1847

1848
                // Read in and deserialize the transaction.
1849
                let tx_info_option = MemPoolDB::get_tx(self.conn(), &candidate.txid)?;
686,416✔
1850
                let tx_info = match tx_info_option {
686,416✔
1851
                    Some(tx) => tx,
686,416✔
1852
                    None => {
1853
                        // Note: Don't panic here because maybe the state has changed from garbage collection.
1854
                        warn!("Miner: could not find a tx for id {:?}", &candidate.txid);
×
1855
                        continue;
×
1856
                    }
1857
                };
1858

1859
                let (tx_type, do_consider) = match &tx_info.tx.payload {
686,416✔
1860
                    TransactionPayload::TokenTransfer(..) => (
655,110✔
1861
                        "TokenTransfer".to_string(),
655,110✔
1862
                        settings
655,110✔
1863
                            .txs_to_consider
655,110✔
1864
                            .contains(&MemPoolWalkTxTypes::TokenTransfer),
655,110✔
1865
                    ),
655,110✔
1866
                    TransactionPayload::SmartContract(..) => (
3,346✔
1867
                        "SmartContract".to_string(),
3,346✔
1868
                        settings
3,346✔
1869
                            .txs_to_consider
3,346✔
1870
                            .contains(&MemPoolWalkTxTypes::SmartContract),
3,346✔
1871
                    ),
3,346✔
1872
                    TransactionPayload::ContractCall(..) => (
27,925✔
1873
                        "ContractCall".to_string(),
27,925✔
1874
                        settings
27,925✔
1875
                            .txs_to_consider
27,925✔
1876
                            .contains(&MemPoolWalkTxTypes::ContractCall),
27,925✔
1877
                    ),
27,925✔
1878
                    _ => ("".to_string(), true),
35✔
1879
                };
1880
                if !do_consider {
686,416✔
1881
                    debug!("Mempool: will skip tx, since it does not have an acceptable type";
10✔
1882
                       "txid" => %tx_info.tx.txid(),
×
1883
                       "type" => %tx_type);
1884
                    continue;
10✔
1885
                }
686,406✔
1886

1887
                let do_consider = settings.filter_origins.is_empty()
686,406✔
1888
                    || settings
×
1889
                        .filter_origins
×
1890
                        .contains(&tx_info.metadata.origin_address);
×
1891

1892
                if !do_consider {
686,406✔
1893
                    debug!("Mempool: will skip tx, since it does not have an allowed origin";
×
1894
                       "txid" => %tx_info.tx.txid(),
×
1895
                       "origin" => %tx_info.metadata.origin_address);
1896
                    continue;
×
1897
                }
686,406✔
1898

1899
                let consider = ConsiderTransaction {
686,406✔
1900
                    tx: tx_info,
686,406✔
1901
                    update_estimate,
686,406✔
1902
                };
686,406✔
1903
                debug!("Mempool: consider transaction";
686,406✔
1904
                           "txid" => %consider.tx.tx.txid(),
×
1905
                           "origin_addr" => %consider.tx.metadata.origin_address,
1906
                           "origin_nonce" => candidate.origin_nonce,
×
1907
                           "sponsor_addr" => %consider.tx.metadata.sponsor_address,
1908
                           "sponsor_nonce" => candidate.sponsor_nonce,
×
1909
                           "accept_time" => consider.tx.metadata.accept_time,
×
1910
                           "tx_fee" => consider.tx.metadata.tx_fee,
×
1911
                           "fee_rate" => candidate.fee_rate,
×
1912
                           "size" => consider.tx.metadata.len);
×
1913
                total_considered += 1;
686,406✔
1914

1915
                // Run `todo` on the transaction.
1916
                match todo(clarity_tx, &consider, self.cost_estimator.as_mut())? {
686,406✔
1917
                    Some(tx_event) => {
685,758✔
1918
                        match tx_event {
685,758✔
1919
                            TransactionEvent::Success(_) => {
1920
                                // Bump nonces in the cache for the executed transaction
1921
                                nonce_cache.set(
676,138✔
1922
                                    consider.tx.metadata.origin_address,
676,138✔
1923
                                    expected_origin_nonce + 1,
676,138✔
1924
                                    &mut nonce_conn,
676,138✔
1925
                                );
1926
                                if consider.tx.tx.auth.is_sponsored() {
676,138✔
1927
                                    nonce_cache.set(
122✔
1928
                                        consider.tx.metadata.sponsor_address,
122✔
1929
                                        expected_sponsor_nonce + 1,
122✔
1930
                                        &mut nonce_conn,
122✔
1931
                                    );
122✔
1932
                                }
676,065✔
1933
                                output_events.push(tx_event);
676,138✔
1934
                            }
1935
                            TransactionEvent::Skipped(_) => {
1936
                                // don't push `Skipped` events to the observer by default
1937
                                if settings.log_skipped_transactions {
9,487✔
1938
                                    output_events.push(tx_event);
450✔
1939
                                }
9,487✔
1940
                            }
1941
                            _ => {
133✔
1942
                                output_events.push(tx_event);
133✔
1943
                            }
133✔
1944
                        }
1945
                    }
1946
                    None => {
1947
                        debug!("Mempool: early exit from iterator");
648✔
1948
                        break MempoolIterationStopReason::IteratorExited;
648✔
1949
                    }
1950
                }
1951

1952
                if settings.strategy == MemPoolWalkStrategy::GlobalFeeRate {
685,758✔
1953
                    // Reset for finding the next transaction to process
1954
                    debug!(
373,183✔
1955
                        "Mempool: reset: retry list has {} entries",
1956
                        candidate_cache.len()
×
1957
                    );
1958
                    candidate_cache.reset();
373,183✔
1959
                }
312,575✔
1960
            };
1961

1962
            // If we've reached the end of the mempool, or if we've stopped
1963
            // iterating for some other reason, break out of the loop. In the
1964
            // case of `NextNonceWithHighestFeeRate` we know we've reached the
1965
            // end of the mempool if the state has not changed. In the case of
1966
            // `GlobalFeeRate` we know we've reached the end of the mempool if
1967
            // the stop reason is `NoMoreCandidates`.
1968
            if settings.strategy != MemPoolWalkStrategy::NextNonceWithHighestFeeRate
498,531✔
1969
                || stop_reason != MempoolIterationStopReason::NoMoreCandidates
494,102✔
1970
                || !state_changed
493,562✔
1971
            {
1972
                if stop_reason == MempoolIterationStopReason::NoMoreCandidates {
364,223✔
1973
                    debug!("Mempool: no more transactions to consider");
363,458✔
1974
                }
765✔
1975
                break stop_reason;
364,223✔
1976
            }
134,308✔
1977

1978
            // Flush the nonce cache to the database before performing the next
1979
            // query.
1980
            nonce_cache.flush(&mut nonce_conn);
134,308✔
1981

1982
            // Flush the candidate cache to the database before performing the
1983
            // next query.
1984
            flush_considered_txs(&mut nonce_conn, &mut considered_txs);
134,308✔
1985
        };
1986

1987
        // drop these rusqlite statements and queries, since their existence as immutable borrows on the
1988
        // connection prevents us from beginning a transaction below (which requires a mutable
1989
        // borrow).
1990
        drop(null_iterator);
364,223✔
1991
        drop(query_stmt_null);
364,223✔
1992
        drop(fee_iterator);
364,223✔
1993
        drop(query_stmt_fee);
364,223✔
1994

1995
        // Write through the nonce cache to the database
1996
        nonce_cache.flush(&mut self.db);
364,223✔
1997

1998
        stop_reason.report_to_monitoring();
364,223✔
1999

2000
        info!(
364,223✔
2001
            "Mempool iteration finished";
2002
            "considered_txs" => u128::from(total_considered),
364,223✔
2003
            "elapsed_ms" => start_time.elapsed().as_millis(),
364,223✔
2004
            "stop_reason" => ?stop_reason
2005
        );
2006
        Ok((total_considered, stop_reason))
364,223✔
2007
    }
364,223✔
2008

2009
    pub fn conn(&self) -> &DBConn {
789,919✔
2010
        &self.db
789,919✔
2011
    }
789,919✔
2012

2013
    pub fn tx_begin(&mut self) -> Result<MemPoolTx<'_>, db_error> {
7,916,232✔
2014
        let tx = tx_begin_immediate(&mut self.db)?;
7,916,232✔
2015
        Ok(MemPoolTx::new(
7,916,232✔
2016
            tx,
7,916,232✔
2017
            &mut self.admitter,
7,916,232✔
2018
            &mut self.bloom_counter,
7,916,232✔
2019
        ))
7,916,232✔
2020
    }
7,916,232✔
2021

2022
    pub fn db_has_tx(conn: &DBConn, txid: &Txid) -> Result<bool, db_error> {
79,110✔
2023
        query_row(conn, "SELECT 1 FROM mempool WHERE txid = ?1", params![txid])
79,110✔
2024
            .map(|row_opt: Option<i64>| row_opt.is_some())
79,110✔
2025
    }
79,110✔
2026

2027
    pub fn get_tx(conn: &DBConn, txid: &Txid) -> Result<Option<MemPoolTxInfo>, db_error> {
1,571,264✔
2028
        query_row(conn, "SELECT * FROM mempool WHERE txid = ?1", params![txid])
1,571,264✔
2029
    }
1,571,264✔
2030

2031
    /// Get all transactions across all tips
2032
    #[cfg(test)]
2033
    pub fn get_all_txs(conn: &DBConn) -> Result<Vec<MemPoolTxInfo>, db_error> {
2,908✔
2034
        let sql = "SELECT * FROM mempool";
2,908✔
2035
        let rows = query_rows::<MemPoolTxInfo, _>(conn, sql, NO_PARAMS)?;
2,908✔
2036
        Ok(rows)
2,908✔
2037
    }
2,908✔
2038

2039
    /// Get all transactions at a specific block
2040
    #[cfg(test)]
2041
    pub fn get_num_tx_at_block(
4✔
2042
        conn: &DBConn,
4✔
2043
        consensus_hash: &ConsensusHash,
4✔
2044
        block_header_hash: &BlockHeaderHash,
4✔
2045
    ) -> Result<usize, db_error> {
4✔
2046
        let sql = "SELECT * FROM mempool WHERE consensus_hash = ?1 AND block_header_hash = ?2";
4✔
2047
        let args = params![consensus_hash, block_header_hash];
4✔
2048
        let rows = query_rows::<MemPoolTxInfo, _>(conn, sql, args)?;
4✔
2049
        Ok(rows.len())
4✔
2050
    }
4✔
2051

2052
    /// Get a number of transactions after a given timestamp on a given chain tip.
2053
    #[cfg(test)]
2054
    pub fn get_txs_after(
19✔
2055
        conn: &DBConn,
19✔
2056
        consensus_hash: &ConsensusHash,
19✔
2057
        block_header_hash: &BlockHeaderHash,
19✔
2058
        timestamp: u64,
19✔
2059
        count: u64,
19✔
2060
    ) -> Result<Vec<MemPoolTxInfo>, db_error> {
19✔
2061
        let sql = "SELECT * FROM mempool WHERE accept_time >= ?1 AND consensus_hash = ?2 AND block_header_hash = ?3 ORDER BY tx_fee DESC LIMIT ?4";
19✔
2062
        let args = params![
19✔
2063
            u64_to_sql(timestamp)?,
19✔
2064
            consensus_hash,
2065
            block_header_hash,
2066
            u64_to_sql(count)?,
19✔
2067
        ];
2068
        let rows = query_rows::<MemPoolTxInfo, _>(conn, sql, args)?;
19✔
2069
        Ok(rows)
19✔
2070
    }
19✔
2071

2072
    /// Get a transaction's metadata, given address and nonce, and whether the address is used as a sponsor or an origin.
2073
    /// Faster than getting the MemPoolTxInfo, since no deserialization will be needed.
2074
    /// Used to see if there exists a transaction with this info, so as to implement replace-by-fee
2075
    pub fn get_tx_metadata_by_address(
161,239✔
2076
        conn: &DBConn,
161,239✔
2077
        is_origin: bool,
161,239✔
2078
        addr: &StacksAddress,
161,239✔
2079
        nonce: u64,
161,239✔
2080
    ) -> Result<Option<MemPoolTxMetadata>, db_error> {
161,239✔
2081
        let sql = format!(
161,239✔
2082
            "SELECT * FROM mempool WHERE {0}_address = ?1 AND {0}_nonce = ?2",
2083
            if is_origin { "origin" } else { "sponsor" }
161,239✔
2084
        );
2085
        let args = params![addr.to_string(), u64_to_sql(nonce)?];
161,239✔
2086
        query_row(conn, &sql, args)
161,239✔
2087
    }
161,239✔
2088

2089
    /// Are the given fully-qualified blocks, identified by their (consensus-hash, block-header-hash) pairs, in the same fork?
2090
    /// That is, is one block an ancestor of another?
2091
    /// TODO: Nakamoto-ize
2092
    fn are_blocks_in_same_fork(
5,303✔
2093
        chainstate: &mut StacksChainState,
5,303✔
2094
        first_consensus_hash: &ConsensusHash,
5,303✔
2095
        first_stacks_block: &BlockHeaderHash,
5,303✔
2096
        second_consensus_hash: &ConsensusHash,
5,303✔
2097
        second_stacks_block: &BlockHeaderHash,
5,303✔
2098
    ) -> Result<bool, db_error> {
5,303✔
2099
        let first_block = StacksBlockId::new(first_consensus_hash, first_stacks_block);
5,303✔
2100
        let second_block = StacksBlockId::new(second_consensus_hash, second_stacks_block);
5,303✔
2101
        // short circuit equality
2102
        if second_block == first_block {
5,303✔
2103
            return Ok(true);
5,301✔
2104
        }
2✔
2105

2106
        let headers_conn = &chainstate.index_conn();
2✔
2107
        let height_of_first_with_second_tip =
2✔
2108
            headers_conn.get_ancestor_block_height(&second_block, &first_block)?;
2✔
2109
        let height_of_second_with_first_tip =
2✔
2110
            headers_conn.get_ancestor_block_height(&first_block, &second_block)?;
2✔
2111

2112
        match (
2113
            height_of_first_with_second_tip,
2✔
2114
            height_of_second_with_first_tip,
2✔
2115
        ) {
2116
            (None, None) => Ok(false),
×
2117
            (_, _) => Ok(true),
2✔
2118
        }
2119
    }
5,303✔
2120

2121
    /// Add a transaction to the mempool.  If it already exists, then replace it if the given fee
2122
    /// is higher than the one that's already there.
2123
    /// Carry out the mempool admission test before adding.
2124
    ///
2125
    /// `tip_consensus_hash`, `tip_block_header_hash`, and `coinbase_height` describe the fork that
2126
    /// was canonical when this transaction is added.  While `coinbase_height` would be derived
2127
    /// from these first two fields, it is supplied independently to facilitate testing.
2128
    ///
2129
    /// If this is called in the Nakamoto epoch -- i.e. if `tip_consensus_hash` is in the Nakamoto
2130
    /// epoch -- then these tip hashes will be resolved to the tenure-start hashes first.  This is
2131
    /// because in Nakamoto, we index transactions by tenure-start blocks since they directly
2132
    /// correspond to epoch 2.x Stacks blocks (meaning, the semantics of mempool sync are preserved
2133
    /// across epoch 2.x and Nakamoto as long as we treat transactions this way).  In both epochs,
2134
    /// transactions arrive during a miner's tenure, not during a particular block's status as
2135
    /// the canonical chain tip.
2136
    ///
2137
    /// The tenure resolution behavior can be short-circuited with `resolve_tenure = false`.
2138
    /// However, this is only used in testing.
2139
    ///
2140
    /// Don't call directly; use submit().
2141
    pub(crate) fn try_add_tx(
80,634✔
2142
        tx: &mut MemPoolTx,
80,634✔
2143
        chainstate: &mut StacksChainState,
80,634✔
2144
        tip_consensus_hash: &ConsensusHash,
80,634✔
2145
        tip_block_header_hash: &BlockHeaderHash,
80,634✔
2146
        resolve_tenure: bool,
80,634✔
2147
        txid: &Txid,
80,634✔
2148
        tx_bytes: Vec<u8>,
80,634✔
2149
        tx_fee: u64,
80,634✔
2150
        coinbase_height: u64,
80,634✔
2151
        origin_address: &StacksAddress,
80,634✔
2152
        origin_nonce: u64,
80,634✔
2153
        sponsor_address: &StacksAddress,
80,634✔
2154
        sponsor_nonce: u64,
80,634✔
2155
        event_observer: Option<&dyn MemPoolEventDispatcher>,
80,634✔
2156
    ) -> Result<(), MemPoolRejection> {
80,634✔
2157
        let length = tx_bytes.len() as u64;
80,634✔
2158

2159
        // this transaction is said to arrive during this _tenure_, not during this _block_.
2160
        // In epoch 2.x, these are the same as `tip_consensus_hash` and `tip_block_header_hash`.
2161
        // In Nakamoto, they may be different.
2162
        //
2163
        // The only exception to this rule is if `tip_consensus_hash` and `tip_block_header_hash`
2164
        // are `FIRST_BURNCHAIN_CONSENSUS_HASH` and `FIRST_STACKS_BLOCK_HASH` -- in this case,
2165
        // there's no need to find the tenure-start header
2166
        let (consensus_hash, block_header_hash) = if resolve_tenure {
80,634✔
2167
            let tenure_start_header = NakamotoChainState::get_tenure_start_block_header(
43,222✔
2168
                &mut chainstate.index_conn(),
43,222✔
2169
                &StacksBlockId::new(tip_consensus_hash, tip_block_header_hash),
43,222✔
2170
                tip_consensus_hash,
43,222✔
2171
            )
2172
            .map_err(MemPoolRejection::FailedToValidate)?
43,222✔
2173
            .ok_or(MemPoolRejection::NoSuchChainTip(
43,222✔
2174
                tip_consensus_hash.clone(),
43,222✔
2175
                tip_block_header_hash.clone(),
43,222✔
2176
            ))?;
43,222✔
2177

2178
            let consensus_hash = tenure_start_header.consensus_hash;
43,222✔
2179
            let block_header_hash = tenure_start_header.anchored_header.block_hash();
43,222✔
2180
            (consensus_hash, block_header_hash)
43,222✔
2181
        } else {
2182
            (tip_consensus_hash.clone(), tip_block_header_hash.clone())
37,412✔
2183
        };
2184

2185
        // do we already have txs with either the same origin nonce or sponsor nonce ?
2186
        let prior_tx = {
80,634✔
2187
            match MemPoolDB::get_tx_metadata_by_address(tx, true, origin_address, origin_nonce)? {
80,634✔
2188
                Some(prior_tx) => Some(prior_tx),
10,615✔
2189
                None => MemPoolDB::get_tx_metadata_by_address(
70,019✔
2190
                    tx,
70,019✔
2191
                    false,
2192
                    sponsor_address,
70,019✔
2193
                    sponsor_nonce,
70,019✔
2194
                )?,
×
2195
            }
2196
        };
2197

2198
        let mut replace_reason = MemPoolDropReason::REPLACE_BY_FEE;
80,634✔
2199

2200
        // if so, is this a replace-by-fee? or a replace-in-chain-tip?
2201
        let add_tx = if let Some(ref prior_tx) = prior_tx {
80,634✔
2202
            if tx_fee > prior_tx.tx_fee {
10,615✔
2203
                // is this a replace-by-fee ?
2204
                debug!(
5,312✔
2205
                    "Can replace {} with {} for {},{} by fee ({} < {})",
2206
                    &prior_tx.txid, &txid, origin_address, origin_nonce, &prior_tx.tx_fee, &tx_fee
×
2207
                );
2208
                replace_reason = MemPoolDropReason::REPLACE_BY_FEE;
5,312✔
2209
                true
5,312✔
2210
            } else if !MemPoolDB::are_blocks_in_same_fork(
5,303✔
2211
                chainstate,
5,303✔
2212
                &prior_tx.tenure_consensus_hash,
5,303✔
2213
                &prior_tx.tenure_block_header_hash,
5,303✔
2214
                &consensus_hash,
5,303✔
2215
                &block_header_hash,
5,303✔
2216
            )? {
×
2217
                // is this a replace-across-fork ?
2218
                debug!(
×
2219
                    "Can replace {} with {} for {},{} across fork",
2220
                    &prior_tx.txid, &txid, origin_address, origin_nonce
×
2221
                );
2222
                replace_reason = MemPoolDropReason::REPLACE_ACROSS_FORK;
×
2223
                true
×
2224
            } else {
2225
                // there's a >= fee tx in this fork, cannot add
2226
                info!("TX conflicts with sponsor/origin nonce in same fork with >= fee";
5,303✔
2227
                      "new_txid" => %txid,
2228
                      "old_txid" => %prior_tx.txid,
2229
                      "origin_addr" => %origin_address,
2230
                      "origin_nonce" => origin_nonce,
5,303✔
2231
                      "sponsor_addr" => %sponsor_address,
2232
                      "sponsor_nonce" => sponsor_nonce,
5,303✔
2233
                      "new_fee" => tx_fee,
5,303✔
2234
                      "old_fee" => prior_tx.tx_fee);
5,303✔
2235
                false
5,303✔
2236
            }
2237
        } else {
2238
            // no conflicting TX with this origin/sponsor, go ahead and add
2239
            true
70,019✔
2240
        };
2241

2242
        if !add_tx {
80,634✔
2243
            return Err(MemPoolRejection::ConflictingNonceInMempool);
5,303✔
2244
        }
75,331✔
2245

2246
        tx.update_bloom_counter(coinbase_height, txid, prior_tx.as_ref().map(|tx| &tx.txid))?;
75,331✔
2247

2248
        let sql = "INSERT OR REPLACE INTO mempool (
75,331✔
2249
            txid,
75,331✔
2250
            origin_address,
75,331✔
2251
            origin_nonce,
75,331✔
2252
            sponsor_address,
75,331✔
2253
            sponsor_nonce,
75,331✔
2254
            tx_fee,
75,331✔
2255
            length,
75,331✔
2256
            consensus_hash,
75,331✔
2257
            block_header_hash,
75,331✔
2258
            height,
75,331✔
2259
            accept_time,
75,331✔
2260
            tx)
75,331✔
2261
            VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)";
75,331✔
2262

2263
        let args = params![
75,331✔
2264
            txid,
2265
            origin_address.to_string(),
75,331✔
2266
            u64_to_sql(origin_nonce)?,
75,331✔
2267
            sponsor_address.to_string(),
75,331✔
2268
            u64_to_sql(sponsor_nonce)?,
75,331✔
2269
            u64_to_sql(tx_fee)?,
75,331✔
2270
            u64_to_sql(length)?,
75,331✔
2271
            consensus_hash,
2272
            block_header_hash,
2273
            u64_to_sql(coinbase_height)?,
75,331✔
2274
            u64_to_sql(get_epoch_time_secs())?,
75,331✔
2275
            tx_bytes,
2276
        ];
2277

2278
        tx.execute(sql, args)
75,331✔
2279
            .map_err(|e| MemPoolRejection::DBError(db_error::SqliteError(e)))?;
75,331✔
2280

2281
        tx.update_mempool_pager(txid)?;
75,331✔
2282

2283
        // broadcast drop event if a tx is being replaced
2284
        if let (Some(prior_tx), Some(event_observer)) = (prior_tx, event_observer) {
75,331✔
2285
            event_observer.mempool_txs_dropped(
18✔
2286
                vec![prior_tx.txid],
18✔
2287
                Some(txid.clone()),
18✔
2288
                replace_reason,
18✔
2289
            );
18✔
2290
        };
75,313✔
2291

2292
        Ok(())
75,331✔
2293
    }
80,634✔
2294

2295
    /// Garbage-collect the mempool according to the behavior specified in `behavior`.
2296
    pub fn garbage_collect(
7,478,208✔
2297
        &mut self,
7,478,208✔
2298
        chain_height: u64,
7,478,208✔
2299
        behavior: &MempoolCollectionBehavior,
7,478,208✔
2300
        event_observer: Option<&dyn MemPoolEventDispatcher>,
7,478,208✔
2301
    ) -> Result<(), db_error> {
7,478,208✔
2302
        let tx = self.tx_begin()?;
7,478,208✔
2303
        match behavior {
7,478,208✔
2304
            MempoolCollectionBehavior::ByStacksHeight => {
2305
                // NOTE: this is the epoch2x behavior, so `chain_height` is 1-to-1 with coinbase
2306
                // height.  This will not be true in Nakamoto!
2307
                let Some(min_height) = chain_height.checked_sub(MEMPOOL_MAX_TRANSACTION_AGE) else {
5,324,229✔
2308
                    return Ok(());
5,324,229✔
2309
                };
2310
                Self::garbage_collect_by_coinbase_height(&tx, min_height, event_observer)?;
×
2311
            }
2312
            MempoolCollectionBehavior::ByReceiveTime => {
2313
                Self::garbage_collect_by_time(
2,153,979✔
2314
                    &tx,
2,153,979✔
2315
                    &MEMPOOL_NAKAMOTO_MAX_TRANSACTION_AGE,
2,153,979✔
2316
                    event_observer,
2,153,979✔
2317
                )?;
×
2318
            }
2319
        };
2320
        tx.commit()
2,153,979✔
2321
    }
7,478,208✔
2322

2323
    /// Garbage-collect the mempool. Remove transactions that were accepted more than `age` ago.
2324
    /// The granularity of this check is in seconds.
2325
    pub fn garbage_collect_by_time(
2,153,980✔
2326
        tx: &MemPoolTx,
2,153,980✔
2327
        age: &Duration,
2,153,980✔
2328
        event_observer: Option<&dyn MemPoolEventDispatcher>,
2,153,980✔
2329
    ) -> Result<(), db_error> {
2,153,980✔
2330
        let threshold_time = get_epoch_time_secs().saturating_sub(age.as_secs());
2,153,980✔
2331
        let args = params![u64_to_sql(threshold_time)?];
2,153,980✔
2332
        if let Some(event_observer) = event_observer {
2,153,980✔
2333
            let sql = "SELECT txid FROM mempool WHERE accept_time < ?1";
2,153,178✔
2334
            let txids = query_rows(tx, sql, args)?;
2,153,178✔
2335
            event_observer.mempool_txs_dropped(txids, None, MemPoolDropReason::STALE_COLLECT);
2,153,178✔
2336
        }
802✔
2337

2338
        let sql = "DELETE FROM mempool WHERE accept_time < ?1";
2,153,980✔
2339

2340
        tx.execute(sql, args)?;
2,153,980✔
2341
        increment_stx_mempool_gc();
2,153,980✔
2342
        Ok(())
2,153,980✔
2343
    }
2,153,980✔
2344

2345
    /// Garbage-collect the mempool.  Remove transactions that were received `min_coinbase_height`
2346
    ///  blocks ago.
2347
    pub fn garbage_collect_by_coinbase_height(
2✔
2348
        tx: &MemPoolTx,
2✔
2349
        min_coinbase_height: u64,
2✔
2350
        event_observer: Option<&dyn MemPoolEventDispatcher>,
2✔
2351
    ) -> Result<(), db_error> {
2✔
2352
        let args = params![u64_to_sql(min_coinbase_height)?];
2✔
2353

2354
        if let Some(event_observer) = event_observer {
2✔
2355
            let sql = "SELECT txid FROM mempool WHERE height < ?1";
×
2356
            let txids = query_rows(tx, sql, args)?;
×
2357
            event_observer.mempool_txs_dropped(txids, None, MemPoolDropReason::STALE_COLLECT);
×
2358
        }
2✔
2359

2360
        let sql = "DELETE FROM mempool WHERE height < ?1";
2✔
2361

2362
        tx.execute(sql, args)?;
2✔
2363
        increment_stx_mempool_gc();
2✔
2364
        Ok(())
2✔
2365
    }
2✔
2366

2367
    #[cfg(test)]
2368
    pub fn clear_before_coinbase_height(
1✔
2369
        &mut self,
1✔
2370
        min_coinbase_height: u64,
1✔
2371
    ) -> Result<(), db_error> {
1✔
2372
        let tx = self.tx_begin()?;
1✔
2373
        MemPoolDB::garbage_collect_by_coinbase_height(&tx, min_coinbase_height, None)?;
1✔
2374
        tx.commit()
1✔
2375
    }
1✔
2376

2377
    /// Submit a transaction to the mempool at a particular chain tip.
2378
    fn tx_submit(
39,436✔
2379
        mempool_tx: &mut MemPoolTx,
39,436✔
2380
        chainstate: &mut StacksChainState,
39,436✔
2381
        sortdb: &SortitionDB,
39,436✔
2382
        consensus_hash: &ConsensusHash,
39,436✔
2383
        block_hash: &BlockHeaderHash,
39,436✔
2384
        tx: &StacksTransaction,
39,436✔
2385
        do_admission_checks: bool,
39,436✔
2386
        event_observer: Option<&dyn MemPoolEventDispatcher>,
39,436✔
2387
        fee_rate_estimate: Option<f64>,
39,436✔
2388
    ) -> Result<(), MemPoolRejection> {
39,436✔
2389
        test_debug!(
39,436✔
2390
            "Mempool submit {} at {}/{}",
2391
            tx.txid(),
×
2392
            consensus_hash,
2393
            block_hash
2394
        );
2395

2396
        let block_id = StacksBlockId::new(consensus_hash, block_hash);
39,436✔
2397
        let coinbase_height = match NakamotoChainState::get_block_header(chainstate.db(), &block_id)
39,436✔
2398
        {
2399
            Ok(Some(header)) => header.stacks_block_height,
39,436✔
2400
            Ok(None) => {
2401
                if *consensus_hash == FIRST_BURNCHAIN_CONSENSUS_HASH {
×
2402
                    0
×
2403
                } else {
2404
                    return Err(MemPoolRejection::NoSuchChainTip(
×
2405
                        consensus_hash.clone(),
×
2406
                        block_hash.clone(),
×
2407
                    ));
×
2408
                }
2409
            }
2410
            Err(e) => {
×
2411
                return Err(MemPoolRejection::Other(format!(
×
2412
                    "Failed to load chain tip: {:?}",
×
2413
                    &e
×
2414
                )));
×
2415
            }
2416
        };
2417

2418
        let txid = tx.txid();
39,436✔
2419
        let mut tx_data = vec![];
39,436✔
2420
        tx.consensus_serialize(&mut tx_data)
39,436✔
2421
            .map_err(MemPoolRejection::SerializationFailure)?;
39,436✔
2422

2423
        let len = tx_data.len() as u64;
39,436✔
2424
        let tx_fee = tx.get_tx_fee();
39,436✔
2425
        let origin_address = tx.origin_address();
39,436✔
2426
        let origin_nonce = tx.get_origin_nonce();
39,436✔
2427
        let (sponsor_address, sponsor_nonce) =
39,436✔
2428
            if let (Some(addr), Some(nonce)) = (tx.sponsor_address(), tx.get_sponsor_nonce()) {
39,436✔
2429
                (addr, nonce)
9✔
2430
            } else {
2431
                (origin_address.clone(), origin_nonce)
39,427✔
2432
            };
2433

2434
        if do_admission_checks {
39,436✔
2435
            mempool_tx
38,878✔
2436
                .admitter
38,878✔
2437
                .set_block(block_hash, (*consensus_hash).clone());
38,878✔
2438
            mempool_tx
38,878✔
2439
                .admitter
38,878✔
2440
                .will_admit_tx(chainstate, sortdb, tx, len)?;
38,878✔
2441
        }
558✔
2442

2443
        MemPoolDB::try_add_tx(
39,272✔
2444
            mempool_tx,
39,272✔
2445
            chainstate,
39,272✔
2446
            consensus_hash,
39,272✔
2447
            block_hash,
39,272✔
2448
            true,
2449
            &txid,
39,272✔
2450
            tx_data,
39,272✔
2451
            tx_fee,
39,272✔
2452
            coinbase_height,
39,272✔
2453
            &origin_address,
39,272✔
2454
            origin_nonce,
39,272✔
2455
            &sponsor_address,
39,272✔
2456
            sponsor_nonce,
39,272✔
2457
            event_observer,
39,272✔
2458
        )?;
9✔
2459

2460
        mempool_tx
39,263✔
2461
            .execute(
39,263✔
2462
                "UPDATE mempool SET fee_rate = ? WHERE txid = ?",
39,263✔
2463
                params![fee_rate_estimate, txid],
39,263✔
2464
            )
39,263✔
2465
            .map_err(db_error::from)?;
39,263✔
2466

2467
        if let Err(e) = monitoring::mempool_accepted(&txid, &chainstate.root_path) {
39,263✔
2468
            warn!("Failed to monitor TX receive: {:?}", e; "txid" => %txid);
×
2469
        }
39,263✔
2470

2471
        Ok(())
39,263✔
2472
    }
39,436✔
2473

2474
    /// One-shot transaction submit.
2475
    ///
2476
    /// Transactions are indexed relative to a chain tip, identified by `consensus_hash` and
2477
    /// `block_hash`.  These fields have slightly different interpretations depending on what epoch
2478
    /// we're in:
2479
    /// * In epoch 2.x, these are the Stacks chain tip.
2480
    /// * In Nakamoto, these will be resolved to the tenure-start block of the tenure in which this
2481
    /// Stacks block lies.  The reason for this is because of how the mempool performs
2482
    /// garbage collection in its DB and bloom filter -- the latter of which is used for mempool
2483
    /// sync.
2484
    ///
2485
    /// No action is required by te caller to handle this discrepancy; the caller should just submit
2486
    /// the canonical Stacks tip.  If the current epoch is a Nakamoto epoch, it will be resolved to
2487
    /// the tenure-start block internally.
2488
    pub fn submit(
39,390✔
2489
        &mut self,
39,390✔
2490
        chainstate: &mut StacksChainState,
39,390✔
2491
        sortdb: &SortitionDB,
39,390✔
2492
        consensus_hash: &ConsensusHash,
39,390✔
2493
        block_hash: &BlockHeaderHash,
39,390✔
2494
        tx: &StacksTransaction,
39,390✔
2495
        event_observer: Option<&dyn MemPoolEventDispatcher>,
39,390✔
2496
        block_limit: &ExecutionCost,
39,390✔
2497
        stacks_epoch_id: &StacksEpochId,
39,390✔
2498
    ) -> Result<(), MemPoolRejection> {
39,390✔
2499
        if self.is_tx_blacklisted(&tx.txid())? {
39,390✔
2500
            // don't re-store this transaction
2501
            test_debug!("Transaction {} is temporarily blacklisted", &tx.txid());
512✔
2502
            return Err(MemPoolRejection::TemporarilyBlacklisted);
512✔
2503
        }
38,878✔
2504

2505
        let estimator_result = cost_estimates::estimate_fee_rate(
38,878✔
2506
            tx,
38,878✔
2507
            self.cost_estimator.as_ref(),
38,878✔
2508
            self.metric.as_ref(),
38,878✔
2509
            block_limit,
38,878✔
2510
            stacks_epoch_id,
38,878✔
2511
        );
2512

2513
        let mut mempool_tx = self.tx_begin().map_err(MemPoolRejection::DBError)?;
38,878✔
2514

2515
        let fee_rate = match estimator_result {
38,878✔
2516
            Ok(x) => Some(x),
26,395✔
2517
            Err(EstimatorError::NoEstimateAvailable) => None,
12,483✔
2518
            Err(e) => {
×
2519
                warn!("Error while estimating mempool tx rate";
×
2520
                      "txid" => %tx.txid(),
×
2521
                      "error" => ?e);
2522
                return Err(MemPoolRejection::EstimatorError(e));
×
2523
            }
2524
        };
2525

2526
        MemPoolDB::tx_submit(
38,878✔
2527
            &mut mempool_tx,
38,878✔
2528
            chainstate,
38,878✔
2529
            sortdb,
38,878✔
2530
            consensus_hash,
38,878✔
2531
            block_hash,
38,878✔
2532
            tx,
38,878✔
2533
            true,
2534
            event_observer,
38,878✔
2535
            fee_rate,
38,878✔
2536
        )?;
173✔
2537
        mempool_tx.commit().map_err(MemPoolRejection::DBError)?;
38,705✔
2538
        Ok(())
38,705✔
2539
    }
39,390✔
2540

2541
    /// Miner-driven submit (e.g. for poison microblocks), where no checks are performed
2542
    pub fn miner_submit(
×
2543
        &mut self,
×
2544
        chainstate: &mut StacksChainState,
×
2545
        sortdb: &SortitionDB,
×
2546
        consensus_hash: &ConsensusHash,
×
2547
        block_hash: &BlockHeaderHash,
×
2548
        tx: &StacksTransaction,
×
2549
        event_observer: Option<&dyn MemPoolEventDispatcher>,
×
2550
        miner_estimate: f64,
×
2551
    ) -> Result<(), MemPoolRejection> {
×
2552
        let mut mempool_tx = self.tx_begin().map_err(MemPoolRejection::DBError)?;
×
2553

2554
        let fee_estimate = Some(miner_estimate);
×
2555

2556
        MemPoolDB::tx_submit(
×
2557
            &mut mempool_tx,
×
2558
            chainstate,
×
2559
            sortdb,
×
2560
            consensus_hash,
×
2561
            block_hash,
×
2562
            tx,
×
2563
            false,
2564
            event_observer,
×
2565
            fee_estimate,
×
2566
        )?;
×
2567
        mempool_tx.commit().map_err(MemPoolRejection::DBError)?;
×
2568
        Ok(())
×
2569
    }
×
2570

2571
    /// Directly submit to the mempool, and don't do any admissions checks.
2572
    #[cfg(any(test, feature = "testing"))]
2573
    pub fn submit_raw(
558✔
2574
        &mut self,
558✔
2575
        chainstate: &mut StacksChainState,
558✔
2576
        sortdb: &SortitionDB,
558✔
2577
        consensus_hash: &ConsensusHash,
558✔
2578
        block_hash: &BlockHeaderHash,
558✔
2579
        tx_bytes: Vec<u8>,
558✔
2580
        block_limit: &ExecutionCost,
558✔
2581
        stacks_epoch_id: &StacksEpochId,
558✔
2582
    ) -> Result<(), MemPoolRejection> {
558✔
2583
        let tx = StacksTransaction::consensus_deserialize(&mut &tx_bytes[..])
558✔
2584
            .map_err(MemPoolRejection::DeserializationFailure)?;
558✔
2585

2586
        if self.is_tx_blacklisted(&tx.txid())? {
558✔
2587
            // don't re-store this transaction
2588
            test_debug!("Transaction {} is temporarily blacklisted", &tx.txid());
×
2589
            return Err(MemPoolRejection::TemporarilyBlacklisted);
×
2590
        }
558✔
2591

2592
        let estimator_result = cost_estimates::estimate_fee_rate(
558✔
2593
            &tx,
558✔
2594
            self.cost_estimator.as_ref(),
558✔
2595
            self.metric.as_ref(),
558✔
2596
            block_limit,
558✔
2597
            stacks_epoch_id,
558✔
2598
        );
2599

2600
        let mut mempool_tx = self.tx_begin().map_err(MemPoolRejection::DBError)?;
558✔
2601

2602
        let fee_rate = match estimator_result {
558✔
2603
            Ok(x) => Some(x),
423✔
2604
            Err(EstimatorError::NoEstimateAvailable) => None,
135✔
2605
            Err(e) => {
×
2606
                warn!("Error while estimating mempool tx rate";
×
2607
                      "txid" => %tx.txid(),
×
2608
                      "error" => ?e);
2609
                return Err(MemPoolRejection::Other(
×
2610
                    "Failed to estimate mempool tx rate".into(),
×
2611
                ));
×
2612
            }
2613
        };
2614

2615
        MemPoolDB::tx_submit(
558✔
2616
            &mut mempool_tx,
558✔
2617
            chainstate,
558✔
2618
            sortdb,
558✔
2619
            consensus_hash,
558✔
2620
            block_hash,
558✔
2621
            &tx,
558✔
2622
            false,
2623
            None,
558✔
2624
            fee_rate,
558✔
2625
        )?;
×
2626
        mempool_tx.commit().map_err(MemPoolRejection::DBError)?;
558✔
2627
        Ok(())
558✔
2628
    }
558✔
2629

2630
    /// Blacklist transactions from the mempool
2631
    /// Do not call directly; it's `pub` only for testing
2632
    pub fn inner_blacklist_txs(tx: &DBTx<'_>, txids: &[Txid], now: u64) -> Result<(), db_error> {
72✔
2633
        for txid in txids {
610✔
2634
            let sql = "INSERT OR REPLACE INTO tx_blacklist (txid, arrival_time) VALUES (?1, ?2)";
610✔
2635
            let args = params![txid, &u64_to_sql(now)?];
610✔
2636
            tx.execute(sql, args)?;
610✔
2637
        }
2638
        Ok(())
72✔
2639
    }
72✔
2640

2641
    /// garbage-collect the tx blacklist -- delete any transactions whose blacklist timeout has
2642
    /// been exceeded
2643
    pub fn garbage_collect_tx_blacklist(
72✔
2644
        tx: &DBTx<'_>,
72✔
2645
        now: u64,
72✔
2646
        timeout: u64,
72✔
2647
        max_size: u64,
72✔
2648
    ) -> Result<(), db_error> {
72✔
2649
        let sql = "DELETE FROM tx_blacklist WHERE arrival_time + ?1 < ?2";
72✔
2650
        let args = params![u64_to_sql(timeout)?, u64_to_sql(now)?];
72✔
2651
        tx.execute(sql, args)?;
72✔
2652

2653
        // if we get too big, then drop some txs at random
2654
        let sql = "SELECT size FROM tx_blacklist_size";
72✔
2655
        let sz = query_int(tx, sql, NO_PARAMS)? as u64;
72✔
2656
        if sz > max_size {
72✔
2657
            let to_delete = sz - max_size;
1✔
2658
            let txids: Vec<Txid> = query_rows(
1✔
2659
                tx,
1✔
2660
                "SELECT txid FROM tx_blacklist ORDER BY RANDOM() LIMIT ?1",
1✔
2661
                params![u64_to_sql(to_delete)?],
1✔
2662
            )?;
×
2663
            for txid in txids.into_iter() {
5✔
2664
                tx.execute("DELETE FROM tx_blacklist WHERE txid = ?1", params![txid])?;
5✔
2665
            }
2666
        }
71✔
2667
        Ok(())
72✔
2668
    }
72✔
2669

2670
    /// when was a tx blacklisted?
2671
    fn get_blacklisted_tx_arrival_time(
40,028✔
2672
        conn: &DBConn,
40,028✔
2673
        txid: &Txid,
40,028✔
2674
    ) -> Result<Option<u64>, db_error> {
40,028✔
2675
        let sql = "SELECT arrival_time FROM tx_blacklist WHERE txid = ?1";
40,028✔
2676
        let args = params![txid];
40,028✔
2677
        query_row(conn, sql, args)
40,028✔
2678
    }
40,028✔
2679

2680
    /// is a tx blacklisted as of the given timestamp?
2681
    fn inner_is_tx_blacklisted(
40,028✔
2682
        conn: &DBConn,
40,028✔
2683
        txid: &Txid,
40,028✔
2684
        now: u64,
40,028✔
2685
        timeout: u64,
40,028✔
2686
    ) -> Result<bool, db_error> {
40,028✔
2687
        match MemPoolDB::get_blacklisted_tx_arrival_time(conn, txid)? {
40,028✔
2688
            None => Ok(false),
39,481✔
2689
            Some(arrival_time) => Ok(now < arrival_time + timeout),
547✔
2690
        }
2691
    }
40,028✔
2692

2693
    /// is a tx blacklisted?
2694
    pub fn is_tx_blacklisted(&self, txid: &Txid) -> Result<bool, db_error> {
40,028✔
2695
        MemPoolDB::inner_is_tx_blacklisted(
40,028✔
2696
            self.conn(),
40,028✔
2697
            txid,
40,028✔
2698
            get_epoch_time_secs(),
40,028✔
2699
            self.blacklist_timeout,
40,028✔
2700
        )
2701
    }
40,028✔
2702

2703
    /// Inner code body for dropping transactions.
2704
    /// Note that the bloom filter will *NOT* be updated.  That's the caller's job, if desired.
2705
    fn inner_drop_txs(tx: &DBTx<'_>, txids: &[Txid]) -> Result<(), db_error> {
386,442✔
2706
        let sql = "DELETE FROM mempool WHERE txid = ?";
386,442✔
2707
        for txid in txids.iter() {
386,442✔
2708
            tx.execute(sql, &[txid])?;
96✔
2709
        }
2710
        Ok(())
386,442✔
2711
    }
386,442✔
2712

2713
    /// Drop transactions from the mempool.  Does not update the bloom filter, thereby ensuring that
2714
    /// these transactions will still show up as present to the mempool sync logic.
2715
    pub fn drop_txs(&mut self, txids: &[Txid]) -> Result<(), db_error> {
386,373✔
2716
        let mempool_tx = self.tx_begin()?;
386,373✔
2717
        MemPoolDB::inner_drop_txs(&mempool_tx, txids)?;
386,373✔
2718
        mempool_tx.commit()?;
386,373✔
2719
        Ok(())
386,373✔
2720
    }
386,373✔
2721

2722
    /// Update the time estimates for the supplied txs in the mempool db
2723
    pub fn update_tx_time_estimates(&mut self, txs: &[(Txid, u64)]) -> Result<(), db_error> {
11,739✔
2724
        let sql = "UPDATE mempool SET time_estimate_ms = ? WHERE txid = ?";
11,739✔
2725
        let mempool_tx = self.tx_begin()?;
11,739✔
2726
        for (txid, time_estimate_ms) in txs.iter() {
1,132,976✔
2727
            mempool_tx
1,132,976✔
2728
                .tx
1,132,976✔
2729
                .execute(sql, params![time_estimate_ms, txid])?;
1,132,976✔
2730
        }
2731
        mempool_tx.commit()?;
11,739✔
2732

2733
        Ok(())
11,739✔
2734
    }
11,739✔
2735

2736
    /// Drop and blacklist transactions, so we don't re-broadcast them or re-fetch them.
2737
    /// Do *NOT* remove them from the bloom filter.  This will cause them to continue to be
2738
    /// reported as present, which is exactly what we want because we don't want these transactions
2739
    /// to be seen again (so we don't want anyone accidentally "helpfully" pushing them to us, nor
2740
    /// do we want the mempool sync logic to "helpfully" re-discover and re-download them).
2741
    pub fn drop_and_blacklist_txs(&mut self, txids: &[Txid]) -> Result<(), db_error> {
69✔
2742
        let now = get_epoch_time_secs();
69✔
2743
        let blacklist_timeout = self.blacklist_timeout;
69✔
2744
        let blacklist_max_size = self.blacklist_max_size;
69✔
2745

2746
        let mempool_tx = self.tx_begin()?;
69✔
2747
        MemPoolDB::inner_drop_txs(&mempool_tx, txids)?;
69✔
2748
        MemPoolDB::inner_blacklist_txs(&mempool_tx, txids, now)?;
69✔
2749
        MemPoolDB::garbage_collect_tx_blacklist(
69✔
2750
            &mempool_tx,
69✔
2751
            now,
69✔
2752
            blacklist_timeout,
69✔
2753
            blacklist_max_size,
69✔
2754
        )?;
×
2755
        mempool_tx.commit()?;
69✔
2756

2757
        Ok(())
69✔
2758
    }
69✔
2759

2760
    #[cfg(test)]
2761
    pub fn dump_txs(&self) {
×
2762
        let sql = "SELECT * FROM mempool";
×
2763
        let txs: Vec<MemPoolTxMetadata> = query_rows(&self.db, sql, NO_PARAMS).unwrap();
×
2764

2765
        eprintln!("{:#?}", txs);
×
2766
    }
×
2767

2768
    /// Do we have a transaction?
2769
    pub fn has_tx(&self, txid: &Txid) -> bool {
42,047✔
2770
        match MemPoolDB::db_has_tx(self.conn(), txid) {
42,047✔
2771
            Ok(b) => {
42,047✔
2772
                if b {
42,047✔
2773
                    test_debug!("Mempool tx already present: {}", txid);
3,575✔
2774
                }
38,472✔
2775
                b
42,047✔
2776
            }
2777
            Err(e) => {
×
2778
                warn!("Failed to query txid: {:?}", &e);
×
2779
                false
×
2780
            }
2781
        }
2782
    }
42,047✔
2783

2784
    /// Get the bloom filter that represents the set of recent transactions we have
2785
    pub fn get_txid_bloom_filter(&self) -> Result<BloomFilter<BloomNodeHasher>, db_error> {
58✔
2786
        self.bloom_counter.to_bloom_filter(self.conn())
58✔
2787
    }
58✔
2788

2789
    /// Find maximum Stacks coinbase height represented in the mempool.
2790
    pub fn get_max_coinbase_height(conn: &DBConn) -> Result<Option<u64>, db_error> {
89,290✔
2791
        let sql = "SELECT 1 FROM mempool WHERE height >= 0";
89,290✔
2792
        let count = query_rows::<i64, _>(conn, sql, NO_PARAMS)?.len();
89,290✔
2793
        if count == 0 {
89,290✔
2794
            Ok(None)
8,022✔
2795
        } else {
2796
            let sql = "SELECT MAX(height) FROM mempool";
81,268✔
2797
            Ok(Some(query_int(conn, sql, NO_PARAMS)? as u64))
81,268✔
2798
        }
2799
    }
89,290✔
2800

2801
    /// Get the transaction ID list that represents the set of transactions that are represented in
2802
    /// the bloom counter.
2803
    pub fn get_bloom_txids(&self) -> Result<Vec<Txid>, db_error> {
6,981✔
2804
        let max_height = match MemPoolDB::get_max_coinbase_height(self.conn())? {
6,981✔
2805
            Some(h) => h,
4,170✔
2806
            None => {
2807
                // mempool is empty
2808
                return Ok(vec![]);
2,811✔
2809
            }
2810
        };
2811
        let min_height = max_height.saturating_sub(BLOOM_COUNTER_DEPTH as u64);
4,170✔
2812
        let sql = "SELECT mempool.txid FROM mempool WHERE height > ?1 AND height <= ?2 AND NOT EXISTS (SELECT 1 FROM removed_txids WHERE txid = mempool.txid)";
4,170✔
2813
        let args = params![u64_to_sql(min_height)?, u64_to_sql(max_height)?];
4,170✔
2814
        query_rows(self.conn(), sql, args)
4,170✔
2815
    }
6,981✔
2816

2817
    /// Get the transaction tag list that represents the set of recent transactions we have.
2818
    /// Generate them with our node-local seed so that our txtag list is different from anyone
2819
    /// else's, with high probability.
2820
    pub fn get_txtags(&self, seed: &[u8]) -> Result<Vec<TxTag>, db_error> {
6,981✔
2821
        self.get_bloom_txids().map(|txid_list| {
6,981✔
2822
            txid_list
6,981✔
2823
                .iter()
6,981✔
2824
                .map(|txid| TxTag::from(seed, txid))
125,129✔
2825
                .collect()
6,981✔
2826
        })
6,981✔
2827
    }
6,981✔
2828

2829
    /// How many recent transactions are there -- i.e. within BLOOM_COUNTER_DEPTH coinbase heights of
2830
    /// the chain tip?
2831
    pub fn get_num_recent_txs(conn: &DBConn) -> Result<u64, db_error> {
82,309✔
2832
        let max_height = match MemPoolDB::get_max_coinbase_height(conn)? {
82,309✔
2833
            Some(h) => h,
77,098✔
2834
            None => {
2835
                // mempool is empty
2836
                return Ok(0);
5,211✔
2837
            }
2838
        };
2839
        let min_height = max_height.saturating_sub(BLOOM_COUNTER_DEPTH as u64);
77,098✔
2840
        let sql = "SELECT COUNT(txid) FROM mempool WHERE height > ?1 AND height <= ?2";
77,098✔
2841
        let args = params![u64_to_sql(min_height)?, u64_to_sql(max_height)?];
77,098✔
2842
        query_int(conn, sql, args).map(|cnt| cnt as u64)
77,098✔
2843
    }
82,309✔
2844

2845
    /// Make a mempool sync request.
2846
    /// If sufficiently sparse, use a MemPoolSyncData::TxTags variant
2847
    /// Otherwise, use a MemPoolSyncData::BloomFilter variant
2848
    pub fn make_mempool_sync_data(&self) -> Result<MemPoolSyncData, db_error> {
6,960✔
2849
        let num_tags = MemPoolDB::get_num_recent_txs(self.conn())?;
6,960✔
2850
        if num_tags < u64::from(self.max_tx_tags) {
6,960✔
2851
            let seed = *self.bloom_counter.get_seed();
6,960✔
2852
            let tags = self.get_txtags(&seed)?;
6,960✔
2853
            Ok(MemPoolSyncData::TxTags(seed, tags))
6,960✔
2854
        } else {
2855
            Ok(MemPoolSyncData::BloomFilter(self.get_txid_bloom_filter()?))
×
2856
        }
2857
    }
6,960✔
2858

2859
    /// Get the hashed txid for a txid
2860
    pub fn get_randomized_txid(&self, txid: &Txid) -> Result<Option<Txid>, db_error> {
128✔
2861
        let sql = "SELECT hashed_txid FROM randomized_txids WHERE txid = ?1 LIMIT 1";
128✔
2862
        let args = params![txid];
128✔
2863
        query_row(self.conn(), sql, args)
128✔
2864
    }
128✔
2865

2866
    pub fn find_next_missing_transactions(
136✔
2867
        &self,
136✔
2868
        data: &MemPoolSyncData,
136✔
2869
        coinbase_height: u64,
136✔
2870
        last_randomized_txid: &Txid,
136✔
2871
        max_txs: u64,
136✔
2872
        max_run: u64,
136✔
2873
    ) -> Result<(Vec<StacksTransaction>, Option<Txid>, u64), db_error> {
136✔
2874
        Self::static_find_next_missing_transactions(
136✔
2875
            self.conn(),
136✔
2876
            data,
136✔
2877
            coinbase_height,
136✔
2878
            last_randomized_txid,
136✔
2879
            max_txs,
136✔
2880
            max_run,
136✔
2881
        )
2882
    }
136✔
2883

2884
    /// Get the next batch of transactions from our mempool that are *not* represented in the given
2885
    /// MemPoolSyncData.  Transactions are ordered lexicographically by randomized_txids.hashed_txid, since this allows us
2886
    /// to use the txid as a cursor while ensuring that each node returns txids in a deterministic random order
2887
    /// (so if some nodes are configured to return fewer than MAX_BLOOM_COUNTER_TXS transactions,
2888
    /// a requesting node will still have a good chance of getting something useful).
2889
    /// Also, return the next value to pass for `last_randomized_txid` to load the next page.
2890
    /// Also, return the number of rows considered.
2891
    pub fn static_find_next_missing_transactions(
13,920✔
2892
        conn: &DBConn,
13,920✔
2893
        data: &MemPoolSyncData,
13,920✔
2894
        coinbase_height: u64,
13,920✔
2895
        last_randomized_txid: &Txid,
13,920✔
2896
        max_txs: u64,
13,920✔
2897
        max_run: u64,
13,920✔
2898
    ) -> Result<(Vec<StacksTransaction>, Option<Txid>, u64), db_error> {
13,920✔
2899
        let mut ret = vec![];
13,920✔
2900
        let sql = "SELECT mempool.txid AS txid, mempool.tx AS tx, randomized_txids.hashed_txid AS hashed_txid \
13,920✔
2901
                   FROM mempool JOIN randomized_txids \
13,920✔
2902
                   ON mempool.txid = randomized_txids.txid \
13,920✔
2903
                   WHERE randomized_txids.hashed_txid > ?1 \
13,920✔
2904
                   AND mempool.height > ?2 \
13,920✔
2905
                   AND NOT EXISTS \
13,920✔
2906
                        (SELECT 1 FROM removed_txids WHERE txid = mempool.txid) \
13,920✔
2907
                   ORDER BY randomized_txids.hashed_txid ASC LIMIT ?3";
13,920✔
2908

2909
        let args = params![
13,920✔
2910
            last_randomized_txid,
2911
            u64_to_sql(coinbase_height.saturating_sub(BLOOM_COUNTER_DEPTH as u64))?,
13,920✔
2912
            u64_to_sql(max_run)?,
13,920✔
2913
        ];
2914

2915
        let mut tags_table = HashSet::new();
13,920✔
2916
        if let MemPoolSyncData::TxTags(_, ref tags) = data {
13,920✔
2917
            for tag in tags.iter() {
1,274,190✔
2918
                tags_table.insert(tag.clone());
1,273,261✔
2919
            }
1,273,261✔
2920
        }
68✔
2921

2922
        let mut stmt = conn.prepare(sql)?;
13,920✔
2923
        let mut rows = stmt.query(args)?;
13,920✔
2924
        let mut num_rows_visited = 0;
13,920✔
2925
        let mut next_page = None;
13,920✔
2926
        while let Some(row) = rows.next()? {
72,464✔
2927
            if num_rows_visited >= max_run {
65,438✔
2928
                break;
×
2929
            }
65,438✔
2930

2931
            let txid = Txid::from_column(row, "txid")?;
65,438✔
2932
            num_rows_visited += 1;
65,438✔
2933

2934
            let hashed_txid = Txid::from_column(row, "hashed_txid")?;
65,438✔
2935
            test_debug!(
65,438✔
2936
                "Consider txid {} ({}) at or after {}",
2937
                &txid,
×
2938
                &hashed_txid,
×
2939
                last_randomized_txid
2940
            );
2941
            next_page = Some(hashed_txid);
65,438✔
2942

2943
            let contains = match data {
65,438✔
2944
                MemPoolSyncData::BloomFilter(ref bf) => bf.contains_raw(&txid.0),
24,576✔
2945
                MemPoolSyncData::TxTags(ref seed, ..) => {
40,862✔
2946
                    tags_table.contains(&TxTag::from(seed, &txid))
40,862✔
2947
                }
2948
            };
2949
            if contains {
65,438✔
2950
                // remote peer already has this one
2951
                continue;
25,776✔
2952
            }
39,662✔
2953

2954
            let tx_bytes: Vec<u8> = row.get_unwrap("tx");
39,662✔
2955
            let tx = StacksTransaction::consensus_deserialize(&mut &tx_bytes[..])
39,662✔
2956
                .map_err(|_e| db_error::ParseError)?;
39,662✔
2957

2958
            test_debug!("Returning txid {}", &txid);
39,662✔
2959
            ret.push(tx);
39,662✔
2960
            if (ret.len() as u64) >= max_txs {
39,662✔
2961
                break;
6,894✔
2962
            }
32,768✔
2963
        }
2964

2965
        Ok((ret, next_page, num_rows_visited))
13,920✔
2966
    }
13,920✔
2967
}
2968

2969
/// Flush the considered transaction IDs to the DB.
2970
/// Do not return until successful. After a successful flush, clear the vector.
2971
pub fn flush_considered_txs(conn: &mut DBConn, considered_txs: &mut Vec<Txid>) {
134,308✔
2972
    const MAX_BACKOFF: Duration = Duration::from_secs(30);
2973
    let mut backoff = Duration::from_millis(rand::thread_rng().gen_range(50..200));
134,308✔
2974

2975
    loop {
2976
        // Pass a slice to the try function.
2977
        let result = try_flush_considered_txs(conn, considered_txs.as_slice());
134,308✔
2978

2979
        match result {
134,308✔
2980
            Ok(_) => {
2981
                // On success, clear the vector so that it’s empty.
2982
                considered_txs.clear();
134,308✔
2983
                return;
134,308✔
2984
            }
2985
            Err(e) => {
×
2986
                warn!("Considered txid flush failed: {e}. Retrying in {backoff:?}");
×
2987
                thread::sleep(backoff);
×
2988
                if backoff < MAX_BACKOFF {
×
2989
                    backoff =
×
2990
                        backoff * 2 + Duration::from_millis(rand::thread_rng().gen_range(50..200));
×
2991
                }
×
2992
            }
2993
        }
2994
    }
2995
}
134,308✔
2996

2997
/// Try to flush the considered transaction IDs to the DB.
2998
pub fn try_flush_considered_txs(
134,308✔
2999
    conn: &mut DBConn,
134,308✔
3000
    considered_txs: &[Txid],
134,308✔
3001
) -> Result<(), db_error> {
134,308✔
3002
    let sql = "INSERT OR IGNORE INTO considered_txs (txid) VALUES (?1)";
134,308✔
3003

3004
    let db_tx = conn.transaction()?;
134,308✔
3005

3006
    for txid in considered_txs {
241,884✔
3007
        match db_tx.execute(sql, params![txid]) {
141,369✔
3008
            Ok(_) => {}
141,369✔
3009
            Err(rusqlite::Error::SqliteFailure(err, _))
×
3010
                if err.code == rusqlite::ErrorCode::ConstraintViolation =>
×
3011
            {
3012
                // Ignore constraint violations (e.g., foreign key failure)
3013
                // This can happen if the txid was removed from the mempool DB
3014
                // before we could flush it to the considered_txs table.
3015
                continue;
×
3016
            }
3017
            Err(e) => return Err(e.into()),
×
3018
        }
3019
    }
3020

3021
    db_tx.commit()?;
134,308✔
3022
    Ok(())
134,308✔
3023
}
134,308✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc